文档摄入管道
概览
文档摄入管道涵盖了从各种源处理原始文档、提取内容并为RAG系统准备的完整工作流程。这项技能包括源连接器、文本提取、预处理和质量验证。
何时使用这项技能: 当构建或维护RAG应用的文档处理管道时。
目录
管道架构
管道阶段
graph TD
A[源文档] --> B[提取]
B --> C[清洗]
C --> D[验证]
D --> E[分块]
E --> F[嵌入]
F --> G[向量存储]
G --> H[元数据存储]
组件设计
# 管道组件
components:
# 源连接器
pdf_loader:
module: document_loaders.pdf
config:
max_file_size: 100MB
ocr_enabled: true
table_extraction: true
web_crawler:
module: web_crawlers.playwright
config:
max_depth: 3
follow_links: true
respect_robots: true
database_loader:
module: database_connectors.postgres
config:
batch_size: 100
connection_pool_size: 10
# 处理
text_extractor:
module: text_extraction.nlp
config:
language_detection: true
entity_extraction: true
section_detection: true
data_cleaner:
module: data_cleaning.standard
config:
remove_duplicates: true
normalize_whitespace: true
fix_encoding: true
validator:
module: validation.schema_validator
config:
schema_path: ./schemas/document_schema.json
strict_mode: false
# 存储
chunk_store:
module: storage.document_store
config:
batch_size: 100
index: document_id
embedding_service:
module: embedding.openai
config:
model: text-embedding-3-small
batch_size: 100
vector_store:
module: vector_stores.pinecone
config:
index_name: documents
dimension: 1536
源连接器
PDF处理
# PDF文档加载器
import PyPDF2
from typing import Dict, List
import io
class PDFLoader:
def __init__(self, config: dict):
self.config = config
self.max_file_size = config.get('max_file_size', 100 * 1024 * 1024) # 100MB默认
async def load(self, file_path: str) -> Dict[str, any]:
"""加载PDF文档"""
# 检查文件大小
import os
file_size = os.path.getsize(file_path)
if file_size > self.max_file_size:
raise ValueError(f"文件过大:{file_size}字节(最大:{self.max_file_size}")
# 从PDF提取文本
reader = PyPDF2.PdfReader(file_path)
text = ""
for page in reader.pages:
text += page.extract_text()
# 提取元数据
metadata = {
'file_path': file_path,
'file_size': file_size,
'page_count': len(reader.pages),
'created_at': datetime.utcnow().isoformat()
}
return {
'text': text,
'metadata': metadata
}
async def load_with_ocr(self, file_path: str) -> Dict[str, any]:
"""使用OCR加载PDF文档"""
# 检查文件大小
import os
file_size = os.path.getsize(file_path)
if file_size > self.max_file_size:
raise ValueError(f"文件过大:{file_size}字节(最大:{self.max_file_size}")
# 从PDF提取图像
reader = PyPDF2.PdfReader(file_path)
images = []
for page in reader.pages:
for image in page.images:
images.append(image)
# 对图像执行OCR
# 这将与OCR服务集成
# 现在,返回占位符
return {
'text': "OCR文本将在这里",
'images': images,
'metadata': {
'file_path': file_path,
'file_size': file_size,
'page_count': len(reader.pages),
'created_at': datetime.utcnow().isoformat()
}
}
网络爬虫
# 网络文档爬虫
from playwright.async_api import async_playwright
from typing import List
class WebCrawler:
def __init__(self, config: dict):
self.config = config
self.max_depth = config.get('max_depth', 3)
self.follow_links = config.get('follow_links', True)
self.respect_robots = config.get('respect_robots', True)
async def crawl(self, url: str) -> Dict[str, any]:
"""爬取网页"""
browser = await async_playwright.async_api.async_playwright()
page = await browser.new_page()
await page.goto(url)
# 等待页面加载
await page.wait_for_load_state('networkidle')
# 提取文本内容
text = await page.inner_text()
# 提取链接
links = await page.locator('a[href^="https?://"]').all()
link_urls = [await link.get_attribute('href') for link in links]
# 提取元数据
metadata = {
'url': url,
'title': await page.title(),
'depth': 0,
'link_count': len(link_urls),
'created_at': datetime.utcnow().isoformat()
}
await browser.close()
return {
'text': text,
'links': link_urls,
'metadata': metadata
}
数据库加载
# 数据库文档加载器
import asyncpg
from typing import Dict, List
class DatabaseLoader:
def __init__(self, config: dict):
self.config = config
self.batch_size = config.get('batch_size', 100)
self.connection_pool_size = config.get('connection_pool_size', 10)
self.connection_pool = None
async def connect(self):
"""创建连接池"""
self.connection_pool = await asyncpg.create_pool(
minsize=1,
maxsize=self.connection_pool_size,
host=self.config['host'],
database=self.config['database'],
user=self.config['user'],
password=self.config['password']
)
async def load_documents(self, document_ids: List[str]) -> List[Dict[str, any]]:
"""从数据库加载文档"""
if not self.connection_pool:
await self.connect()
async with self.connection_pool.acquire() as conn:
query = """
SELECT id, title, content, metadata, created_at, updated_at
FROM documents
WHERE id = ANY($1)
"""
result = await conn.fetch(query, document_ids)
return result
文本提取
文本提取方法
| 方法 | 用例 | 优点 | 缺点 | |--------|---------|------|------|-------| | PDF文本提取 | PDF文档 | 快速,准确 | 可能遗漏图像 | | OCR | 扫描文档 | 处理图像 | 较慢,容易出错 | | 网络抓取 | 网页 | 动态内容 | 可能遗漏结构化数据 | | 数据库导出 | 数据库记录 | 结构化数据 | 可能需要连接 | | API提取 | API响应 | 清洁数据 | 速率限制 |
文本提取实现
# 文本提取服务
class TextExtractionService:
def __init__(self, extractors: Dict):
self.extractors = extractors
async def extract(self, source: Dict[str, any]) -> str:
"""从文档源提取文本"""
source_type = source.get('type')
if source_type == 'pdf':
return await self._extract_from_pdf(source)
elif source_type == 'web':
return await self._extract_from_web(source)
elif source_type == 'database':
return source.get('text', '')
else:
raise ValueError(f"不支持的源类型:{source_type}")
async def _extract_from_pdf(self, source: Dict[str, any]) -> str:
"""从PDF源提取文本"""
loader = PDFLoader(self.config.get('pdf', {}))
result = await loader.load(source['path'])
return result['text']
async def _extract_from_web(self, source: Dict[str, any]) -> str:
"""从网络源提取文本"""
crawler = WebCrawler(self.config.get('web', {}))
result = await crawler.crawl(source['url'])
return result['text']
数据清洗
清洗操作
# 数据清洗操作
class DataCleaner:
def __init__(self, config: Dict):
self.config = config
def clean_text(self, text: str) -> str:
"""清洗提取的文本"""
# 移除额外的空格
text = ' '.join(text.split())
# 标准化换行符
text = text.replace('\r
', '
')
# 移除特殊字符
import re
text = re.sub(r'[\x00-\x1F]+', '', text)
# 移除控制字符
text = re.sub(r'[\x00-\x08-\x0B]+', '', text)
return text
def remove_duplicates(self, documents: List[Dict[str, any]]) -> List[Dict[str, any]]:
"""移除重复文档"""
seen = set()
unique_documents = []
for doc in documents:
doc_hash = self._hash_document(doc)
if doc_hash not in seen:
unique_documents.append(doc)
seen.add(doc_hash)
return unique_documents
def normalize_whitespace(self, text: str) -> str:
"""标准化文本中的空白"""
# 替换多个空格为单个空格
import re
text = re.sub(r' +', ' ', text)
# 将制表符标准化为空格
text = text.replace('\t', ' ')
return text
def _hash_document(self, doc: Dict[str, any]) -> str:
"""生成文档哈希"""
content = doc.get('text', '')
return hash(content)
质量验证
# 质量验证
class QualityValidator:
def __init__(self, config: Dict):
self.config = config
self.min_length = config.get('min_length', 100)
self.max_length = config.get('max_length', 100000)
self.min_quality_score = config.get('min_quality', 0.7)
def validate_text(self, text: str) -> Dict[str, any]:
"""验证文本质量"""
issues = []
# 检查长度
text_length = len(text)
if text_length < self.min_length:
issues.append({
'type': 'length',
'severity': 'warning',
'message': f"文本太短:{text_length}字符(最低:{self.min_length}"
})
elif text_length > self.max_length:
issues.append({
'type': 'length',
'severity': 'warning',
'message': f"文本太长:{text_length}字符(最大:{self.max_length}"
})
# 检查特殊字符
import re
if re.search(r'[\x00-\x1F]+', text):
issues.append({
'type': 'special_characters',
'severity': 'warning',
'message': '文本包含特殊字符'
})
# 检查控制字符
if re.search(r'[\x00-\x08-\x0B]+', text):
issues.append({
'type': 'control_characters',
'severity': 'warning',
'message': '文本包含控制字符'
})
# 检查质量分数
quality_score = self._calculate_quality_score(text, issues)
return {
'text': text,
'issues': issues,
'quality_score': quality_score,
'passes_validation': quality_score >= self.min_quality_score
}
def _calculate_quality_score(self, text: str, issues: List[Dict]) -> float:
"""计算质量分数"""
base_score = 1.0
for issue in issues:
if issue['severity'] == 'error':
base_score -= 0.3
elif issue['severity'] == 'warning':
base_score -= 0.1
return max(0.0, base_score)
摄入检查表
源验证
## 源验证清单
### 文件检查
- [ ] 文件存在
- [ ] 文件可访问
- [ ] 文件大小在限制内
- [ ] 文件格式支持
- [ ] 文件未损坏
### 内容检查
- [ ] 文本提取成功
- [ ] 内容不为空
- [ ] 内容可读
- [ ] 内容不是垃圾
处理检查
## 处理清单
### 数据清洗
- [ ] 空白标准化
- [ ] 重复项移除
- [ ] 特殊字符移除
- [ ] 换行符标准化
- [ ] 编码修复
### 验证
- [ ] 长度要求满足
- [ ] 质量分数可接受
- [ ] 未发现关键问题
- [ ] 元数据提取
- [ ] 链接提取(针对网络源)
存储检查
## 存储清单
### 文档存储
- [ ] 文档存储在文档存储中
- [ ] 元数据索引
- [ ] 创建到块的链接
- [ ] 应用访问控制
- [ ] 创建备份
### 质量检查
- [ ] 记录质量指标
- [ ] 登录验证结果
- [ ] 跟踪处理时间
- [ ] 测试错误处理
快速参考
管道操作
# 管道操作
class IngestionPipeline:
def __init__(self, config: Dict):
self.config = config
self.source_connectors = SourceConnectors(config.get('sources', {}))
self.text_extractor = TextExtractionService(config.get('extraction', {}))
self.data_cleaner = DataCleaner(config.get('cleaning', {}))
self.validator = QualityValidator(config.get('validation', {}))
async def ingest_document(self, source: Dict[str, any]) -> Dict[str, any]:
"""摄入单个文档"""
# 提取文本
extracted = await self.text_extractor.extract(source)
# 清洗文本
cleaned = self.data_cleaner.clean_text(extracted['text'])
# 验证质量
validated = self.validator.validate_text(cleaned)
# 存储文档
document_id = await self._store_document(source, cleaned, validated)
return {
'document_id': document_id,
'extraction': extracted,
'cleaning': cleaned,
'validation': validated,
'status': 'success'
}
async def ingest_batch(self, sources: List[Dict[str, any]]) -> List[Dict[str, any]]:
"""批量摄入文档"""
results = []
for source in sources:
try:
result = await self.ingest_document(source)
results.append(result)
except Exception as e:
results.append({
'source': source,
'status': 'error',
'error': str(e)
})
return results
async def _store_document(self, source: Dict[str, any], text: str, validated: Dict[str, any]) -> str:
"""在文档存储中存储文档"""
# 这将在文档数据库中存储
import uuid
document_id = str(uuid.uuid4())
# 存储文档
# await self.document_store.create({
# 'id': document_id,
# 'source': source,
# 'text': text,
# 'metadata': validated['metadata'],
# 'created_at': datetime.utcnow().isoformat()
# })
return document_id
错误处理
# 错误处理
class ErrorHandler:
def __init__(self):
pass
def handle_extraction_error(self, source: Dict[str, any], error: Exception) -> Dict[str, any]:
"""处理提取错误"""
return {
'source': source,
'status': 'error',
'error': str(error),
'retry_count': source.get('retry_count', 0) + 1,
'created_at': datetime.utcnow().isoformat()
}
def handle_validation_error(self, document_id: str, error: Exception) -> Dict[str, any]:
"""处理验证错误"""
return {
'document_id': document_id,
'status': 'error',
'error': str(error),
'created_at': datetime.utcnow().isoformat()
}
def should_retry(self, error: Dict[str, any]) -> bool:
"""确定是否应该触发重试"""
retry_count = error.get('retry_count', 0)
max_retries = 3
if retry_count < max_retries:
return True
return False
监控
# 管道监控
class PipelineMonitor:
def __init__(self):
self.metrics = {
'documents_processed': 0,
'documents_failed': 0,
'total_chars_processed': 0,
'processing_time_ms': 0,
'errors': []
}
def record_success(self, document_id: str):
"""记录成功处理"""
self.metrics['documents_processed'] += 1
self.metrics['documents_failed'] += 0
def record_error(self, error: str):
"""记录处理错误"""
self.metrics['errors'].append({
'error': error,
'timestamp': datetime.utcnow().isoformat()
})
self.metrics['documents_failed'] += 1
def get_metrics(self) -> Dict[str, any]:
"""获取管道指标"""
return self.metrics
常见陷阱
- 无验证 - 总是在存储前验证提取的内容
- 错误处理不当 - 实施重试逻辑和监控
- 无监控 - 跟踪指标和性能
- 忽视质量 - 质量检查防止不良数据
- 无去重 - 重复文档浪费存储空间
- 无源跟踪 - 跟踪文档来源以便于调试
- 无元数据 - 元数据对于检索至关重要
- 无异步处理 - 同步处理阻塞管道
相关技能
07-document-processing/document-parsing07-document-processing/image-preprocessing07-document-processing/ocr-paddleocr07-document-processing/ocr-tesseract07-document-processing/pdf-processing06-ai-ml-production/rag-implementation06-ai-ml-production/embedding-models04-database/database-connections