LLM应用模式Skill llm-app-patterns

这是一个关于构建生产就绪的大型语言模型(LLM)应用模式的技能。涵盖了 RAG 管道、智能体架构、提示 IDE 和 LLMOps 监控,适用于设计 AI 应用、实现 RAG、构建智能体或设置 LLM 可观测性。关键词:LLM 应用、RAG、智能体、提示工程、LLMOps。

AI应用 0 次安装 0 次浏览 更新于 3/21/2026

name: llm-app-patterns description: “构建 LLM 应用的生产就绪模式。涵盖了 RAG 管道、智能体架构、提示 IDE 和 LLMOps 监控。适用于设计 AI 应用、实现 RAG、构建智能体或设置 LLM 可观测性。”

🤖 LLM 应用模式

构建 LLM 应用的生产就绪模式,灵感来自 Dify 和行业最佳实践。

何时使用此技能

在以下情况下使用此技能:

  • 设计 LLM 驱动的应用
  • 实现 RAG(检索增强生成)
  • 构建带工具的 AI 智能体
  • 设置 LLMOps 监控
  • 选择智能体架构

1. RAG 管道架构

概述

RAG(检索增强生成)将 LLM 响应基于您的数据。

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   文档摄取   │────▶│   检索上下文   │────▶│   生成响应   │
│  Documents  │     │   Context   │     │   Response  │
└─────────────┘     └─────────────┘     └─────────────┘
      │                   │                   │
      ▼                   ▼                   ▼
 ┌─────────┐       ┌───────────┐       ┌───────────┐
 │ 分块嵌入 │       │   向量搜索   │       │   LLM + 上下文 │
 │ Chunking│       │  Vector   │       │    LLM    │
 │Embedding│       │  Search   │       │  + Context│
 └─────────┘       └───────────┘       └───────────┘

1.1 文档摄取

# 分块策略
class ChunkingStrategy:
    # 固定大小分块(简单但可能破坏上下文)
    FIXED_SIZE = "fixed_size"  # 例如:512 个 token

    # 语义分块(保留含义)
    SEMANTIC = "semantic"      # 按段落/部分分割

    # 递归分割(尝试多个分隔符)
    RECURSIVE = "recursive"    # ["

", "
", " ", ""]

    # 文档感知(尊重结构)
    DOCUMENT_AWARE = "document_aware"  # 标题、列表等

# 推荐设置
CHUNK_CONFIG = {
    "chunk_size": 512,       # token
    "chunk_overlap": 50,     # 分块间的 token 重叠
    "separators": ["

", "
", ". ", " "],
}

1.2 嵌入与存储

# 向量数据库选择
VECTOR_DB_OPTIONS = {
    "pinecone": {
        "use_case": "生产环境,托管服务",
        "scale": "数十亿向量",
        "features": ["混合搜索", "元数据过滤"]
    },
    "weaviate": {
        "use_case": "自托管,多模态",
        "scale": "数百万向量",
        "features": ["GraphQL API", "模块"]
    },
    "chromadb": {
        "use_case": "开发,原型设计",
        "scale": "数千向量",
        "features": ["简单 API", "内存选项"]
    },
    "pgvector": {
        "use_case": "现有 Postgres 基础设施",
        "scale": "数百万向量",
        "features": ["SQL 集成", "ACID 合规性"]
    }
}

# 嵌入模型选择
EMBEDDING_MODELS = {
    "openai/text-embedding-3-small": {
        "dimensions": 1536,
        "cost": "$0.02/100 万 token",
        "quality": "适用于大多数用例"
    },
    "openai/text-embedding-3-large": {
        "dimensions": 3072,
        "cost": "$0.13/100 万 token",
        "quality": "适用于复杂查询"
    },
    "local/bge-large": {
        "dimensions": 1024,
        "cost": "免费(仅计算成本)",
        "quality": "与 OpenAI small 相当"
    }
}

1.3 检索策略

# 基本语义搜索
def semantic_search(query: str, top_k: int = 5):
    query_embedding = embed(query)
    results = vector_db.similarity_search(
        query_embedding,
        top_k=top_k
    )
    return results

# 混合搜索(语义 + 关键词)
def hybrid_search(query: str, top_k: int = 5, alpha: float = 0.5):
    """
    alpha=1.0: 纯语义
    alpha=0.0: 纯关键词(BM25)
    alpha=0.5: 平衡
    """
    semantic_results = vector_db.similarity_search(query)
    keyword_results = bm25_search(query)

    # 逆序排名融合
    return rrf_merge(semantic_results, keyword_results, alpha)

# 多查询检索
def multi_query_retrieval(query: str):
    """生成多个查询变体以提高召回率"""
    queries = llm.generate_query_variations(query, n=3)
    all_results = []
    for q in queries:
        all_results.extend(semantic_search(q))
    return deduplicate(all_results)

# 上下文压缩
def compressed_retrieval(query: str):
    """检索后压缩为相关部分"""
    docs = semantic_search(query, top_k=10)
    compressed = llm.extract_relevant_parts(docs, query)
    return compressed

1.4 带上下文的生成

RAG_PROMPT_TEMPLATE = """
仅根据以下上下文回答用户的问题。
如果上下文信息不足,请说“我没有足够的信息回答这个问题”。

上下文:
{context}

问题:{question}

回答:"""

def generate_with_rag(question: str):
    # 检索
    context_docs = hybrid_search(question, top_k=5)
    context = "

".join([doc.content for doc in context_docs])

    # 生成
    prompt = RAG_PROMPT_TEMPLATE.format(
        context=context,
        question=question
    )

    response = llm.generate(prompt)

    # 返回带引用
    return {
        "answer": response,
        "sources": [doc.metadata for doc in context_docs]
    }

2. 智能体架构

2.1 ReAct 模式(推理 + 行动)

思考:我需要搜索关于 X 的信息
行动:search("X")
观察:[搜索结果]
思考:基于结果,我应该...
行动:calculate(...)
观察:[计算结果]
思考:我现在有足够信息
行动:final_answer("答案是...")
REACT_PROMPT = """
您是一个可以使用工具回答问题的 AI 助手。

可用工具:
{tools_description}

使用此格式:
思考:[您关于下一步做什么的推理]
行动:[tool_name(arguments)]
观察:[工具结果 - 将在此填充]
...(根据需要重复思考/行动/观察)
思考:我有足够信息回答
最终答案:[您的最终响应]

问题:{question}
"""

class ReActAgent:
    def __init__(self, tools: list, llm):
        self.tools = {t.name: t for t in tools}
        self.llm = llm
        self.max_iterations = 10

    def run(self, question: str) -> str:
        prompt = REACT_PROMPT.format(
            tools_description=self._format_tools(),
            question=question
        )

        for _ in range(self.max_iterations):
            response = self.llm.generate(prompt)

            if "最终答案:" in response:
                return self._extract_final_answer(response)

            action = self._parse_action(response)
            observation = self._execute_tool(action)
            prompt += f"
观察: {observation}
"

        return "达到最大迭代次数"

2.2 函数调用模式

# 将工具定义为带模式的函数
TOOLS = [
    {
        "name": "search_web",
        "description": "搜索网络获取最新信息",
        "parameters": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "搜索查询"
                }
            },
            "required": ["query"]
        }
    },
    {
        "name": "calculate",
        "description": "执行数学计算",
        "parameters": {
            "type": "object",
            "properties": {
                "expression": {
                    "type": "string",
                    "description": "要评估的数学表达式"
                }
            },
            "required": ["expression"]
        }
    }
]

class FunctionCallingAgent:
    def run(self, question: str) -> str:
        messages = [{"role": "user", "content": question}]

        while True:
            response = self.llm.chat(
                messages=messages,
                tools=TOOLS,
                tool_choice="auto"
            )

            if response.tool_calls:
                for tool_call in response.tool_calls:
                    result = self._execute_tool(
                        tool_call.name,
                        tool_call.arguments
                    )
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tool_call.id,
                        "content": str(result)
                    })
            else:
                return response.content

2.3 计划与执行模式

class PlanAndExecuteAgent:
    """
    1. 创建计划(步骤列表)
    2. 执行每个步骤
    3. 如果需要,重新计划
    """

    def run(self, task: str) -> str:
        # 规划阶段
        plan = self.planner.create_plan(task)
        # 返回:[“步骤 1: ...”, “步骤 2: ...”, ...]

        results = []
        for step in plan:
            # 执行每个步骤
            result = self.executor.execute(step, context=results)
            results.append(result)

            # 检查是否需要重新计划
            if self._needs_replan(task, results):
                new_plan = self.planner.replan(
                    task,
                    completed=results,
                    remaining=plan[len(results):]
                )
                plan = new_plan

        # 综合最终答案
        return self.synthesizer.summarize(task, results)

2.4 多智能体协作

class AgentTeam:
    """
    专业智能体协作处理复杂任务
    """

    def __init__(self):
        self.agents = {
            "researcher": ResearchAgent(),
            "analyst": AnalystAgent(),
            "writer": WriterAgent(),
            "critic": CriticAgent()
        }
        self.coordinator = CoordinatorAgent()

    def solve(self, task: str) -> str:
        # 协调员分配子任务
        assignments = self.coordinator.decompose(task)

        results = {}
        for assignment in assignments:
            agent = self.agents[assignment.agent]
            result = agent.execute(
                assignment.subtask,
                context=results
            )
            results[assignment.id] = result

        # 评论员审核
        critique = self.agents["critic"].review(results)

        if critique.needs_revision:
            # 带反馈迭代
            return self.solve_with_feedback(task, results, critique)

        return self.coordinator.synthesize(results)

3. 提示 IDE 模式

3.1 带变量的提示模板

class PromptTemplate:
    def __init__(self, template: str, variables: list[str]):
        self.template = template
        self.variables = variables

    def format(self, **kwargs) -> str:
        # 验证所有变量已提供
        missing = set(self.variables) - set(kwargs.keys())
        if missing:
            raise ValueError(f"缺少变量: {missing}")

        return self.template.format(**kwargs)

    def with_examples(self, examples: list[dict]) -> str:
        """添加少样本示例"""
        example_text = "

".join([
            f"输入: {ex['input']}
输出: {ex['output']}"
            for ex in examples
        ])
        return f"{example_text}

{self.template}"

# 使用
summarizer = PromptTemplate(
    template="用{style}风格总结以下文本:

{text}",
    variables=["style", "text"]
)

prompt = summarizer.format(
    style="专业",
    text="长文章内容..."
)

3.2 提示版本控制与 A/B 测试

class PromptRegistry:
    def __init__(self, db):
        self.db = db

    def register(self, name: str, template: str, version: str):
        """存储带版本的提示"""
        self.db.save({
            "name": name,
            "template": template,
            "version": version,
            "created_at": datetime.now(),
            "metrics": {}
        })

    def get(self, name: str, version: str = "latest") -> str:
        """检索特定版本"""
        return self.db.get(name, version)

    def ab_test(self, name: str, user_id: str) -> str:
        """基于用户分桶返回变体"""
        variants = self.db.get_all_versions(name)
        bucket = hash(user_id) % len(variants)
        return variants[bucket]

    def record_outcome(self, prompt_id: str, outcome: dict):
        """跟踪提示性能"""
        self.db.update_metrics(prompt_id, outcome)

3.3 提示链式

class PromptChain:
    """
    将提示链接,将输出作为下一个输入的输入
    """

    def __init__(self, steps: list[dict]):
        self.steps = steps

    def run(self, initial_input: str) -> dict:
        context = {"input": initial_input}
        results = []

        for step in self.steps:
            prompt = step["prompt"].format(**context)
            output = llm.generate(prompt)

            # 如果需要,解析输出
            if step.get("parser"):
                output = step["parser"](output)

            context[step["output_key"]] = output
            results.append({
                "step": step["name"],
                "output": output
            })

        return {
            "final_output": context[self.steps[-1]["output_key"]],
            "intermediate_results": results
        }

# 示例:研究 → 分析 → 总结
chain = PromptChain([
    {
        "name": "research",
        "prompt": "研究主题: {input}",
        "output_key": "research"
    },
    {
        "name": "analyze",
        "prompt": "分析这些发现:
{research}",
        "output_key": "analysis"
    },
    {
        "name": "summarize",
        "prompt": "用 3 个要点总结此分析:
{analysis}",
        "output_key": "summary"
    }
])

4. LLMOps 与可观测性

4.1 要跟踪的指标

LLM_METRICS = {
    # 性能
    "latency_p50": "第 50 百分位响应时间",
    "latency_p99": "第 99 百分位响应时间",
    "tokens_per_second": "生成速度",

    # 质量
    "user_satisfaction": "赞/踩比例",
    "task_completion": "成功完成任务的百分比",
    "hallucination_rate": "有事实错误的响应百分比",

    # 成本
    "cost_per_request": "平均每次 API 调用的美元成本",
    "tokens_per_request": "平均使用的 token 数",
    "cache_hit_rate": "从缓存服务的请求百分比",

    # 可靠性
    "error_rate": "失败请求的百分比",
    "timeout_rate": "超时请求的百分比",
    "retry_rate": "需要重试的请求百分比"
}

4.2 日志记录与追踪

import logging
from opentelemetry import trace

tracer = trace.get_tracer(__name__)

class LLMLogger:
    def log_request(self, request_id: str, data: dict):
        """为调试和分析记录 LLM 请求"""
        log_entry = {
            "request_id": request_id,
            "timestamp": datetime.now().isoformat(),
            "model": data["model"],
            "prompt": data["prompt"][:500],  # 为存储截断
            "prompt_tokens": data["prompt_tokens"],
            "temperature": data.get("temperature", 1.0),
            "user_id": data.get("user_id"),
        }
        logging.info(f"LLM_REQUEST: {json.dumps(log_entry)}")

    def log_response(self, request_id: str, data: dict):
        """记录 LLM 响应"""
        log_entry = {
            "request_id": request_id,
            "completion_tokens": data["completion_tokens"],
            "total_tokens": data["total_tokens"],
            "latency_ms": data["latency_ms"],
            "finish_reason": data["finish_reason"],
            "cost_usd": self._calculate_cost(data),
        }
        logging.info(f"LLM_RESPONSE: {json.dumps(log_entry)}")

# 分布式追踪
@tracer.start_as_current_span("llm_call")
def call_llm(prompt: str) -> str:
    span = trace.get_current_span()
    span.set_attribute("prompt.length", len(prompt))

    response = llm.generate(prompt)

    span.set_attribute("response.length", len(response))
    span.set_attribute("tokens.total", response.usage.total_tokens)

    return response.content

4.3 评估框架

class LLMEvaluator:
    """
    评估 LLM 输出的质量
    """

    def evaluate_response(self,
                          question: str,
                          response: str,
                          ground_truth: str = None) -> dict:
        scores = {}

        # 相关性:是否回答问题?
        scores["relevance"] = self._score_relevance(question, response)

        # 连贯性:结构是否良好?
        scores["coherence"] = self._score_coherence(response)

        # 基于性:是否基于提供的上下文?
        scores["groundedness"] = self._score_groundedness(response)

        # 准确性:是否与真实情况匹配?
        if ground_truth:
            scores["accuracy"] = self._score_accuracy(response, ground_truth)

        # 有害性:是否安全?
        scores["safety"] = self._score_safety(response)

        return scores

    def run_benchmark(self, test_cases: list[dict]) -> dict:
        """在测试集上运行评估"""
        results = []
        for case in test_cases:
            response = llm.generate(case["prompt"])
            scores = self.evaluate_response(
                question=case["prompt"],
                response=response,
                ground_truth=case.get("expected")
            )
            results.append(scores)

        return self._aggregate_scores(results)

5. 生产模式

5.1 缓存策略

import hashlib
from functools import lru_cache

class LLMCache:
    def __init__(self, redis_client, ttl_seconds=3600):
        self.redis = redis_client
        self.ttl = ttl_seconds

    def _cache_key(self, prompt: str, model: str, **kwargs) -> str:
        """生成确定性缓存键"""
        content = f"{model}:{prompt}:{json.dumps(kwargs, sort_keys=True)}"
        return hashlib.sha256(content.encode()).hexdigest()

    def get_or_generate(self, prompt: str, model: str, **kwargs) -> str:
        key = self._cache_key(prompt, model, **kwargs)

        # 检查缓存
        cached = self.redis.get(key)
        if cached:
            return cached.decode()

        # 生成
        response = llm.generate(prompt, model=model, **kwargs)

        # 缓存(仅缓存确定性输出)
        if kwargs.get("temperature", 1.0) == 0:
            self.redis.setex(key, self.ttl, response)

        return response

5.2 限流与重试

import time
from tenacity import retry, wait_exponential, stop_after_attempt

class RateLimiter:
    def __init__(self, requests_per_minute: int):
        self.rpm = requests_per_minute
        self.timestamps = []

    def acquire(self):
        """如果会超过速率限制,则等待"""
        now = time.time()

        # 移除旧时间戳
        self.timestamps = [t for t in self.timestamps if now - t < 60]

        if len(self.timestamps) >= self.rpm:
            sleep_time = 60 - (now - self.timestamps[0])
            time.sleep(sleep_time)

        self.timestamps.append(time.time())

# 指数退避重试
@retry(
    wait=wait_exponential(multiplier=1, min=4, max=60),
    stop=stop_after_attempt(5)
)
def call_llm_with_retry(prompt: str) -> str:
    try:
        return llm.generate(prompt)
    except RateLimitError:
        raise  # 将触发重试
    except APIError as e:
        if e.status_code >= 500:
            raise  # 重试服务器错误
        raise  # 不重试客户端错误

5.3 回退策略

class LLMWithFallback:
    def __init__(self, primary: str, fallbacks: list[str]):
        self.primary = primary
        self.fallbacks = fallbacks

    def generate(self, prompt: str, **kwargs) -> str:
        models = [self.primary] + self.fallbacks

        for model in models:
            try:
                return llm.generate(prompt, model=model, **kwargs)
            except (RateLimitError, APIError) as e:
                logging.warning(f"模型 {model} 失败: {e}")
                continue

        raise AllModelsFailedError("所有模型已耗尽")

# 使用
llm_client = LLMWithFallback(
    primary="gpt-4-turbo",
    fallbacks=["gpt-3.5-turbo", "claude-3-sonnet"]
)

架构决策矩阵

模式 使用场景 复杂度 成本
简单 RAG 常见问题解答、文档搜索
混合 RAG 混合查询
ReAct 智能体 多步任务
函数调用 结构化工具
计划执行 复杂任务
多智能体 研究任务 非常高 非常高

资源