数字档案构建技能Skill digital-archive

该技能专注于使用人工智能技术构建和管理数字档案,包括多源数据集成、AI驱动分类、实体提取、知识图构建、PDF生成和数据验证。适用于内容归档、智能分类、知识发现和档案管理,关键词包括数字档案、AI增强、实体提取、知识图、数据集成、NLP、AI应用。

AI应用 0 次安装 0 次浏览 更新于 3/15/2026

名称: digital-archive 描述: 带有AI增强、实体提取和知识图构建的数字档案工作流程。在构建内容档案、实施AI驱动分类、提取实体和关系或集成多个数据源时使用。涵盖Jay Rosen数字档案项目的模式。

数字档案方法

构建生产质量数字档案的模式,具有AI驱动分析和知识图构建。

档案架构

多源集成模式

┌─────────────────┐    ┌──────────────────┐    ┌────────────────┐
│  OCR管道        │    │  网络爬取        │    │  社交媒体      │
│  (报纸)         │    │  (文章)          │    │  (转录)        │
└────────┬────────┘    └────────┬─────────┘    └───────┬────────┘
         │                      │                      │
         └──────────────────────┼──────────────────────┘
                                │
                    ┌───────────▼───────────┐
                    │  统一模式             │
                    │  (35+字段)           │
                    └───────────┬───────────┘
                                │
         ┌──────────────────────┼──────────────────────┐
         │                      │                      │
┌────────▼────────┐  ┌──────────▼──────────┐  ┌───────▼───────┐
│  AI增强         │  │  实体提取           │  │  PDF档案      │
│  (Gemini)       │  │  (知识图)           │  │  (WCAG 2.1)   │
└────────┬────────┘  └──────────┬──────────┘  └───────┬───────┘
         │                      │                      │
         └──────────────────────┼──────────────────────┘
                                │
                    ┌───────────▼───────────┐
                    │  Google Sheets        │
                    │  (主数据库)           │
                    └───────────┬───────────┘
                                │
                    ┌───────────▼───────────┐
                    │  前端导出             │
                    │  (JSON/CSV)           │
                    └───────────────────────┘

统一模式设计

from dataclasses import dataclass, field
from datetime import date
from typing import Optional
from enum import Enum

class ContentType(Enum):
    ARTICLE = '文章'
    VIDEO = '视频'
    AUDIO = '音频'
    SOCIAL = '社交媒体帖子'
    NEWSPAPER = '报纸文章'

class ThematicCategory(Enum):
    PRESS_CRITICISM = '新闻与媒体批评'
    JOURNALISM_THEORY = '新闻学理论'
    POLITICS = '政治与民主'
    TECHNOLOGY = '技术与数字媒体'
    EDUCATION = '新闻教育'
    AUDIENCE = '受众与公众参与'

class HistoricalEra(Enum):
    ERA_1990s = '1990-1999'
    ERA_2000_04 = '2000-2004'
    ERA_2005_09 = '2005-2009'
    ERA_2010_15 = '2010-2015'
    ERA_2016_20 = '2016-2020'
    ERA_2021_PRESENT = '2021至今'

@dataclass
class ArchiveRecord:
    # 核心标识符
    id: str                              # 格式: SOURCE-00001
    url: str
    title: str

    # 内容
    author: Optional[str] = None
    publication_date: Optional[date] = None
    publication: Optional[str] = None
    content_type: ContentType = ContentType.ARTICLE
    text: str = ''

    # AI增强字段
    summary: Optional[str] = None
    pull_quote: Optional[str] = None
    categories: list[ThematicCategory] = field(default_factory=list)
    key_concepts: list[str] = field(default_factory=list)
    tags: list[str] = field(default_factory=list)
    era: Optional[HistoricalEra] = None
    scope: Optional[str] = None  # 理论、评论、案例研究等

    # 实体引用
    entities_mentioned: list[str] = field(default_factory=list)
    related_to: list[str] = field(default_factory=list)
    responds_to: list[str] = field(default_factory=list)

    # 档案元数据
    pdf_url: Optional[str] = None
    transcript_url: Optional[str] = None
    verified: bool = False
    processing_status: str = '待处理'
    last_updated: Optional[date] = None

def generate_record_id(source: str, sequence: int) -> str:
    """生成带有源前缀的唯一ID。"""
    prefixes = {
        'nytimes': 'NYT',
        'columbia journalism review': 'CJR',
        'pressthink': 'PT',
        'twitter': 'TW',
        'youtube': 'YT',
        'newspaper': 'NEWS',
    }
    prefix = prefixes.get(source.lower(), 'MISC')
    return f"{prefix}-{sequence:05d}"

AI驱动分类

基于分类法的分类

import google.generativeai as genai
import json
from typing import Optional

TAXONOMY = {
    "thematic_categories": [
        "新闻与媒体批评",
        "新闻学理论",
        "政治与民主",
        "技术与数字媒体",
        "新闻教育",
        "受众与公众参与"
    ],
    "key_concepts": [
        "无所不包的视角",
        "验证与断言",
        "公民与消费者",
        "公共新闻学",
        "罗森测试",
        "精明与天真",
        "专业与业余",
        "生产与分发",
        "信任与透明",
        "赛马式报道",
        "双方新闻学",
        "受众原子化",
        "精明教会"
    ],
    "scope_types": [
        "理论",
        "评论",
        "历史",
        "案例研究",
        "教学",
        "个人反思"
    ]
}

class ArchiveCategorizer:
    def __init__(self, model: str = 'gemini-2.0-flash'):
        self.model = genai.GenerativeModel(model)

    def categorize(self, record: ArchiveRecord) -> dict:
        prompt = f"""分析此档案内容并根据分类法进行分类。

内容:
标题: {record.title}
作者: {record.author or '未知'}
日期: {record.publication_date or '未知'}
文本(前8000字符):
{record.text[:8000]}

分类法:
{json.dumps(TAXONOMY, indent=2)}

以JSON格式响应,包含:
{{
  "categories": ["category1", "category2"],  // 1-3个来自thematic_categories
  "key_concepts": ["concept1", "concept2"],  // 0-5个来自key_concepts列表
  "scope": "scope_type",                     // 一个来自scope_types
  "era": "YYYY-YYYY",                        // 十年范围
  "tags": ["tag1", "tag2", "tag3", "tag4", "tag5"],  // 5个上下文关键词
  "summary": "2-3句摘要",
  "pull_quote": "文本中最有影响力的引用"
}}

重要:
- 仅使用分类法中的类别/概念
- 标签应为小写、连字符关键词
- 摘要应捕捉主要论点
- 引用必须是文本中的确切摘录
"""

        response = self.model.generate_content(prompt)
        result = self._parse_response(response.text)

        # 根据分类法验证
        result['categories'] = [c for c in result.get('categories', [])
                               if c in TAXONOMY['thematic_categories']]
        result['key_concepts'] = [c for c in result.get('key_concepts', [])
                                  if c in TAXONOMY['key_concepts']]

        return result

    def _parse_response(self, text: str) -> dict:
        """从响应中提取JSON,处理Markdown代码块。"""
        # 如果存在Markdown代码块,则移除
        if '```json' in text:
            text = text.split('```json')[1].split('```')[0]
        elif '```' in text:
            text = text.split('```')[1].split('```')[0]

        return json.loads(text.strip())

    def validate_response(self, result: dict, text: str) -> bool:
        """检测AI幻觉模式。"""
        # 检查统一响应签名(所有值相同)
        if len(set(result.get('tags', []))) < 3:
            return False

        # 检查引用是否存在于文本中
        pull_quote = result.get('pull_quote', '')
        if pull_quote and pull_quote.lower() not in text.lower():
            return False

        # 检查摘要是否不通用
        generic_phrases = ['this article discusses', 'the author explores', 'this piece examines']
        summary = result.get('summary', '').lower()
        if any(phrase in summary for phrase in generic_phrases):
            return False

        return True

实体提取和知识图

实体类型和关系

from dataclasses import dataclass
from typing import Literal

EntityType = Literal['人', '组织', '作品', '概念', '事件', '地点']
RelationshipType = Literal[
    '提及', '批评', '引用', '讨论', '扩展', '支持',
    '由...创立', '开创', '受...启发',
    '隶属于', '发表于', '起源于', '发生于',
    '拥有', '被...拥有'
]

@dataclass
class Entity:
    id: str                    # P-001, O-001, W-001等
    name: str
    type: EntityType
    aliases: list[str]         # 替代名称/拼写
    prominence: float          # 0-10基于讨论深度
    mention_count: int = 0
    first_mentioned_in: str = ''  # 记录ID

@dataclass
class Relationship:
    source_entity_id: str
    target_entity_id: str
    relationship_type: RelationshipType
    source_record_id: str      # 哪个记录建立了此关系
    confidence: float = 1.0

class EntityRegistry:
    """实体的去重和规范化。"""

    NORMALIZATIONS = {
        'nyt': '纽约时报',
        'new york times': '纽约时报',
        'ny times': '纽约时报',
        'washington post': '华盛顿邮报',
        'wapo': '华盛顿邮报',
        'cnn': 'CNN',
        'fox': '福克斯新闻',
        'fox news channel': '福克斯新闻',
    }

    def __init__(self):
        self.entities: dict[str, Entity] = {}
        self.name_to_id: dict[str, str] = {}

    def normalize_name(self, name: str) -> str:
        """将实体名称规范化为标准形式。"""
        name_lower = name.lower().strip()
        return self.NORMALIZATIONS.get(name_lower, name.strip())

    def find_or_create(self, name: str, entity_type: EntityType) -> Entity:
        """查找现有实体或创建新实体。"""
        normalized = self.normalize_name(name)

        # 检查是否已存在
        if normalized.lower() in self.name_to_id:
            entity_id = self.name_to_id[normalized.lower()]
            entity = self.entities[entity_id]
            entity.mention_count += 1
            return entity

        # 创建新实体
        type_prefix = entity_type[0].upper()  # P, O, W, C, E, L
        count = sum(1 for e in self.entities.values() if e.type == entity_type)
        entity_id = f"{type_prefix}-{count + 1:04d}"

        entity = Entity(
            id=entity_id,
            name=normalized,
            type=entity_type,
            aliases=[name] if name != normalized else [],
            prominence=0.0,
            mention_count=1
        )

        self.entities[entity_id] = entity
        self.name_to_id[normalized.lower()] = entity_id

        return entity

AI驱动实体提取

class EntityExtractor:
    def __init__(self, registry: EntityRegistry):
        self.registry = registry
        self.model = genai.GenerativeModel('gemini-2.0-flash')

    def extract(self, record: ArchiveRecord) -> tuple[list[Entity], list[Relationship]]:
        prompt = f"""从档案内容中提取命名实体和关系。

内容:
标题: {record.title}
文本: {record.text[:10000]}

实体类型:
- 人: 记者、政治家、学者、媒体人物
- 组织: 新闻机构、媒体公司、学术机构
- 作品: 文章、书籍、博客文章、研究、报告
- 概念: 新闻学理论、媒体批评框架
- 事件: 会议、选举、媒体危机
- 地点: 与媒体上下文相关的地理位置

关系类型:
- 提及、批评、引用、讨论、扩展、支持
- 由...创立、开创、受...启发
- 隶属于、发表于、起源于、发生于
- 拥有、被...拥有

以JSON响应:
{{
  "entities": [
    {{"name": "实体名称", "type": "人|组织|...", "prominence": 1-10}}
  ],
  "relationships": [
    {{"source": "实体名称", "target": "实体名称", "type": "关系类型"}}
  ]
}}

重要:
- 显著性: 1-3 = 简要提及,4-6 = 讨论,7-10 = 中心焦点
- 仅提取实际讨论的实体,不是仅提及
- 关系必须连接同一文本中出现的实体
"""

        response = self.model.generate_content(prompt)
        data = json.loads(response.text)

        entities = []
        entity_name_to_obj = {}

        # 处理实体
        for e in data.get('entities', []):
            entity = self.registry.find_or_create(e['name'], e['type'])
            entity.prominence = max(entity.prominence, e.get('prominence', 5))
            entities.append(entity)
            entity_name_to_obj[e['name'].lower()] = entity

        # 处理关系
        relationships = []
        for r in data.get('relationships', []):
            source = entity_name_to_obj.get(r['source'].lower())
            target = entity_name_to_obj.get(r['target'].lower())

            if source and target:
                relationships.append(Relationship(
                    source_entity_id=source.id,
                    target_entity_id=target.id,
                    relationship_type=r['type'],
                    source_record_id=record.id
                ))

        return entities, relationships

PDF档案生成

from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image
from reportlab.lib.units import inch
from pathlib import Path

class ArchivePDFGenerator:
    """为档案保存生成可访问的PDF。"""

    def __init__(self, output_dir: Path):
        self.output_dir = output_dir
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.styles = getSampleStyleSheet()

        # 自定义样式
        self.styles.add(ParagraphStyle(
            'ArchiveTitle',
            parent=self.styles['Heading1'],
            fontSize=16,
            spaceAfter=12
        ))
        self.styles.add(ParagraphStyle(
            'ArchiveMeta',
            parent=self.styles['Normal'],
            fontSize=10,
            textColor='#666666',
            spaceAfter=6
        ))

    def generate(self, record: ArchiveRecord) -> Path:
        output_path = self.output_dir / f"{record.id}.pdf"

        doc = SimpleDocTemplate(
            str(output_path),
            pagesize=letter,
            title=record.title,
            author=record.author or '未知',
            subject=f"档案记录 {record.id}"
        )

        story = []

        # 标题
        story.append(Paragraph(record.title, self.styles['ArchiveTitle']))

        # 元数据块
        meta_lines = [
            f"<b>作者:</b> {record.author or '未知'}",
            f"<b>日期:</b> {record.publication_date or '未知'}",
            f"<b>来源:</b> {record.publication or '未知'}",
            f"<b>URL:</b> {record.url}",
            f"<b>档案ID:</b> {record.id}",
        ]
        for line in meta_lines:
            story.append(Paragraph(line, self.styles['ArchiveMeta']))

        story.append(Spacer(1, 0.25 * inch))

        # 摘要(如果可用)
        if record.summary:
            story.append(Paragraph("<b>摘要:</b>", self.styles['Heading2']))
            story.append(Paragraph(record.summary, self.styles['Normal']))
            story.append(Spacer(1, 0.25 * inch))

        # 主要内容
        story.append(Paragraph("<b>全文:</b>", self.styles['Heading2']))

        # 分割成段落并添加
        paragraphs = record.text.split('

')
        for para in paragraphs:
            if para.strip():
                story.append(Paragraph(para.strip(), self.styles['Normal']))
                story.append(Spacer(1, 0.1 * inch))

        # 构建PDF
        doc.build(story)

        return output_path

数据质量和验证

from dataclasses import dataclass
from typing import Callable

@dataclass
class ValidationResult:
    field: str
    valid: bool
    message: str
    severity: Literal['error', 'warning', 'info']

class ArchiveValidator:
    """验证档案记录的完整性和一致性。"""

    REQUIRED_FIELDS = ['id', 'url', 'title', 'text']
    CRITICAL_FIELDS = ['publication_date', 'author', 'summary']
    OPTIONAL_FIELDS = ['categories', 'tags', 'pull_quote']

    def validate(self, record: ArchiveRecord) -> list[ValidationResult]:
        results = []

        # 必需字段
        for field in self.REQUIRED_FIELDS:
            value = getattr(record, field, None)
            if not value:
                results.append(ValidationResult(
                    field=field,
                    valid=False,
                    message=f"必需字段 '{field}' 缺失",
                    severity='error'
                ))

        # 关键字段(应有但不阻塞)
        for field in self.CRITICAL_FIELDS:
            value = getattr(record, field, None)
            if not value:
                results.append(ValidationResult(
                    field=field,
                    valid=False,
                    message=f"关键字段 '{field}' 缺失",
                    severity='warning'
                ))

        # 内容长度检查
        if record.text and len(record.text) < 100:
            results.append(ValidationResult(
                field='text',
                valid=False,
                message=f"文本异常短 ({len(record.text)} 字符)",
                severity='warning'
            ))

        # 日期格式验证
        if record.publication_date:
            try:
                # 确保日期有效
                _ = record.publication_date.isoformat()
            except (AttributeError, ValueError):
                results.append(ValidationResult(
                    field='publication_date',
                    valid=False,
                    message="无效日期格式",
                    severity='error'
                ))

        # 类别验证
        for cat in record.categories:
            if cat not in ThematicCategory:
                results.append(ValidationResult(
                    field='categories',
                    valid=False,
                    message=f"未知类别: {cat}",
                    severity='warning'
                ))

        return results

    def is_complete(self, record: ArchiveRecord) -> bool:
        """检查记录是否所有关键字段都已填充。"""
        results = self.validate(record)
        errors = [r for r in results if r.severity == 'error']
        return len(errors) == 0

集成工作流程

class ArchiveWorkflow:
    """协调完整的档案处理管道。"""

    def __init__(self, config: Config):
        self.scraper = ScrapingCascade()
        self.categorizer = ArchiveCategorizer()
        self.entity_registry = EntityRegistry()
        self.entity_extractor = EntityExtractor(self.entity_registry)
        self.pdf_generator = ArchivePDFGenerator(config.PDF_DIR)
        self.sheets_service = SheetsService(config.CREDENTIALS_PATH)
        self.validator = ArchiveValidator()
        self.progress = ProgressTracker(config.PROGRESS_FILE)

    def process_url(self, url: str, record_id: str) -> ArchiveRecord:
        """通过完整管道处理单个URL。"""

        # 1. 爬取内容
        result = self.scraper.fetch(url)
        if not result:
            raise ValueError(f"爬取失败: {url}")

        # 2. 创建初始记录
        record = ArchiveRecord(
            id=record_id,
            url=url,
            title=result.title,
            text=result.content
        )

        # 3. AI分类
        categories = self.categorizer.categorize(record)
        record.summary = categories.get('summary')
        record.pull_quote = categories.get('pull_quote')
        record.categories = categories.get('categories', [])
        record.key_concepts = categories.get('key_concepts', [])
        record.tags = categories.get('tags', [])
        record.era = categories.get('era')
        record.scope = categories.get('scope')

        # 4. 实体提取
        entities, relationships = self.entity_extractor.extract(record)
        record.entities_mentioned = [e.id for e in entities]

        # 5. 生成PDF
        pdf_path = self.pdf_generator.generate(record)
        record.pdf_url = str(pdf_path)

        # 6. 验证
        validation = self.validator.validate(record)
        record.verified = self.validator.is_complete(record)
        record.processing_status = '已完成'

        return record

    def run_batch(self, input_csv: Path):
        """从输入CSV处理所有URL。"""
        for row in read_input(input_csv):
            if self.progress.is_processed(row.id):
                continue

            try:
                record = self.process_url(row.url, row.id)
                self.sheets_service.append_row(self.worksheet, record_to_row(record))
                self.progress.mark_processed(row.id)
            except Exception as e:
                self.progress.log_error(row.id, str(e))

前端消费导出

import json
from pathlib import Path

def export_for_frontend(records: list[ArchiveRecord], output_dir: Path):
    """以前端友好格式导出档案数据。"""

    # 主档案JSON
    archive_data = {
        'metadata': {
            'total_records': len(records),
            'last_updated': datetime.now().isoformat(),
            'schema_version': '2.0'
        },
        'records': [asdict(r) for r in records]
    }

    (output_dir / 'archive-data.json').write_text(
        json.dumps(archive_data, indent=2, default=str)
    )

    # 实体导出
    entities_data = [asdict(e) for e in entity_registry.entities.values()]
    (output_dir / 'entities.json').write_text(
        json.dumps(entities_data, indent=2)
    )

    # 关系导出
    relationships_data = [asdict(r) for r in all_relationships]
    (output_dir / 'relationships.json').write_text(
        json.dumps(relationships_data, indent=2)
    )

    # CSV导出以兼容电子表格
    records_df = pd.DataFrame([asdict(r) for r in records])
    records_df.to_csv(output_dir / 'archive_records.csv', index=False)