文档摄入管道 DocumentIngestionPipeline

文档摄入管道是一种处理原始文档、提取内容并为RAG系统准备的技能,涉及源连接器、文本提取、预处理和质量验证等关键步骤。

RAG应用 0 次安装 0 次浏览 更新于 3/5/2026

文档摄入管道

概览

文档摄入管道涵盖了从各种源处理原始文档、提取内容并为RAG系统准备的完整工作流程。这项技能包括源连接器、文本提取、预处理和质量验证。

何时使用这项技能: 当构建或维护RAG应用的文档处理管道时。

目录

  1. 管道架构
  2. 源连接器
  3. 文本提取
  4. 数据清洗
  5. 质量验证
  6. 摄入检查表
  7. 快速参考

管道架构

管道阶段

graph TD
    A[源文档] --> B[提取]
    B --> C[清洗]
    C --> D[验证]
    D --> E[分块]
    E --> F[嵌入]
    F --> G[向量存储]
    G --> H[元数据存储]

组件设计

# 管道组件
components:
  # 源连接器
  pdf_loader:
      module: document_loaders.pdf
      config:
        max_file_size: 100MB
        ocr_enabled: true
        table_extraction: true
    
  web_crawler:
      module: web_crawlers.playwright
      config:
        max_depth: 3
        follow_links: true
        respect_robots: true
    
  database_loader:
      module: database_connectors.postgres
      config:
        batch_size: 100
      connection_pool_size: 10
  
  # 处理
  text_extractor:
      module: text_extraction.nlp
      config:
        language_detection: true
        entity_extraction: true
        section_detection: true
    
  data_cleaner:
      module: data_cleaning.standard
      config:
        remove_duplicates: true
        normalize_whitespace: true
        fix_encoding: true
  
  validator:
      module: validation.schema_validator
      config:
        schema_path: ./schemas/document_schema.json
        strict_mode: false
  
  # 存储
  chunk_store:
      module: storage.document_store
      config:
        batch_size: 100
        index: document_id
  
  embedding_service:
      module: embedding.openai
      config:
        model: text-embedding-3-small
        batch_size: 100
  
  vector_store:
      module: vector_stores.pinecone
      config:
        index_name: documents
        dimension: 1536

源连接器

PDF处理

# PDF文档加载器
import PyPDF2
from typing import Dict, List
import io

class PDFLoader:
    def __init__(self, config: dict):
        self.config = config
        self.max_file_size = config.get('max_file_size', 100 * 1024 * 1024)  # 100MB默认
    
    async def load(self, file_path: str) -> Dict[str, any]:
        """加载PDF文档"""
        # 检查文件大小
        import os
        file_size = os.path.getsize(file_path)
        
        if file_size > self.max_file_size:
            raise ValueError(f"文件过大:{file_size}字节(最大:{self.max_file_size}")
        
        # 从PDF提取文本
        reader = PyPDF2.PdfReader(file_path)
        text = ""
        
        for page in reader.pages:
            text += page.extract_text()
        
        # 提取元数据
        metadata = {
            'file_path': file_path,
            'file_size': file_size,
            'page_count': len(reader.pages),
            'created_at': datetime.utcnow().isoformat()
        }
        
        return {
            'text': text,
            'metadata': metadata
        }
    
    async def load_with_ocr(self, file_path: str) -> Dict[str, any]:
        """使用OCR加载PDF文档"""
        # 检查文件大小
        import os
        file_size = os.path.getsize(file_path)
        
        if file_size > self.max_file_size:
            raise ValueError(f"文件过大:{file_size}字节(最大:{self.max_file_size}")
        
        # 从PDF提取图像
        reader = PyPDF2.PdfReader(file_path)
        images = []
        
        for page in reader.pages:
            for image in page.images:
                images.append(image)
        
        # 对图像执行OCR
        # 这将与OCR服务集成
        # 现在,返回占位符
        
        return {
            'text': "OCR文本将在这里",
            'images': images,
            'metadata': {
                'file_path': file_path,
                'file_size': file_size,
                'page_count': len(reader.pages),
                'created_at': datetime.utcnow().isoformat()
            }
        }

网络爬虫

# 网络文档爬虫
from playwright.async_api import async_playwright
from typing import List

class WebCrawler:
    def __init__(self, config: dict):
        self.config = config
        self.max_depth = config.get('max_depth', 3)
        self.follow_links = config.get('follow_links', True)
        self.respect_robots = config.get('respect_robots', True)
    
    async def crawl(self, url: str) -> Dict[str, any]:
        """爬取网页"""
        browser = await async_playwright.async_api.async_playwright()
        page = await browser.new_page()
        
        await page.goto(url)
        
        # 等待页面加载
        await page.wait_for_load_state('networkidle')
        
        # 提取文本内容
        text = await page.inner_text()
        
        # 提取链接
        links = await page.locator('a[href^="https?://"]').all()
        link_urls = [await link.get_attribute('href') for link in links]
        
        # 提取元数据
        metadata = {
            'url': url,
            'title': await page.title(),
            'depth': 0,
            'link_count': len(link_urls),
            'created_at': datetime.utcnow().isoformat()
        }
        
        await browser.close()
        
        return {
            'text': text,
            'links': link_urls,
            'metadata': metadata
        }

数据库加载

# 数据库文档加载器
import asyncpg
from typing import Dict, List

class DatabaseLoader:
    def __init__(self, config: dict):
        self.config = config
        self.batch_size = config.get('batch_size', 100)
        self.connection_pool_size = config.get('connection_pool_size', 10)
        self.connection_pool = None
    
    async def connect(self):
        """创建连接池"""
        self.connection_pool = await asyncpg.create_pool(
            minsize=1,
            maxsize=self.connection_pool_size,
            host=self.config['host'],
            database=self.config['database'],
            user=self.config['user'],
            password=self.config['password']
        )
    
    async def load_documents(self, document_ids: List[str]) -> List[Dict[str, any]]:
        """从数据库加载文档"""
        if not self.connection_pool:
            await self.connect()
        
        async with self.connection_pool.acquire() as conn:
            query = """
                SELECT id, title, content, metadata, created_at, updated_at
                FROM documents
                WHERE id = ANY($1)
            """
            
            result = await conn.fetch(query, document_ids)
        
        return result

文本提取

文本提取方法

| 方法 | 用例 | 优点 | 缺点 | |--------|---------|------|------|-------| | PDF文本提取 | PDF文档 | 快速,准确 | 可能遗漏图像 | | OCR | 扫描文档 | 处理图像 | 较慢,容易出错 | | 网络抓取 | 网页 | 动态内容 | 可能遗漏结构化数据 | | 数据库导出 | 数据库记录 | 结构化数据 | 可能需要连接 | | API提取 | API响应 | 清洁数据 | 速率限制 |

文本提取实现

# 文本提取服务
class TextExtractionService:
    def __init__(self, extractors: Dict):
        self.extractors = extractors
    
    async def extract(self, source: Dict[str, any]) -> str:
        """从文档源提取文本"""
        source_type = source.get('type')
        
        if source_type == 'pdf':
            return await self._extract_from_pdf(source)
        elif source_type == 'web':
            return await self._extract_from_web(source)
        elif source_type == 'database':
            return source.get('text', '')
        else:
            raise ValueError(f"不支持的源类型:{source_type}")
    
    async def _extract_from_pdf(self, source: Dict[str, any]) -> str:
        """从PDF源提取文本"""
        loader = PDFLoader(self.config.get('pdf', {}))
        result = await loader.load(source['path'])
        
        return result['text']
    
    async def _extract_from_web(self, source: Dict[str, any]) -> str:
        """从网络源提取文本"""
        crawler = WebCrawler(self.config.get('web', {}))
        result = await crawler.crawl(source['url'])
        
        return result['text']

数据清洗

清洗操作

# 数据清洗操作
class DataCleaner:
    def __init__(self, config: Dict):
        self.config = config
    
    def clean_text(self, text: str) -> str:
        """清洗提取的文本"""
        # 移除额外的空格
        text = ' '.join(text.split())
        
        # 标准化换行符
        text = text.replace('\r
', '
')
        
        # 移除特殊字符
        import re
        text = re.sub(r'[\x00-\x1F]+', '', text)
        
        # 移除控制字符
        text = re.sub(r'[\x00-\x08-\x0B]+', '', text)
        
        return text
    
    def remove_duplicates(self, documents: List[Dict[str, any]]) -> List[Dict[str, any]]:
        """移除重复文档"""
        seen = set()
        unique_documents = []
        
        for doc in documents:
            doc_hash = self._hash_document(doc)
            
            if doc_hash not in seen:
                unique_documents.append(doc)
                seen.add(doc_hash)
        
        return unique_documents
    
    def normalize_whitespace(self, text: str) -> str:
        """标准化文本中的空白"""
        # 替换多个空格为单个空格
        import re
        text = re.sub(r' +', ' ', text)
        
        # 将制表符标准化为空格
        text = text.replace('\t', '    ')
        
        return text
    
    def _hash_document(self, doc: Dict[str, any]) -> str:
        """生成文档哈希"""
        content = doc.get('text', '')
        return hash(content)

质量验证

# 质量验证
class QualityValidator:
    def __init__(self, config: Dict):
        self.config = config
        self.min_length = config.get('min_length', 100)
        self.max_length = config.get('max_length', 100000)
        self.min_quality_score = config.get('min_quality', 0.7)
    
    def validate_text(self, text: str) -> Dict[str, any]:
        """验证文本质量"""
        issues = []
        
        # 检查长度
        text_length = len(text)
        if text_length < self.min_length:
            issues.append({
                'type': 'length',
                'severity': 'warning',
                'message': f"文本太短:{text_length}字符(最低:{self.min_length}"
            })
        elif text_length > self.max_length:
            issues.append({
                'type': 'length',
                'severity': 'warning',
                'message': f"文本太长:{text_length}字符(最大:{self.max_length}"
            })
        
        # 检查特殊字符
        import re
        if re.search(r'[\x00-\x1F]+', text):
            issues.append({
                'type': 'special_characters',
                'severity': 'warning',
                'message': '文本包含特殊字符'
            })
        
        # 检查控制字符
        if re.search(r'[\x00-\x08-\x0B]+', text):
            issues.append({
                'type': 'control_characters',
                'severity': 'warning',
                'message': '文本包含控制字符'
            })
        
        # 检查质量分数
        quality_score = self._calculate_quality_score(text, issues)
        
        return {
            'text': text,
            'issues': issues,
            'quality_score': quality_score,
            'passes_validation': quality_score >= self.min_quality_score
        }
    
    def _calculate_quality_score(self, text: str, issues: List[Dict]) -> float:
        """计算质量分数"""
        base_score = 1.0
        
        for issue in issues:
            if issue['severity'] == 'error':
                base_score -= 0.3
            elif issue['severity'] == 'warning':
                base_score -= 0.1
        
        return max(0.0, base_score)

摄入检查表

源验证

## 源验证清单

### 文件检查
- [ ] 文件存在
- [ ] 文件可访问
- [ ] 文件大小在限制内
- [ ] 文件格式支持
- [ ] 文件未损坏

### 内容检查
- [ ] 文本提取成功
- [ ] 内容不为空
- [ ] 内容可读
- [ ] 内容不是垃圾

处理检查

## 处理清单

### 数据清洗
- [ ] 空白标准化
- [ ] 重复项移除
- [ ] 特殊字符移除
- [ ] 换行符标准化
- [ ] 编码修复

### 验证
- [ ] 长度要求满足
- [ ] 质量分数可接受
- [ ] 未发现关键问题
- [ ] 元数据提取
- [ ] 链接提取(针对网络源)

存储检查

## 存储清单

### 文档存储
- [ ] 文档存储在文档存储中
- [ ] 元数据索引
- [ ] 创建到块的链接
- [ ] 应用访问控制
- [ ] 创建备份

### 质量检查
- [ ] 记录质量指标
- [ ] 登录验证结果
- [ ] 跟踪处理时间
- [ ] 测试错误处理

快速参考

管道操作

# 管道操作
class IngestionPipeline:
    def __init__(self, config: Dict):
        self.config = config
        self.source_connectors = SourceConnectors(config.get('sources', {}))
        self.text_extractor = TextExtractionService(config.get('extraction', {}))
        self.data_cleaner = DataCleaner(config.get('cleaning', {}))
        self.validator = QualityValidator(config.get('validation', {}))
    
    async def ingest_document(self, source: Dict[str, any]) -> Dict[str, any]:
        """摄入单个文档"""
        # 提取文本
        extracted = await self.text_extractor.extract(source)
        
        # 清洗文本
        cleaned = self.data_cleaner.clean_text(extracted['text'])
        
        # 验证质量
        validated = self.validator.validate_text(cleaned)
        
        # 存储文档
        document_id = await self._store_document(source, cleaned, validated)
        
        return {
            'document_id': document_id,
            'extraction': extracted,
            'cleaning': cleaned,
            'validation': validated,
            'status': 'success'
        }
    
    async def ingest_batch(self, sources: List[Dict[str, any]]) -> List[Dict[str, any]]:
        """批量摄入文档"""
        results = []
        
        for source in sources:
            try:
                result = await self.ingest_document(source)
                results.append(result)
            except Exception as e:
                results.append({
                    'source': source,
                    'status': 'error',
                    'error': str(e)
                })
        
        return results
    
    async def _store_document(self, source: Dict[str, any], text: str, validated: Dict[str, any]) -> str:
        """在文档存储中存储文档"""
        # 这将在文档数据库中存储
        import uuid
        document_id = str(uuid.uuid4())
        
        # 存储文档
        # await self.document_store.create({
        #     'id': document_id,
        #     'source': source,
        #     'text': text,
        #     'metadata': validated['metadata'],
        #     'created_at': datetime.utcnow().isoformat()
        # })
        
        return document_id

错误处理

# 错误处理
class ErrorHandler:
    def __init__(self):
        pass
    
    def handle_extraction_error(self, source: Dict[str, any], error: Exception) -> Dict[str, any]:
        """处理提取错误"""
        return {
            'source': source,
            'status': 'error',
            'error': str(error),
            'retry_count': source.get('retry_count', 0) + 1,
            'created_at': datetime.utcnow().isoformat()
        }
    
    def handle_validation_error(self, document_id: str, error: Exception) -> Dict[str, any]:
        """处理验证错误"""
        return {
            'document_id': document_id,
            'status': 'error',
            'error': str(error),
            'created_at': datetime.utcnow().isoformat()
        }
    
    def should_retry(self, error: Dict[str, any]) -> bool:
        """确定是否应该触发重试"""
        retry_count = error.get('retry_count', 0)
        max_retries = 3
        
        if retry_count < max_retries:
            return True
        
        return False

监控

# 管道监控
class PipelineMonitor:
    def __init__(self):
        self.metrics = {
            'documents_processed': 0,
            'documents_failed': 0,
            'total_chars_processed': 0,
            'processing_time_ms': 0,
            'errors': []
        }
    
    def record_success(self, document_id: str):
        """记录成功处理"""
        self.metrics['documents_processed'] += 1
        self.metrics['documents_failed'] += 0
    
    def record_error(self, error: str):
        """记录处理错误"""
        self.metrics['errors'].append({
            'error': error,
            'timestamp': datetime.utcnow().isoformat()
        })
        self.metrics['documents_failed'] += 1
    
    def get_metrics(self) -> Dict[str, any]:
        """获取管道指标"""
        return self.metrics

常见陷阱

  1. 无验证 - 总是在存储前验证提取的内容
  2. 错误处理不当 - 实施重试逻辑和监控
  3. 无监控 - 跟踪指标和性能
  4. 忽视质量 - 质量检查防止不良数据
  5. 无去重 - 重复文档浪费存储空间
  6. 无源跟踪 - 跟踪文档来源以便于调试
  7. 无元数据 - 元数据对于检索至关重要
  8. 无异步处理 - 同步处理阻塞管道

相关技能

额外资源