RAG 分块元数据策略
概览
RAG 分块元数据策略涵盖了在 RAG 管道中添加、管理和利用元数据的系统方法。这项技能包括元数据模式、分块级元数据、文档级元数据和元数据驱动的检索策略。
何时使用此技能: 当设计或实现需要丰富元数据以提高检索精度和上下文管理的 RAG 系统时。
目录
- 元数据模式设计
- 分块级元数据
- 文档级元数据
- 元数据驱动检索
- 元数据存储
- 分块元数据检查清单
- 快速参考
元数据模式设计
核心元数据字段
| 字段 |
类型 |
描述 |
示例 |
chunk_id |
字符串 |
分块的唯一标识符 |
doc_123_chunk_45 |
document_id |
字符串 |
父文档标识符 |
doc_123 |
chunk_index |
整数 |
在文档中的位置 |
0, 1, 2 |
text |
字符串 |
分块内容 |
"The quick brown fox..." |
token_count |
整数 |
标记数量 |
150 |
embedding_id |
字符串 |
向量数据库引用 |
vec_abc123 |
created_at |
时间戳 |
创建时间 |
2024-01-15T10:30:00Z |
updated_at |
时间戳 |
最后更新时间 |
2024-01-15T10:30:00Z |
source_type |
枚举 |
内容来源 |
pdf, web, database |
content_type |
枚举 |
文档部分类型 |
introduction, methodology, results |
language |
字符串 |
检测到的语言 |
en, es, fr |
扩展元数据字段
| 字段 |
类型 |
描述 |
示例 |
title |
字符串 |
分块标题 |
"Introduction to Machine Learning" |
summary |
字符串 |
分块摘要 |
"Overview of ML concepts" |
keywords |
数组 |
搜索关键词 |
["machine learning", "AI", "data"] |
entities |
数组 |
命名实体 |
[{"type": "PERSON", "text": "John Doe"}] |
section_hierarchy |
数组 |
文档结构 |
["chapter", "section", "subsection"] |
cross_references |
数组 |
链接到其他分块 |
["doc_123_chunk_44", "doc_123_chunk_46"] |
quality_score |
浮点数 |
内容质量分数 |
0.95 |
access_level |
枚举 |
权限级别 |
public, internal, restricted |
元数据 JSON 模式
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "RAG Chunk Metadata",
"type": "object",
"required": ["chunk_id", "document_id", "text"],
"properties": {
"chunk_id": {
"type": "string",
"description": "Unique chunk identifier"
},
"document_id": {
"type": "string",
"description": "Parent document identifier"
},
"text": {
"type": "string",
"description": "Chunk content text"
},
"chunk_index": {
"type": "integer",
"description": "Position within document"
},
"token_count": {
"type": "integer",
"description": "Number of tokens in chunk"
},
"embedding_id": {
"type": "string",
"description": "Vector database reference"
},
"created_at": {
"type": "string",
"format": "date-time",
"description": "Creation timestamp"
},
"updated_at": {
"type": "string",
"format": "date-time",
"description": "Last update timestamp"
},
"source_type": {
"type": "string",
"enum": ["pdf", "web", "database", "api", "manual"],
"description": "Content origin"
},
"content_type": {
"type": "string",
"enum": ["introduction", "methodology", "results", "conclusion", "appendix", "references"],
"description": "Document section type"
},
"language": {
"type": "string",
"description": "Detected content language"
},
"title": {
"type": "string",
"description": "Chunk title"
},
"summary": {
"type": "string",
"description": "Chunk summary"
},
"keywords": {
"type": "array",
"items": {
"type": "string"
},
"description": "Search keywords"
},
"entities": {
"type": "array",
"items": {
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": ["PERSON", "ORG", "LOCATION", "DATE", "MONEY", "PERCENT"]
},
"text": {
"type": "string"
}
}
}
},
"section_hierarchy": {
"type": "array",
"items": {
"type": "string"
},
"description": "Document structure hierarchy"
},
"cross_references": {
"type": "array",
"items": {
"type": "string"
},
"description": "Links to related chunks"
},
"quality_score": {
"type": "number",
"minimum": 0,
"maximum": 1,
"description": "Content quality score"
},
"access_level": {
"type": "string",
"enum": ["public", "internal", "restricted"],
"description": "Permission level"
}
}
}
分块级元数据
自动元数据提取
# 自动元数据提取
class MetadataExtractor:
def __init__(self):
pass
def extract_chunk_metadata(self, chunk: str, chunk_index: int, document_id: str) -> dict:
"""提取单个分块的元数据"""
# 基本元数据
metadata = {
'chunk_id': f"{document_id}_chunk_{chunk_index}",
'document_id': document_id,
'chunk_index': chunk_index,
'text': chunk,
'token_count': len(chunk.split()),
'created_at': datetime.utcnow().isoformat(),
'updated_at': datetime.utcnow().isoformat()
}
# 内容类型推断
metadata['content_type'] = self._infer_content_type(chunk)
# 语言检测
metadata['language'] = self._detect_language(chunk)
# 实体提取
metadata['entities'] = self._extract_entities(chunk)
# 关键词提取
metadata['keywords'] = self._extract_keywords(chunk)
# 标题生成
metadata['title'] = self._generate_title(chunk)
# 摘要生成
metadata['summary'] = self._generate_summary(chunk)
return metadata
def _infer_content_type(self, text: str) -> str:
"""从文本推断内容类型"""
# 简单的启发式推断
text_lower = text.lower()
if any(word in text_lower for word in ['abstract', 'introduction', 'overview']):
return 'introduction'
elif any(word in text_lower for word in ['method', 'approach', 'algorithm', 'implementation']):
return 'methodology'
elif any(word in text_lower for word in ['result', 'conclusion', 'finding', 'data']):
return 'results'
elif any(word in text_lower for word in ['conclusion', 'summary', 'final']):
return 'conclusion'
else:
return 'body'
def _detect_language(self, text: str) -> str:
"""从文本检测语言"""
# 简单的语言检测
# 在生产中,使用适当的语言检测库
# 这是一个占位实现
return 'en' # 默认为英文
def _extract_entities(self, text: str) -> list:
"""从文本提取命名实体"""
# 简单的模式匹配实体提取
# 在生产中,使用 NER 模型
entities = []
# 提取日期
import re
date_pattern = r'\d{1,2}[-/]\d{1,2}[-/]\d{4}'
dates = re.findall(date_pattern, text)
for date in dates:
entities.append({'type': 'DATE', 'text': date})
# 提取电子邮件
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9._%+-]+\.[A-Za-z]{2,}'
emails = re.findall(email_pattern, text)
for email in emails:
entities.append({'type': 'EMAIL', 'text': email})
# 提取 URL
url_pattern = r'https?://(?:www\.)?[-a-zA-Z0-9._%]+(?:\.[a-zA-Z]{2,})?[/\S]*'
urls = re.findall(url_pattern, text)
for url in urls:
entities.append({'type': 'URL', 'text': url})
return entities
def _extract_keywords(self, text: str) -> list:
"""从文本提取关键词"""
# 简单的关键词提取
# 移除常见停用词
stop_words = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'been', 'be', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'shall', 'can', 'need', 'seem', 'appear', 'look', 'feel', 'try', 'leave', 'called', 'found', 'located', 'created', 'made', 'taken', 'get', 'got', 'went', 'put', 'said', 'told', 'asked', 'answer', 'seems', 'means', 'tends', 'kind', 'sort', 'set', 'begin', 'seem', 'help', 'talk', 'turn', 'start', 'might', 'show', 'hear', 'play', 'run', 'move', 'like', 'live', 'believe', 'hold', 'bring', 'happen', 'write', 'provide', 'sit', 'stand', 'lose', 'pay', 'meet', 'include', 'continue', 'set', 'learn', 'change', 'lead', 'understand', 'watch', 'follow', 'stop', 'create', 'speak', 'read', 'allow', 'add', 'spend', 'grow', 'open', 'walk', 'win', 'offer', 'remember', 'love', 'consider', 'appear', 'buy', 'wait', 'serve', 'die', 'send', 'expect', 'build', 'stay', 'fall', 'cut', 'reach', 'kill', 'remain'}
words = text.lower().split()
keywords = [word for word in words if word not in stop_words and len(word) > 3]
# 移除重复项
keywords = list(set(keywords))
return keywords[:10] # 返回前 10 个关键词
def _generate_title(self, chunk: str) -> str:
"""从分块生成标题"""
# 取第一句话或前 50 个字符
sentences = chunk.split('. ')
first_sentence = sentences[0] if sentences else chunk
# 截断到合理长度
title = first_sentence[:100]
return title
def _generate_summary(self, chunk: str) -> str:
"""从分块生成摘要"""
# 取第一句话或前 100 个字符
sentences = chunk.split('. ')
first_sentence = sentences[0] if sentences else chunk
# 截断到合理长度
summary = first_sentence[:200]
return summary
手动元数据增强
# 手动元数据增强
class MetadataEnhancer:
def __init__(self):
pass
def add_cross_references(self, metadata: dict, references: list) -> dict:
"""向元数据添加交叉引用"""
metadata['cross_references'] = references
return metadata
def update_quality_score(self, metadata: dict, score: float) -> dict:
"""更新质量分数"""
metadata['quality_score'] = score
metadata['updated_at'] = datetime.utcnow().isoformat()
return metadata
def add_access_level(self, metadata: dict, level: str) -> dict:
"""向元数据添加访问级别"""
metadata['access_level'] = level
metadata['updated_at'] = datetime.utcnow().isoformat()
return metadata
def add_section_hierarchy(self, metadata: dict, hierarchy: list) -> dict:
"""向元数据添加部分层次结构"""
metadata['section_hierarchy'] = hierarchy
metadata['updated_at'] = datetime.utcnow().isoformat()
return metadata
文档级元数据
文档元数据模式
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Document Metadata",
"type": "object",
"required": ["document_id", "title", "created_at", "chunk_count"],
"properties": {
"document_id": {
"type": "string",
"description": "Unique document identifier"
},
"title": {
"type": "string",
"description": "Document title"
},
"created_at": {
"type": "string",
"format": "date-time",
"description": "Creation timestamp"
},
"updated_at": {
"type": "string",
"format": "date-time",
"description": "Last update timestamp"
},
"chunk_count": {
"type": "integer",
"description": "Total number of chunks"
},
"source_type": {
"type": "string",
"enum": ["pdf", "web", "database", "api", "manual"],
"description": "Content origin"
},
"language": {
"type": "string",
"description": "Detected primary language"
},
"file_path": {
"type": "string",
"description": "Original file path"
},
"file_size": {
"type": "integer",
"description": "File size in bytes"
},
"total_tokens": {
"type": "integer",
"description": "Total tokens across all chunks"
},
"authors": {
"type": "array",
"items": {
"type": "string"
},
"description": "Document authors"
},
"tags": {
"type": "array",
"items": {
"type": "string"
},
"description": "Document tags"
},
"version": {
"type": "string",
"description": "Document version"
},
"status": {
"type": "string",
"enum": ["draft", "processing", "indexed", "published", "archived"],
"description": "Document processing status"
}
}
}
文档元数据管理
# 文档元数据管理
class DocumentMetadata:
def __init__(self):
pass
def create_document_metadata(self, file_path: str, title: str) -> dict:
"""创建文档元数据"""
metadata = {
'document_id': self._generate_id(file_path),
'title': title,
'created_at': datetime.utcnow().isoformat(),
'updated_at': datetime.utcnow().isoformat(),
'source_type': self._infer_source_type(file_path),
'file_path': file_path,
'file_size': self._get_file_size(file_path),
'chunk_count': 0,
'status': 'processing',
'language': 'en' # 默认
}
return metadata
def update_document_status(self, document_id: str, status: str) -> dict:
"""更新文档状态"""
# 这将更新数据库
metadata = {
'document_id': document_id,
'status': status,
'updated_at': datetime.utcnow().isoformat()
}
return metadata
def add_chunk_count(self, document_id: str, count: int) -> dict:
"""向文档元数据添加分块计数"""
# 这将更新数据库
metadata = {
'document_id': document_id,
'chunk_count': count,
'updated_at': datetime.utcnow().isoformat()
}
return metadata
def _generate_id(self, file_path: str) -> str:
"""生成唯一文档 ID"""
# 使用文件路径或 UUID
import uuid
return str(uuid.uuid4())
def _infer_source_type(self, file_path: str) -> str:
"""从文件路径推断来源类型"""
extension = file_path.lower().split('.')[-1]
source_types = {
'pdf': 'pdf',
'docx': 'pdf',
'doc': 'pdf',
'txt': 'web',
'html': 'web',
'md': 'web'
}
return source_types.get(extension, 'manual')
def _get_file_size(self, file_path: str) -> int:
"""获取文件大小(字节)"""
import os
return os.path.getsize(file_path)
元数据驱动检索
基于元数据的过滤
# 元数据驱动检索
class MetadataRetriever:
def __init__(self, vector_db, metadata_store):
self.vector_db = vector_db
self.metadata_store = metadata_store
async def retrieve_by_metadata(self, filters: dict) -> list:
"""基于元数据过滤器检索分块"""
# 根据过滤器构建查询
query = self._build_metadata_query(filters)
# 搜索向量数据库
results = await self.vector_db.search(query)
# 根据元数据过滤
filtered_results = self._filter_by_metadata(results, filters)
return filtered_results
def _build_metadata_query(self, filters: dict) -> str:
"""根据元数据过滤器构建查询"""
query_parts = []
# 内容类型过滤器
if 'content_type' in filters:
query_parts.append(f"content_type:{filters['content_type']}")
# 语言过滤器
if 'language' in filters:
query_parts.append(f"language:{filters['language']}")
# 访问级别过滤器
if 'access_level' in filters:
query_parts.append(f"access_level:{filters['access_level']}")
# 日期范围过滤器
if 'date_from' in filters:
query_parts.append(f"created_at:>{filters['date_from']}")
if 'date_to' in filters:
query_parts.append(f"created_at:<{filters['date_to']}")
# 关键词过滤器
if 'keywords' in filters:
keywords = ' '.join(filters['keywords'])
query_parts.append(f"keywords:{keywords}")
return ' '.join(query_parts)
def _filter_by_metadata(self, results: list, filters: dict) -> list:
"""根据元数据过滤结果"""
filtered = []
for result in results:
metadata = result.get('metadata', {})
# 内容类型过滤器
if 'content_type' in filters:
if metadata.get('content_type') != filters['content_type']:
continue
# 语言过滤器
if 'language' in filters:
if metadata.get('language') != filters['language']:
continue
# 访问级别过滤器
if 'access_level' in filters:
if metadata.get('access_level') != filters['access_level']:
continue
# 日期范围过滤器
if 'date_from' in filters:
if metadata.get('created_at') < filters['date_from']:
continue
if 'date_to' in filters:
if metadata.get('created_at') > filters['date_to']:
continue
# 关键词过滤器
if 'keywords' in filters:
result_keywords = set(metadata.get('keywords', []))
filter_keywords = set(filters['keywords'])
if not result_keywords.intersection(filter_keywords):
continue
filtered.append(result)
return filtered
元数据增强搜索
# 元数据增强搜索
class MetadataAugmentedSearch:
def __init__(self, vector_db, metadata_store):
self.vector_db = vector_db
self.metadata_store = metadata_store
async def search_with_metadata_boost(self, query: str, metadata_filters: dict) -> list:
"""使用元数据增强进行搜索"""
# 标准向量搜索
vector_results = await self.vector_db.search(query)
# 获取结果的元数据
for result in vector_results:
metadata = await self.metadata_store.get(result['id'])
result['metadata'] = metadata
# 应用元数据过滤器
filtered_results = self._apply_metadata_filters(vector_results, metadata_filters)
# 使用元数据重新排名
reranked = self._rerank_with_metadata(filtered_results)
return reranked
def _apply_metadata_filters(self, results: list, filters: dict) -> list:
"""对结果应用元数据过滤器"""
filtered = []
for result in results:
metadata = result.get('metadata', {})
# 内容类型增强
if 'content_type' in filters:
if metadata.get('content_type') == filters['content_type']:
result['boost'] = result.get('boost', 1.0) + 0.5
else:
continue
# 语言增强
if 'language' in filters:
if metadata.get('language') == filters['language']:
result['boost'] = result.get('boost', 1.0) + 0.3
else:
continue
filtered.append(result)
return filtered
def _rerank_with_metadata(self, results: list) -> list:
"""使用元数据分数重新排名"""
for result in results:
base_score = result.get('score', 0.5)
metadata = result.get('metadata', {})
# 添加元数据增强
boost = metadata.get('boost', 0.0)
result['score'] = base_score + boost
# 按新分数排序
results.sort(key=lambda x: x['score'], reverse=True)
return results
元数据存储
存储策略
| 存储类型 |
用例 |
优势 |
| 文档存储 |
文档元数据 |
快速文档查找 |
| 分块存储 |
分块元数据 |
快速分块过滤 |
| 向量存储 |
向量 + 元数据 |
结合检索 |
| 混合存储 |
文档 + 分块 + 向量 |
完整的元数据管理 |
存储实现
# 元数据存储实现
class MetadataStore:
def __init__(self, storage_backend):
self.storage = storage_backend
async def store_chunk_metadata(self, metadata: dict) -> str:
"""存储分块元数据"""
# 在文档存储中以 chunk_id 为键存储
await self.storage.put(
key=f"chunk:{metadata['chunk_id']}",
value=metadata
)
return metadata['chunk_id']
async def store_document_metadata(self, metadata: dict) -> str:
"""存储文档元数据"""
# 在文档存储中以 document_id 为键存储
await self.storage.put(
key=f"document:{metadata['document_id']}",
value=metadata
)
return metadata['document_id']
async def get_chunk_metadata(self, chunk_id: str) -> dict:
"""获取分块元数据"""
metadata = await self.storage.get(f"chunk:{chunk_id}")
return metadata
async def get_document_metadata(self, document_id: str) -> dict:
"""获取文档元数据"""
metadata = await self.storage.get(f"document:{document_id}")
return metadata
async def update_metadata(self, id: str, updates: dict) -> dict:
"""更新元数据"""
current = await self.storage.get(id)
updated = {**current, **updates}
await self.storage.put(id, updated)
return updated
分块元数据检查清单
预处理
## 预处理检查清单
### 文档分析
- [ ] 分析文档结构
- [ ] 确定内容类型
- [ ] 检测语言
- [ ] 提取实体
- [ ] 提取关键词
- [ ] 映射部分层次结构
### 元数据模式
- [ ] 设计模式
- [ ] 定义必填字段
- [ ] 定义可选字段
- [ ] 定义验证规则
- [ ] 创建 JSON 模式
分块过程
## 分块过程检查清单
### 分块创建
- [ ] 确定分块边界
- [ ] 提取每个分块的元数据
- [ ] 添加交叉引用
- [ ] 计算质量分数
- [ ] 验证标记计数
- [ ] 生成嵌入向量
### 元数据存储
- [ ] 存储分块元数据
- [ ] 存储文档元数据
- [ ] 创建索引
- [ ] 优化存储
- [ ] 配置备份
质量控制
## 质量控制检查清单
### 验证
- [ ] 通过模式验证
- [ ] 必填字段存在
- [ ] 数据类型正确
- [ ] 格式验证通过
- [ ] 质量分数在范围内
- [ ] 通过重复项检测
### 监控
- [ ] 跟踪元数据完整性
- [ ] 计算质量指标
- [ ] 监控存储性能
- [ ] 测量查询性能
快速参考
元数据操作
# 元数据操作
from typing import Dict, List
class MetadataOperations:
def __init__(self, metadata_store):
self.store = metadata_store
async def create_document(self, file_path: str, title: str) -> str:
"""用元数据创建文档"""
# 创建文档元数据
doc_metadata = {
'document_id': self._generate_id(file_path),
'title': title,
'created_at': datetime.utcnow().isoformat(),
'source_type': self._infer_source_type(file_path),
'file_path': file_path,
'file_size': self._get_file_size(file_path),
'status': 'processing',
'chunk_count': 0,
'language': 'en'
}
# 存储文档元数据
await self.store.store_document_metadata(doc_metadata)
return doc_metadata['document_id']
async def add_chunk(self, document_id: str, chunk: str, chunk_index: int) -> str:
"""添加分块及元数据"""
# 创建分块元数据
chunk_metadata = {
'chunk_id': f"{document_id}_chunk_{chunk_index}",
'document_id': document_id,
'chunk_index': chunk_index,
'text': chunk,
'token_count': len(chunk.split()),
'created_at': datetime.utcnow().isoformat(),
'updated_at': datetime.utcnow().isoformat(),
'content_type': self._infer_content_type(chunk),
'language': 'en'
}
# 存储分块元数据
await self.store.store_chunk_metadata(chunk_metadata)
return chunk_metadata['chunk_id']
async def search_by_metadata(self, filters: Dict[str, str]) -> List[Dict]:
"""通过元数据搜索文档"""
# 构建元数据查询
query = self._build_metadata_query(filters)
# 搜索向量数据库
results = await self.store.search(query)
# 根据元数据过滤
filtered = self._filter_by_metadata(results, filters)
return filtered
async def get_document_info(self, document_id: str) -> Dict[str, str]:
"""获取完整文档信息"""
# 获取文档元数据
doc_metadata = await self.store.get_document_metadata(document_id)
# 获取所有分块
chunks = await self.store.get_chunks_by_document(document_id)
return {
'document': doc_metadata,
'chunks': chunks
}
元数据查询示例
# 元数据查询示例
# 按内容类型查询
filters = {
'content_type': 'introduction'
}
results = await metadata_ops.search_by_metadata(filters)
# 按语言查询
filters = {
'language': 'en'
}
results = await metadata_ops.search_by_metadata(filters)
# 按日期范围查询
filters = {
'date_from': '2024-01-01',
'date_to': '2024-01-31'
}
results = await metadata_ops.search_by_metadata(filters)
# 按关键词查询
filters = {
'keywords': ['machine learning', 'AI', 'data']
}
results = await metadata_ops.search_by_metadata(filters)
# 组合过滤器
filters = {
'content_type': 'methodology',
'language': 'en',
'date_from': '2024-01-01'
}
results = await metadata_ops.search_by_metadata(filters)
元数据验证
# 元数据验证
class MetadataValidator:
def __init__(self, schema: dict):
self.schema = schema
def validate_chunk_metadata(self, metadata: dict) -> bool:
"""根据模式验证分块元数据"""
# 检查必填字段
required_fields = ['chunk_id', 'document_id', 'text']
for field in required_fields:
if field not in metadata:
return False
# 检查数据类型
if not isinstance(metadata['chunk_id'], str):
return False
if not isinstance(metadata['document_id'], str):
return False
if not isinstance(metadata['text'], str):
return False
# 检查枚举值
if 'content_type' in metadata:
valid_types = ['introduction', 'methodology', 'results', 'conclusion', 'appendix', 'references', 'body']
if metadata['content_type'] not in valid_types:
return False
if 'access_level' in metadata:
valid_levels = ['public', 'internal', 'restricted']
if metadata['access_level'] not in valid_levels:
return False
# 检查范围
if 'quality_score' in metadata:
if not 0 <= metadata['quality_score'] <= 1:
return False
return True
def validate_document_metadata(self, metadata: dict) -> bool:
"""根据模式验证文档元数据"""
# 检查必填字段
required_fields = ['document_id', 'title', 'created_at']
for field in required_fields:
if field not in metadata:
return False
# 检查数据类型
if not isinstance(metadata['document_id'], str):
return False
if not isinstance(metadata['title'], str):
return False
# 检查枚举值
if 'status' in metadata:
valid_statuses = ['draft', 'processing', 'indexed', 'published', 'archived']
if metadata['status'] not in valid_statuses:
return False
# 检查范围
if 'chunk_count' in metadata:
if not isinstance(metadata['chunk_count'], int) or metadata['chunk_count'] < 0:
return False
return True
常见陷阱
- 缺少元数据 - 始终提取并存储每个分块的元数据
- 不一致的模式 - 在文档间使用一致的元数据模式
- 无验证 - 存储前验证元数据
- 质量分数低 - 使用客观的质量指标
- 无交叉引用 - 链接相关分块以获得更好的上下文
- 忽略语言 - 使用语言检测以获得更好的检索
- 无访问控制 - 实施访问级别以确保安全
- 不更新元数据 - 保持元数据最新
额外资源