社交媒体情报Skill social-media-intelligence

社交媒体情报技能专注于监控、分析和调查社交媒体数据,以支持新闻工作者的调查和报道。它涉及追踪病毒内容传播、检测协调活动、监测突发新闻、验证账户真实性,并识别虚假信息模式。关键词:社交媒体监控、叙事跟踪、OSINT、新闻调查、数据分析、开源情报。

数据分析 0 次安装 0 次浏览 更新于 3/15/2026

名称: 社交媒体情报 描述: 社交媒体监控、叙事跟踪和开源情报,用于记者。在追踪病毒内容传播、分析协调活动、监测社交媒体平台上的突发新闻、调查账户真实性或检测虚假信息模式时使用。对报道在线叙事和数字调查的记者至关重要。

社交媒体情报

用于新闻工作的社交媒体监控、分析和调查的系统方法。

何时激活

  • 追踪故事在平台间的传播
  • 调查潜在的协调虚假行为
  • 监测社交媒体平台上的突发新闻
  • 分析账户网络和关系
  • 检测机器人活动或操纵活动
  • 为数字调查构建证据链
  • 在删除前存档社交内容

实时监控

多平台追踪器

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional, Dict
from enum import Enum
import hashlib

class Platform(Enum):
    TWITTER = "twitter"
    FACEBOOK = "facebook"
    INSTAGRAM = "instagram"
    TIKTOK = "tiktok"
    YOUTUBE = "youtube"
    REDDIT = "reddit"
    THREADS = "threads"
    BLUESKY = "bluesky"
    MASTODON = "mastodon"

@dataclass
class SocialPost:
    platform: Platform
    post_id: str
    author: str
    content: str
    timestamp: datetime
    url: str
    engagement: Dict[str, int] = field(default_factory=dict)
    media_urls: List[str] = field(default_factory=list)
    archived_urls: List[str] = field(default_factory=list)
    content_hash: str = ""

    def __post_init__(self):
        # 哈希内容以检测重复
        self.content_hash = hashlib.md5(
            f"{self.platform.value}:{self.content}".encode()
        ).hexdigest()

@dataclass
class MonitoringQuery:
    keywords: List[str]
    platforms: List[Platform]
    accounts: List[str] = field(default_factory=list)
    hashtags: List[str] = field(default_factory=list)
    exclude_terms: List[str] = field(default_factory=list)
    start_date: Optional[datetime] = None

    def to_search_string(self, platform: Platform) -> str:
        """生成平台特定的搜索查询。"""
        parts = []

        # 关键词
        if self.keywords:
            parts.append(' OR '.join(f'"{k}"' for k in self.keywords))

        # 话题标签
        if self.hashtags:
            parts.append(' OR '.join(f'#{h}' for h in self.hashtags))

        # 排除项
        if self.exclude_terms:
            parts.append(' '.join(f'-{t}' for t in self.exclude_terms))

        return ' '.join(parts)

突发新闻监测器

from collections import defaultdict
from datetime import datetime, timedelta

class BreakingNewsDetector:
    """检测关键词提及的突然激增。"""

    def __init__(self, baseline_window_hours: int = 24):
        self.baseline_window = timedelta(hours=baseline_window_hours)
        self.mention_history = defaultdict(list)

    def add_mention(self, keyword: str, timestamp: datetime):
        """记录关键词的提及。"""
        self.mention_history[keyword].append(timestamp)
        # 修剪旧数据
        cutoff = datetime.now() - self.baseline_window * 2
        self.mention_history[keyword] = [
            t for t in self.mention_history[keyword] if t > cutoff
        ]

    def is_spiking(self, keyword: str, threshold_multiplier: float = 3.0) -> bool:
        """检查关键词是否超过基线激增。"""
        now = datetime.now()
        recent = sum(1 for t in self.mention_history[keyword]
                    if t > now - timedelta(hours=1))

        baseline_hourly = len([
            t for t in self.mention_history[keyword]
            if t > now - self.baseline_window
        ]) / self.baseline_window.total_seconds() * 3600

        if baseline_hourly == 0:
            return recent > 10  # 对新主题的任意阈值

        return recent > baseline_hourly * threshold_multiplier

    def get_trending(self, top_n: int = 10) -> List[tuple]:
        """获取按激增强度排序的关键词。"""
        spikes = []
        for keyword in self.mention_history:
            if self.is_spiking(keyword):
                recent = sum(1 for t in self.mention_history[keyword]
                           if t > datetime.now() - timedelta(hours=1))
                spikes.append((keyword, recent))

        return sorted(spikes, key=lambda x: x[1], reverse=True)[:top_n]

账户分析

真实性指标

from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional

@dataclass
class AccountAnalysis:
    username: str
    platform: Platform
    created_date: Optional[datetime] = None
    follower_count: int = 0
    following_count: int = 0
    post_count: int = 0

    # 真实性信号
    profile_photo_is_stock: Optional[bool] = None
    bio_contains_keywords: List[str] = field(default_factory=list)
    posts_primarily_reshares: Optional[bool] = None
    posting_pattern_irregular: Optional[bool] = None
    engagement_ratio_suspicious: Optional[bool] = None

    def calculate_red_flags(self) -> dict:
        """为账户真实性打分。"""
        flags = {}

        # 账户年龄
        if self.created_date:
            age_days = (datetime.now() - self.created_date).days
            if age_days < 30:
                flags['new_account'] = f"创建于 {age_days} 天前"

        # 粉丝比例
        if self.following_count > 0:
            ratio = self.follower_count / self.following_count
            if ratio < 0.1:
                flags['low_follower_ratio'] = f"比例: {ratio:.2f}"

        # 发帖频率
        if self.created_date and self.post_count > 0:
            age_days = max(1, (datetime.now() - self.created_date).days)
            posts_per_day = self.post_count / age_days
            if posts_per_day > 50:
                flags['excessive_posting'] = f"{posts_per_day:.0f} 条/天"

        # 库存照片检查
        if self.profile_photo_is_stock:
            flags['stock_profile_photo'] = "个人资料图片似乎是库存图像"

        return flags

    def authenticity_score(self) -> int:
        """0-100 分,越高越可能真实。"""
        score = 100
        flags = self.calculate_red_flags()

        penalty_per_flag = 20
        score -= len(flags) * penalty_per_flag

        return max(0, score)

网络映射

from collections import defaultdict
from typing import Set, Dict

class AccountNetwork:
    """映射账户间的关系。"""

    def __init__(self):
        self.interactions = defaultdict(lambda: defaultdict(int))
        self.accounts = {}

    def add_interaction(self, from_account: str, to_account: str,
                       interaction_type: str = "mention"):
        """记录账户间的交互。"""
        self.interactions[from_account][to_account] += 1

    def find_clusters(self, min_interactions: int = 3) -> List[Set[str]]:
        """找到频繁交互的账户组。"""
        # 构建具有最小阈值的邻接关系
        adjacency = defaultdict(set)
        for from_acc, targets in self.interactions.items():
            for to_acc, count in targets.items():
                if count >= min_interactions:
                    adjacency[from_acc].add(to_acc)
                    adjacency[to_acc].add(from_acc)

        # 找到连通组件
        visited = set()
        clusters = []

        for account in adjacency:
            if account in visited:
                continue

            cluster = set()
            stack = [account]

            while stack:
                current = stack.pop()
                if current in visited:
                    continue
                visited.add(current)
                cluster.add(current)
                stack.extend(adjacency[current] - visited)

            if len(cluster) > 1:
                clusters.append(cluster)

        return sorted(clusters, key=len, reverse=True)

    def coordination_score(self, accounts: Set[str]) -> float:
        """为账户组的协调程度打分。"""
        if len(accounts) < 2:
            return 0.0

        total_possible = len(accounts) * (len(accounts) - 1)
        actual_connections = 0

        for acc in accounts:
            for other in accounts:
                if acc != other and self.interactions[acc][other] > 0:
                    actual_connections += 1

        return actual_connections / total_possible if total_possible > 0 else 0

叙事跟踪

声明传播追踪器

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Optional

@dataclass
class Claim:
    text: str
    first_seen: datetime
    first_seen_url: str
    variations: List[str] = field(default_factory=list)
    appearances: List[Dict] = field(default_factory=list)

    def add_appearance(self, url: str, platform: Platform,
                       timestamp: datetime, author: str):
        """跟踪此声明出现的地方。"""
        self.appearances.append({
            'url': url,
            'platform': platform.value,
            'timestamp': timestamp,
            'author': author
        })

    def spread_timeline(self) -> List[Dict]:
        """获取声明的按时间顺序的传播。"""
        return sorted(self.appearances, key=lambda x: x['timestamp'])

    def platforms_reached(self) -> Dict[str, int]:
        """按平台统计出现次数。"""
        counts = defaultdict(int)
        for app in self.appearances:
            counts[app['platform']] += 1
        return dict(counts)

    def velocity(self, window_hours: int = 24) -> float:
        """计算传播速率,以每小时出现次数计。"""
        if not self.appearances:
            return 0.0

        recent = [
            a for a in self.appearances
            if a['timestamp'] > datetime.now() - timedelta(hours=window_hours)
        ]
        return len(recent) / window_hours

话题标签分析

from collections import Counter
from datetime import datetime, timedelta

class HashtagAnalyzer:
    """分析话题标签使用模式。"""

    def __init__(self):
        self.hashtag_posts = defaultdict(list)

    def add_post(self, hashtags: List[str], post: SocialPost):
        """记录帖子的标签。"""
        for tag in hashtags:
            self.hashtag_posts[tag.lower()].append(post)

    def co_occurrence(self, hashtag: str, top_n: int = 10) -> List[tuple]:
        """找到常与此标签一起出现的标签。"""
        co_tags = Counter()

        for post in self.hashtag_posts.get(hashtag.lower(), []):
            # 从帖子内容中提取标签
            tags = [
                word.lower() for word in post.content.split()
                if word.startswith('#')
            ]
            for tag in tags:
                if tag != f'#{hashtag.lower()}':
                    co_tags[tag] += 1

        return co_tags.most_common(top_n)

    def posting_pattern(self, hashtag: str) -> Dict:
        """分析带有此标签的帖子的出现时间。"""
        posts = self.hashtag_posts.get(hashtag.lower(), [])

        hour_counts = Counter(p.timestamp.hour for p in posts)
        day_counts = Counter(p.timestamp.strftime('%A') for p in posts)

        return {
            'by_hour': dict(hour_counts),
            'by_day': dict(day_counts),
            'total_posts': len(posts),
            'unique_authors': len(set(p.author for p in posts))
        }

证据保存

在消失前存档

import requests
from datetime import datetime
from typing import Optional

class SocialArchiver:
    """在删除前存档社交内容。"""

    def __init__(self):
        self.archived = {}

    def archive_to_wayback(self, url: str) -> Optional[str]:
        """将URL提交到Internet Archive。"""
        try:
            save_url = f"https://web.archive.org/save/{url}"
            response = requests.get(save_url, timeout=30)

            if response.status_code == 200:
                archived_url = response.url
                self.archived[url] = {
                    'wayback': archived_url,
                    'archived_at': datetime.now().isoformat()
                }
                return archived_url
        except Exception as e:
            print(f"存档失败: {e}")
        return None

    def archive_to_archive_today(self, url: str) -> Optional[str]:
        """将URL提交到archive.today。"""
        try:
            response = requests.post(
                'https://archive.today/submit/',
                data={'url': url},
                timeout=60
            )
            if response.status_code == 200:
                return response.url
        except Exception as e:
            print(f"archive.today失败: {e}")
        return None

    def full_archive(self, url: str) -> dict:
        """存档到多个服务以确保冗余。"""
        results = {
            'original_url': url,
            'archived_at': datetime.now().isoformat(),
            'archives': {}
        }

        wayback = self.archive_to_wayback(url)
        if wayback:
            results['archives']['wayback'] = wayback

        archive_today = self.archive_to_archive_today(url)
        if archive_today:
            results['archives']['archive_today'] = archive_today

        return results

协调检测

行为信号检查清单

## 协调虚假行为指标

### 时间模式
- [ ] 多个账户在几分钟内发布相同内容
- [ ] 跨账户同步发布的时间
- [ ] 爆发活动后休眠
- [ ] 帖子出现速度超过人类打字速度

### 内容模式
- [ ] 跨账户相同或近乎相同的文本
- [ ] 多个账户分享相同的图像/媒体
- [ ] 相同的拼写错误或格式错误
- [ ] 可见的复制粘贴痕迹

### 账户模式
- [ ] 账户在同一时间创建
- [ ] 相似的命名约定(名称 + 数字)
- [ ] 通用或库存个人资料照片
- [ ] 最少的个人内容,主要是分享
- [ ] 关注相同的账户
- [ ] 相互间不成比例地互动

### 网络模式
- [ ] 在网络分析中形成密集集群
- [ ] 放大相同的外部来源
- [ ] 针对相同的账户或标签
- [ ] 可见的跨平台协调

自动化协调打分

def coordination_likelihood(posts: List[SocialPost]) -> dict:
    """为帖子代表协调活动的可能性打分。"""

    if len(posts) < 2:
        return {'score': 0, 'signals': []}

    signals = []
    score = 0

    # 检查相同内容
    contents = [p.content for p in posts]
    unique_contents = set(contents)
    if len(unique_contents) < len(contents) * 0.5:
        signals.append("高内容重复")
        score += 30

    # 检查时间集群
    timestamps = sorted(p.timestamp for p in posts)
    rapid_posts = 0
    for i in range(1, len(timestamps)):
        if (timestamps[i] - timestamps[i-1]).seconds < 60:
            rapid_posts += 1

    if rapid_posts > len(posts) * 0.3:
        signals.append("可疑时间集群")
        score += 25

    # 检查独特作者
    authors = set(p.author for p in posts)
    if len(authors) > 5 and len(contents) / len(authors) > 2:
        signals.append("作者少,相似帖子多")
        score += 20

    return {
        'score': min(100, score),
        'signals': signals,
        'posts_analyzed': len(posts),
        'unique_authors': len(authors)
    }

平台特定工具

平台 监控工具 备注
Twitter/X TweetDeck, Brandwatch API 访问日益受限
Facebook CrowdTangle(有限) 现仅学术访问
Instagram Later, Brandwatch 无公开搜索 API
TikTok Exolyt, Pentos 历史数据有限
Reddit Pushshift, Arctic Shift 存档访问各异
YouTube YouTube Data API 良好的元数据访问
Bluesky Firehose API 开放,实时访问

道德指南

  • 仅存档公开内容
  • 不为监控创建虚假账户
  • 尊重平台服务条款
  • 保护分享社交内容的来源
  • 在发布协调相关声明前验证
  • 在放大有害内容前考虑上下文

相关技能

  • 来源验证 - 验证在社交媒体上找到的账户和声明
  • 网络爬取 - 公开内容的程序化收集
  • 数据新闻 - 分析社交数据以发现模式

技能元数据

字段
版本 1.0.0
创建日期 2025-12-26
作者 Claude Skills for Journalism
领域 新闻学, OSINT
复杂度 高级