Social media intelligence
Systematic approaches for monitoring, analyzing, and investigating social media for journalism.
When to activate
- Tracking how a story spreads across platforms
- Investigating potential coordinated inauthentic behavior
- Monitoring breaking news across social platforms
- Analyzing account networks and relationships
- Detecting bot activity or manipulation campaigns
- Building evidence trails for digital investigations
- Archiving social content before deletion
Real-time monitoring
Multi-platform tracker
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional, Dict
from enum import Enum
import hashlib
class Platform(Enum):
TWITTER = "twitter" # X since 2023; "twitter" retained for legacy data
FACEBOOK = "facebook"
INSTAGRAM = "instagram"
TIKTOK = "tiktok"
YOUTUBE = "youtube"
REDDIT = "reddit"
THREADS = "threads"
BLUESKY = "bluesky"
MASTODON = "mastodon"
TELEGRAM = "telegram"
@dataclass
class SocialPost:
platform: Platform
post_id: str
author: str
content: str
timestamp: datetime
url: str
engagement: Dict[str, int] = field(default_factory=dict)
media_urls: List[str] = field(default_factory=list)
archived_urls: List[str] = field(default_factory=list)
content_hash: str = ""
def __post_init__(self):
# Hash content for duplicate detection
self.content_hash = hashlib.md5(
f"{self.platform.value}:{self.content}".encode()
).hexdigest()
@dataclass
class MonitoringQuery:
keywords: List[str]
platforms: List[Platform]
accounts: List[str] = field(default_factory=list)
hashtags: List[str] = field(default_factory=list)
exclude_terms: List[str] = field(default_factory=list)
start_date: Optional[datetime] = None
def to_search_string(self, platform: Platform) -> str:
"""Generate platform-specific search query."""
parts = []
# Keywords
if self.keywords:
parts.append(' OR '.join(f'"{k}"' for k in self.keywords))
# Hashtags
if self.hashtags:
parts.append(' OR '.join(f'#{h}' for h in self.hashtags))
# Exclusions
if self.exclude_terms:
parts.append(' '.join(f'-{t}' for t in self.exclude_terms))
return ' '.join(parts)
Breaking news monitor
from collections import defaultdict
from datetime import datetime, timedelta
class BreakingNewsDetector:
"""Detect sudden spikes in keyword mentions."""
def __init__(self, baseline_window_hours: int = 24):
self.baseline_window = timedelta(hours=baseline_window_hours)
self.mention_history = defaultdict(list)
def add_mention(self, keyword: str, timestamp: datetime):
"""Record a mention of a keyword."""
self.mention_history[keyword].append(timestamp)
# Prune old data
cutoff = datetime.now() - self.baseline_window * 2
self.mention_history[keyword] = [
t for t in self.mention_history[keyword] if t > cutoff
]
def is_spiking(self, keyword: str, threshold_multiplier: float = 3.0) -> bool:
"""Check if keyword is spiking above baseline."""
now = datetime.now()
recent = sum(1 for t in self.mention_history[keyword]
if t > now - timedelta(hours=1))
baseline_hourly = len([
t for t in self.mention_history[keyword]
if t > now - self.baseline_window
]) / self.baseline_window.total_seconds() * 3600
if baseline_hourly == 0:
return recent > 10 # Arbitrary threshold for new topics
return recent > baseline_hourly * threshold_multiplier
def get_trending(self, top_n: int = 10) -> List[tuple]:
"""Get keywords sorted by spike intensity."""
spikes = []
for keyword in self.mention_history:
if self.is_spiking(keyword):
recent = sum(1 for t in self.mention_history[keyword]
if t > datetime.now() - timedelta(hours=1))
spikes.append((keyword, recent))
return sorted(spikes, key=lambda x: x[1], reverse=True)[:top_n]
Account analysis
Authenticity indicators
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
@dataclass
class AccountAnalysis:
username: str
platform: Platform
created_date: Optional[datetime] = None
follower_count: int = 0
following_count: int = 0
post_count: int = 0
# Authenticity signals
profile_photo_is_stock: Optional[bool] = None
bio_contains_keywords: List[str] = field(default_factory=list)
posts_primarily_reshares: Optional[bool] = None
posting_pattern_irregular: Optional[bool] = None
engagement_ratio_suspicious: Optional[bool] = None
def calculate_red_flags(self) -> dict:
"""Score account authenticity."""
flags = {}
# Account age
if self.created_date:
age_days = (datetime.now() - self.created_date).days
if age_days < 30:
flags['new_account'] = f"Created {age_days} days ago"
# Follower ratio
if self.following_count > 0:
ratio = self.follower_count / self.following_count
if ratio < 0.1:
flags['low_follower_ratio'] = f"Ratio: {ratio:.2f}"
# Posting frequency
if self.created_date and self.post_count > 0:
age_days = max(1, (datetime.now() - self.created_date).days)
posts_per_day = self.post_count / age_days
if posts_per_day > 50:
flags['excessive_posting'] = f"{posts_per_day:.0f} posts/day"
# Stock photo check
if self.profile_photo_is_stock:
flags['stock_profile_photo'] = "Profile appears to be stock image"
return flags
def authenticity_score(self) -> int:
"""0-100 score, higher = more likely authentic."""
score = 100
flags = self.calculate_red_flags()
penalty_per_flag = 20
score -= len(flags) * penalty_per_flag
return max(0, score)
Network mapping
from collections import defaultdict
from typing import Set, Dict
class AccountNetwork:
"""Map relationships between accounts."""
def __init__(self):
self.interactions = defaultdict(lambda: defaultdict(int))
self.accounts = {}
def add_interaction(self, from_account: str, to_account: str,
interaction_type: str = "mention"):
"""Record an interaction between accounts."""
self.interactions[from_account][to_account] += 1
def find_clusters(self, min_interactions: int = 3) -> List[Set[str]]:
"""Find groups of accounts that frequently interact."""
# Build adjacency with minimum threshold
adjacency = defaultdict(set)
for from_acc, targets in self.interactions.items():
for to_acc, count in targets.items():
if count >= min_interactions:
adjacency[from_acc].add(to_acc)
adjacency[to_acc].add(from_acc)
# Find connected components
visited = set()
clusters = []
for account in adjacency:
if account in visited:
continue
cluster = set()
stack = [account]
while stack:
current = stack.pop()
if current in visited:
continue
visited.add(current)
cluster.add(current)
stack.extend(adjacency[current] - visited)
if len(cluster) > 1:
clusters.append(cluster)
return sorted(clusters, key=len, reverse=True)
def coordination_score(self, accounts: Set[str]) -> float:
"""Score how coordinated a group of