RAG 架构模式
概览
RAG (Retrieval-Augmented Generation) 架构模式涵盖了构建高效、可扩展和准确的RAG系统的系统设计。这项技能涵盖了数据摄取、分块策略、嵌入模型、向量数据库、检索优化和生成编排。
何时使用这项技能: 当为生产应用设计或实现RAG系统时。
目录
- RAG系统架构
- 数据摄取管道
- 分块策略
- 嵌入模型
- 向量数据库选择
- 检索优化
- 生成编排
- RAG清单
- 快速参考
RAG系统架构
核心组件
graph TD
A[数据源] --> B[摄取管道]
B --> C[分块策略]
C --> D[嵌入模型]
D --> E[向量数据库]
E --> F[检索服务]
F --> G[生成服务]
G --> H[响应服务]
I[用户查询] --> F
F --> J[检索结果]
J --> G
G --> H[生成响应]
H --> K[最终输出]
架构模式
| 模式 |
描述 |
用例 |
| 简单RAG |
没有优化的简单检索 |
快速原型,小数据集 |
| 高级RAG |
重排、融合、混合搜索 |
生产系统,高准确度 |
| 代理RAG |
代理用于检索和推理 |
复杂查询,多步骤任务 |
| 图RAG |
基于知识图谱的检索 |
结构化数据,关系查询 |
| 模块化RAG |
分离检索和生成服务 |
可扩展系统,独立扩展 |
系统组件
# RAG系统组件
components:
# 数据层
data_ingestion:
- document_loaders
- text_extractors
- data_cleaners
# 处理层
chunking:
- semantic_chunker
- fixed_size_chunker
- recursive_chunker
embedding:
- text_embedding_model
- embedding_cache
- batch_processor
# 存储层
vector_database:
- vector_store
- metadata_store
- index_manager
# 检索层
retrieval:
- similarity_search
- hybrid_search
- reranker
- query_expander
# 生成层
generation:
- prompt_builder
- llm_client
- response_formatter
- citation_generator
数据摄取管道
摄取工作流程
graph LR
A[原始数据] --> B[提取]
B --> C[清洗]
C --> D[标准化]
D --> E[验证]
E --> F[分块]
F --> G[嵌入]
G --> H[向量存储]
文档处理
# 文档摄取管道
class DocumentIngestor:
def __init__(self, loader, chunker, embedder):
self.loader = loader
self.chunker = chunker
self.embedder = embedder
async def ingest(self, documents):
"""将文档摄取到RAG系统"""
results = []
for doc in documents:
# 提取文本
text = await self.loader.extract(doc)
# 清洗和标准化
text = self.cleaner.normalize(text)
# 分块文档
chunks = await self.chunker.chunk(text)
# 生成嵌入
embeddings = await self.embedder.embed_batch(chunks)
# 存储到向量数据库
await self.vector_store.store(chunks, embeddings)
results.append({
'doc_id': doc.id,
'chunk_count': len(chunks)
'status': 'success'
})
return results
数据质量检查
## 数据质量清单
### 预摄取
- [ ] 文档格式验证
- [ ] 编码检测和处理
- [ ] 文件大小限制
- [ ] 内容类型识别
- [ ] 语言检测
### 摄取中
- [ ] 文本提取完成
- [ ] 特殊字符处理
- [ ] 空白标准化
- [ ] 重复项删除
- [ ] 元数据提取
### 后摄取
- [ ] 块成功存储
- [ ] 嵌入生成
- [ ] 向量索引更新
- [ ] 错误日志审核
- [ ] 质量度量计算
分块策略
分块方法
| 策略 |
描述 |
优点 |
缺点 |
| 固定大小 |
固定字符/标记计数 |
简单,可预测 |
可能破坏上下文 |
| 语义 |
按段落/句子分割 |
保留意义 |
变化的块大小 |
| 递归 |
层次分割 |
保持结构 |
实现复杂 |
| 滑动窗口 |
重叠块 |
保持上下文 |
更多存储 |
| 混合 |
结合多种方法 |
两者兼得 |
更复杂 |
块大小指南
# 块大小优化
CHUNK_SIZES = {
'small': {
'max_chars': 500,
'max_tokens': 150,
'overlap': 50,
'use_case': '快速查询,低延迟'
},
'medium': {
'max_chars': 1000,
'max_tokens': 300,
'overlap': 100,
'use_case': '平衡检索和上下文'
},
'large': {
'max_chars': 2000,
'max_tokens': 500,
'overlap': 200,
'use_case': '复杂查询,完整上下文'
}
}
def get_chunk_size_for_model(model_name: str) -> dict:
"""为模型获取最优块大小"""
model_sizes = {
'gpt-3.5-turbo': CHUNK_SIZES['small'],
'gpt-4': CHUNK_SIZES['medium'],
'claude-3': CHUNK_SIZES['large']
}
return model_sizes.get(model_name, CHUNK_SIZES['medium'])
语义分块
# 语义分块与NLP
import nltk
from typing import List
class SemanticChunker:
def __init__(self, max_chunk_size: int = 1000):
self.max_chunk_size = max_chunk_size
self.sent_detector = nltk.load('tokenizers/punkt/english.pickle')
def chunk(self, text: str) -> List[str]:
"""将文本分割为语义块"""
sentences = self.sent_detector.tokenize(text)
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) < self.max_chunk_size:
current_chunk += " " + sentence
else:
chunks.append(current_chunk.strip())
current_chunk = sentence
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
嵌入模型
模型选择标准
| 因素 |
考虑 |
选项 |
| 维度 |
384, 768, 1024, 1536 |
更高 = 更多信息,更多存储 |
| 性能 |
推理速度,批量大小 |
更快 = 低延迟 |
| 成本 |
每令牌API定价 |
平衡质量与成本 |
| 语言支持 |
多语言模型 |
匹配内容语言 |
| 领域 |
通用与专业 |
领域特定可获得更好结果 |
流行嵌入模型
| 模型 |
维度 |
成本 |
最适合 |
| OpenAI text-embedding-3-small |
1536 |
$$ |
通用文本 |
| OpenAI text-embedding-3-large |
3072 |
$$$ |
复杂文档 |
| Cohere embed-v3 |
1024 |
$ |
多语言 |
| HuggingFace all-MiniLM-L6-v2 |
384 |
免费 |
通用目的 |
| E5-large-v2 |
1024 |
免费 |
英文文本 |
嵌入最佳实践
# 嵌入优化
import asyncio
from typing import List
class EmbeddingService:
def __init__(self, model, batch_size: int = 100):
self.model = model
self.batch_size = batch_size
self.cache = {}
async def embed_batch(self, texts: List[str]) -> List[List[float]]:
"""批量嵌入与缓存"""
# 首先检查缓存
uncached_texts = [t for t in texts if t not in self.cache]
if uncached_texts:
# 批量嵌入未缓存文本
embeddings = await self.model.embed(uncached_texts)
# 缓存结果
for text, emb in zip(uncached_texts, embeddings):
self.cache[text] = emb
# 按顺序返回所有嵌入
return [self.cache.get(t) for t in texts]
async def embed_with_retry(self, text: str, max_retries: int = 3) -> List[float]:
"""嵌入重试逻辑"""
for attempt in range(max_retries):
try:
return await self.model.embed(text)
except Exception as e:
if attempt == max_retries - 1:
raise e
await asyncio.sleep(2 ** attempt) # 指数退避
向量数据库选择
数据库比较
| 数据库 |
最适合 |
扩展 |
成本 |
特性 |
| Pinecone |
生产应用 |
托管 |
$ |
无服务器,易设置 |
| Qdrant |
大数据集 |
自托管 |
$ |
高级过滤 |
| Weaviate |
企业 |
自托管 |
$$ |
模块化,混合搜索 |
| Chroma |
小应用 |
自托管 |
免费 |
开源,易本地 |
| pgvector |
PostgreSQL用户 |
自托管 |
$ |
SQL基础,熟悉 |
| Milvus |
生产 |
托管 |
$ |
高级搜索,过滤 |
向量数据库配置
# 向量数据库配置
VECTOR_DB_CONFIG = {
'pinecone': {
'index_type': 'hnsw', # 层次可导航小世界
'metric': 'cosine',
'dimension': 1536,
'pods': 1, # 副本数量
'replicas': 2, # 每个pod的副本数
'environment': 'us-west-2'
},
'qdrant': {
'distance': 'cosine',
'vector_params': {
'm': 16,
'ef_construction': None
},
'hnsw_config': {
'ef': 128,
'm': 16
}
},
'weaviate': {
'vectorizer': 'text2vec-openai',
'module_config': {
'min_distance': 0.25,
'vector_cache_max_objects': 1000000000
}
}
}
检索优化
检索策略
| 策略 |
描述 |
实施 |
| 密集检索 |
向量相似性搜索 |
快速,语义 |
| 稀疏检索 |
关键词/BM25搜索 |
精确匹配,快速 |
| 混合检索 |
结合密集+稀疏 |
两者兼得 |
| 重排 |
重新评分检索结果 |
更高准确度 |
| 查询扩展 |
用同义词扩展 |
更好召回 |
| 过滤 |
预过滤元数据 |
更快,更相关 |
混合搜索实现
# 混合搜索实现
class HybridRetriever:
def __init__(self, vector_db, keyword_index):
self.vector_db = vector_db
self.keyword_index = keyword_index
async def retrieve(self, query: str, top_k: int = 10) -> List[dict]:
"""结合向量和关键词搜索的混合检索"""
# 向量搜索
vector_results = await self.vector_db.search(
query_vector=await self.embed(query),
top_k=top_k * 2 # 获取更多用于重排
)
# 关键词搜索
keyword_results = await self.keyword_index.search(
query=query,
top_k=top_k * 2
)
# 合并和去重
combined = self._combine_results(vector_results, keyword_results)
# 重排合并结果
reranked = self._rerank(query, combined)
return reranked[:top_k]
def _combine_results(self, vector_results, keyword_results):
"""合并和去重结果"""
seen = set()
combined = []
for result in vector_results + keyword_results:
if result['id'] not in seen:
combined.append(result)
seen.add(result['id'])
return combined
def _rerank(self, query: str, results: List[dict]) -> List[dict]:
"""基于查询相关性重排结果"""
scored = []
for result in results:
# 计算相关性得分
score = self._calculate_relevance(query, result)
scored.append({**result, 'score': score})
# 按得分排序
scored.sort(key=lambda x: x['score'], reverse=True)
return scored
生成编排
提示构建
# RAG提示构建
class PromptBuilder:
def __init__(self, template: str = None):
self.template = template or self._default_template()
def _default_template(self) -> str:
"""默认RAG提示模板"""
return """根据以下上下文回答问题:
{context}
问题:{question}
指令:
- 仅使用提供的上下文回答问题
- 如果答案不在上下文中,请说“我不知道”
- 不要使用外部知识
- 简洁准确
- 如可能,请引用来源
答案:"""
def build_prompt(self, query: str, context_chunks: List[str]) -> str:
"""用上下文构建RAG提示"""
# 格式化上下文块
formatted_context = self._format_context(context_chunks)
# 构建提示
prompt = self.template.format(
context=formatted_context,
question=query
)
return prompt
def _format_context(self, chunks: List[str]) -> str:
"""为提示格式化上下文块"""
context_parts = []
for i, chunk in enumerate(chunks, 1):
context_parts.append(f"[{i}] {chunk['text']}")
return "
".join(context_parts)
响应生成
# RAG响应生成
class RAGGenerator:
def __init__(self, llm_client, retriever):
self.llm = llm_client
self.retriever = retriever
async def generate(self, query: str) -> str:
"""使用RAG生成响应"""
# 检索相关上下文
context = await self.retriever.retrieve(query, top_k=5)
# 构建提示
prompt = self._build_prompt(query, context)
# 生成响应
response = await self.llm.generate(prompt)
return response
async def generate_with_citations(self, query: str) -> dict:
"""生成带源引用的响应"""
context = await self.retriever.retrieve(query, top_k=5)
# 构建带引用指令的提示
prompt = self._build_citation_prompt(query, context)
response = await self.llm.generate(prompt)
return {
'response': response,
'sources': [c['source'] for c in context]
}
RAG清单
系统设计
## RAG系统设计清单
### 架构
- [ ] 确定数据源
- [ ] 设计摄取管道
- [ ] 选择分块策略
- [ ] 选择嵌入模型
- [ ] 选择向量数据库
- [ ] 定义检索策略
- [ ] 确定生成方法
### 可扩展性
- [ ] 计划水平扩展
- [ ] 实施缓存策略
- [ ] 配置负载均衡
- [ ] 考虑数据库分片
- [ ] CDN用于静态资产
### 性能
- [ ] 定义延迟目标(< 500ms)
- [ ] 定义吞吐量目标
- [ ] 缓存嵌入
- [ ] 实施批量处理
- [ ] 计划查询优化
### 可靠性
- [ ] 定义错误处理
- [ ] 实施重试逻辑
- [ ] 计划回退机制
- [ ] 配置监控
- [ ] 设置警报阈值
实施
## RAG实施清单
### 数据管道
- [ ] 实施文档加载器
- [ ] 测试文本提取
- [ ] 验证数据清洗
- [ ] 测试分块逻辑
- [ ] 集成嵌入服务
- [ ] 连接向量存储
### 检索
- [ ] 配置相似性搜索
- [ ] 实施混合搜索
- [ ] 添加重排逻辑
- [ ] 考虑查询扩展
- [ ] 按元数据过滤
### 生成
- [ ] 创建提示模板
- [ ] 测试LLM集成
- [ ] 定义响应格式化
- [ ] 添加引用生成
- [ ] 考虑流式响应
快速参考
RAG管道命令
# 初始化RAG系统
from rag_system import RAGSystem
# 使用默认组件创建RAG系统
rag = RAGSystem(
embedding_model='openai/text-embedding-3-large',
vector_db='pinecone',
retriever='hybrid',
generator='claude-3'
)
# 摄取文档
await rag.ingest_documents([
'document1.pdf',
'document2.txt'
])
# 查询系统
response = await rag.query("什么是退货政策?")
# 获取检索详情
retrieval = await rag.get_retrieval_details(query_id)
性能指标
| 指标 |
目标 |
如何测量 |
| 端到端延迟 |
< 500ms |
查询到响应时间 |
| 检索准确度 |
> 85% |
相关块在顶部K |
| 生成质量 |
人工评级 |
响应准确度得分 |
| 吞吐量 |
> 100 QPS |
每秒查询数 |
| 缓存命中率 |
> 80% |
嵌入缓存命中 |
常见问题
| 问题 |
解决方案 |
| 检索不佳 |
改进分块,使用混合搜索 |
| 嵌入慢 |
批量处理,缓存 |
| 高延迟 |
优化向量DB,使用CDN |
| 响应不准确 |
更好的提示,重排 |
| 上下文窗口问题 |
调整块大小,重叠 |
常见陷阱
- 分块不佳 - 使用语义分块,而不是固定大小
- 嵌入模型错误 - 匹配模型到用例和语言
- 没有重排 - 向量相似性并不总是最佳
- 忽略元数据 - 使用所有可用的元数据进行过滤
- 没有缓存 - 嵌入昂贵,积极缓存
- 单一检索策略 - 结合多种方法
- 提示不佳 - 用清晰的指令构建提示
- 没有监控 - 你无法改进你不了解的
额外资源