RAG架构模式 rag-architecture-patterns

RAG架构模式是一种系统设计技能,用于构建高效的RAG系统,涉及数据摄取、分块策略、嵌入模型、向量数据库、检索优化和生成编排等关键环节。

RAG应用 0 次安装 0 次浏览 更新于 3/5/2026

RAG 架构模式

概览

RAG (Retrieval-Augmented Generation) 架构模式涵盖了构建高效、可扩展和准确的RAG系统的系统设计。这项技能涵盖了数据摄取、分块策略、嵌入模型、向量数据库、检索优化和生成编排。

何时使用这项技能: 当为生产应用设计或实现RAG系统时。

目录

  1. RAG系统架构
  2. 数据摄取管道
  3. 分块策略
  4. 嵌入模型
  5. 向量数据库选择
  6. 检索优化
  7. 生成编排
  8. RAG清单
  9. 快速参考

RAG系统架构

核心组件

graph TD
    A[数据源] --> B[摄取管道]
    B --> C[分块策略]
    C --> D[嵌入模型]
    D --> E[向量数据库]
    E --> F[检索服务]
    F --> G[生成服务]
    G --> H[响应服务]
    
    I[用户查询] --> F
    F --> J[检索结果]
    J --> G
    G --> H[生成响应]
    H --> K[最终输出]

架构模式

模式 描述 用例
简单RAG 没有优化的简单检索 快速原型,小数据集
高级RAG 重排、融合、混合搜索 生产系统,高准确度
代理RAG 代理用于检索和推理 复杂查询,多步骤任务
图RAG 基于知识图谱的检索 结构化数据,关系查询
模块化RAG 分离检索和生成服务 可扩展系统,独立扩展

系统组件

# RAG系统组件
components:
  # 数据层
  data_ingestion:
      - document_loaders
      - text_extractors
      - data_cleaners
  
  # 处理层
  chunking:
      - semantic_chunker
      - fixed_size_chunker
      - recursive_chunker
  embedding:
      - text_embedding_model
      - embedding_cache
      - batch_processor
  
  # 存储层
  vector_database:
      - vector_store
      - metadata_store
      - index_manager
  
  # 检索层
  retrieval:
      - similarity_search
      - hybrid_search
      - reranker
      - query_expander
  
  # 生成层
  generation:
      - prompt_builder
      - llm_client
      - response_formatter
      - citation_generator

数据摄取管道

摄取工作流程

graph LR
    A[原始数据] --> B[提取]
    B --> C[清洗]
    C --> D[标准化]
    D --> E[验证]
    E --> F[分块]
    F --> G[嵌入]
    G --> H[向量存储]

文档处理

# 文档摄取管道
class DocumentIngestor:
    def __init__(self, loader, chunker, embedder):
        self.loader = loader
        self.chunker = chunker
        self.embedder = embedder
    
    async def ingest(self, documents):
        """将文档摄取到RAG系统"""
        results = []
        for doc in documents:
            # 提取文本
            text = await self.loader.extract(doc)
            
            # 清洗和标准化
            text = self.cleaner.normalize(text)
            
            # 分块文档
            chunks = await self.chunker.chunk(text)
            
            # 生成嵌入
            embeddings = await self.embedder.embed_batch(chunks)
            
            # 存储到向量数据库
            await self.vector_store.store(chunks, embeddings)
            
            results.append({
                'doc_id': doc.id,
                'chunk_count': len(chunks)
                'status': 'success'
            })
        
        return results

数据质量检查

## 数据质量清单

### 预摄取
- [ ] 文档格式验证
- [ ] 编码检测和处理
- [ ] 文件大小限制
- [ ] 内容类型识别
- [ ] 语言检测

### 摄取中
- [ ] 文本提取完成
- [ ] 特殊字符处理
- [ ] 空白标准化
- [ ] 重复项删除
- [ ] 元数据提取

### 后摄取
- [ ] 块成功存储
- [ ] 嵌入生成
- [ ] 向量索引更新
- [ ] 错误日志审核
- [ ] 质量度量计算

分块策略

分块方法

策略 描述 优点 缺点
固定大小 固定字符/标记计数 简单,可预测 可能破坏上下文
语义 按段落/句子分割 保留意义 变化的块大小
递归 层次分割 保持结构 实现复杂
滑动窗口 重叠块 保持上下文 更多存储
混合 结合多种方法 两者兼得 更复杂

块大小指南

# 块大小优化
CHUNK_SIZES = {
    'small': {
        'max_chars': 500,
        'max_tokens': 150,
        'overlap': 50,
        'use_case': '快速查询,低延迟'
    },
    'medium': {
        'max_chars': 1000,
        'max_tokens': 300,
        'overlap': 100,
        'use_case': '平衡检索和上下文'
    },
    'large': {
        'max_chars': 2000,
        'max_tokens': 500,
        'overlap': 200,
        'use_case': '复杂查询,完整上下文'
    }
}

def get_chunk_size_for_model(model_name: str) -> dict:
    """为模型获取最优块大小"""
    model_sizes = {
        'gpt-3.5-turbo': CHUNK_SIZES['small'],
        'gpt-4': CHUNK_SIZES['medium'],
        'claude-3': CHUNK_SIZES['large']
    }
    return model_sizes.get(model_name, CHUNK_SIZES['medium'])

语义分块

# 语义分块与NLP
import nltk
from typing import List

class SemanticChunker:
    def __init__(self, max_chunk_size: int = 1000):
        self.max_chunk_size = max_chunk_size
        self.sent_detector = nltk.load('tokenizers/punkt/english.pickle')
    
    def chunk(self, text: str) -> List[str]:
        """将文本分割为语义块"""
        sentences = self.sent_detector.tokenize(text)
        chunks = []
        current_chunk = ""
        
        for sentence in sentences:
            if len(current_chunk) + len(sentence) < self.max_chunk_size:
                current_chunk += " " + sentence
            else:
                chunks.append(current_chunk.strip())
                current_chunk = sentence
        
        if current_chunk:
            chunks.append(current_chunk.strip())
        
        return chunks

嵌入模型

模型选择标准

因素 考虑 选项
维度 384, 768, 1024, 1536 更高 = 更多信息,更多存储
性能 推理速度,批量大小 更快 = 低延迟
成本 每令牌API定价 平衡质量与成本
语言支持 多语言模型 匹配内容语言
领域 通用与专业 领域特定可获得更好结果

流行嵌入模型

模型 维度 成本 最适合
OpenAI text-embedding-3-small 1536 $$ 通用文本
OpenAI text-embedding-3-large 3072 $$$ 复杂文档
Cohere embed-v3 1024 $ 多语言
HuggingFace all-MiniLM-L6-v2 384 免费 通用目的
E5-large-v2 1024 免费 英文文本

嵌入最佳实践

# 嵌入优化
import asyncio
from typing import List

class EmbeddingService:
    def __init__(self, model, batch_size: int = 100):
        self.model = model
        self.batch_size = batch_size
        self.cache = {}
    
    async def embed_batch(self, texts: List[str]) -> List[List[float]]:
        """批量嵌入与缓存"""
        # 首先检查缓存
        uncached_texts = [t for t in texts if t not in self.cache]
        
        if uncached_texts:
            # 批量嵌入未缓存文本
            embeddings = await self.model.embed(uncached_texts)
            
            # 缓存结果
            for text, emb in zip(uncached_texts, embeddings):
                self.cache[text] = emb
        
        # 按顺序返回所有嵌入
        return [self.cache.get(t) for t in texts]
    
    async def embed_with_retry(self, text: str, max_retries: int = 3) -> List[float]:
        """嵌入重试逻辑"""
        for attempt in range(max_retries):
            try:
                return await self.model.embed(text)
            except Exception as e:
                if attempt == max_retries - 1:
                    raise e
                await asyncio.sleep(2 ** attempt)  # 指数退避

向量数据库选择

数据库比较

数据库 最适合 扩展 成本 特性
Pinecone 生产应用 托管 $ 无服务器,易设置
Qdrant 大数据集 自托管 $ 高级过滤
Weaviate 企业 自托管 $$ 模块化,混合搜索
Chroma 小应用 自托管 免费 开源,易本地
pgvector PostgreSQL用户 自托管 $ SQL基础,熟悉
Milvus 生产 托管 $ 高级搜索,过滤

向量数据库配置

# 向量数据库配置
VECTOR_DB_CONFIG = {
    'pinecone': {
        'index_type': 'hnsw',  # 层次可导航小世界
        'metric': 'cosine',
        'dimension': 1536,
        'pods': 1,  # 副本数量
        'replicas': 2,  # 每个pod的副本数
        'environment': 'us-west-2'
    },
    'qdrant': {
        'distance': 'cosine',
        'vector_params': {
            'm': 16,
            'ef_construction': None
        },
        'hnsw_config': {
            'ef': 128,
            'm': 16
        }
    },
    'weaviate': {
        'vectorizer': 'text2vec-openai',
        'module_config': {
            'min_distance': 0.25,
            'vector_cache_max_objects': 1000000000
        }
    }
}

检索优化

检索策略

策略 描述 实施
密集检索 向量相似性搜索 快速,语义
稀疏检索 关键词/BM25搜索 精确匹配,快速
混合检索 结合密集+稀疏 两者兼得
重排 重新评分检索结果 更高准确度
查询扩展 用同义词扩展 更好召回
过滤 预过滤元数据 更快,更相关

混合搜索实现

# 混合搜索实现
class HybridRetriever:
    def __init__(self, vector_db, keyword_index):
        self.vector_db = vector_db
        self.keyword_index = keyword_index
    
    async def retrieve(self, query: str, top_k: int = 10) -> List[dict]:
        """结合向量和关键词搜索的混合检索"""
        # 向量搜索
        vector_results = await self.vector_db.search(
            query_vector=await self.embed(query),
            top_k=top_k * 2  # 获取更多用于重排
        )
        
        # 关键词搜索
        keyword_results = await self.keyword_index.search(
            query=query,
            top_k=top_k * 2
        )
        
        # 合并和去重
        combined = self._combine_results(vector_results, keyword_results)
        
        # 重排合并结果
        reranked = self._rerank(query, combined)
        
        return reranked[:top_k]
    
    def _combine_results(self, vector_results, keyword_results):
        """合并和去重结果"""
        seen = set()
        combined = []
        
        for result in vector_results + keyword_results:
            if result['id'] not in seen:
                combined.append(result)
                seen.add(result['id'])
        
        return combined
    
    def _rerank(self, query: str, results: List[dict]) -> List[dict]:
        """基于查询相关性重排结果"""
        scored = []
        for result in results:
            # 计算相关性得分
            score = self._calculate_relevance(query, result)
            scored.append({**result, 'score': score})
        
        # 按得分排序
        scored.sort(key=lambda x: x['score'], reverse=True)
        return scored

生成编排

提示构建

# RAG提示构建
class PromptBuilder:
    def __init__(self, template: str = None):
        self.template = template or self._default_template()
    
    def _default_template(self) -> str:
        """默认RAG提示模板"""
        return """根据以下上下文回答问题:

{context}

问题:{question}

指令:
- 仅使用提供的上下文回答问题
- 如果答案不在上下文中,请说“我不知道”
- 不要使用外部知识
- 简洁准确
- 如可能,请引用来源

答案:"""
    
    def build_prompt(self, query: str, context_chunks: List[str]) -> str:
        """用上下文构建RAG提示"""
        # 格式化上下文块
        formatted_context = self._format_context(context_chunks)
        
        # 构建提示
        prompt = self.template.format(
            context=formatted_context,
            question=query
        )
        
        return prompt
    
    def _format_context(self, chunks: List[str]) -> str:
        """为提示格式化上下文块"""
        context_parts = []
        for i, chunk in enumerate(chunks, 1):
            context_parts.append(f"[{i}] {chunk['text']}")
        
        return "

".join(context_parts)

响应生成

# RAG响应生成
class RAGGenerator:
    def __init__(self, llm_client, retriever):
        self.llm = llm_client
        self.retriever = retriever
    
    async def generate(self, query: str) -> str:
        """使用RAG生成响应"""
        # 检索相关上下文
        context = await self.retriever.retrieve(query, top_k=5)
        
        # 构建提示
        prompt = self._build_prompt(query, context)
        
        # 生成响应
        response = await self.llm.generate(prompt)
        
        return response
    
    async def generate_with_citations(self, query: str) -> dict:
        """生成带源引用的响应"""
        context = await self.retriever.retrieve(query, top_k=5)
        
        # 构建带引用指令的提示
        prompt = self._build_citation_prompt(query, context)
        
        response = await self.llm.generate(prompt)
        
        return {
            'response': response,
            'sources': [c['source'] for c in context]
        }

RAG清单

系统设计

## RAG系统设计清单

### 架构
- [ ] 确定数据源
- [ ] 设计摄取管道
- [ ] 选择分块策略
- [ ] 选择嵌入模型
- [ ] 选择向量数据库
- [ ] 定义检索策略
- [ ] 确定生成方法

### 可扩展性
- [ ] 计划水平扩展
- [ ] 实施缓存策略
- [ ] 配置负载均衡
- [ ] 考虑数据库分片
- [ ] CDN用于静态资产

### 性能
- [ ] 定义延迟目标(< 500ms)
- [ ] 定义吞吐量目标
- [ ] 缓存嵌入
- [ ] 实施批量处理
- [ ] 计划查询优化

### 可靠性
- [ ] 定义错误处理
- [ ] 实施重试逻辑
- [ ] 计划回退机制
- [ ] 配置监控
- [ ] 设置警报阈值

实施

## RAG实施清单

### 数据管道
- [ ] 实施文档加载器
- [ ] 测试文本提取
- [ ] 验证数据清洗
- [ ] 测试分块逻辑
- [ ] 集成嵌入服务
- [ ] 连接向量存储

### 检索
- [ ] 配置相似性搜索
- [ ] 实施混合搜索
- [ ] 添加重排逻辑
- [ ] 考虑查询扩展
- [ ] 按元数据过滤

### 生成
- [ ] 创建提示模板
- [ ] 测试LLM集成
- [ ] 定义响应格式化
- [ ] 添加引用生成
- [ ] 考虑流式响应

快速参考

RAG管道命令

# 初始化RAG系统
from rag_system import RAGSystem

# 使用默认组件创建RAG系统
rag = RAGSystem(
    embedding_model='openai/text-embedding-3-large',
    vector_db='pinecone',
    retriever='hybrid',
    generator='claude-3'
)

# 摄取文档
await rag.ingest_documents([
    'document1.pdf',
    'document2.txt'
])

# 查询系统
response = await rag.query("什么是退货政策?")

# 获取检索详情
retrieval = await rag.get_retrieval_details(query_id)

性能指标

指标 目标 如何测量
端到端延迟 < 500ms 查询到响应时间
检索准确度 > 85% 相关块在顶部K
生成质量 人工评级 响应准确度得分
吞吐量 > 100 QPS 每秒查询数
缓存命中率 > 80% 嵌入缓存命中

常见问题

问题 解决方案
检索不佳 改进分块,使用混合搜索
嵌入慢 批量处理,缓存
高延迟 优化向量DB,使用CDN
响应不准确 更好的提示,重排
上下文窗口问题 调整块大小,重叠

常见陷阱

  1. 分块不佳 - 使用语义分块,而不是固定大小
  2. 嵌入模型错误 - 匹配模型到用例和语言
  3. 没有重排 - 向量相似性并不总是最佳
  4. 忽略元数据 - 使用所有可用的元数据进行过滤
  5. 没有缓存 - 嵌入昂贵,积极缓存
  6. 单一检索策略 - 结合多种方法
  7. 提示不佳 - 用清晰的指令构建提示
  8. 没有监控 - 你无法改进你不了解的

额外资源