名称: digital-archive 描述: 带有AI增强、实体提取和知识图构建的数字档案工作流程。在构建内容档案、实施AI驱动分类、提取实体和关系或集成多个数据源时使用。涵盖Jay Rosen数字档案项目的模式。
数字档案方法
构建生产质量数字档案的模式,具有AI驱动分析和知识图构建。
档案架构
多源集成模式
┌─────────────────┐ ┌──────────────────┐ ┌────────────────┐
│ OCR管道 │ │ 网络爬取 │ │ 社交媒体 │
│ (报纸) │ │ (文章) │ │ (转录) │
└────────┬────────┘ └────────┬─────────┘ └───────┬────────┘
│ │ │
└──────────────────────┼──────────────────────┘
│
┌───────────▼───────────┐
│ 统一模式 │
│ (35+字段) │
└───────────┬───────────┘
│
┌──────────────────────┼──────────────────────┐
│ │ │
┌────────▼────────┐ ┌──────────▼──────────┐ ┌───────▼───────┐
│ AI增强 │ │ 实体提取 │ │ PDF档案 │
│ (Gemini) │ │ (知识图) │ │ (WCAG 2.1) │
└────────┬────────┘ └──────────┬──────────┘ └───────┬───────┘
│ │ │
└──────────────────────┼──────────────────────┘
│
┌───────────▼───────────┐
│ Google Sheets │
│ (主数据库) │
└───────────┬───────────┘
│
┌───────────▼───────────┐
│ 前端导出 │
│ (JSON/CSV) │
└───────────────────────┘
统一模式设计
from dataclasses import dataclass, field
from datetime import date
from typing import Optional
from enum import Enum
class ContentType(Enum):
ARTICLE = '文章'
VIDEO = '视频'
AUDIO = '音频'
SOCIAL = '社交媒体帖子'
NEWSPAPER = '报纸文章'
class ThematicCategory(Enum):
PRESS_CRITICISM = '新闻与媒体批评'
JOURNALISM_THEORY = '新闻学理论'
POLITICS = '政治与民主'
TECHNOLOGY = '技术与数字媒体'
EDUCATION = '新闻教育'
AUDIENCE = '受众与公众参与'
class HistoricalEra(Enum):
ERA_1990s = '1990-1999'
ERA_2000_04 = '2000-2004'
ERA_2005_09 = '2005-2009'
ERA_2010_15 = '2010-2015'
ERA_2016_20 = '2016-2020'
ERA_2021_PRESENT = '2021至今'
@dataclass
class ArchiveRecord:
# 核心标识符
id: str # 格式: SOURCE-00001
url: str
title: str
# 内容
author: Optional[str] = None
publication_date: Optional[date] = None
publication: Optional[str] = None
content_type: ContentType = ContentType.ARTICLE
text: str = ''
# AI增强字段
summary: Optional[str] = None
pull_quote: Optional[str] = None
categories: list[ThematicCategory] = field(default_factory=list)
key_concepts: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
era: Optional[HistoricalEra] = None
scope: Optional[str] = None # 理论、评论、案例研究等
# 实体引用
entities_mentioned: list[str] = field(default_factory=list)
related_to: list[str] = field(default_factory=list)
responds_to: list[str] = field(default_factory=list)
# 档案元数据
pdf_url: Optional[str] = None
transcript_url: Optional[str] = None
verified: bool = False
processing_status: str = '待处理'
last_updated: Optional[date] = None
def generate_record_id(source: str, sequence: int) -> str:
"""生成带有源前缀的唯一ID。"""
prefixes = {
'nytimes': 'NYT',
'columbia journalism review': 'CJR',
'pressthink': 'PT',
'twitter': 'TW',
'youtube': 'YT',
'newspaper': 'NEWS',
}
prefix = prefixes.get(source.lower(), 'MISC')
return f"{prefix}-{sequence:05d}"
AI驱动分类
基于分类法的分类
import google.generativeai as genai
import json
from typing import Optional
TAXONOMY = {
"thematic_categories": [
"新闻与媒体批评",
"新闻学理论",
"政治与民主",
"技术与数字媒体",
"新闻教育",
"受众与公众参与"
],
"key_concepts": [
"无所不包的视角",
"验证与断言",
"公民与消费者",
"公共新闻学",
"罗森测试",
"精明与天真",
"专业与业余",
"生产与分发",
"信任与透明",
"赛马式报道",
"双方新闻学",
"受众原子化",
"精明教会"
],
"scope_types": [
"理论",
"评论",
"历史",
"案例研究",
"教学",
"个人反思"
]
}
class ArchiveCategorizer:
def __init__(self, model: str = 'gemini-2.0-flash'):
self.model = genai.GenerativeModel(model)
def categorize(self, record: ArchiveRecord) -> dict:
prompt = f"""分析此档案内容并根据分类法进行分类。
内容:
标题: {record.title}
作者: {record.author or '未知'}
日期: {record.publication_date or '未知'}
文本(前8000字符):
{record.text[:8000]}
分类法:
{json.dumps(TAXONOMY, indent=2)}
以JSON格式响应,包含:
{{
"categories": ["category1", "category2"], // 1-3个来自thematic_categories
"key_concepts": ["concept1", "concept2"], // 0-5个来自key_concepts列表
"scope": "scope_type", // 一个来自scope_types
"era": "YYYY-YYYY", // 十年范围
"tags": ["tag1", "tag2", "tag3", "tag4", "tag5"], // 5个上下文关键词
"summary": "2-3句摘要",
"pull_quote": "文本中最有影响力的引用"
}}
重要:
- 仅使用分类法中的类别/概念
- 标签应为小写、连字符关键词
- 摘要应捕捉主要论点
- 引用必须是文本中的确切摘录
"""
response = self.model.generate_content(prompt)
result = self._parse_response(response.text)
# 根据分类法验证
result['categories'] = [c for c in result.get('categories', [])
if c in TAXONOMY['thematic_categories']]
result['key_concepts'] = [c for c in result.get('key_concepts', [])
if c in TAXONOMY['key_concepts']]
return result
def _parse_response(self, text: str) -> dict:
"""从响应中提取JSON,处理Markdown代码块。"""
# 如果存在Markdown代码块,则移除
if '```json' in text:
text = text.split('```json')[1].split('```')[0]
elif '```' in text:
text = text.split('```')[1].split('```')[0]
return json.loads(text.strip())
def validate_response(self, result: dict, text: str) -> bool:
"""检测AI幻觉模式。"""
# 检查统一响应签名(所有值相同)
if len(set(result.get('tags', []))) < 3:
return False
# 检查引用是否存在于文本中
pull_quote = result.get('pull_quote', '')
if pull_quote and pull_quote.lower() not in text.lower():
return False
# 检查摘要是否不通用
generic_phrases = ['this article discusses', 'the author explores', 'this piece examines']
summary = result.get('summary', '').lower()
if any(phrase in summary for phrase in generic_phrases):
return False
return True
实体提取和知识图
实体类型和关系
from dataclasses import dataclass
from typing import Literal
EntityType = Literal['人', '组织', '作品', '概念', '事件', '地点']
RelationshipType = Literal[
'提及', '批评', '引用', '讨论', '扩展', '支持',
'由...创立', '开创', '受...启发',
'隶属于', '发表于', '起源于', '发生于',
'拥有', '被...拥有'
]
@dataclass
class Entity:
id: str # P-001, O-001, W-001等
name: str
type: EntityType
aliases: list[str] # 替代名称/拼写
prominence: float # 0-10基于讨论深度
mention_count: int = 0
first_mentioned_in: str = '' # 记录ID
@dataclass
class Relationship:
source_entity_id: str
target_entity_id: str
relationship_type: RelationshipType
source_record_id: str # 哪个记录建立了此关系
confidence: float = 1.0
class EntityRegistry:
"""实体的去重和规范化。"""
NORMALIZATIONS = {
'nyt': '纽约时报',
'new york times': '纽约时报',
'ny times': '纽约时报',
'washington post': '华盛顿邮报',
'wapo': '华盛顿邮报',
'cnn': 'CNN',
'fox': '福克斯新闻',
'fox news channel': '福克斯新闻',
}
def __init__(self):
self.entities: dict[str, Entity] = {}
self.name_to_id: dict[str, str] = {}
def normalize_name(self, name: str) -> str:
"""将实体名称规范化为标准形式。"""
name_lower = name.lower().strip()
return self.NORMALIZATIONS.get(name_lower, name.strip())
def find_or_create(self, name: str, entity_type: EntityType) -> Entity:
"""查找现有实体或创建新实体。"""
normalized = self.normalize_name(name)
# 检查是否已存在
if normalized.lower() in self.name_to_id:
entity_id = self.name_to_id[normalized.lower()]
entity = self.entities[entity_id]
entity.mention_count += 1
return entity
# 创建新实体
type_prefix = entity_type[0].upper() # P, O, W, C, E, L
count = sum(1 for e in self.entities.values() if e.type == entity_type)
entity_id = f"{type_prefix}-{count + 1:04d}"
entity = Entity(
id=entity_id,
name=normalized,
type=entity_type,
aliases=[name] if name != normalized else [],
prominence=0.0,
mention_count=1
)
self.entities[entity_id] = entity
self.name_to_id[normalized.lower()] = entity_id
return entity
AI驱动实体提取
class EntityExtractor:
def __init__(self, registry: EntityRegistry):
self.registry = registry
self.model = genai.GenerativeModel('gemini-2.0-flash')
def extract(self, record: ArchiveRecord) -> tuple[list[Entity], list[Relationship]]:
prompt = f"""从档案内容中提取命名实体和关系。
内容:
标题: {record.title}
文本: {record.text[:10000]}
实体类型:
- 人: 记者、政治家、学者、媒体人物
- 组织: 新闻机构、媒体公司、学术机构
- 作品: 文章、书籍、博客文章、研究、报告
- 概念: 新闻学理论、媒体批评框架
- 事件: 会议、选举、媒体危机
- 地点: 与媒体上下文相关的地理位置
关系类型:
- 提及、批评、引用、讨论、扩展、支持
- 由...创立、开创、受...启发
- 隶属于、发表于、起源于、发生于
- 拥有、被...拥有
以JSON响应:
{{
"entities": [
{{"name": "实体名称", "type": "人|组织|...", "prominence": 1-10}}
],
"relationships": [
{{"source": "实体名称", "target": "实体名称", "type": "关系类型"}}
]
}}
重要:
- 显著性: 1-3 = 简要提及,4-6 = 讨论,7-10 = 中心焦点
- 仅提取实际讨论的实体,不是仅提及
- 关系必须连接同一文本中出现的实体
"""
response = self.model.generate_content(prompt)
data = json.loads(response.text)
entities = []
entity_name_to_obj = {}
# 处理实体
for e in data.get('entities', []):
entity = self.registry.find_or_create(e['name'], e['type'])
entity.prominence = max(entity.prominence, e.get('prominence', 5))
entities.append(entity)
entity_name_to_obj[e['name'].lower()] = entity
# 处理关系
relationships = []
for r in data.get('relationships', []):
source = entity_name_to_obj.get(r['source'].lower())
target = entity_name_to_obj.get(r['target'].lower())
if source and target:
relationships.append(Relationship(
source_entity_id=source.id,
target_entity_id=target.id,
relationship_type=r['type'],
source_record_id=record.id
))
return entities, relationships
PDF档案生成
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image
from reportlab.lib.units import inch
from pathlib import Path
class ArchivePDFGenerator:
"""为档案保存生成可访问的PDF。"""
def __init__(self, output_dir: Path):
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
self.styles = getSampleStyleSheet()
# 自定义样式
self.styles.add(ParagraphStyle(
'ArchiveTitle',
parent=self.styles['Heading1'],
fontSize=16,
spaceAfter=12
))
self.styles.add(ParagraphStyle(
'ArchiveMeta',
parent=self.styles['Normal'],
fontSize=10,
textColor='#666666',
spaceAfter=6
))
def generate(self, record: ArchiveRecord) -> Path:
output_path = self.output_dir / f"{record.id}.pdf"
doc = SimpleDocTemplate(
str(output_path),
pagesize=letter,
title=record.title,
author=record.author or '未知',
subject=f"档案记录 {record.id}"
)
story = []
# 标题
story.append(Paragraph(record.title, self.styles['ArchiveTitle']))
# 元数据块
meta_lines = [
f"<b>作者:</b> {record.author or '未知'}",
f"<b>日期:</b> {record.publication_date or '未知'}",
f"<b>来源:</b> {record.publication or '未知'}",
f"<b>URL:</b> {record.url}",
f"<b>档案ID:</b> {record.id}",
]
for line in meta_lines:
story.append(Paragraph(line, self.styles['ArchiveMeta']))
story.append(Spacer(1, 0.25 * inch))
# 摘要(如果可用)
if record.summary:
story.append(Paragraph("<b>摘要:</b>", self.styles['Heading2']))
story.append(Paragraph(record.summary, self.styles['Normal']))
story.append(Spacer(1, 0.25 * inch))
# 主要内容
story.append(Paragraph("<b>全文:</b>", self.styles['Heading2']))
# 分割成段落并添加
paragraphs = record.text.split('
')
for para in paragraphs:
if para.strip():
story.append(Paragraph(para.strip(), self.styles['Normal']))
story.append(Spacer(1, 0.1 * inch))
# 构建PDF
doc.build(story)
return output_path
数据质量和验证
from dataclasses import dataclass
from typing import Callable
@dataclass
class ValidationResult:
field: str
valid: bool
message: str
severity: Literal['error', 'warning', 'info']
class ArchiveValidator:
"""验证档案记录的完整性和一致性。"""
REQUIRED_FIELDS = ['id', 'url', 'title', 'text']
CRITICAL_FIELDS = ['publication_date', 'author', 'summary']
OPTIONAL_FIELDS = ['categories', 'tags', 'pull_quote']
def validate(self, record: ArchiveRecord) -> list[ValidationResult]:
results = []
# 必需字段
for field in self.REQUIRED_FIELDS:
value = getattr(record, field, None)
if not value:
results.append(ValidationResult(
field=field,
valid=False,
message=f"必需字段 '{field}' 缺失",
severity='error'
))
# 关键字段(应有但不阻塞)
for field in self.CRITICAL_FIELDS:
value = getattr(record, field, None)
if not value:
results.append(ValidationResult(
field=field,
valid=False,
message=f"关键字段 '{field}' 缺失",
severity='warning'
))
# 内容长度检查
if record.text and len(record.text) < 100:
results.append(ValidationResult(
field='text',
valid=False,
message=f"文本异常短 ({len(record.text)} 字符)",
severity='warning'
))
# 日期格式验证
if record.publication_date:
try:
# 确保日期有效
_ = record.publication_date.isoformat()
except (AttributeError, ValueError):
results.append(ValidationResult(
field='publication_date',
valid=False,
message="无效日期格式",
severity='error'
))
# 类别验证
for cat in record.categories:
if cat not in ThematicCategory:
results.append(ValidationResult(
field='categories',
valid=False,
message=f"未知类别: {cat}",
severity='warning'
))
return results
def is_complete(self, record: ArchiveRecord) -> bool:
"""检查记录是否所有关键字段都已填充。"""
results = self.validate(record)
errors = [r for r in results if r.severity == 'error']
return len(errors) == 0
集成工作流程
class ArchiveWorkflow:
"""协调完整的档案处理管道。"""
def __init__(self, config: Config):
self.scraper = ScrapingCascade()
self.categorizer = ArchiveCategorizer()
self.entity_registry = EntityRegistry()
self.entity_extractor = EntityExtractor(self.entity_registry)
self.pdf_generator = ArchivePDFGenerator(config.PDF_DIR)
self.sheets_service = SheetsService(config.CREDENTIALS_PATH)
self.validator = ArchiveValidator()
self.progress = ProgressTracker(config.PROGRESS_FILE)
def process_url(self, url: str, record_id: str) -> ArchiveRecord:
"""通过完整管道处理单个URL。"""
# 1. 爬取内容
result = self.scraper.fetch(url)
if not result:
raise ValueError(f"爬取失败: {url}")
# 2. 创建初始记录
record = ArchiveRecord(
id=record_id,
url=url,
title=result.title,
text=result.content
)
# 3. AI分类
categories = self.categorizer.categorize(record)
record.summary = categories.get('summary')
record.pull_quote = categories.get('pull_quote')
record.categories = categories.get('categories', [])
record.key_concepts = categories.get('key_concepts', [])
record.tags = categories.get('tags', [])
record.era = categories.get('era')
record.scope = categories.get('scope')
# 4. 实体提取
entities, relationships = self.entity_extractor.extract(record)
record.entities_mentioned = [e.id for e in entities]
# 5. 生成PDF
pdf_path = self.pdf_generator.generate(record)
record.pdf_url = str(pdf_path)
# 6. 验证
validation = self.validator.validate(record)
record.verified = self.validator.is_complete(record)
record.processing_status = '已完成'
return record
def run_batch(self, input_csv: Path):
"""从输入CSV处理所有URL。"""
for row in read_input(input_csv):
if self.progress.is_processed(row.id):
continue
try:
record = self.process_url(row.url, row.id)
self.sheets_service.append_row(self.worksheet, record_to_row(record))
self.progress.mark_processed(row.id)
except Exception as e:
self.progress.log_error(row.id, str(e))
前端消费导出
import json
from pathlib import Path
def export_for_frontend(records: list[ArchiveRecord], output_dir: Path):
"""以前端友好格式导出档案数据。"""
# 主档案JSON
archive_data = {
'metadata': {
'total_records': len(records),
'last_updated': datetime.now().isoformat(),
'schema_version': '2.0'
},
'records': [asdict(r) for r in records]
}
(output_dir / 'archive-data.json').write_text(
json.dumps(archive_data, indent=2, default=str)
)
# 实体导出
entities_data = [asdict(e) for e in entity_registry.entities.values()]
(output_dir / 'entities.json').write_text(
json.dumps(entities_data, indent=2)
)
# 关系导出
relationships_data = [asdict(r) for r in all_relationships]
(output_dir / 'relationships.json').write_text(
json.dumps(relationships_data, indent=2)
)
# CSV导出以兼容电子表格
records_df = pd.DataFrame([asdict(r) for r in records])
records_df.to_csv(output_dir / 'archive_records.csv', index=False)