名称: 社交媒体情报
描述: 社交媒体监控、叙事跟踪和开源情报,用于记者。在追踪病毒内容传播、分析协调活动、监测社交媒体平台上的突发新闻、调查账户真实性或检测虚假信息模式时使用。对报道在线叙事和数字调查的记者至关重要。
社交媒体情报
用于新闻工作的社交媒体监控、分析和调查的系统方法。
何时激活
- 追踪故事在平台间的传播
- 调查潜在的协调虚假行为
- 监测社交媒体平台上的突发新闻
- 分析账户网络和关系
- 检测机器人活动或操纵活动
- 为数字调查构建证据链
- 在删除前存档社交内容
实时监控
多平台追踪器
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional, Dict
from enum import Enum
import hashlib
class Platform(Enum):
TWITTER = "twitter"
FACEBOOK = "facebook"
INSTAGRAM = "instagram"
TIKTOK = "tiktok"
YOUTUBE = "youtube"
REDDIT = "reddit"
THREADS = "threads"
BLUESKY = "bluesky"
MASTODON = "mastodon"
@dataclass
class SocialPost:
platform: Platform
post_id: str
author: str
content: str
timestamp: datetime
url: str
engagement: Dict[str, int] = field(default_factory=dict)
media_urls: List[str] = field(default_factory=list)
archived_urls: List[str] = field(default_factory=list)
content_hash: str = ""
def __post_init__(self):
# 哈希内容以检测重复
self.content_hash = hashlib.md5(
f"{self.platform.value}:{self.content}".encode()
).hexdigest()
@dataclass
class MonitoringQuery:
keywords: List[str]
platforms: List[Platform]
accounts: List[str] = field(default_factory=list)
hashtags: List[str] = field(default_factory=list)
exclude_terms: List[str] = field(default_factory=list)
start_date: Optional[datetime] = None
def to_search_string(self, platform: Platform) -> str:
"""生成平台特定的搜索查询。"""
parts = []
# 关键词
if self.keywords:
parts.append(' OR '.join(f'"{k}"' for k in self.keywords))
# 话题标签
if self.hashtags:
parts.append(' OR '.join(f'#{h}' for h in self.hashtags))
# 排除项
if self.exclude_terms:
parts.append(' '.join(f'-{t}' for t in self.exclude_terms))
return ' '.join(parts)
突发新闻监测器
from collections import defaultdict
from datetime import datetime, timedelta
class BreakingNewsDetector:
"""检测关键词提及的突然激增。"""
def __init__(self, baseline_window_hours: int = 24):
self.baseline_window = timedelta(hours=baseline_window_hours)
self.mention_history = defaultdict(list)
def add_mention(self, keyword: str, timestamp: datetime):
"""记录关键词的提及。"""
self.mention_history[keyword].append(timestamp)
# 修剪旧数据
cutoff = datetime.now() - self.baseline_window * 2
self.mention_history[keyword] = [
t for t in self.mention_history[keyword] if t > cutoff
]
def is_spiking(self, keyword: str, threshold_multiplier: float = 3.0) -> bool:
"""检查关键词是否超过基线激增。"""
now = datetime.now()
recent = sum(1 for t in self.mention_history[keyword]
if t > now - timedelta(hours=1))
baseline_hourly = len([
t for t in self.mention_history[keyword]
if t > now - self.baseline_window
]) / self.baseline_window.total_seconds() * 3600
if baseline_hourly == 0:
return recent > 10 # 对新主题的任意阈值
return recent > baseline_hourly * threshold_multiplier
def get_trending(self, top_n: int = 10) -> List[tuple]:
"""获取按激增强度排序的关键词。"""
spikes = []
for keyword in self.mention_history:
if self.is_spiking(keyword):
recent = sum(1 for t in self.mention_history[keyword]
if t > datetime.now() - timedelta(hours=1))
spikes.append((keyword, recent))
return sorted(spikes, key=lambda x: x[1], reverse=True)[:top_n]
账户分析
真实性指标
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
@dataclass
class AccountAnalysis:
username: str
platform: Platform
created_date: Optional[datetime] = None
follower_count: int = 0
following_count: int = 0
post_count: int = 0
# 真实性信号
profile_photo_is_stock: Optional[bool] = None
bio_contains_keywords: List[str] = field(default_factory=list)
posts_primarily_reshares: Optional[bool] = None
posting_pattern_irregular: Optional[bool] = None
engagement_ratio_suspicious: Optional[bool] = None
def calculate_red_flags(self) -> dict:
"""为账户真实性打分。"""
flags = {}
# 账户年龄
if self.created_date:
age_days = (datetime.now() - self.created_date).days
if age_days < 30:
flags['new_account'] = f"创建于 {age_days} 天前"
# 粉丝比例
if self.following_count > 0:
ratio = self.follower_count / self.following_count
if ratio < 0.1:
flags['low_follower_ratio'] = f"比例: {ratio:.2f}"
# 发帖频率
if self.created_date and self.post_count > 0:
age_days = max(1, (datetime.now() - self.created_date).days)
posts_per_day = self.post_count / age_days
if posts_per_day > 50:
flags['excessive_posting'] = f"{posts_per_day:.0f} 条/天"
# 库存照片检查
if self.profile_photo_is_stock:
flags['stock_profile_photo'] = "个人资料图片似乎是库存图像"
return flags
def authenticity_score(self) -> int:
"""0-100 分,越高越可能真实。"""
score = 100
flags = self.calculate_red_flags()
penalty_per_flag = 20
score -= len(flags) * penalty_per_flag
return max(0, score)
网络映射
from collections import defaultdict
from typing import Set, Dict
class AccountNetwork:
"""映射账户间的关系。"""
def __init__(self):
self.interactions = defaultdict(lambda: defaultdict(int))
self.accounts = {}
def add_interaction(self, from_account: str, to_account: str,
interaction_type: str = "mention"):
"""记录账户间的交互。"""
self.interactions[from_account][to_account] += 1
def find_clusters(self, min_interactions: int = 3) -> List[Set[str]]:
"""找到频繁交互的账户组。"""
# 构建具有最小阈值的邻接关系
adjacency = defaultdict(set)
for from_acc, targets in self.interactions.items():
for to_acc, count in targets.items():
if count >= min_interactions:
adjacency[from_acc].add(to_acc)
adjacency[to_acc].add(from_acc)
# 找到连通组件
visited = set()
clusters = []
for account in adjacency:
if account in visited:
continue
cluster = set()
stack = [account]
while stack:
current = stack.pop()
if current in visited:
continue
visited.add(current)
cluster.add(current)
stack.extend(adjacency[current] - visited)
if len(cluster) > 1:
clusters.append(cluster)
return sorted(clusters, key=len, reverse=True)
def coordination_score(self, accounts: Set[str]) -> float:
"""为账户组的协调程度打分。"""
if len(accounts) < 2:
return 0.0
total_possible = len(accounts) * (len(accounts) - 1)
actual_connections = 0
for acc in accounts:
for other in accounts:
if acc != other and self.interactions[acc][other] > 0:
actual_connections += 1
return actual_connections / total_possible if total_possible > 0 else 0
叙事跟踪
声明传播追踪器
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Optional
@dataclass
class Claim:
text: str
first_seen: datetime
first_seen_url: str
variations: List[str] = field(default_factory=list)
appearances: List[Dict] = field(default_factory=list)
def add_appearance(self, url: str, platform: Platform,
timestamp: datetime, author: str):
"""跟踪此声明出现的地方。"""
self.appearances.append({
'url': url,
'platform': platform.value,
'timestamp': timestamp,
'author': author
})
def spread_timeline(self) -> List[Dict]:
"""获取声明的按时间顺序的传播。"""
return sorted(self.appearances, key=lambda x: x['timestamp'])
def platforms_reached(self) -> Dict[str, int]:
"""按平台统计出现次数。"""
counts = defaultdict(int)
for app in self.appearances:
counts[app['platform']] += 1
return dict(counts)
def velocity(self, window_hours: int = 24) -> float:
"""计算传播速率,以每小时出现次数计。"""
if not self.appearances:
return 0.0
recent = [
a for a in self.appearances
if a['timestamp'] > datetime.now() - timedelta(hours=window_hours)
]
return len(recent) / window_hours
话题标签分析
from collections import Counter
from datetime import datetime, timedelta
class HashtagAnalyzer:
"""分析话题标签使用模式。"""
def __init__(self):
self.hashtag_posts = defaultdict(list)
def add_post(self, hashtags: List[str], post: SocialPost):
"""记录帖子的标签。"""
for tag in hashtags:
self.hashtag_posts[tag.lower()].append(post)
def co_occurrence(self, hashtag: str, top_n: int = 10) -> List[tuple]:
"""找到常与此标签一起出现的标签。"""
co_tags = Counter()
for post in self.hashtag_posts.get(hashtag.lower(), []):
# 从帖子内容中提取标签
tags = [
word.lower() for word in post.content.split()
if word.startswith('#')
]
for tag in tags:
if tag != f'#{hashtag.lower()}':
co_tags[tag] += 1
return co_tags.most_common(top_n)
def posting_pattern(self, hashtag: str) -> Dict:
"""分析带有此标签的帖子的出现时间。"""
posts = self.hashtag_posts.get(hashtag.lower(), [])
hour_counts = Counter(p.timestamp.hour for p in posts)
day_counts = Counter(p.timestamp.strftime('%A') for p in posts)
return {
'by_hour': dict(hour_counts),
'by_day': dict(day_counts),
'total_posts': len(posts),
'unique_authors': len(set(p.author for p in posts))
}
证据保存
在消失前存档
import requests
from datetime import datetime
from typing import Optional
class SocialArchiver:
"""在删除前存档社交内容。"""
def __init__(self):
self.archived = {}
def archive_to_wayback(self, url: str) -> Optional[str]:
"""将URL提交到Internet Archive。"""
try:
save_url = f"https://web.archive.org/save/{url}"
response = requests.get(save_url, timeout=30)
if response.status_code == 200:
archived_url = response.url
self.archived[url] = {
'wayback': archived_url,
'archived_at': datetime.now().isoformat()
}
return archived_url
except Exception as e:
print(f"存档失败: {e}")
return None
def archive_to_archive_today(self, url: str) -> Optional[str]:
"""将URL提交到archive.today。"""
try:
response = requests.post(
'https://archive.today/submit/',
data={'url': url},
timeout=60
)
if response.status_code == 200:
return response.url
except Exception as e:
print(f"archive.today失败: {e}")
return None
def full_archive(self, url: str) -> dict:
"""存档到多个服务以确保冗余。"""
results = {
'original_url': url,
'archived_at': datetime.now().isoformat(),
'archives': {}
}
wayback = self.archive_to_wayback(url)
if wayback:
results['archives']['wayback'] = wayback
archive_today = self.archive_to_archive_today(url)
if archive_today:
results['archives']['archive_today'] = archive_today
return results
协调检测
行为信号检查清单
## 协调虚假行为指标
### 时间模式
- [ ] 多个账户在几分钟内发布相同内容
- [ ] 跨账户同步发布的时间
- [ ] 爆发活动后休眠
- [ ] 帖子出现速度超过人类打字速度
### 内容模式
- [ ] 跨账户相同或近乎相同的文本
- [ ] 多个账户分享相同的图像/媒体
- [ ] 相同的拼写错误或格式错误
- [ ] 可见的复制粘贴痕迹
### 账户模式
- [ ] 账户在同一时间创建
- [ ] 相似的命名约定(名称 + 数字)
- [ ] 通用或库存个人资料照片
- [ ] 最少的个人内容,主要是分享
- [ ] 关注相同的账户
- [ ] 相互间不成比例地互动
### 网络模式
- [ ] 在网络分析中形成密集集群
- [ ] 放大相同的外部来源
- [ ] 针对相同的账户或标签
- [ ] 可见的跨平台协调
自动化协调打分
def coordination_likelihood(posts: List[SocialPost]) -> dict:
"""为帖子代表协调活动的可能性打分。"""
if len(posts) < 2:
return {'score': 0, 'signals': []}
signals = []
score = 0
# 检查相同内容
contents = [p.content for p in posts]
unique_contents = set(contents)
if len(unique_contents) < len(contents) * 0.5:
signals.append("高内容重复")
score += 30
# 检查时间集群
timestamps = sorted(p.timestamp for p in posts)
rapid_posts = 0
for i in range(1, len(timestamps)):
if (timestamps[i] - timestamps[i-1]).seconds < 60:
rapid_posts += 1
if rapid_posts > len(posts) * 0.3:
signals.append("可疑时间集群")
score += 25
# 检查独特作者
authors = set(p.author for p in posts)
if len(authors) > 5 and len(contents) / len(authors) > 2:
signals.append("作者少,相似帖子多")
score += 20
return {
'score': min(100, score),
'signals': signals,
'posts_analyzed': len(posts),
'unique_authors': len(authors)
}
平台特定工具
| 平台 |
监控工具 |
备注 |
| Twitter/X |
TweetDeck, Brandwatch |
API 访问日益受限 |
| Facebook |
CrowdTangle(有限) |
现仅学术访问 |
| Instagram |
Later, Brandwatch |
无公开搜索 API |
| TikTok |
Exolyt, Pentos |
历史数据有限 |
| Reddit |
Pushshift, Arctic Shift |
存档访问各异 |
| YouTube |
YouTube Data API |
良好的元数据访问 |
| Bluesky |
Firehose API |
开放,实时访问 |
道德指南
- 仅存档公开内容
- 不为监控创建虚假账户
- 尊重平台服务条款
- 保护分享社交内容的来源
- 在发布协调相关声明前验证
- 在放大有害内容前考虑上下文
相关技能
- 来源验证 - 验证在社交媒体上找到的账户和声明
- 网络爬取 - 公开内容的程序化收集
- 数据新闻 - 分析社交数据以发现模式
技能元数据
| 字段 |
值 |
| 版本 |
1.0.0 |
| 创建日期 |
2025-12-26 |
| 作者 |
Claude Skills for Journalism |
| 领域 |
新闻学, OSINT |
| 复杂度 |
高级 |