name: building-rag-systems description: | 构建生产级RAG系统,包含语义分块、增量索引和过滤检索。 适用于实现文档摄取流水线、使用Qdrant进行向量搜索或上下文感知检索。 涵盖分块策略、变更检测、负载索引和上下文扩展。 不适用于没有生产要求的简单相似性搜索。
构建RAG系统
生产级RAG系统,具备语义分块、增量更新和过滤检索功能。
快速开始
# 依赖项
pip install qdrant-client openai pydantic python-frontmatter
# 核心组件
# 1. 爬虫 → 发现文件,提取路径元数据
# 2. 解析器 → 提取前置元数据,计算文件哈希值
# 3. 分块器 → 基于##标题进行语义分割,400个标记,15%重叠
# 4. 嵌入器 → 批处理OpenAI嵌入
# 5. 上传器 → 使用索引负载将数据上传至Qdrant
摄取流水线
架构
┌──────────┐ ┌────────┐ ┌─────────┐ ┌──────────┐ ┌──────────┐
│ 爬虫 │ -> │ 解析器 │ -> │ 分块器 │ -> │ 嵌入器 │ -> │ 上传器 │
└──────────┘ └────────┘ └─────────┘ └──────────┘ └──────────┘
│ │ │ │ │
发现文件 提取前置元数据 按语义边界分割 生成向量 上传至Qdrant
+ 文件哈希值 (批处理) (批处理)
语义分块(非固定大小)
class SemanticChunker:
"""
生产级分块:
- 基于##标题分割(语义边界)
- 目标400个标记(NVIDIA基准测试最优)
- 15%重叠以确保上下文连续性
- 跟踪前/后块以进行上下文扩展
"""
SECTION_PATTERN = re.compile(r"(?=^## )", re.MULTILINE)
TOKENS_PER_WORD = 1.3
def __init__(
self,
target_tokens: int = 400,
max_tokens: int = 512,
overlap_percent: float = 0.15,
):
self.target_words = int(target_tokens / self.TOKENS_PER_WORD)
self.overlap_words = int(self.target_words * overlap_percent)
def chunk(self, content: str, file_hash: str) -> list[Chunk]:
sections = self.SECTION_PATTERN.split(content)
chunks = []
for idx, section in enumerate(sections):
content_hash = hashlib.sha256(section.encode()).hexdigest()[:16]
chunk_id = f"{file_hash[:8]}_{content_hash}_{idx}"
chunks.append(Chunk(
id=chunk_id,
text=section,
chunk_index=idx,
total_chunks=len(sections),
prev_chunk_id=chunks[-1].id if chunks else None,
content_hash=content_hash,
source_file_hash=file_hash,
))
# 为前一个块设置next_chunk_id
if len(chunks) > 1:
chunks[-2].next_chunk_id = chunk_id
return chunks
变更检测(增量更新)
def compute_file_hash(file_path: str) -> str:
"""SHA-256用于变更检测。"""
with open(file_path, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
class QdrantStateTracker:
"""直接查询Qdrant负载 - 无需外部状态数据库。"""
def get_indexed_files(self, book_id: str) -> dict[str, str]:
"""从Qdrant返回{文件路径: 文件哈希值}。"""
indexed = {}
offset = None
while True:
points, next_offset = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(must=[
FieldCondition(key="book_id", match=MatchValue(value=book_id))
]),
limit=100,
offset=offset,
with_payload=["source_file", "source_file_hash"],
with_vectors=False,
)
for point in points:
indexed[point.payload["source_file"]] = point.payload["source_file_hash"]
if next_offset is None:
break
offset = next_offset
return indexed
def detect_changes(self, current: dict[str, str], indexed: dict[str, str]):
"""比较文件系统与索引。"""
new = [p for p in current if p not in indexed]
deleted = [p for p in indexed if p not in current]
modified = [p for p in current if p in indexed and current[p] != indexed[p]]
return new, modified, deleted
批处理嵌入
class OpenAIEmbedder:
def __init__(self, model: str = "text-embedding-3-small", batch_size: int = 20):
self.client = OpenAI()
self.model = model
self.batch_size = batch_size # OpenAI推荐
def embed_chunks(self, chunks: list[Chunk]) -> list[EmbeddedChunk]:
embedded = []
for i in range(0, len(chunks), self.batch_size):
batch = chunks[i:i + self.batch_size]
response = self.client.embeddings.create(
input=[c.text for c in batch],
model=self.model,
)
for chunk, data in zip(batch, response.data):
embedded.append(EmbeddedChunk(**chunk.dict(), embedding=data.embedding))
return embedded
带有负载索引的Qdrant集合
def create_collection(self, recreate: bool = False):
"""创建集合,为过滤检索配置适当的索引。"""
self.client.create_collection(
collection_name=self.collection,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
)
# 索引所有用于过滤的字段
indexes = [
("book_id", PayloadSchemaType.KEYWORD), # 租户隔离
("module", PayloadSchemaType.KEYWORD), # 内容过滤
("chapter", PayloadSchemaType.INTEGER), # 范围过滤
("hardware_tier", PayloadSchemaType.INTEGER),# 个性化
("proficiency_level", PayloadSchemaType.KEYWORD),
("parent_doc_id", PayloadSchemaType.KEYWORD),# 上下文扩展
("source_file_hash", PayloadSchemaType.KEYWORD), # 变更检测
]
for field, schema in indexes:
self.client.create_payload_index(
collection_name=self.collection,
field_name=field,
field_schema=schema,
)
检索模式
综合过滤器构建器
def build_filter(self, query: SearchQuery) -> Filter:
"""构建包含所有条件的Qdrant过滤器(AND逻辑)。"""
conditions = []
# 必需:租户隔离
conditions.append(FieldCondition(
key="book_id", match=MatchValue(value=query.book_id)
))
# 必需:硬件层级(lte = "X级或更低")
conditions.append(FieldCondition(
key="hardware_tier", range=Range(lte=query.hardware_tier)
))
# 可选:模块精确匹配
if query.module:
conditions.append(FieldCondition(
key="module", match=MatchValue(value=query.module)
))
# 可选:章节范围
if query.chapter_min or query.chapter_max:
chapter_range = Range()
if query.chapter_min:
chapter_range.gte = query.chapter_min
if query.chapter_max:
chapter_range.lte = query.chapter_max
conditions.append(FieldCondition(key="chapter", range=chapter_range))
# 可选:熟练度OR逻辑
if query.proficiency_levels:
conditions.append(FieldCondition(
key="proficiency_level",
match=MatchAny(any=query.proficiency_levels),
))
return Filter(must=conditions)
上下文扩展(遍历块链)
def expand_context(self, chunk_id: str, prev: int = 1, next: int = 1) -> list[Chunk]:
"""遍历prev_chunk_id/next_chunk_id链以获取周围上下文。"""
current = self.get_chunk_by_id(chunk_id)
if not current:
return []
# 向后遍历
prev_chunks = []
prev_id = current.prev_chunk_id
for _ in range(prev):
if not prev_id:
break
chunk = self.get_chunk_by_id(prev_id)
if not chunk:
break
prev_chunks.insert(0, chunk)
prev_id = chunk.prev_chunk_id
# 向前遍历
next_chunks = []
next_id = current.next_chunk_id
for _ in range(next):
if not next_id:
break
chunk = self.get_chunk_by_id(next_id)
if not chunk:
break
next_chunks.append(chunk)
next_id = chunk.next_chunk_id
return prev_chunks + [current] + next_chunks
完整文档检索
def get_document_chunks(self, parent_doc_id: str) -> list[Chunk]:
"""获取文档的所有块,按chunk_index排序。"""
points, _ = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(must=[
FieldCondition(key="parent_doc_id", match=MatchValue(value=parent_doc_id))
]),
limit=100,
with_payload=True,
with_vectors=False,
)
chunks = [self._to_chunk(p) for p in points]
chunks.sort(key=lambda c: c.chunk_index)
return chunks
负载模式
class ChunkPayload(BaseModel):
"""用于过滤检索和上下文扩展的完整负载。"""
# 租户隔离
book_id: str
# 内容过滤器(全部索引)
module: str
chapter: int
lesson: int
hardware_tier: int
proficiency_level: str
# 显示内容
text: str
section_title: Optional[str]
source_file: str
# 上下文扩展
parent_doc_id: str
chunk_index: int
total_chunks: int
prev_chunk_id: Optional[str]
next_chunk_id: Optional[str]
# 变更检测
content_hash: str
source_file_hash: str
反模式
| 不要做 | 应该做 |
|---|---|
| 固定字符分块 | 语义边界(##标题) |
| 基于位置的分块ID | 内容哈希以获取稳定ID |
| 块之间无重叠 | 10-20%重叠以确保连续性 |
| 每次变更都完全重新索引 | 使用文件哈希检测进行增量更新 |
| 缺少负载索引 | 索引所有用于过滤的字段 |
| 同步嵌入 | 使用后台作业进行批处理 |
| 外部状态数据库 | Qdrant原生状态跟踪 |
验证
运行:python scripts/verify.py
相关技能
scaffolding-fastapi-dapr- 搜索端点的API模式streaming-llm-responses- 流式RAG响应
参考资料
- references/ingestion-patterns.md - 完整摄取流水线
- references/retrieval-patterns.md - 过滤策略、上下文扩展