name: llm-app-patterns description: “构建 LLM 应用的生产就绪模式。涵盖了 RAG 管道、智能体架构、提示 IDE 和 LLMOps 监控。适用于设计 AI 应用、实现 RAG、构建智能体或设置 LLM 可观测性。”
🤖 LLM 应用模式
构建 LLM 应用的生产就绪模式,灵感来自 Dify 和行业最佳实践。
何时使用此技能
在以下情况下使用此技能:
- 设计 LLM 驱动的应用
- 实现 RAG(检索增强生成)
- 构建带工具的 AI 智能体
- 设置 LLMOps 监控
- 选择智能体架构
1. RAG 管道架构
概述
RAG(检索增强生成)将 LLM 响应基于您的数据。
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 文档摄取 │────▶│ 检索上下文 │────▶│ 生成响应 │
│ Documents │ │ Context │ │ Response │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
▼ ▼ ▼
┌─────────┐ ┌───────────┐ ┌───────────┐
│ 分块嵌入 │ │ 向量搜索 │ │ LLM + 上下文 │
│ Chunking│ │ Vector │ │ LLM │
│Embedding│ │ Search │ │ + Context│
└─────────┘ └───────────┘ └───────────┘
1.1 文档摄取
# 分块策略
class ChunkingStrategy:
# 固定大小分块(简单但可能破坏上下文)
FIXED_SIZE = "fixed_size" # 例如:512 个 token
# 语义分块(保留含义)
SEMANTIC = "semantic" # 按段落/部分分割
# 递归分割(尝试多个分隔符)
RECURSIVE = "recursive" # ["
", "
", " ", ""]
# 文档感知(尊重结构)
DOCUMENT_AWARE = "document_aware" # 标题、列表等
# 推荐设置
CHUNK_CONFIG = {
"chunk_size": 512, # token
"chunk_overlap": 50, # 分块间的 token 重叠
"separators": ["
", "
", ". ", " "],
}
1.2 嵌入与存储
# 向量数据库选择
VECTOR_DB_OPTIONS = {
"pinecone": {
"use_case": "生产环境,托管服务",
"scale": "数十亿向量",
"features": ["混合搜索", "元数据过滤"]
},
"weaviate": {
"use_case": "自托管,多模态",
"scale": "数百万向量",
"features": ["GraphQL API", "模块"]
},
"chromadb": {
"use_case": "开发,原型设计",
"scale": "数千向量",
"features": ["简单 API", "内存选项"]
},
"pgvector": {
"use_case": "现有 Postgres 基础设施",
"scale": "数百万向量",
"features": ["SQL 集成", "ACID 合规性"]
}
}
# 嵌入模型选择
EMBEDDING_MODELS = {
"openai/text-embedding-3-small": {
"dimensions": 1536,
"cost": "$0.02/100 万 token",
"quality": "适用于大多数用例"
},
"openai/text-embedding-3-large": {
"dimensions": 3072,
"cost": "$0.13/100 万 token",
"quality": "适用于复杂查询"
},
"local/bge-large": {
"dimensions": 1024,
"cost": "免费(仅计算成本)",
"quality": "与 OpenAI small 相当"
}
}
1.3 检索策略
# 基本语义搜索
def semantic_search(query: str, top_k: int = 5):
query_embedding = embed(query)
results = vector_db.similarity_search(
query_embedding,
top_k=top_k
)
return results
# 混合搜索(语义 + 关键词)
def hybrid_search(query: str, top_k: int = 5, alpha: float = 0.5):
"""
alpha=1.0: 纯语义
alpha=0.0: 纯关键词(BM25)
alpha=0.5: 平衡
"""
semantic_results = vector_db.similarity_search(query)
keyword_results = bm25_search(query)
# 逆序排名融合
return rrf_merge(semantic_results, keyword_results, alpha)
# 多查询检索
def multi_query_retrieval(query: str):
"""生成多个查询变体以提高召回率"""
queries = llm.generate_query_variations(query, n=3)
all_results = []
for q in queries:
all_results.extend(semantic_search(q))
return deduplicate(all_results)
# 上下文压缩
def compressed_retrieval(query: str):
"""检索后压缩为相关部分"""
docs = semantic_search(query, top_k=10)
compressed = llm.extract_relevant_parts(docs, query)
return compressed
1.4 带上下文的生成
RAG_PROMPT_TEMPLATE = """
仅根据以下上下文回答用户的问题。
如果上下文信息不足,请说“我没有足够的信息回答这个问题”。
上下文:
{context}
问题:{question}
回答:"""
def generate_with_rag(question: str):
# 检索
context_docs = hybrid_search(question, top_k=5)
context = "
".join([doc.content for doc in context_docs])
# 生成
prompt = RAG_PROMPT_TEMPLATE.format(
context=context,
question=question
)
response = llm.generate(prompt)
# 返回带引用
return {
"answer": response,
"sources": [doc.metadata for doc in context_docs]
}
2. 智能体架构
2.1 ReAct 模式(推理 + 行动)
思考:我需要搜索关于 X 的信息
行动:search("X")
观察:[搜索结果]
思考:基于结果,我应该...
行动:calculate(...)
观察:[计算结果]
思考:我现在有足够信息
行动:final_answer("答案是...")
REACT_PROMPT = """
您是一个可以使用工具回答问题的 AI 助手。
可用工具:
{tools_description}
使用此格式:
思考:[您关于下一步做什么的推理]
行动:[tool_name(arguments)]
观察:[工具结果 - 将在此填充]
...(根据需要重复思考/行动/观察)
思考:我有足够信息回答
最终答案:[您的最终响应]
问题:{question}
"""
class ReActAgent:
def __init__(self, tools: list, llm):
self.tools = {t.name: t for t in tools}
self.llm = llm
self.max_iterations = 10
def run(self, question: str) -> str:
prompt = REACT_PROMPT.format(
tools_description=self._format_tools(),
question=question
)
for _ in range(self.max_iterations):
response = self.llm.generate(prompt)
if "最终答案:" in response:
return self._extract_final_answer(response)
action = self._parse_action(response)
observation = self._execute_tool(action)
prompt += f"
观察: {observation}
"
return "达到最大迭代次数"
2.2 函数调用模式
# 将工具定义为带模式的函数
TOOLS = [
{
"name": "search_web",
"description": "搜索网络获取最新信息",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询"
}
},
"required": ["query"]
}
},
{
"name": "calculate",
"description": "执行数学计算",
"parameters": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "要评估的数学表达式"
}
},
"required": ["expression"]
}
}
]
class FunctionCallingAgent:
def run(self, question: str) -> str:
messages = [{"role": "user", "content": question}]
while True:
response = self.llm.chat(
messages=messages,
tools=TOOLS,
tool_choice="auto"
)
if response.tool_calls:
for tool_call in response.tool_calls:
result = self._execute_tool(
tool_call.name,
tool_call.arguments
)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result)
})
else:
return response.content
2.3 计划与执行模式
class PlanAndExecuteAgent:
"""
1. 创建计划(步骤列表)
2. 执行每个步骤
3. 如果需要,重新计划
"""
def run(self, task: str) -> str:
# 规划阶段
plan = self.planner.create_plan(task)
# 返回:[“步骤 1: ...”, “步骤 2: ...”, ...]
results = []
for step in plan:
# 执行每个步骤
result = self.executor.execute(step, context=results)
results.append(result)
# 检查是否需要重新计划
if self._needs_replan(task, results):
new_plan = self.planner.replan(
task,
completed=results,
remaining=plan[len(results):]
)
plan = new_plan
# 综合最终答案
return self.synthesizer.summarize(task, results)
2.4 多智能体协作
class AgentTeam:
"""
专业智能体协作处理复杂任务
"""
def __init__(self):
self.agents = {
"researcher": ResearchAgent(),
"analyst": AnalystAgent(),
"writer": WriterAgent(),
"critic": CriticAgent()
}
self.coordinator = CoordinatorAgent()
def solve(self, task: str) -> str:
# 协调员分配子任务
assignments = self.coordinator.decompose(task)
results = {}
for assignment in assignments:
agent = self.agents[assignment.agent]
result = agent.execute(
assignment.subtask,
context=results
)
results[assignment.id] = result
# 评论员审核
critique = self.agents["critic"].review(results)
if critique.needs_revision:
# 带反馈迭代
return self.solve_with_feedback(task, results, critique)
return self.coordinator.synthesize(results)
3. 提示 IDE 模式
3.1 带变量的提示模板
class PromptTemplate:
def __init__(self, template: str, variables: list[str]):
self.template = template
self.variables = variables
def format(self, **kwargs) -> str:
# 验证所有变量已提供
missing = set(self.variables) - set(kwargs.keys())
if missing:
raise ValueError(f"缺少变量: {missing}")
return self.template.format(**kwargs)
def with_examples(self, examples: list[dict]) -> str:
"""添加少样本示例"""
example_text = "
".join([
f"输入: {ex['input']}
输出: {ex['output']}"
for ex in examples
])
return f"{example_text}
{self.template}"
# 使用
summarizer = PromptTemplate(
template="用{style}风格总结以下文本:
{text}",
variables=["style", "text"]
)
prompt = summarizer.format(
style="专业",
text="长文章内容..."
)
3.2 提示版本控制与 A/B 测试
class PromptRegistry:
def __init__(self, db):
self.db = db
def register(self, name: str, template: str, version: str):
"""存储带版本的提示"""
self.db.save({
"name": name,
"template": template,
"version": version,
"created_at": datetime.now(),
"metrics": {}
})
def get(self, name: str, version: str = "latest") -> str:
"""检索特定版本"""
return self.db.get(name, version)
def ab_test(self, name: str, user_id: str) -> str:
"""基于用户分桶返回变体"""
variants = self.db.get_all_versions(name)
bucket = hash(user_id) % len(variants)
return variants[bucket]
def record_outcome(self, prompt_id: str, outcome: dict):
"""跟踪提示性能"""
self.db.update_metrics(prompt_id, outcome)
3.3 提示链式
class PromptChain:
"""
将提示链接,将输出作为下一个输入的输入
"""
def __init__(self, steps: list[dict]):
self.steps = steps
def run(self, initial_input: str) -> dict:
context = {"input": initial_input}
results = []
for step in self.steps:
prompt = step["prompt"].format(**context)
output = llm.generate(prompt)
# 如果需要,解析输出
if step.get("parser"):
output = step["parser"](output)
context[step["output_key"]] = output
results.append({
"step": step["name"],
"output": output
})
return {
"final_output": context[self.steps[-1]["output_key"]],
"intermediate_results": results
}
# 示例:研究 → 分析 → 总结
chain = PromptChain([
{
"name": "research",
"prompt": "研究主题: {input}",
"output_key": "research"
},
{
"name": "analyze",
"prompt": "分析这些发现:
{research}",
"output_key": "analysis"
},
{
"name": "summarize",
"prompt": "用 3 个要点总结此分析:
{analysis}",
"output_key": "summary"
}
])
4. LLMOps 与可观测性
4.1 要跟踪的指标
LLM_METRICS = {
# 性能
"latency_p50": "第 50 百分位响应时间",
"latency_p99": "第 99 百分位响应时间",
"tokens_per_second": "生成速度",
# 质量
"user_satisfaction": "赞/踩比例",
"task_completion": "成功完成任务的百分比",
"hallucination_rate": "有事实错误的响应百分比",
# 成本
"cost_per_request": "平均每次 API 调用的美元成本",
"tokens_per_request": "平均使用的 token 数",
"cache_hit_rate": "从缓存服务的请求百分比",
# 可靠性
"error_rate": "失败请求的百分比",
"timeout_rate": "超时请求的百分比",
"retry_rate": "需要重试的请求百分比"
}
4.2 日志记录与追踪
import logging
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
class LLMLogger:
def log_request(self, request_id: str, data: dict):
"""为调试和分析记录 LLM 请求"""
log_entry = {
"request_id": request_id,
"timestamp": datetime.now().isoformat(),
"model": data["model"],
"prompt": data["prompt"][:500], # 为存储截断
"prompt_tokens": data["prompt_tokens"],
"temperature": data.get("temperature", 1.0),
"user_id": data.get("user_id"),
}
logging.info(f"LLM_REQUEST: {json.dumps(log_entry)}")
def log_response(self, request_id: str, data: dict):
"""记录 LLM 响应"""
log_entry = {
"request_id": request_id,
"completion_tokens": data["completion_tokens"],
"total_tokens": data["total_tokens"],
"latency_ms": data["latency_ms"],
"finish_reason": data["finish_reason"],
"cost_usd": self._calculate_cost(data),
}
logging.info(f"LLM_RESPONSE: {json.dumps(log_entry)}")
# 分布式追踪
@tracer.start_as_current_span("llm_call")
def call_llm(prompt: str) -> str:
span = trace.get_current_span()
span.set_attribute("prompt.length", len(prompt))
response = llm.generate(prompt)
span.set_attribute("response.length", len(response))
span.set_attribute("tokens.total", response.usage.total_tokens)
return response.content
4.3 评估框架
class LLMEvaluator:
"""
评估 LLM 输出的质量
"""
def evaluate_response(self,
question: str,
response: str,
ground_truth: str = None) -> dict:
scores = {}
# 相关性:是否回答问题?
scores["relevance"] = self._score_relevance(question, response)
# 连贯性:结构是否良好?
scores["coherence"] = self._score_coherence(response)
# 基于性:是否基于提供的上下文?
scores["groundedness"] = self._score_groundedness(response)
# 准确性:是否与真实情况匹配?
if ground_truth:
scores["accuracy"] = self._score_accuracy(response, ground_truth)
# 有害性:是否安全?
scores["safety"] = self._score_safety(response)
return scores
def run_benchmark(self, test_cases: list[dict]) -> dict:
"""在测试集上运行评估"""
results = []
for case in test_cases:
response = llm.generate(case["prompt"])
scores = self.evaluate_response(
question=case["prompt"],
response=response,
ground_truth=case.get("expected")
)
results.append(scores)
return self._aggregate_scores(results)
5. 生产模式
5.1 缓存策略
import hashlib
from functools import lru_cache
class LLMCache:
def __init__(self, redis_client, ttl_seconds=3600):
self.redis = redis_client
self.ttl = ttl_seconds
def _cache_key(self, prompt: str, model: str, **kwargs) -> str:
"""生成确定性缓存键"""
content = f"{model}:{prompt}:{json.dumps(kwargs, sort_keys=True)}"
return hashlib.sha256(content.encode()).hexdigest()
def get_or_generate(self, prompt: str, model: str, **kwargs) -> str:
key = self._cache_key(prompt, model, **kwargs)
# 检查缓存
cached = self.redis.get(key)
if cached:
return cached.decode()
# 生成
response = llm.generate(prompt, model=model, **kwargs)
# 缓存(仅缓存确定性输出)
if kwargs.get("temperature", 1.0) == 0:
self.redis.setex(key, self.ttl, response)
return response
5.2 限流与重试
import time
from tenacity import retry, wait_exponential, stop_after_attempt
class RateLimiter:
def __init__(self, requests_per_minute: int):
self.rpm = requests_per_minute
self.timestamps = []
def acquire(self):
"""如果会超过速率限制,则等待"""
now = time.time()
# 移除旧时间戳
self.timestamps = [t for t in self.timestamps if now - t < 60]
if len(self.timestamps) >= self.rpm:
sleep_time = 60 - (now - self.timestamps[0])
time.sleep(sleep_time)
self.timestamps.append(time.time())
# 指数退避重试
@retry(
wait=wait_exponential(multiplier=1, min=4, max=60),
stop=stop_after_attempt(5)
)
def call_llm_with_retry(prompt: str) -> str:
try:
return llm.generate(prompt)
except RateLimitError:
raise # 将触发重试
except APIError as e:
if e.status_code >= 500:
raise # 重试服务器错误
raise # 不重试客户端错误
5.3 回退策略
class LLMWithFallback:
def __init__(self, primary: str, fallbacks: list[str]):
self.primary = primary
self.fallbacks = fallbacks
def generate(self, prompt: str, **kwargs) -> str:
models = [self.primary] + self.fallbacks
for model in models:
try:
return llm.generate(prompt, model=model, **kwargs)
except (RateLimitError, APIError) as e:
logging.warning(f"模型 {model} 失败: {e}")
continue
raise AllModelsFailedError("所有模型已耗尽")
# 使用
llm_client = LLMWithFallback(
primary="gpt-4-turbo",
fallbacks=["gpt-3.5-turbo", "claude-3-sonnet"]
)
架构决策矩阵
| 模式 | 使用场景 | 复杂度 | 成本 |
|---|---|---|---|
| 简单 RAG | 常见问题解答、文档搜索 | 低 | 低 |
| 混合 RAG | 混合查询 | 中 | 中 |
| ReAct 智能体 | 多步任务 | 中 | 中 |
| 函数调用 | 结构化工具 | 低 | 低 |
| 计划执行 | 复杂任务 | 高 | 高 |
| 多智能体 | 研究任务 | 非常高 | 非常高 |