构建RAG系统Skill building-rag-systems

本技能详细介绍了如何构建生产级的检索增强生成(RAG)系统,涵盖从文档摄取到智能检索的全流程。核心内容包括:语义分块策略(基于标题分割,避免固定大小)、增量索引与变更检测(通过文件哈希实现高效更新)、批处理向量嵌入(使用OpenAI API)、以及基于Qdrant向量数据库的过滤检索和上下文扩展。适用于需要构建企业级知识库、智能问答系统或文档智能检索应用的开发者,关键词包括:RAG系统、语义分块、增量索引、向量检索、Qdrant、OpenAI嵌入、生产级部署。

RAG应用 0 次安装 11 次浏览 更新于 3/2/2026

name: building-rag-systems description: | 构建生产级RAG系统,包含语义分块、增量索引和过滤检索。 适用于实现文档摄取流水线、使用Qdrant进行向量搜索或上下文感知检索。 涵盖分块策略、变更检测、负载索引和上下文扩展。 不适用于没有生产要求的简单相似性搜索。

构建RAG系统

生产级RAG系统,具备语义分块、增量更新和过滤检索功能。

快速开始

# 依赖项
pip install qdrant-client openai pydantic python-frontmatter

# 核心组件
# 1. 爬虫 → 发现文件,提取路径元数据
# 2. 解析器 → 提取前置元数据,计算文件哈希值
# 3. 分块器 → 基于##标题进行语义分割,400个标记,15%重叠
# 4. 嵌入器 → 批处理OpenAI嵌入
# 5. 上传器 → 使用索引负载将数据上传至Qdrant

摄取流水线

架构

┌──────────┐    ┌────────┐    ┌─────────┐    ┌──────────┐    ┌──────────┐
│  爬虫    │ -> │ 解析器 │ -> │ 分块器 │ -> │  嵌入器  │ -> │  上传器  │
└──────────┘    └────────┘    └─────────┘    └──────────┘    └──────────┘
     │              │              │              │              │
发现文件      提取前置元数据   按语义边界分割   生成向量       上传至Qdrant
               + 文件哈希值                      (批处理)       (批处理)

语义分块(非固定大小)

class SemanticChunker:
    """
    生产级分块:
    - 基于##标题分割(语义边界)
    - 目标400个标记(NVIDIA基准测试最优)
    - 15%重叠以确保上下文连续性
    - 跟踪前/后块以进行上下文扩展
    """
    SECTION_PATTERN = re.compile(r"(?=^## )", re.MULTILINE)
    TOKENS_PER_WORD = 1.3

    def __init__(
        self,
        target_tokens: int = 400,
        max_tokens: int = 512,
        overlap_percent: float = 0.15,
    ):
        self.target_words = int(target_tokens / self.TOKENS_PER_WORD)
        self.overlap_words = int(self.target_words * overlap_percent)

    def chunk(self, content: str, file_hash: str) -> list[Chunk]:
        sections = self.SECTION_PATTERN.split(content)
        chunks = []

        for idx, section in enumerate(sections):
            content_hash = hashlib.sha256(section.encode()).hexdigest()[:16]
            chunk_id = f"{file_hash[:8]}_{content_hash}_{idx}"

            chunks.append(Chunk(
                id=chunk_id,
                text=section,
                chunk_index=idx,
                total_chunks=len(sections),
                prev_chunk_id=chunks[-1].id if chunks else None,
                content_hash=content_hash,
                source_file_hash=file_hash,
            ))

            # 为前一个块设置next_chunk_id
            if len(chunks) > 1:
                chunks[-2].next_chunk_id = chunk_id

        return chunks

变更检测(增量更新)

def compute_file_hash(file_path: str) -> str:
    """SHA-256用于变更检测。"""
    with open(file_path, 'rb') as f:
        return hashlib.sha256(f.read()).hexdigest()

class QdrantStateTracker:
    """直接查询Qdrant负载 - 无需外部状态数据库。"""

    def get_indexed_files(self, book_id: str) -> dict[str, str]:
        """从Qdrant返回{文件路径: 文件哈希值}。"""
        indexed = {}
        offset = None

        while True:
            points, next_offset = self.client.scroll(
                collection_name=self.collection,
                scroll_filter=Filter(must=[
                    FieldCondition(key="book_id", match=MatchValue(value=book_id))
                ]),
                limit=100,
                offset=offset,
                with_payload=["source_file", "source_file_hash"],
                with_vectors=False,
            )

            for point in points:
                indexed[point.payload["source_file"]] = point.payload["source_file_hash"]

            if next_offset is None:
                break
            offset = next_offset

        return indexed

    def detect_changes(self, current: dict[str, str], indexed: dict[str, str]):
        """比较文件系统与索引。"""
        new = [p for p in current if p not in indexed]
        deleted = [p for p in indexed if p not in current]
        modified = [p for p in current if p in indexed and current[p] != indexed[p]]
        return new, modified, deleted

批处理嵌入

class OpenAIEmbedder:
    def __init__(self, model: str = "text-embedding-3-small", batch_size: int = 20):
        self.client = OpenAI()
        self.model = model
        self.batch_size = batch_size  # OpenAI推荐

    def embed_chunks(self, chunks: list[Chunk]) -> list[EmbeddedChunk]:
        embedded = []
        for i in range(0, len(chunks), self.batch_size):
            batch = chunks[i:i + self.batch_size]
            response = self.client.embeddings.create(
                input=[c.text for c in batch],
                model=self.model,
            )
            for chunk, data in zip(batch, response.data):
                embedded.append(EmbeddedChunk(**chunk.dict(), embedding=data.embedding))
        return embedded

带有负载索引的Qdrant集合

def create_collection(self, recreate: bool = False):
    """创建集合,为过滤检索配置适当的索引。"""
    self.client.create_collection(
        collection_name=self.collection,
        vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
    )

    # 索引所有用于过滤的字段
    indexes = [
        ("book_id", PayloadSchemaType.KEYWORD),      # 租户隔离
        ("module", PayloadSchemaType.KEYWORD),       # 内容过滤
        ("chapter", PayloadSchemaType.INTEGER),      # 范围过滤
        ("hardware_tier", PayloadSchemaType.INTEGER),# 个性化
        ("proficiency_level", PayloadSchemaType.KEYWORD),
        ("parent_doc_id", PayloadSchemaType.KEYWORD),# 上下文扩展
        ("source_file_hash", PayloadSchemaType.KEYWORD),  # 变更检测
    ]

    for field, schema in indexes:
        self.client.create_payload_index(
            collection_name=self.collection,
            field_name=field,
            field_schema=schema,
        )

检索模式

综合过滤器构建器

def build_filter(self, query: SearchQuery) -> Filter:
    """构建包含所有条件的Qdrant过滤器(AND逻辑)。"""
    conditions = []

    # 必需:租户隔离
    conditions.append(FieldCondition(
        key="book_id", match=MatchValue(value=query.book_id)
    ))

    # 必需:硬件层级(lte = "X级或更低")
    conditions.append(FieldCondition(
        key="hardware_tier", range=Range(lte=query.hardware_tier)
    ))

    # 可选:模块精确匹配
    if query.module:
        conditions.append(FieldCondition(
            key="module", match=MatchValue(value=query.module)
        ))

    # 可选:章节范围
    if query.chapter_min or query.chapter_max:
        chapter_range = Range()
        if query.chapter_min:
            chapter_range.gte = query.chapter_min
        if query.chapter_max:
            chapter_range.lte = query.chapter_max
        conditions.append(FieldCondition(key="chapter", range=chapter_range))

    # 可选:熟练度OR逻辑
    if query.proficiency_levels:
        conditions.append(FieldCondition(
            key="proficiency_level",
            match=MatchAny(any=query.proficiency_levels),
        ))

    return Filter(must=conditions)

上下文扩展(遍历块链)

def expand_context(self, chunk_id: str, prev: int = 1, next: int = 1) -> list[Chunk]:
    """遍历prev_chunk_id/next_chunk_id链以获取周围上下文。"""
    current = self.get_chunk_by_id(chunk_id)
    if not current:
        return []

    # 向后遍历
    prev_chunks = []
    prev_id = current.prev_chunk_id
    for _ in range(prev):
        if not prev_id:
            break
        chunk = self.get_chunk_by_id(prev_id)
        if not chunk:
            break
        prev_chunks.insert(0, chunk)
        prev_id = chunk.prev_chunk_id

    # 向前遍历
    next_chunks = []
    next_id = current.next_chunk_id
    for _ in range(next):
        if not next_id:
            break
        chunk = self.get_chunk_by_id(next_id)
        if not chunk:
            break
        next_chunks.append(chunk)
        next_id = chunk.next_chunk_id

    return prev_chunks + [current] + next_chunks

完整文档检索

def get_document_chunks(self, parent_doc_id: str) -> list[Chunk]:
    """获取文档的所有块,按chunk_index排序。"""
    points, _ = self.client.scroll(
        collection_name=self.collection,
        scroll_filter=Filter(must=[
            FieldCondition(key="parent_doc_id", match=MatchValue(value=parent_doc_id))
        ]),
        limit=100,
        with_payload=True,
        with_vectors=False,
    )

    chunks = [self._to_chunk(p) for p in points]
    chunks.sort(key=lambda c: c.chunk_index)
    return chunks

负载模式

class ChunkPayload(BaseModel):
    """用于过滤检索和上下文扩展的完整负载。"""

    # 租户隔离
    book_id: str

    # 内容过滤器(全部索引)
    module: str
    chapter: int
    lesson: int
    hardware_tier: int
    proficiency_level: str

    # 显示内容
    text: str
    section_title: Optional[str]
    source_file: str

    # 上下文扩展
    parent_doc_id: str
    chunk_index: int
    total_chunks: int
    prev_chunk_id: Optional[str]
    next_chunk_id: Optional[str]

    # 变更检测
    content_hash: str
    source_file_hash: str

反模式

不要做 应该做
固定字符分块 语义边界(##标题)
基于位置的分块ID 内容哈希以获取稳定ID
块之间无重叠 10-20%重叠以确保连续性
每次变更都完全重新索引 使用文件哈希检测进行增量更新
缺少负载索引 索引所有用于过滤的字段
同步嵌入 使用后台作业进行批处理
外部状态数据库 Qdrant原生状态跟踪

验证

运行:python scripts/verify.py

相关技能

  • scaffolding-fastapi-dapr - 搜索端点的API模式
  • streaming-llm-responses - 流式RAG响应

参考资料