AI代理模式 AIAgentPatterns

这个技能是关于设计模式,用于构建自主AI代理,能够进行推理、规划和执行任务。它涵盖了代理架构(如ReAct、Plan-and-Execute)、核心组件(内存、规划、工具使用)、框架(LangChain、AutoGPT、CrewAI、Semantic Kernel)、工具设计、多代理系统、错误恢复、护栏、评估和生产部署。适用于AI智能体开发和自主系统构建,关键词包括:AI代理、模式、自主系统、LLM、推理、规划、执行、架构设计、AI智能体开发、代理系统、工具使用、内存管理、安全护栏。

AI智能体 0 次安装 0 次浏览 更新于 3/5/2026

name: AI 代理模式 description: 用于构建能够推理、规划和执行任务的自主AI代理的设计模式。

AI 代理模式

概述

AI 代理是使用大型语言模型(LLMs)进行推理、规划和执行任务的自主系统。与简单的聊天机器人不同,代理能够将复杂问题分解为步骤,使用工具与外部系统交互,并根据反馈调整行为。本技能涵盖代理架构(ReAct、Plan-and-Execute、Tree of Thoughts、Reflexion)、核心组件(内存、规划、工具使用)、框架(LangChain、AutoGPT、CrewAI、Semantic Kernel)、工具设计、多代理系统、错误恢复、护栏、评估和生产部署。

先决条件

  • 了解LLMs及其能力
  • 掌握提示工程和系统提示知识
  • 熟悉函数调用和工具API
  • 理解代理架构和设计模式
  • 基础知识:向量数据库和嵌入
  • 理解AI系统的安全性和护栏

关键概念

代理特征

  • 自主性:无需人工干预即可做出决策
  • 推理能力:能将复杂任务分解为子任务
  • 规划能力:能创建和执行多步骤计划
  • 工具使用:能通过函数调用与外部系统交互
  • 内存:能在交互间维护上下文
  • 适应性:能从反馈中学习并调整行为

简单代理 vs 复杂代理

  • 简单代理:单步执行(如天气查询 → 结果)
  • 复杂代理:多步规划(如旅行规划 → 搜索 → 预订 → 确认)

代理架构

  • ReAct:循环中的推理 + 执行
  • Plan-and-Execute:先创建计划,然后逐步执行
  • Tree of Thoughts:在决策前探索多个推理路径
  • Reflexion:反思行动并从错误中学习

核心组件

  • 内存:短期(对话历史)、长期(知识库)、情景(特定经验)
  • 规划:将目标分解为可操作步骤
  • 工具使用:安全地注册、验证和执行工具
  • 反思:分析行动并改进行为

部署模式

  • FastAPI 服务器:用于代理推理的REST API
  • Docker 部署:容器化代理服务
  • Kubernetes:支持GPU的可扩展部署
  • 多代理系统:代理协作和委派

实施指南

代理架构

ReAct(推理 + 执行)

ReAct 模式在循环中结合推理和执行。

from openai import OpenAI

class ReActAgent:
    def __init__(self, llm_client, tools):
        self.client = llm_client
        self.tools = tools
        self.max_iterations = 10

    def run(self, query):
        thoughts = []

        for i in range(self.max_iterations):
            # 思考:我应该做什么?
            thought_response = self.client.chat.completions.create(
                model="gpt-4",
                messages=[
                    {"role": "system", "content": self.get_system_prompt()},
                    *thoughts,
                    {"role": "user", "content": query}
                ],
                tools=self.tools
            )

            thought = thought_response.choices[0].message.content
            thoughts.append({"role": "assistant", "content": thought})

            # 检查代理是否想使用工具
            if thought_response.choices[0].finish_reason == "function_calls":
                fc = thought_response.choices[0].message.function_calls[0]

                # 行动:执行工具
                tool_result = self.execute_tool(fc.name, fc.arguments)

                # 观察:我观察到了什么?
                observation = f"工具 {fc.name} 返回:{json.dumps(tool_result)}"
                thoughts.append({"role": "assistant", "content": observation})
            else:
                # 无需更多行动
                break

        return thoughts

    def get_system_prompt(self):
        return """您是一个有帮助的助手,可以访问工具。
使用以下格式:

思考:关于要做什么的思考
行动:要采取的行动(工具调用或最终答案)
观察:行动的结果
...(根据需要重复思考/行动/观察)"""

    def execute_tool(self, tool_name, arguments):
        # 执行工具
        for tool in self.tools:
            if tool["name"] == tool_name:
                handler = tool["handler"]
                return handler(**arguments)
        return None

# 用法
tools = [
    {
        "type": "function",
        "function": {
            "name": "search",
            "description": "搜索信息",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {"type": "string"}
                },
                "required": ["query"]
            }
        },
        "handler": search_tool
    }
]

agent = ReActAgent(client, tools)
conversation = agent.run("法国首都是什么?")

Plan-and-Execute

代理先创建计划,然后逐步执行。

class PlanAndExecuteAgent:
    def __init__(self, llm_client, tools):
        self.client = llm_client
        self.tools = tools

    def run(self, query):
        # 阶段 1:规划
        plan_response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system",
                    "content": """您是一个规划代理。
将用户请求分解为编号的步骤列表。
每个步骤应该是可操作的,并使用可用工具。

可用工具:
- search:搜索信息
- calculate:执行计算
- database:查询数据库

将响应格式化为:
步骤 1:[行动]
步骤 2:[行动]
..."""
                },
                {"role": "user", "content": query}
            ],
            tools=self.tools
        )

        plan_text = plan_response.choices[0].message.content
        steps = self.parse_plan(plan_text)

        # 阶段 2:执行
        results = []
        for step in steps:
            result = self.execute_step(step)
            results.append(result)

        # 阶段 3:合成
        synthesis_response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system",
                    "content": "将结果合成为最终答案。"
                },
                {"role": "user", "content": query},
                {"role": "assistant", "content": plan_text},
                *[{"role": "assistant", "content": f"步骤 {i+1} 结果:{json.dumps(r)}"} 
                   for i, r in enumerate(results)]
            ]
        )

        return synthesis_response.choices[0].message.content

    def parse_plan(self, plan_text):
        steps = []
        lines = plan_text.split('
')
        for line in lines:
            if line.strip().startswith('步骤'):
                steps.append(line.strip())
        return steps

    def execute_step(self, step):
        # 从步骤中提取工具名称和参数
        tool_name = self.extract_tool_name(step)
        arguments = self.extract_arguments(step)

        # 执行工具
        for tool in self.tools:
            if tool["name"] == tool_name:
                handler = tool["handler"]
                return handler(**arguments)

        return None

Tree of Thoughts

代理在决策前探索多个推理路径。

class TreeOfThoughtsAgent:
    def __init__(self, llm_client, tools):
        self.client = llm_client
        self.tools = tools
        self.max_depth = 3
        self.max_branches = 5

    def run(self, query):
        # 生成多个候选思考
        thoughts_response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system",
                    "content": f"""您是一个深思熟虑的代理。
生成 {self.max_branches} 个不同的可能思考/路径来回答查询。

每个思考应探索不同的方法。

格式为:
思考 1:[描述]
思考 2:[描述]
...
"""
                },
                {"role": "user", "content": query}
            ],
            tools=self.tools
        )

        thoughts_text = thoughts_response.choices[0].message.content
        thoughts = thoughts_text.split('
')

        # 评估每个思考
        evaluations = []
        for i, thought in enumerate(thoughts):
            evaluation = self.evaluate_thought(thought, query)
            evaluations.append((i, evaluation))

        # 选择最佳思考
        best = max(evaluations, key=lambda x: x[1])
        best_thought = thoughts[best[0]]

        # 执行最佳思考
        result = self.execute_thought(best_thought)

        return result

    def evaluate_thought(self, thought, query):
        # 基于各种标准评分思考
        score = 0

        # 清晰度
        if "clear" in thought.lower():
            score += 2

        # 完整性
        if "step" in thought.lower():
            score += 2

        # 效率
        if "quick" in thought.lower() or "fast" in thought.lower():
            score += 1

        return score

    def execute_thought(self, thought):
        # 从思考中提取工具调用
        tool_calls = self.extract_tool_calls(thought)

        # 执行所有工具调用
        results = []
        for tc in tool_calls:
            result = self.execute_tool(tc["name"], tc["arguments"])
            results.append(result)

        return results

Reflexion

代理反思其行动并从错误中学习。

class ReflexionAgent:
    def __init__(self, llm_client, tools):
        self.client = llm_client
        self.tools = tools
        self.memory = []
        self.reflections = []

    def run(self, query):
        while True:
            # 行动:生成行动
            action_response = self.client.chat.completions.create(
                model="gpt-4",
                messages=[
                    {
                        "role": "system",
                        "content": "您是一个有帮助的助手。回答用户的查询。"
                    },
                    *self.memory,
                    {"role": "user", "content": query}
                ],
                tools=self.tools
            )

            action = action_response.choices[0].message
            self.memory.append({"role": "assistant", "content": action})

            # 检查是否采取了行动
            if action_response.choices[0].finish_reason == "function_calls":
                fc = action_response.choices[0].message.function_calls[0]

                # 执行行动
                result = self.execute_tool(fc.name, fc.arguments)

                # 反思:这个行动成功了吗?
                reflection = self.reflect(action, result)
                self.reflections.append(reflection)

                # 用反思更新内存
                self.memory.append({
                    "role": "system",
                    "content": f"反思:{reflection}"
                })

                # 如果行动失败,再试一次
                if "failed" in reflection.lower():
                    continue
                else:
                    # 成功 - 生成最终答案
                    final_response = self.client.chat.completions.create(
                        model="gpt-4",
                        messages=[
                            *self.memory,
                            {
                                "role": "user",
                                "content": query
                            },
                            {
                                "role": "function",
                                "name": fc.name,
                                "content": json.dumps(result)
                            }
                        ]
                    )
                    return final_response.choices[0].message.content
            else:
                # 无需行动 - 返回答案
                return action

    def reflect(self, action, result):
        # 分析行动和结果
        reflection_parts = []

        # 行动是否达到了目标?
        if result.get("success", False):
            reflection_parts.append("行动失败。")

        # 出了什么问题?
        if "error" in result:
            reflection_parts.append(f"错误:{result['error']}")

        # 如何改进?
        if "error" in result:
            reflection_parts.append("下次,先验证参数。")

        return " ".join(reflection_parts)

核心组件

内存(短期、长期)

from typing import List, Dict, Any
from datetime import datetime

class AgentMemory:
    def __init__(self):
        self.short_term = []  # 对话历史
        self.long_term = {}  # 持久知识
        self.episodic = []  # 特定经验

    def add_to_short_term(self, role, content):
        """添加到对话历史"""
        self.short_term.append({
            "role": role,
            "content": content,
            "timestamp": datetime.now().isoformat()
        })

        # 保留最后10条消息
        if len(self.short_term) > 10:
            self.short_term = self.short_term[-10:]

    def add_to_long_term(self, key, value):
        """添加到持久知识库"""
        self.long_term[key] = {
            "value": value,
            "timestamp": datetime.now().isoformat(),
            "access_count": self.long_term.get(key, {}).get("access_count", 0) + 1
        }

    def add_episode(self, episode_type, data):
        """添加特定经验/事件"""
        self.episodic.append({
            "type": episode_type,
            "data": data,
            "timestamp": datetime.now().isoformat()
        })

    def retrieve_relevant(self, query, max_results=5):
        """基于查询检索相关信息"""
        # 长期内存的简单关键词匹配
        relevant = []
        query_lower = query.lower()

        for key, value in self.long_term.items():
            if query_lower in key.lower() or key.lower() in query_lower():
                relevant.append({
                    "key": key,
                    "value": value["value"],
                    "relevance": self.calculate_relevance(query, key)
                })

        # 按相关性排序并返回顶部结果
        relevant.sort(key=lambda x: x["relevance"], reverse=True)
        return relevant[:max_results]

    def calculate_relevance(self, query, key):
        """计算相关性分数"""
        query_words = set(query.lower().split())
        key_words = set(key.lower().split())

        # Jaccard 相似度
        intersection = len(query_words & key_words)
        union = len(query_words | key_words)

        return intersection / union if union > 0 else 0

    def get_context(self, max_messages=10):
        """获取最近对话上下文"""
        return self.short_term[-max_messages:]

规划

from typing import List, Dict

class Planner:
    def __init__(self, llm_client):
        self.client = llm_client

    def create_plan(self, goal, constraints=None):
        """创建分步计划以实现目标"""

        system_prompt = f"""您是一个规划代理。
创建详细计划以实现目标:{goal}

约束:
{constraints if constraints else "无"}

可用操作:
- search:搜索信息
- query_database:查询数据库
- calculate:执行计算
- api_call:调用外部API

将您的计划格式化为:
1. [操作] - [描述]
2. [操作] - [描述]
...

每个步骤应具体且可操作。
"""

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": goal}
            ],
            tools=self.tools
        )

        plan_text = response.choices[0].message.content
        return self.parse_plan(plan_text)

    def parse_plan(self, plan_text):
        """将计划解析为结构化格式"""
        steps = []
        lines = plan_text.split('
')

        for line in lines:
            if line.strip() and line.strip()[0].isdigit():
                parts = line.split('-', 1)
                if len(parts) == 2:
                    operation = parts[0].strip()
                    description = parts[1].strip()
                    steps.append({
                        "step": int(operation[0]),
                        "operation": operation,
                        "description": description
                    })

        return steps

    def update_plan(self, plan, completed_step, new_step=None):
        """完成步骤后更新计划"""
        updated_plan = []

        for step in plan:
            if step["step"] == completed_step:
                continue  # 跳过已完成的步骤

            if new_step:
                # 在完成的步骤后插入新步骤
                if step["step"] == completed_step - 1:
                    updated_plan.append(step)
                    updated_step = {
                        "step": completed_step,
                        "operation": new_step["operation"],
                        "description": new_step["description"]
                    }
                    # 重编号剩余步骤
                    for s in plan[step["step"]:]:
                        updated_step = s.copy()
                        updated_step["step"] = len(updated_plan) + 1
                    updated_plan.append(updated_step)
                    break
            else:
                updated_plan.append(step)

        return updated_plan

工具使用

from typing import Dict, Callable, Any

class ToolRegistry:
    def __init__(self):
        self.tools: Dict[str, Dict] = {}

    def register(self, name: str, handler: Callable, description: str, parameters: Dict):
        """注册工具"""
        self.tools[name] = {
            "handler": handler,
            "description": description,
            "parameters": parameters
        }

    def get_tool(self, name: str) -> Dict:
        """按名称获取工具"""
        return self.tools.get(name)

    def list_tools(self) -> List[Dict]:
        """列出所有可用工具"""
        return list(self.tools.values())

    def execute(self, name: str, arguments: Dict) -> Any:
        """使用给定参数执行工具"""
        tool = self.get_tool(name)
        if not tool:
            raise ValueError(f"未找到工具:{name}")

        handler = tool["handler"]
        return handler(**arguments)

    def get_openai_schema(self):
        """将工具转换为OpenAI函数调用格式"""
        return [
            {
                "type": "function",
                "function": {
                    "name": name,
                    "description": tool["description"],
                    "parameters": tool["parameters"]
                }
            }
            for name, tool in self.tools.items()
        ]

# 用法
def search_database(query: str) -> Dict:
    """搜索数据库"""
    # 实施
    return {"results": [], "query": query}

def calculate(expression: str) -> float:
    """计算数学表达式"""
    # 实施
    return eval(expression)

registry = ToolRegistry()
registry.register("search", search_database, "搜索数据库", {
    "type": "object",
    "properties": {
        "query": {"type": "string"}
    },
    "required": ["query"]
})
registry.register("calculate", calculate, "执行计算", {
    "type": "object",
    "properties": {
        "expression": {"type": "string"}
    },
    "required": ["expression"]
})

反思

class ReflectionEngine:
    def __init__(self, llm_client):
        self.client = llm_client
        self.reflection_history = []

    def reflect_on_action(self, action, result, context):
        """反思行动及其结果"""

        reflection_prompt = f"""
您是一个反思引擎。分析以下行动和结果:

行动:{action}
结果:{result}
上下文:{context}

提供一个反思,包括:
1. 这个行动的目标是什么?
2. 行动成功了吗?
3. 如果不成功,出了什么问题?
4. 下次如何改进?
5. 我们应该尝试不同的方法吗?

将您的反思格式化为:
分析:[您的分析]
建议:[您的建议]
"""

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": reflection_prompt}]
        )

        reflection = response.choices[0].message.content
        self.reflection_history.append(reflection)

        return reflection

    def get_improvement_suggestion(self, recent_reflections):
        """从最近的反思中获取改进建议"""
        if not recent_reflections:
            return None

        # 分析反思中的模式
        suggestions = []

        for reflection in recent_reflections:
            if "improve" in reflection.lower() or "better" in reflection.lower():
                # 提取改进建议
                lines = reflection.split('
')
                for line in lines:
                    if "recommendation" in line.lower():
                        suggestions.append(line.split(':', 1)[1].strip())

        return suggestions if suggestions else None

代理框架

LangChain 代理

from langchain.agents import initialize_agent, Tool, AgentExecutor
from langchain.chat_models import ChatOpenAI
from langchain.utilities import SerpAPIWrapper

# 初始化LLM
llm = ChatOpenAI(temperature=0)

# 定义工具
search = SerpAPIWrapper()
tools = [
    Tool(
        name="搜索",
        func=search.run,
        description="搜索最新信息"
    )
]

# 初始化代理
agent = initialize_agent(
    tools,
    llm=llm,
    agent="chat-zero-shot-react-description",
    verbose=True
)

# 创建执行器
executor = AgentExecutor.from_agent_and_tools(
    agent=agent,
    tools=tools,
    verbose=True
)

# 运行代理
result = executor.run("AI 的最新新闻是什么?")
print(result)

AutoGPT 模式

from langchain.experimental import AutoGPT
from langchain.chat_models import ChatOpenAI

llm = ChatOpenAI(temperature=0)

# 定义工具
tools = [
    {
        "name": "google_search",
        "description": "在Google上搜索信息",
        "func": google_search,
        "parameters": {
            "type": "object",
            "properties": {
                "query": {"type": "string"}
            },
            "required": ["query"]
        }
    }
]

# 创建AutoGPT代理
agent = AutoGPT.from_llm_and_tools(
    llm,
    tools,
    agent_type="zero-shot-react-description",
    verbose=True
)

# 运行代理
result = agent.run([
    "研究AI的最新发展",
    "总结关键发现",
    "提供建议"
])
print(result)

CrewAI

from crewai import Agent, Task, Crew, Process
from crewai.tools import SerperDevTool

# 定义代理
researcher = Agent(
    role='研究员',
    goal='查找和分析信息',
    backstory='您是一名专家研究员。',
    verbose=True,
    tools=[SerperDevTool()]
)

writer = Agent(
    role='作者',
    goal='基于研究创建吸引人的内容',
    backstory='您是一名熟练的作者。',
    verbose=True
)

# 定义任务
research_task = Task(
    description='研究最新AI发展',
    agent=researcher,
    expected_output='详细的研究发现'
)

write_task = Task(
    description='撰写关于AI发展的博客文章',
    agent=writer,
    context=[research_task],
    expected_output='最终博客文章'
)

# 创建团队
crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, write_task],
    process=Process.sequential,
    verbose=True
)

# 执行团队
result = crew.kickoff()
print(result)

Semantic Kernel

import semantic_kernel as sk
from semantic_kernel.connectors.ai.open_ai import AzureOpenAIChatCompletion

# 初始化内核
kernel = sk.Kernel()

kernel.add_chat_service("chat-gpt", AzureOpenAIChatCompletion())

# 定义插件
class WeatherPlugin:
    @sk.kernel_function(
        description="获取当前位置的天气",
        name="get_weather"
    )
    def get_weather(location: str) -> str:
        # 调用天气API
        return f"{location}的天气:晴朗,22°C"

# 添加插件到内核
kernel.add_plugin(WeatherPlugin())

# 创建代理
agent = sk.ChatCompletionAgent(
    service_id="chat-gpt",
    kernel=kernel,
    instructions="您是一个有帮助的助手,可以访问天气信息。"
)

# 运行代理
result = agent.chat("东京的天气如何?")
print(result)

代理工具设计

工具命名约定

# 好的工具名称
GOOD_TOOL_NAMES = [
    "search_database",      # 清晰、描述性
    "calculate_sum",        # 面向行动
    "get_user_profile",     # 获取 + 资源
    "send_email",          # 行动 + 资源
    "validate_address",    # 行动 + 资源
]

# 坏的工具名称
BAD_TOOL_NAMES = [
    "tool1",              # 不描述性
    "do_stuff",           # 模糊
    "helper",             # 通用
    "func",               # 缩写
]

工具文档

def search_products(
    query: str,
    category: str = None,
    min_price: float = None,
    max_price: float = None,
    limit: int = 10
) -> dict:
    """
    在目录中搜索产品。

    参数:
        query: 搜索查询 - 可以包括产品名称、类别或关键词
        category: 按特定类别过滤(可选)
        min_price: 最低价格过滤器(可选)
        max_price: 最高价格过滤器(可选)
        limit: 返回的最大结果数(默认:10)

    返回:
        dict: 包含的字典:
            - success: 搜索是否成功
            - results: 匹配产品列表
            - total: 匹配总数
            - error: 如果不成功,错误消息
    """

    # 实施
    try:
        # 构建查询
        db_query = "SELECT * FROM products WHERE name LIKE ?"
        params = [f"%{query}%"]

        if category:
            db_query += " AND category = ?"
            params.append(category)

        if min_price is not None:
            db_query += " AND price >= ?"
            params.append(min_price)

        if max_price is not None:
            db_query += " AND price <= ?"
            params.append(max_price)

        db_query += f" LIMIT {limit}"

        # 执行查询
        cursor.execute(db_query, params)
        results = cursor.fetchall()

        return {
            "success": True,
            "results": results,
            "total": len(results)
        }

    except Exception as e:
        return {
            "success": False,
            "error": str(e)
        }

工具错误处理

from typing import Optional, Dict, Any

class ToolError(Exception):
    """工具错误的自定义异常"""
    pass

def safe_tool_execute(tool_name: str, arguments: Dict, tool_handler) -> Dict[str, Any]:
    """
    执行具有全面错误处理的工具

    参数:
        tool_name: 要执行的工具名称
        arguments: 传递给工具的参数
        tool_handler: 工具处理函数

    返回:
        dict: 结果字典,包含成功状态和结果或错误
    """

    try:
        # 验证参数
        validation_result = validate_tool_arguments(tool_name, arguments)
        if not validation_result["valid"]:
            return {
                "success": False,
                "error": validation_result["error"],
                "error_code": "VALIDATION_ERROR"
            }

        # 执行工具
        result = tool_handler(**arguments)

        return {
            "success": True,
            "result": result
        }

    except ToolError as e:
        return {
            "success": False,
            "error": e.message,
            "error_code": "TOOL_ERROR"
        }

    except Exception as e:
        return {
            "success": False,
            "error": f"意外错误:{str(e)}",
            "error_code": "UNEXPECTED_ERROR"
        }

def validate_tool_arguments(tool_name: str, arguments: Dict) -> Dict[str, Any]:
    """验证工具参数"""
    errors = []

    # 检查缺失的必需参数
    required_params = get_required_params(tool_name)
    for param in required_params:
        if param not in arguments:
            errors.append(f"缺失必需参数:{param}")

    # 验证参数类型
    param_types = get_param_types(tool_name)
    for param, value in arguments.items():
        if param in param_types:
            expected_type = param_types[param]
            if not isinstance(value, expected_type):
                errors.append(f"参数 {param} 应该是 {expected_type.__name__}")

    return {
        "valid": len(errors) == 0,
        "error": "; ".join(errors) if errors else None
    }

内存模式

对话缓冲区

class ConversationBuffer:
    def __init__(self, max_messages=20):
        self.messages = []
        self.max_messages = max_messages

    def add(self, role, content):
        """添加消息到缓冲区"""
        self.messages.append({
            "role": role,
            "content": content,
            "timestamp": datetime.now().isoformat()
        })

        # 如果超过最大,修剪
        if len(self.messages) > self.max_messages:
            self.messages = self.messages[-self.max_messages:]

    def get_recent(self, n=10):
        """获取 n 个最新消息"""
        return self.messages[-n:]

    def get_all(self):
        """获取所有消息"""
        return self.messages

    def clear(self):
        """清除所有消息"""
        self.messages = []

    def get_context_string(self):
        """获取LLM的格式化上下文"""
        if not self.messages:
            return ""

        return "
".join([
            f"{msg['role']}: {msg['content']}"
            for msg in self.messages
        ])

摘要内存

class SummaryMemory:
    def __init__(self, llm_client):
        self.client = llm_client
        self.summaries = {}

    def add_messages(self, messages):
        """添加消息并创建摘要"""
        if not messages:
            return

        # 创建摘要
        text = "
".join([m["content"] for m in messages])

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system",
                    "content": "用2-3句话总结以下对话。"
                },
                {
                    "role": "user",
                    "content": text
                }
            ]
        )

        summary = response.choices[0].message.content
        key = self.generate_key(messages)

        self.summaries[key] = {
            "summary": summary,
            "messages": messages,
            "timestamp": datetime.now().isoformat()
        }

    def create_summary(self, messages):
        """创建消息摘要"""
        text = "
".join([m["content"] for m in messages])

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system",
                    "content": "用2-3句话总结以下对话。"
                },
                {
                    "role": "user",
                    "content": text
                }
            ]
        )

        return response.choices[0].message.content

    def generate_key(self, messages):
        """为存储摘要生成密钥"""
        # 使用第一条消息作为密钥
        if messages:
            return messages[0]["content"][:50]  # 前50个字符
        return "default"

向量内存

class VectorMemory:
    def __init__(self, vector_db):
        self.vector_db = vector_db
        self.embeddings = {}

    def add(self, key: str, value: str, metadata: dict = None):
        """添加具有嵌入的内存"""
        # 生成嵌入
        embedding = self.generate_embedding(value)

        # 存储在向量数据库中
        self.vector_db.upsert([{
            "id": key,
            "values": embedding,
            "metadata": metadata or {"text": value}
        }])

        self.embeddings[key] = {
            "value": value,
            "embedding": embedding,
            "metadata": metadata,
            "timestamp": datetime.now().isoformat()
        }

    def search(self, query: str, top_k: int = 5):
        """搜索相关内存"""
        query_embedding = self.generate_embedding(query)

        results = self.vector_db.query(
            vector=query_embedding.tolist(),
            top_k=top_k
        )

        return [
            {
                "key": r["id"],
                "value": r["metadata"]["text"],
                "score": r["score"],
                "metadata": r["metadata"]
            }
            for r in results["matches"]
        ]

    def generate_embedding(self, text: str):
        """为文本生成嵌入"""
        # 如果可用,使用缓存的嵌入
        if text in self.embeddings:
            return self.embeddings[text]["embedding"]

        # 生成新嵌入
        # 实施取决于嵌入模型
        embedding = get_embedding_from_model(text)
        return embedding

多代理系统

代理协作

class MultiAgentSystem:
    def __init__(self, agents):
        self.agents = agents
        self.message_bus = []

    def send_message(self, from_agent: str, to_agent: str, message: dict):
        """从一个代理发送消息到另一个代理"""
        msg = {
            "from": from_agent,
            "to": to_agent,
            "message": message,
            "timestamp": datetime.now().isoformat()
        }
        self.message_bus.append(msg)

    def get_messages_for_agent(self, agent_name: str):
        """获取特定代理的消息"""
        return [
            msg for msg in self.message_bus
            if msg["to"] == agent_name
        ]

    def broadcast(self, from_agent: str, message: dict):
        """广播消息到所有代理"""
        for agent in self.agents:
            if agent != from_agent:
                self.send_message(from_agent, agent, message)

# 用法
system = MultiAgentSystem(["researcher", "analyzer", "writer"])

# 研究员发送发现给分析员
system.send_message("researcher", "analyzer", {
    "type": "research_complete",
    "data": {"topic": "AI趋势", "findings": "..."}
})

# 分析员发送摘要给作者
system.send_message("analyzer", "writer", {
    "type": "analysis_complete",
    "summary": "关键趋势:..."
})

代理委派

class DelegatingAgent:
    def __init__(self, llm_client, sub_agents):
        self.client = llm_client
        self.sub_agents = sub_agents

    def delegate_task(self, task: str, context: dict):
        """委派任务给适当的子代理"""

        # 确定哪个代理应处理任务
        agent = self.select_agent(task, context)

        # 委派给代理
        result = agent.execute(task, context)

        return {
            "delegated_to": agent.name,
            "result": result
        }

    def select_agent(self, task: str, context: dict):
        """为任务选择最佳代理"""
        task_lower = task.lower()

        # 简单关键词匹配
        scores = []
        for agent in self.sub_agents:
            score = 0
            for keyword in agent.keywords:
                if keyword in task_lower:
                    score += 1

            scores.append((score, agent))

        # 返回分数最高的代理
        if scores:
            return max(scores, key=lambda x: x[0])[1]

        # 默认返回第一个代理
        return self.sub_agents[0]

# 用法
delegator = DelegatingAgent(client, [searcher, analyzer, writer])
result = delegator.delegate_task("搜索Python教程", {})

主管模式

class SupervisorAgent:
    def __init__(self, llm_client, workers):
        self.client = llm_client
        self.workers = workers
        self.task_queue = []
        self.completed_tasks = []

    def assign_task(self, task: dict):
        """分配任务给可用的工作者"""
        # 找到任务的最佳工作者
        worker = self.select_worker(task)

        # 分配任务
        task["worker"] = worker.name
        task["status"] = "assigned"
        task["assigned_at"] = datetime.now().isoformat()

        self.task_queue.append(task)

        return worker.execute(task)

    def select_worker(self, task: dict):
        """为任务选择最佳工作者"""
        task_type = task.get("type", "")

        # 找到可以处理此任务类型的工作者
        eligible_workers = [
            w for w in self.workers
            if task_type in w.capabilities
        ]

        if not eligible_workers:
            return self.workers[0]

        # 选择负载最低的工作者
        return min(eligible_workers, key=lambda w: w.current_tasks)

    def get_status(self):
        """获取整体状态"""
        assigned = len([t for t in self.task_queue if t["status"] == "assigned"])
        completed = len(self.completed_tasks)

        return {
            "total_tasks": assigned + completed,
            "assigned": assigned,
            "completed": completed,
            "workers": {w.name: w.current_tasks for w in self.workers}
        }

错误恢复和自校正

指数退避重试

import time
import random

class RetryHandler:
    def __init__(self, max_retries=3, base_delay=1, max_delay=60):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay

    def execute_with_retry(self, func, *args, **kwargs):
        """执行函数,带指数退避重试"""
        last_exception = None

        for attempt in range(self.max_retries):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                last_exception = e

                if attempt < self.max_retries - 1:
                    # 计算带抖动的延迟
                    delay = min(
                        self.base_delay * (2 ** attempt),
                        self.max_delay
                    )
                    jitter = random.uniform(0.8, 1.2)
                    time.sleep(delay * jitter)
                else:
                    # 最后一次尝试失败
                    raise e

        # 所有重试失败
        raise last_exception

# 用法
@RetryHandler(max_retries=3)
def unreliable_tool_call(data):
    # 可能偶尔失败
    if random.random() < 0.3:
        raise Exception("随机失败")
    return {"result": data}

result = unreliable_tool_call("测试数据")

后备策略

class FallbackManager:
    def __init__(self):
        self.primary_tools = {}
        self.fallback_tools = {}

    def register_tool(self, name, primary, fallback):
        """注册带主要和后备的工具"""
        self.primary_tools[name] = primary
        self.fallback_tools[name] = fallback

    def execute(self, tool_name, arguments):
        """执行带后备的工具"""
        # 尝试主要工具
        try:
            result = self.primary_tools[tool_name](**arguments)
            return {
                "success": True,
                "result": result,
                "method": "primary"
            }
        except Exception as e:
            # 使用后备
            if tool_name in self.fallback_tools:
                try:
                    result = self.fallback_tools[tool_name](**arguments)
                    return {
                        "success": True,
                        "result": result,
                        "method": "fallback",
                        "warning": f"主要工具失败:{str(e)}"
                    }
                except Exception as fe:
                    return {
                        "success": False,
                        "error": f"主要和后备都失败:{str(fe)}"
                    }
            else:
                return {
                    "success": False,
                    "error": f"工具失败:{str(e)}"
                }

# 用法
def primary_search(query):
    # 可能失败
    if random.random() < 0.2:
        raise Exception("搜索服务不可用")
    return {"results": []}

def fallback_search(query):
    # 更简单、更可靠
    return {"results": ["后备结果"]}

manager = FallbackManager()
manager.register_tool("search", primary_search, fallback_search)

从失败中学习

class LearningAgent:
    def __init__(self, llm_client):
        self.client = llm_client
        self.failure_history = []
        self.success_patterns = {}

    def record_failure(self, action, error):
        """为学习记录失败"""
        self.failure_history.append({
            "action": action,
            "error": str(error),
            "timestamp": datetime.now().isoformat()
        })

    def record_success(self, action, result):
        """记录成功行动"""
        pattern = self.extract_pattern(action)

        if pattern not in self.success_patterns:
            self.success_patterns[pattern] = 0

        self.success_patterns[pattern] += 1

    def extract_pattern(self, action):
        """从行动中提取模式"""
        # 简单模式提取
        if "search" in action.lower():
            return "search"
        elif "calculate" in action.lower():
            return "calculate"
        else:
            return "unknown"

    def get_recommendation(self, action):
        """基于失败历史获取建议"""
        pattern = self.extract_pattern(action)

        # 找到相似失败
        similar_failures = [
            f for f in self.failure_history
            if pattern in f["action"].lower()
        ]

        if similar_failures:
            # 分析常见失败原因
            error_types = [f["error"] for f in similar_failures]
            from collections import Counter
            common_error = Counter(error_types).most_common(1)[0]

            # 获取最常见错误
            suggestions = {
                "timeout": "尝试增加超时或使用更快的端点",
                "rate_limit": "实现速率限制并带退避重试",
                "invalid_input": "添加更好的输入验证",
                "permission": "在执行前检查用户权限",
                "network": "检查网络连接并尝试替代端点"
            }

            return suggestions.get(common_error, "尝试不同的参数")

护栏和安全

输出验证

import re

class OutputValidator:
    def __init__(self):
        self.rules = {
            "no_code_execution": True,
            "no_pii": True,
            "max_length": 1000,
            "allowed_patterns": []
        }

    def validate(self, output: str) -> dict:
        """根据规则验证代理输出"""
        violations = []

        # 检查代码执行
        if self.rules["no_code_execution"]:
            code_patterns = [
                r'```.*?python',
                r'```.*?javascript',
                r'```.*?bash',
                r'exec\(',
                r'eval\('
            ]
            for pattern in code_patterns:
                if re.search(pattern, output, re.IGNORECASE):
                    violations.append("包含代码执行尝试")

        # 检查PII
        if self.rules["no_pii"]:
            pii_patterns = [
                r'\b\d{3}[-.\d{2}[-.\d{4}\b',  # SSN
                r'\b\d{3}[-.\d{2}[-.\d{4}\b',  # 电话
                r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b',  # 邮箱
                r'\b\d{16}\b'  # 信用卡
            ]
            for pattern in pii_patterns:
                if re.search(pattern, output):
                    violations.append("包含潜在PII")

        # 检查长度
        if len(output) > self.rules["max_length"]:
            violations.append(f"输出太长({len(output)} 字符)")

        # 检查允许的模式
        if self.rules["allowed_patterns"]:
            for pattern in self.rules["allowed_patterns"]:
                if not re.search(pattern, output):
                    violations.append(f"不匹配所需模式:{pattern}")

        return {
            "valid": len(violations) == 0,
            "violations": violations
        }

行动限制

class ActionLimiter:
    def __init__(self, max_actions_per_minute=10, max_actions_per_hour=100):
        self.max_per_minute = max_actions_per_minute
        self.max_per_hour = max_actions_per_hour
        self.action_log = []

    def check_limit(self, action_type: str) -> bool:
        """检查行动是否允许"""
        now = datetime.now()
        minute_ago = now - timedelta(minutes=1)
        hour_ago = now - timedelta(hours=1)

        # 计数最后一分钟的行动
        recent_minute = [
            a for a in self.action_log
            if a["type"] == action_type and a["timestamp"] > minute_ago.isoformat()
        ]

        if len(recent_minute) >= self.max_per_minute:
            return False

        # 计数最后一小时的行动
        recent_hour = [
            a for a in self.action_log
            if a["type"] == action_type and a["timestamp"] > hour_ago.isoformat()
        ]

        if len(recent_hour) >= self.max_per_hour:
            return False

        return True

    def record_action(self, action_type: str, details: dict = None):
        """记录行动"""
        self.action_log.append({
            "type": action_type,
            "details": details,
            "timestamp": datetime.now().isoformat()
        })

        # 清理旧日志(保留最后24小时)
        day_ago = (datetime.now() - timedelta(hours=24)).isoformat()
        self.action_log = [
            a for a in self.action_log
            if a["timestamp"] > day_ago
        ]

人在环路中

class HumanApprovalFlow:
    def __init__(self):
        self.pending_approvals = {}
        self.approved_actions = {}

    def request_approval(self, agent_name: str, action: dict, user_id: str):
        """请求人类批准行动"""
        approval_id = f"{agent_name}_{datetime.now().timestamp()}"

        self.pending_approvals[approval_id] = {
            "agent": agent_name,
            "action": action,
            "user_id": user_id,
            "requested_at": datetime.now().isoformat(),
            "status": "pending"
        }

        # 通知用户(实施取决于通知系统)
        self.notify_user(user_id, approval_id, action)

        return {
            "approval_id": approval_id,
            "status": "pending"
        }

    def approve_action(self, approval_id: str):
        """批准待定行动"""
        if approval_id not in self.pending_approvals:
            return {"error": "未找到批准"}

        approval = self.pending_approvals[approval_id]

        # 执行行动
        result = self.execute_action(approval["action"])

        # 记录批准
        self.approved_actions[approval_id] = {
            "approval": approval,
            "executed_at": datetime.now().isoformat(),
            "result": result
        }

        # 从待定中移除
        del self.pending_approvals[approval_id]

        return result

    def reject_action(self, approval_id: str, reason: str):
        """拒绝待定行动"""
        if approval_id not in self.pending_approvals:
            return {"error": "未找到批准"}

        approval = self.pending_approvals[approval_id]

        # 记录拒绝
        self.approved_actions[approval_id] = {
            "approval": approval,
            "rejected_at": datetime.now().isoformat(),
            "reason": reason
        }

        # 从待定中移除
        del self.pending_approvals[approval_id]

        return {"status": "rejected", "reason": reason}

评估和测试

单元测试代理

import pytest
from unittest.mock import Mock, patch

def test_agent_tool_execution():
    """测试代理工具执行"""
    # 模拟LLM响应
    with patch('openai.OpenAI') as mock_openai:
        mock_client = Mock()
        mock_response = Mock()
        mock_response.choices = [Mock()]
        mock_response.choices[0].message = Mock()
        mock_response.choices[0].message.function_calls = [
            Mock(name="test_tool", arguments='{"param": "value"}')
        ]

        mock_client.chat.completions.create.return_value = mock_response

        # 测试
        agent = Agent(mock_client, [test_tool])
        result = agent.run("测试查询")

        # 验证工具被调用
        assert mock_response.choices[0].message.function_calls[0].name == "test_tool"

集成测试

def test_end_to_end_agent_workflow():
    """测试完整代理工作流"""
    # 设置测试环境
    llm_client = OpenAI()
    tools = [search_tool, database_tool]

    # 创建代理
    agent = ReActAgent(llm_client, tools)

    # 测试场景
    user_query = "查找我关于Python编程的信息"

    # 运行代理
    result = agent.run(user_query)

    # 验证工作流
    assert len(result) > 0  # 代理产生了输出

    # 检查是否调用了工具
    tool_calls = [msg for msg in result if msg.get("role") == "function"]
    assert len(tool_calls) > 0  # 使用了工具

    # 验证最终答案
    final_answer = [msg for msg in result if msg.get("role") == "assistant"]][-1]
    assert "Python" in final_answer.get("content", "")  # 答案相关

成本管理

class AgentCostTracker:
    def __init__(self):
        self.tool_costs = {}
        self.llm_costs = {
            "prompt_tokens": 0,
            "completion_tokens": 0,
            "total_cost": 0
        }

    def track_tool_call(self, tool_name: str, arguments: dict):
        """跟踪工具执行成本"""
        # 估计工具成本(可能是API调用、计算时间等)
        cost = self.estimate_tool_cost(tool_name, arguments)

        self.tool_costs[tool_name] = self.tool_costs.get(tool_name, 0) + cost
        return cost

    def track_llm_call(self, prompt_tokens: int, completion_tokens: int):
        """跟踪LLM API成本"""
        # GPT-4 定价(示例)
        prompt_cost = prompt_tokens * 0.00003  # $0.03 每1K令牌
        completion_cost = completion_tokens * 0.00006  # $0.06 每1K令牌

        self.llm_costs["prompt_tokens"] += prompt_tokens
        self.llm_costs["completion_tokens"] += completion_tokens
        self.llm_costs["total_cost"] += prompt_cost + completion_cost

        return prompt_cost + completion_cost

    def get_total_cost(self):
        """获取代理执行总成本"""
        tool_cost = sum(self.tool_costs.values())
        llm_cost = self.llm_costs["total_cost"]

        return {
            "tool_calls": tool_cost,
            "llm_calls": llm_cost,
            "total": tool_cost + llm_cost
        }

可观察性和调试

代理日志

import logging

class AgentLogger:
    def __init__(self, agent_name: str):
        self.agent_name = agent_name
        self.logger = logging.getLogger(f"agent.{agent_name}")
        self.logger.setLevel(logging.DEBUG)

        # 文件处理器
        handler = logging.FileHandler(f'logs/{agent_name}.log')
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

    def log_thought(self, thought: str):
        """记录代理思考过程"""
        self.logger.debug(f"思考:{thought}")

    def log_action(self, action: str, details: dict):
        """记录代理行动"""
        self.logger.info(f"行动:{action},细节:{details}")

    def log_tool_call(self, tool_name: str, arguments: dict, result):
        """记录工具执行"""
        self.logger.info(f"工具:{tool_name},参数:{arguments},结果:{result}")

    def log_error(self, error: str, context: dict):
        """记录错误"""
        self.logger.error(f"错误:{error},上下文:{context}")

    def log_completion(self, result: str):
        """记录任务完成"""
        self.logger.info(f"完成:{result}")

调试工具

class AgentDebugger:
    def __init__(self, agent):
        self.agent = agent
        self.step_history = []

    def trace_execution(self, query: str):
        """逐步追踪代理执行"""
        self.step_history = []

        # 运行带追踪的代理
        original_run = self.agent.run

        # 捕获每一步
        for step in original_run:
            self.step_history.append({
                "step": len(self.step_history) + 1,
                "type": self.get_step_type(step),
                "content": step,
                "timestamp": datetime.now().isoformat()
            })

        return {
            "steps": self.step_history,
            "final_result": original_run[-1] if original_run else None
        }

    def get_step_type(self, step):
        """确定步骤类型"""
        content = str(step)

        if "思考:" in content:
            return "thought"
        elif "行动:" in content:
            return "action"
        elif "观察:" in content:
            return "observation"
        elif "函数:" in content:
            return "function_call"
        else:
            return "other"

    def visualize_trace(self):
        """生成执行追踪可视化"""
        mermaid_diagram = "graph TD;
"

        for step in self.step_history:
            step_num = step["step"]
            content = step["content"][:50]  # 截断

            step_type = step["type"]

            if step_type == "thought":
                mermaid_diagram += f'S{step_num}[思考:"{content}"]
'
            elif step_type == "action":
                mermaid_diagram += f'S{step_num}[行动:"{content}"]
'
            elif step_type == "observation":
                mermaid_diagram += f'S{step_num}[观察:"{content}"]
'
            elif step_type == "function_call":
                mermaid_diagram += f'S{step_num}[函数:"{content}"]
'

        mermaid_diagram += "end;"

        return mermaid_diagram

生产部署

部署考虑

class ProductionAgentConfig:
    def __init__(self):
        self.config = {
            # LLM 设置
            "model": "gpt-4",
            "temperature": 0.7,
            "max_tokens": 4096,
            "timeout": 30,

            # 代理设置
            "max_iterations": 10,
            "max_tools_per_call": 5,

            # 安全设置
            "enable_guardrails": True,
            "require_human_approval_for": ["delete", "transfer"],
            "max_cost_per_session": 1.00,

            # 监控
            "enable_logging": True,
            "enable_tracing": True,
            "log_level": "INFO",

            # 性能
            "enable_caching": True,
            "cache_ttl": 300,
            "log_level": "INFO"
        }

    def validate(self) -> bool:
        """验证配置"""
        # 检查必需设置
        required = ["model"]

        for key in required:
            if key not in self.config:
                return False

        # 验证值
        if self.config["temperature"] < 0 or self.config["temperature"] > 2:
            return False

        return True

    def get_llm_config(self) -> dict:
        """获取LLM配置"""
        return {
            "model": self.config["model"],
            "temperature": self.config["temperature"],
            "max_tokens": self.config["max_tokens"],
            "timeout": self.config["timeout"]
        }

扩展策略

class AgentPool:
    def __init__(self, agent_factory, pool_size=5):
        self.agent_factory = agent_factory
        self.pool_size = pool_size
        self.agents = []
        self.current_index = 0

    def initialize_pool(self):
        """初始化代理池"""
        for i in range(self.pool_size):
            agent = self.agent_factory(f"agent_{i}")
            self.agents.append(agent)

        return self.agents

    def get_agent(self) -> object:
        """获取下一个可用代理(轮询)"""
        if not self.agents:
            raise Exception("代理池未初始化")

        agent = self.agents[self.current_index]
        self.current_index = (self.current_index + 1) % self.pool_size

        return agent

    def get_all_agents(self) -> list:
        """获取池中所有代理"""
        return self.agents

常见用例

研究助手

class ResearchAssistantAgent:
    def __init__(self, llm_client, tools):
        self.client = llm_client
        self.tools = tools
        self.memory = ConversationBuffer(max_messages=20)

    def research(self, topic: str):
        """研究主题"""
        # 阶段 1:计划研究
        plan = self.create_research_plan(topic)

        # 阶段 2:执行研究
        findings = []
        for step in plan:
            result = self.execute_research_step(step)
            findings.append(result)

        # 阶段 3:合成
        synthesis = self.synthesize_findings(findings)

        return {
            "topic": topic,
            "plan": plan,
            "findings": findings,
            "synthesis": synthesis
        }

    def create_research_plan(self, topic: str):
        """创建研究计划"""
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system",
                    "content": "为以下主题创建分步研究计划:" + topic + "。"
                },
                {"role": "user", "content": f"研究:{topic}"}
            ],
            tools=self.tools
        )

        return self.parse_plan(response.choices[0].message.content)

    def execute_research_step(self, step):
        """执行研究步骤"""
        # 实施
        return {"result": f"步骤的研究结果"}

    def synthesize_findings(self, findings):
        """将发现合成为最终答案"""
        return f"合成报告:{findings}"

代码生成代理

class CodeGenerationAgent:
    def __init__(self, llm_client):
        self.client = llm_client

    def generate_code(self, requirements: str, language: str = "python"):
        """基于需求生成代码"""
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system",
                    "content": f"""您是一名专家 {language} 开发者。
基于需求生成干净、有良好文档的代码。
包括类型提示和文档字符串。
"""
                },
                {
                    "role": "user",
                    "content": f"需求:{requirements}"
                }
            ]
        )

        code = response.choices[0].message.content

        # 验证和格式化代码
        return self.format_code(code)

    def format_code(self, code: str) -> str:
        """用语法高亮格式化代码"""
        # 添加Markdown代码块
        if "```" not in code:
            code = f"```{language}
{code}
```"

        return code

客户服务代理

class CustomerServiceAgent:
    def __init__(self, llm_client, tools):
        self.client = llm_client
        self.tools = tools
        self.customer_db = None

    def handle_query(self, query: str, customer_id: str):
        """处理客户服务查询"""
        # 获取客户上下文
        customer = self.get_customer_context(customer_id)

        # 分析查询意图
        intent = self.analyze_intent(query)

        # 路由到适当的处理器
        if intent == "order_status":
            return self.check_order_status(query, customer_id)
        elif intent == "refund_request":
            return self.process_refund(query, customer_id)
        elif intent == "product_info":
            return self.get_product_info(query)
        else:
            return self.general_inquiry(query)

    def analyze_intent(self, query: str) -> str:
        """分析客户查询意图"""
        if "order" in query.lower() and "status" in query.lower():
            return "order_status"
        elif "refund" in query.lower():
            return "refund_request"
        elif "product" in query.lower():
            return "product_info"
        else:
            return "general_inquiry"

    def get_customer_context(self, customer_id: str):
        """获取客户上下文"""
        # 实施
        return {"customer_id": customer_id, "data": {}}

数据分析代理

class DataAnalysisAgent:
    def __init__(self, llm_client, tools):
        self.client = llm_client
        self.tools = tools

    def analyze_data(self, data: dict):
        """分析提供的数据"""
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {
                    "role": "system",
                    "content": """您是一名数据分析师。
分析提供的数据并提供见解。
关注趋势、模式和可操作建议。

可用工具:
- search:搜索信息
- calculate:执行计算
- database:查询数据库
"""
                },
                {
                    "role": "user",
                    "content": f"数据:{json.dumps(data)}"
                }
            ],
            tools=self.tools
        )

        analysis = response.choices[0].message.content

        return {
            "data": data,
            "analysis": analysis,
            "insights": self.extract_insights(analysis)
        }

    def extract_insights(self, analysis: str) -> list:
        """从分析中提取关键见解"""
        insights = []

        # 查找见解关键词
        insight_keywords = [
            "trend", "pattern", "recommendation", "anomaly", "increase", "decrease", "correlation"
        ]

        for keyword in insight_keywords:
            if keyword in analysis.lower():
                # 提取包含关键词的句子
                sentences = analysis.split('.')
                for sentence in sentences:
                    if keyword in sentence.lower():
                        insights.append(sentence.strip())

        return insights

最佳实践

代理设计

  • 保持代理专注于特定任务
  • 使用清晰的工具接口
  • 实现适当的错误处理
  • 为安全添加护栏
  • 为您的用例使用适当的内存类型
  • 实现内存检索策略
  • 定期清理旧内存
  • 为高效检索索引内存
  • 记录所有代理决策
  • 跟踪工具执行
  • 监控成本
  • 为异常设置警报
  • 实现调试工具

内存管理

  • 为您的用例使用适当的内存类型

    • 短期:用于对话上下文
    • 长期:用于持久知识
    • 情景:用于特定经验
  • 实现内存检索策略

    • 为高效检索索引内存
    • 定期清理旧内存

工具使用

  • 设计具有清晰接口的工具
  • 实现适当的验证
  • 为工具执行使用超时
  • 提供有意义的错误消息
  • 实现重试逻辑

规划

  • 将复杂任务分解为较小步骤
  • 执行前验证计划
  • 优雅处理计划失败
  • 允许计划调整

安全

  • 实现输出验证
  • 为关键行动使用人类批准
  • 设置行动限制
  • 监控代理行为
    • 为审计记录所有行动
    • 为异常设置警报

可观察性

  • 记录所有代理决策
  • 跟踪工具执行
  • 监控成本
  • 为异常设置警报
  • 实现调试工具

生产部署

  • 为模型加载使用单例模式
  • 配置适当的超时
  • 为性能实现缓存
  • 设置监控
  • 使用适当的硬件

相关技能