RAGChunkingMetadataStrategy rag-chunking-metadata-strategy

RAG 分块元数据策略是一种系统化方法,用于在 RAG 管道中添加、管理和利用元数据,以提高检索精度和上下文管理。关键词包括元数据模式、分块级元数据、文档级元数据、元数据驱动检索。

RAG应用 0 次安装 0 次浏览 更新于 3/5/2026

RAG 分块元数据策略

概览

RAG 分块元数据策略涵盖了在 RAG 管道中添加、管理和利用元数据的系统方法。这项技能包括元数据模式、分块级元数据、文档级元数据和元数据驱动的检索策略。

何时使用此技能: 当设计或实现需要丰富元数据以提高检索精度和上下文管理的 RAG 系统时。

目录

  1. 元数据模式设计
  2. 分块级元数据
  3. 文档级元数据
  4. 元数据驱动检索
  5. 元数据存储
  6. 分块元数据检查清单
  7. 快速参考

元数据模式设计

核心元数据字段

字段 类型 描述 示例
chunk_id 字符串 分块的唯一标识符 doc_123_chunk_45
document_id 字符串 父文档标识符 doc_123
chunk_index 整数 在文档中的位置 0, 1, 2
text 字符串 分块内容 "The quick brown fox..."
token_count 整数 标记数量 150
embedding_id 字符串 向量数据库引用 vec_abc123
created_at 时间戳 创建时间 2024-01-15T10:30:00Z
updated_at 时间戳 最后更新时间 2024-01-15T10:30:00Z
source_type 枚举 内容来源 pdf, web, database
content_type 枚举 文档部分类型 introduction, methodology, results
language 字符串 检测到的语言 en, es, fr

扩展元数据字段

字段 类型 描述 示例
title 字符串 分块标题 "Introduction to Machine Learning"
summary 字符串 分块摘要 "Overview of ML concepts"
keywords 数组 搜索关键词 ["machine learning", "AI", "data"]
entities 数组 命名实体 [{"type": "PERSON", "text": "John Doe"}]
section_hierarchy 数组 文档结构 ["chapter", "section", "subsection"]
cross_references 数组 链接到其他分块 ["doc_123_chunk_44", "doc_123_chunk_46"]
quality_score 浮点数 内容质量分数 0.95
access_level 枚举 权限级别 public, internal, restricted

元数据 JSON 模式

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "RAG Chunk Metadata",
  "type": "object",
  "required": ["chunk_id", "document_id", "text"],
  "properties": {
    "chunk_id": {
      "type": "string",
      "description": "Unique chunk identifier"
    },
    "document_id": {
      "type": "string",
      "description": "Parent document identifier"
    },
    "text": {
      "type": "string",
      "description": "Chunk content text"
    },
    "chunk_index": {
      "type": "integer",
      "description": "Position within document"
    },
    "token_count": {
      "type": "integer",
      "description": "Number of tokens in chunk"
    },
    "embedding_id": {
      "type": "string",
      "description": "Vector database reference"
    },
    "created_at": {
      "type": "string",
      "format": "date-time",
      "description": "Creation timestamp"
    },
    "updated_at": {
      "type": "string",
      "format": "date-time",
      "description": "Last update timestamp"
    },
    "source_type": {
      "type": "string",
      "enum": ["pdf", "web", "database", "api", "manual"],
      "description": "Content origin"
    },
    "content_type": {
      "type": "string",
      "enum": ["introduction", "methodology", "results", "conclusion", "appendix", "references"],
      "description": "Document section type"
    },
    "language": {
      "type": "string",
      "description": "Detected content language"
    },
    "title": {
      "type": "string",
      "description": "Chunk title"
    },
    "summary": {
      "type": "string",
      "description": "Chunk summary"
    },
    "keywords": {
      "type": "array",
      "items": {
        "type": "string"
      },
      "description": "Search keywords"
    },
    "entities": {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "type": {
            "type": "string",
            "enum": ["PERSON", "ORG", "LOCATION", "DATE", "MONEY", "PERCENT"]
          },
          "text": {
            "type": "string"
          }
        }
      }
    },
    "section_hierarchy": {
      "type": "array",
      "items": {
        "type": "string"
      },
      "description": "Document structure hierarchy"
    },
    "cross_references": {
      "type": "array",
      "items": {
        "type": "string"
      },
      "description": "Links to related chunks"
    },
    "quality_score": {
      "type": "number",
      "minimum": 0,
      "maximum": 1,
      "description": "Content quality score"
    },
    "access_level": {
      "type": "string",
      "enum": ["public", "internal", "restricted"],
      "description": "Permission level"
    }
  }
}

分块级元数据

自动元数据提取

# 自动元数据提取
class MetadataExtractor:
    def __init__(self):
        pass
    
    def extract_chunk_metadata(self, chunk: str, chunk_index: int, document_id: str) -> dict:
        """提取单个分块的元数据"""
        # 基本元数据
        metadata = {
            'chunk_id': f"{document_id}_chunk_{chunk_index}",
            'document_id': document_id,
            'chunk_index': chunk_index,
            'text': chunk,
            'token_count': len(chunk.split()),
            'created_at': datetime.utcnow().isoformat(),
            'updated_at': datetime.utcnow().isoformat()
        }
        
        # 内容类型推断
        metadata['content_type'] = self._infer_content_type(chunk)
        
        # 语言检测
        metadata['language'] = self._detect_language(chunk)
        
        # 实体提取
        metadata['entities'] = self._extract_entities(chunk)
        
        # 关键词提取
        metadata['keywords'] = self._extract_keywords(chunk)
        
        # 标题生成
        metadata['title'] = self._generate_title(chunk)
        
        # 摘要生成
        metadata['summary'] = self._generate_summary(chunk)
        
        return metadata
    
    def _infer_content_type(self, text: str) -> str:
        """从文本推断内容类型"""
        # 简单的启发式推断
        text_lower = text.lower()
        
        if any(word in text_lower for word in ['abstract', 'introduction', 'overview']):
            return 'introduction'
        elif any(word in text_lower for word in ['method', 'approach', 'algorithm', 'implementation']):
            return 'methodology'
        elif any(word in text_lower for word in ['result', 'conclusion', 'finding', 'data']):
            return 'results'
        elif any(word in text_lower for word in ['conclusion', 'summary', 'final']):
            return 'conclusion'
        else:
            return 'body'
    
    def _detect_language(self, text: str) -> str:
        """从文本检测语言"""
        # 简单的语言检测
        # 在生产中,使用适当的语言检测库
        # 这是一个占位实现
        return 'en'  # 默认为英文
    
    def _extract_entities(self, text: str) -> list:
        """从文本提取命名实体"""
        # 简单的模式匹配实体提取
        # 在生产中,使用 NER 模型
        entities = []
        
        # 提取日期
        import re
        date_pattern = r'\d{1,2}[-/]\d{1,2}[-/]\d{4}'
        dates = re.findall(date_pattern, text)
        for date in dates:
            entities.append({'type': 'DATE', 'text': date})
        
        # 提取电子邮件
        email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9._%+-]+\.[A-Za-z]{2,}'
        emails = re.findall(email_pattern, text)
        for email in emails:
            entities.append({'type': 'EMAIL', 'text': email})
        
        # 提取 URL
        url_pattern = r'https?://(?:www\.)?[-a-zA-Z0-9._%]+(?:\.[a-zA-Z]{2,})?[/\S]*'
        urls = re.findall(url_pattern, text)
        for url in urls:
            entities.append({'type': 'URL', 'text': url})
        
        return entities
    
    def _extract_keywords(self, text: str) -> list:
        """从文本提取关键词"""
        # 简单的关键词提取
        # 移除常见停用词
        stop_words = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'been', 'be', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'shall', 'can', 'need', 'seem', 'appear', 'look', 'feel', 'try', 'leave', 'called', 'found', 'located', 'created', 'made', 'taken', 'get', 'got', 'went', 'put', 'said', 'told', 'asked', 'answer', 'seems', 'means', 'tends', 'kind', 'sort', 'set', 'begin', 'seem', 'help', 'talk', 'turn', 'start', 'might', 'show', 'hear', 'play', 'run', 'move', 'like', 'live', 'believe', 'hold', 'bring', 'happen', 'write', 'provide', 'sit', 'stand', 'lose', 'pay', 'meet', 'include', 'continue', 'set', 'learn', 'change', 'lead', 'understand', 'watch', 'follow', 'stop', 'create', 'speak', 'read', 'allow', 'add', 'spend', 'grow', 'open', 'walk', 'win', 'offer', 'remember', 'love', 'consider', 'appear', 'buy', 'wait', 'serve', 'die', 'send', 'expect', 'build', 'stay', 'fall', 'cut', 'reach', 'kill', 'remain'}
        words = text.lower().split()
        keywords = [word for word in words if word not in stop_words and len(word) > 3]
        
        # 移除重复项
        keywords = list(set(keywords))
        
        return keywords[:10]  # 返回前 10 个关键词
    
    def _generate_title(self, chunk: str) -> str:
        """从分块生成标题"""
        # 取第一句话或前 50 个字符
        sentences = chunk.split('. ')
        first_sentence = sentences[0] if sentences else chunk
        
        # 截断到合理长度
        title = first_sentence[:100]
        
        return title
    
    def _generate_summary(self, chunk: str) -> str:
        """从分块生成摘要"""
        # 取第一句话或前 100 个字符
        sentences = chunk.split('. ')
        first_sentence = sentences[0] if sentences else chunk
        
        # 截断到合理长度
        summary = first_sentence[:200]
        
        return summary

手动元数据增强

# 手动元数据增强
class MetadataEnhancer:
    def __init__(self):
        pass
    
    def add_cross_references(self, metadata: dict, references: list) -> dict:
        """向元数据添加交叉引用"""
        metadata['cross_references'] = references
        return metadata
    
    def update_quality_score(self, metadata: dict, score: float) -> dict:
        """更新质量分数"""
        metadata['quality_score'] = score
        metadata['updated_at'] = datetime.utcnow().isoformat()
        return metadata
    
    def add_access_level(self, metadata: dict, level: str) -> dict:
        """向元数据添加访问级别"""
        metadata['access_level'] = level
        metadata['updated_at'] = datetime.utcnow().isoformat()
        return metadata
    
    def add_section_hierarchy(self, metadata: dict, hierarchy: list) -> dict:
        """向元数据添加部分层次结构"""
        metadata['section_hierarchy'] = hierarchy
        metadata['updated_at'] = datetime.utcnow().isoformat()
        return metadata

文档级元数据

文档元数据模式

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Document Metadata",
  "type": "object",
  "required": ["document_id", "title", "created_at", "chunk_count"],
  "properties": {
    "document_id": {
      "type": "string",
      "description": "Unique document identifier"
    },
    "title": {
      "type": "string",
      "description": "Document title"
    },
    "created_at": {
      "type": "string",
      "format": "date-time",
      "description": "Creation timestamp"
    },
    "updated_at": {
      "type": "string",
      "format": "date-time",
      "description": "Last update timestamp"
    },
    "chunk_count": {
      "type": "integer",
      "description": "Total number of chunks"
    },
    "source_type": {
      "type": "string",
      "enum": ["pdf", "web", "database", "api", "manual"],
      "description": "Content origin"
    },
    "language": {
      "type": "string",
      "description": "Detected primary language"
    },
    "file_path": {
      "type": "string",
      "description": "Original file path"
    },
    "file_size": {
      "type": "integer",
      "description": "File size in bytes"
    },
    "total_tokens": {
      "type": "integer",
      "description": "Total tokens across all chunks"
    },
    "authors": {
      "type": "array",
      "items": {
        "type": "string"
      },
      "description": "Document authors"
    },
    "tags": {
      "type": "array",
      "items": {
        "type": "string"
      },
      "description": "Document tags"
    },
    "version": {
      "type": "string",
      "description": "Document version"
    },
    "status": {
      "type": "string",
      "enum": ["draft", "processing", "indexed", "published", "archived"],
      "description": "Document processing status"
    }
  }
}

文档元数据管理

# 文档元数据管理
class DocumentMetadata:
    def __init__(self):
        pass
    
    def create_document_metadata(self, file_path: str, title: str) -> dict:
        """创建文档元数据"""
        metadata = {
            'document_id': self._generate_id(file_path),
            'title': title,
            'created_at': datetime.utcnow().isoformat(),
            'updated_at': datetime.utcnow().isoformat(),
            'source_type': self._infer_source_type(file_path),
            'file_path': file_path,
            'file_size': self._get_file_size(file_path),
            'chunk_count': 0,
            'status': 'processing',
            'language': 'en'  # 默认
        }
        
        return metadata
    
    def update_document_status(self, document_id: str, status: str) -> dict:
        """更新文档状态"""
        # 这将更新数据库
        metadata = {
            'document_id': document_id,
            'status': status,
            'updated_at': datetime.utcnow().isoformat()
        }
        
        return metadata
    
    def add_chunk_count(self, document_id: str, count: int) -> dict:
        """向文档元数据添加分块计数"""
        # 这将更新数据库
        metadata = {
            'document_id': document_id,
            'chunk_count': count,
            'updated_at': datetime.utcnow().isoformat()
        }
        
        return metadata
    
    def _generate_id(self, file_path: str) -> str:
        """生成唯一文档 ID"""
        # 使用文件路径或 UUID
        import uuid
        return str(uuid.uuid4())
    
    def _infer_source_type(self, file_path: str) -> str:
        """从文件路径推断来源类型"""
        extension = file_path.lower().split('.')[-1]
        
        source_types = {
            'pdf': 'pdf',
            'docx': 'pdf',
            'doc': 'pdf',
            'txt': 'web',
            'html': 'web',
            'md': 'web'
        }
        
        return source_types.get(extension, 'manual')
    
    def _get_file_size(self, file_path: str) -> int:
        """获取文件大小(字节)"""
        import os
        return os.path.getsize(file_path)

元数据驱动检索

基于元数据的过滤

# 元数据驱动检索
class MetadataRetriever:
    def __init__(self, vector_db, metadata_store):
        self.vector_db = vector_db
        self.metadata_store = metadata_store
    
    async def retrieve_by_metadata(self, filters: dict) -> list:
        """基于元数据过滤器检索分块"""
        # 根据过滤器构建查询
        query = self._build_metadata_query(filters)
        
        # 搜索向量数据库
        results = await self.vector_db.search(query)
        
        # 根据元数据过滤
        filtered_results = self._filter_by_metadata(results, filters)
        
        return filtered_results
    
    def _build_metadata_query(self, filters: dict) -> str:
        """根据元数据过滤器构建查询"""
        query_parts = []
        
        # 内容类型过滤器
        if 'content_type' in filters:
            query_parts.append(f"content_type:{filters['content_type']}")
        
        # 语言过滤器
        if 'language' in filters:
            query_parts.append(f"language:{filters['language']}")
        
        # 访问级别过滤器
        if 'access_level' in filters:
            query_parts.append(f"access_level:{filters['access_level']}")
        
        # 日期范围过滤器
        if 'date_from' in filters:
            query_parts.append(f"created_at:>{filters['date_from']}")
        if 'date_to' in filters:
            query_parts.append(f"created_at:<{filters['date_to']}")
        
        # 关键词过滤器
        if 'keywords' in filters:
            keywords = ' '.join(filters['keywords'])
            query_parts.append(f"keywords:{keywords}")
        
        return ' '.join(query_parts)
    
    def _filter_by_metadata(self, results: list, filters: dict) -> list:
        """根据元数据过滤结果"""
        filtered = []
        
        for result in results:
            metadata = result.get('metadata', {})
            
            # 内容类型过滤器
            if 'content_type' in filters:
                if metadata.get('content_type') != filters['content_type']:
                    continue
            
            # 语言过滤器
            if 'language' in filters:
                if metadata.get('language') != filters['language']:
                    continue
            
            # 访问级别过滤器
            if 'access_level' in filters:
                if metadata.get('access_level') != filters['access_level']:
                    continue
            
            # 日期范围过滤器
            if 'date_from' in filters:
                if metadata.get('created_at') < filters['date_from']:
                    continue
            if 'date_to' in filters:
                if metadata.get('created_at') > filters['date_to']:
                    continue
            
            # 关键词过滤器
            if 'keywords' in filters:
                result_keywords = set(metadata.get('keywords', []))
                filter_keywords = set(filters['keywords'])
                if not result_keywords.intersection(filter_keywords):
                    continue
            
            filtered.append(result)
        
        return filtered

元数据增强搜索

# 元数据增强搜索
class MetadataAugmentedSearch:
    def __init__(self, vector_db, metadata_store):
        self.vector_db = vector_db
        self.metadata_store = metadata_store
    
    async def search_with_metadata_boost(self, query: str, metadata_filters: dict) -> list:
        """使用元数据增强进行搜索"""
        # 标准向量搜索
        vector_results = await self.vector_db.search(query)
        
        # 获取结果的元数据
        for result in vector_results:
            metadata = await self.metadata_store.get(result['id'])
            result['metadata'] = metadata
        
        # 应用元数据过滤器
        filtered_results = self._apply_metadata_filters(vector_results, metadata_filters)
        
        # 使用元数据重新排名
        reranked = self._rerank_with_metadata(filtered_results)
        
        return reranked
    
    def _apply_metadata_filters(self, results: list, filters: dict) -> list:
        """对结果应用元数据过滤器"""
        filtered = []
        
        for result in results:
            metadata = result.get('metadata', {})
            
            # 内容类型增强
            if 'content_type' in filters:
                if metadata.get('content_type') == filters['content_type']:
                    result['boost'] = result.get('boost', 1.0) + 0.5
                else:
                    continue
            
            # 语言增强
            if 'language' in filters:
                if metadata.get('language') == filters['language']:
                    result['boost'] = result.get('boost', 1.0) + 0.3
                else:
                    continue
            
            filtered.append(result)
        
        return filtered
    
    def _rerank_with_metadata(self, results: list) -> list:
        """使用元数据分数重新排名"""
        for result in results:
            base_score = result.get('score', 0.5)
            metadata = result.get('metadata', {})
            
            # 添加元数据增强
            boost = metadata.get('boost', 0.0)
            result['score'] = base_score + boost
        
        # 按新分数排序
        results.sort(key=lambda x: x['score'], reverse=True)
        
        return results

元数据存储

存储策略

存储类型 用例 优势
文档存储 文档元数据 快速文档查找
分块存储 分块元数据 快速分块过滤
向量存储 向量 + 元数据 结合检索
混合存储 文档 + 分块 + 向量 完整的元数据管理

存储实现

# 元数据存储实现
class MetadataStore:
    def __init__(self, storage_backend):
        self.storage = storage_backend
    
    async def store_chunk_metadata(self, metadata: dict) -> str:
        """存储分块元数据"""
        # 在文档存储中以 chunk_id 为键存储
        await self.storage.put(
            key=f"chunk:{metadata['chunk_id']}",
            value=metadata
        )
        
        return metadata['chunk_id']
    
    async def store_document_metadata(self, metadata: dict) -> str:
        """存储文档元数据"""
        # 在文档存储中以 document_id 为键存储
        await self.storage.put(
            key=f"document:{metadata['document_id']}",
            value=metadata
        )
        
        return metadata['document_id']
    
    async def get_chunk_metadata(self, chunk_id: str) -> dict:
        """获取分块元数据"""
        metadata = await self.storage.get(f"chunk:{chunk_id}")
        return metadata
    
    async def get_document_metadata(self, document_id: str) -> dict:
        """获取文档元数据"""
        metadata = await self.storage.get(f"document:{document_id}")
        return metadata
    
    async def update_metadata(self, id: str, updates: dict) -> dict:
        """更新元数据"""
        current = await self.storage.get(id)
        updated = {**current, **updates}
        await self.storage.put(id, updated)
        
        return updated

分块元数据检查清单

预处理

## 预处理检查清单

### 文档分析
- [ ] 分析文档结构
- [ ] 确定内容类型
- [ ] 检测语言
- [ ] 提取实体
- [ ] 提取关键词
- [ ] 映射部分层次结构

### 元数据模式
- [ ] 设计模式
- [ ] 定义必填字段
- [ ] 定义可选字段
- [ ] 定义验证规则
- [ ] 创建 JSON 模式

分块过程

## 分块过程检查清单

### 分块创建
- [ ] 确定分块边界
- [ ] 提取每个分块的元数据
- [ ] 添加交叉引用
- [ ] 计算质量分数
- [ ] 验证标记计数
- [ ] 生成嵌入向量

### 元数据存储
- [ ] 存储分块元数据
- [ ] 存储文档元数据
- [ ] 创建索引
- [ ] 优化存储
- [ ] 配置备份

质量控制

## 质量控制检查清单

### 验证
- [ ] 通过模式验证
- [ ] 必填字段存在
- [ ] 数据类型正确
- [ ] 格式验证通过
- [ ] 质量分数在范围内
- [ ] 通过重复项检测

### 监控
- [ ] 跟踪元数据完整性
- [ ] 计算质量指标
- [ ] 监控存储性能
- [ ] 测量查询性能

快速参考

元数据操作

# 元数据操作
from typing import Dict, List

class MetadataOperations:
    def __init__(self, metadata_store):
        self.store = metadata_store
    
    async def create_document(self, file_path: str, title: str) -> str:
        """用元数据创建文档"""
        # 创建文档元数据
        doc_metadata = {
            'document_id': self._generate_id(file_path),
            'title': title,
            'created_at': datetime.utcnow().isoformat(),
            'source_type': self._infer_source_type(file_path),
            'file_path': file_path,
            'file_size': self._get_file_size(file_path),
            'status': 'processing',
            'chunk_count': 0,
            'language': 'en'
        }
        
        # 存储文档元数据
        await self.store.store_document_metadata(doc_metadata)
        
        return doc_metadata['document_id']
    
    async def add_chunk(self, document_id: str, chunk: str, chunk_index: int) -> str:
        """添加分块及元数据"""
        # 创建分块元数据
        chunk_metadata = {
            'chunk_id': f"{document_id}_chunk_{chunk_index}",
            'document_id': document_id,
            'chunk_index': chunk_index,
            'text': chunk,
            'token_count': len(chunk.split()),
            'created_at': datetime.utcnow().isoformat(),
            'updated_at': datetime.utcnow().isoformat(),
            'content_type': self._infer_content_type(chunk),
            'language': 'en'
        }
        
        # 存储分块元数据
        await self.store.store_chunk_metadata(chunk_metadata)
        
        return chunk_metadata['chunk_id']
    
    async def search_by_metadata(self, filters: Dict[str, str]) -> List[Dict]:
        """通过元数据搜索文档"""
        # 构建元数据查询
        query = self._build_metadata_query(filters)
        
        # 搜索向量数据库
        results = await self.store.search(query)
        
        # 根据元数据过滤
        filtered = self._filter_by_metadata(results, filters)
        
        return filtered
    
    async def get_document_info(self, document_id: str) -> Dict[str, str]:
        """获取完整文档信息"""
        # 获取文档元数据
        doc_metadata = await self.store.get_document_metadata(document_id)
        
        # 获取所有分块
        chunks = await self.store.get_chunks_by_document(document_id)
        
        return {
            'document': doc_metadata,
            'chunks': chunks
        }

元数据查询示例

# 元数据查询示例
# 按内容类型查询
filters = {
    'content_type': 'introduction'
}
results = await metadata_ops.search_by_metadata(filters)

# 按语言查询
filters = {
    'language': 'en'
}
results = await metadata_ops.search_by_metadata(filters)

# 按日期范围查询
filters = {
    'date_from': '2024-01-01',
    'date_to': '2024-01-31'
}
results = await metadata_ops.search_by_metadata(filters)

# 按关键词查询
filters = {
    'keywords': ['machine learning', 'AI', 'data']
}
results = await metadata_ops.search_by_metadata(filters)

# 组合过滤器
filters = {
    'content_type': 'methodology',
    'language': 'en',
    'date_from': '2024-01-01'
}
results = await metadata_ops.search_by_metadata(filters)

元数据验证

# 元数据验证
class MetadataValidator:
    def __init__(self, schema: dict):
        self.schema = schema
    
    def validate_chunk_metadata(self, metadata: dict) -> bool:
        """根据模式验证分块元数据"""
        # 检查必填字段
        required_fields = ['chunk_id', 'document_id', 'text']
        for field in required_fields:
            if field not in metadata:
                return False
        
        # 检查数据类型
        if not isinstance(metadata['chunk_id'], str):
            return False
        if not isinstance(metadata['document_id'], str):
            return False
        if not isinstance(metadata['text'], str):
            return False
        
        # 检查枚举值
        if 'content_type' in metadata:
            valid_types = ['introduction', 'methodology', 'results', 'conclusion', 'appendix', 'references', 'body']
            if metadata['content_type'] not in valid_types:
                return False
        
        if 'access_level' in metadata:
            valid_levels = ['public', 'internal', 'restricted']
            if metadata['access_level'] not in valid_levels:
                return False
        
        # 检查范围
        if 'quality_score' in metadata:
            if not 0 <= metadata['quality_score'] <= 1:
                return False
        
        return True
    
    def validate_document_metadata(self, metadata: dict) -> bool:
        """根据模式验证文档元数据"""
        # 检查必填字段
        required_fields = ['document_id', 'title', 'created_at']
        for field in required_fields:
            if field not in metadata:
                return False
        
        # 检查数据类型
        if not isinstance(metadata['document_id'], str):
            return False
        if not isinstance(metadata['title'], str):
            return False
        
        # 检查枚举值
        if 'status' in metadata:
            valid_statuses = ['draft', 'processing', 'indexed', 'published', 'archived']
            if metadata['status'] not in valid_statuses:
                return False
        
        # 检查范围
        if 'chunk_count' in metadata:
            if not isinstance(metadata['chunk_count'], int) or metadata['chunk_count'] < 0:
                return False
        
        return True

常见陷阱

  1. 缺少元数据 - 始终提取并存储每个分块的元数据
  2. 不一致的模式 - 在文档间使用一致的元数据模式
  3. 无验证 - 存储前验证元数据
  4. 质量分数低 - 使用客观的质量指标
  5. 无交叉引用 - 链接相关分块以获得更好的上下文
  6. 忽略语言 - 使用语言检测以获得更好的检索
  7. 无访问控制 - 实施访问级别以确保安全
  8. 不更新元数据 - 保持元数据最新

额外资源