name: AI 代理模式 description: 用于构建能够推理、规划和执行任务的自主AI代理的设计模式。
AI 代理模式
概述
AI 代理是使用大型语言模型(LLMs)进行推理、规划和执行任务的自主系统。与简单的聊天机器人不同,代理能够将复杂问题分解为步骤,使用工具与外部系统交互,并根据反馈调整行为。本技能涵盖代理架构(ReAct、Plan-and-Execute、Tree of Thoughts、Reflexion)、核心组件(内存、规划、工具使用)、框架(LangChain、AutoGPT、CrewAI、Semantic Kernel)、工具设计、多代理系统、错误恢复、护栏、评估和生产部署。
先决条件
- 了解LLMs及其能力
- 掌握提示工程和系统提示知识
- 熟悉函数调用和工具API
- 理解代理架构和设计模式
- 基础知识:向量数据库和嵌入
- 理解AI系统的安全性和护栏
关键概念
代理特征
- 自主性:无需人工干预即可做出决策
- 推理能力:能将复杂任务分解为子任务
- 规划能力:能创建和执行多步骤计划
- 工具使用:能通过函数调用与外部系统交互
- 内存:能在交互间维护上下文
- 适应性:能从反馈中学习并调整行为
简单代理 vs 复杂代理
- 简单代理:单步执行(如天气查询 → 结果)
- 复杂代理:多步规划(如旅行规划 → 搜索 → 预订 → 确认)
代理架构
- ReAct:循环中的推理 + 执行
- Plan-and-Execute:先创建计划,然后逐步执行
- Tree of Thoughts:在决策前探索多个推理路径
- Reflexion:反思行动并从错误中学习
核心组件
- 内存:短期(对话历史)、长期(知识库)、情景(特定经验)
- 规划:将目标分解为可操作步骤
- 工具使用:安全地注册、验证和执行工具
- 反思:分析行动并改进行为
部署模式
- FastAPI 服务器:用于代理推理的REST API
- Docker 部署:容器化代理服务
- Kubernetes:支持GPU的可扩展部署
- 多代理系统:代理协作和委派
实施指南
代理架构
ReAct(推理 + 执行)
ReAct 模式在循环中结合推理和执行。
from openai import OpenAI
class ReActAgent:
def __init__(self, llm_client, tools):
self.client = llm_client
self.tools = tools
self.max_iterations = 10
def run(self, query):
thoughts = []
for i in range(self.max_iterations):
# 思考:我应该做什么?
thought_response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": self.get_system_prompt()},
*thoughts,
{"role": "user", "content": query}
],
tools=self.tools
)
thought = thought_response.choices[0].message.content
thoughts.append({"role": "assistant", "content": thought})
# 检查代理是否想使用工具
if thought_response.choices[0].finish_reason == "function_calls":
fc = thought_response.choices[0].message.function_calls[0]
# 行动:执行工具
tool_result = self.execute_tool(fc.name, fc.arguments)
# 观察:我观察到了什么?
observation = f"工具 {fc.name} 返回:{json.dumps(tool_result)}"
thoughts.append({"role": "assistant", "content": observation})
else:
# 无需更多行动
break
return thoughts
def get_system_prompt(self):
return """您是一个有帮助的助手,可以访问工具。
使用以下格式:
思考:关于要做什么的思考
行动:要采取的行动(工具调用或最终答案)
观察:行动的结果
...(根据需要重复思考/行动/观察)"""
def execute_tool(self, tool_name, arguments):
# 执行工具
for tool in self.tools:
if tool["name"] == tool_name:
handler = tool["handler"]
return handler(**arguments)
return None
# 用法
tools = [
{
"type": "function",
"function": {
"name": "search",
"description": "搜索信息",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"}
},
"required": ["query"]
}
},
"handler": search_tool
}
]
agent = ReActAgent(client, tools)
conversation = agent.run("法国首都是什么?")
Plan-and-Execute
代理先创建计划,然后逐步执行。
class PlanAndExecuteAgent:
def __init__(self, llm_client, tools):
self.client = llm_client
self.tools = tools
def run(self, query):
# 阶段 1:规划
plan_response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": """您是一个规划代理。
将用户请求分解为编号的步骤列表。
每个步骤应该是可操作的,并使用可用工具。
可用工具:
- search:搜索信息
- calculate:执行计算
- database:查询数据库
将响应格式化为:
步骤 1:[行动]
步骤 2:[行动]
..."""
},
{"role": "user", "content": query}
],
tools=self.tools
)
plan_text = plan_response.choices[0].message.content
steps = self.parse_plan(plan_text)
# 阶段 2:执行
results = []
for step in steps:
result = self.execute_step(step)
results.append(result)
# 阶段 3:合成
synthesis_response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": "将结果合成为最终答案。"
},
{"role": "user", "content": query},
{"role": "assistant", "content": plan_text},
*[{"role": "assistant", "content": f"步骤 {i+1} 结果:{json.dumps(r)}"}
for i, r in enumerate(results)]
]
)
return synthesis_response.choices[0].message.content
def parse_plan(self, plan_text):
steps = []
lines = plan_text.split('
')
for line in lines:
if line.strip().startswith('步骤'):
steps.append(line.strip())
return steps
def execute_step(self, step):
# 从步骤中提取工具名称和参数
tool_name = self.extract_tool_name(step)
arguments = self.extract_arguments(step)
# 执行工具
for tool in self.tools:
if tool["name"] == tool_name:
handler = tool["handler"]
return handler(**arguments)
return None
Tree of Thoughts
代理在决策前探索多个推理路径。
class TreeOfThoughtsAgent:
def __init__(self, llm_client, tools):
self.client = llm_client
self.tools = tools
self.max_depth = 3
self.max_branches = 5
def run(self, query):
# 生成多个候选思考
thoughts_response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": f"""您是一个深思熟虑的代理。
生成 {self.max_branches} 个不同的可能思考/路径来回答查询。
每个思考应探索不同的方法。
格式为:
思考 1:[描述]
思考 2:[描述]
...
"""
},
{"role": "user", "content": query}
],
tools=self.tools
)
thoughts_text = thoughts_response.choices[0].message.content
thoughts = thoughts_text.split('
')
# 评估每个思考
evaluations = []
for i, thought in enumerate(thoughts):
evaluation = self.evaluate_thought(thought, query)
evaluations.append((i, evaluation))
# 选择最佳思考
best = max(evaluations, key=lambda x: x[1])
best_thought = thoughts[best[0]]
# 执行最佳思考
result = self.execute_thought(best_thought)
return result
def evaluate_thought(self, thought, query):
# 基于各种标准评分思考
score = 0
# 清晰度
if "clear" in thought.lower():
score += 2
# 完整性
if "step" in thought.lower():
score += 2
# 效率
if "quick" in thought.lower() or "fast" in thought.lower():
score += 1
return score
def execute_thought(self, thought):
# 从思考中提取工具调用
tool_calls = self.extract_tool_calls(thought)
# 执行所有工具调用
results = []
for tc in tool_calls:
result = self.execute_tool(tc["name"], tc["arguments"])
results.append(result)
return results
Reflexion
代理反思其行动并从错误中学习。
class ReflexionAgent:
def __init__(self, llm_client, tools):
self.client = llm_client
self.tools = tools
self.memory = []
self.reflections = []
def run(self, query):
while True:
# 行动:生成行动
action_response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": "您是一个有帮助的助手。回答用户的查询。"
},
*self.memory,
{"role": "user", "content": query}
],
tools=self.tools
)
action = action_response.choices[0].message
self.memory.append({"role": "assistant", "content": action})
# 检查是否采取了行动
if action_response.choices[0].finish_reason == "function_calls":
fc = action_response.choices[0].message.function_calls[0]
# 执行行动
result = self.execute_tool(fc.name, fc.arguments)
# 反思:这个行动成功了吗?
reflection = self.reflect(action, result)
self.reflections.append(reflection)
# 用反思更新内存
self.memory.append({
"role": "system",
"content": f"反思:{reflection}"
})
# 如果行动失败,再试一次
if "failed" in reflection.lower():
continue
else:
# 成功 - 生成最终答案
final_response = self.client.chat.completions.create(
model="gpt-4",
messages=[
*self.memory,
{
"role": "user",
"content": query
},
{
"role": "function",
"name": fc.name,
"content": json.dumps(result)
}
]
)
return final_response.choices[0].message.content
else:
# 无需行动 - 返回答案
return action
def reflect(self, action, result):
# 分析行动和结果
reflection_parts = []
# 行动是否达到了目标?
if result.get("success", False):
reflection_parts.append("行动失败。")
# 出了什么问题?
if "error" in result:
reflection_parts.append(f"错误:{result['error']}")
# 如何改进?
if "error" in result:
reflection_parts.append("下次,先验证参数。")
return " ".join(reflection_parts)
核心组件
内存(短期、长期)
from typing import List, Dict, Any
from datetime import datetime
class AgentMemory:
def __init__(self):
self.short_term = [] # 对话历史
self.long_term = {} # 持久知识
self.episodic = [] # 特定经验
def add_to_short_term(self, role, content):
"""添加到对话历史"""
self.short_term.append({
"role": role,
"content": content,
"timestamp": datetime.now().isoformat()
})
# 保留最后10条消息
if len(self.short_term) > 10:
self.short_term = self.short_term[-10:]
def add_to_long_term(self, key, value):
"""添加到持久知识库"""
self.long_term[key] = {
"value": value,
"timestamp": datetime.now().isoformat(),
"access_count": self.long_term.get(key, {}).get("access_count", 0) + 1
}
def add_episode(self, episode_type, data):
"""添加特定经验/事件"""
self.episodic.append({
"type": episode_type,
"data": data,
"timestamp": datetime.now().isoformat()
})
def retrieve_relevant(self, query, max_results=5):
"""基于查询检索相关信息"""
# 长期内存的简单关键词匹配
relevant = []
query_lower = query.lower()
for key, value in self.long_term.items():
if query_lower in key.lower() or key.lower() in query_lower():
relevant.append({
"key": key,
"value": value["value"],
"relevance": self.calculate_relevance(query, key)
})
# 按相关性排序并返回顶部结果
relevant.sort(key=lambda x: x["relevance"], reverse=True)
return relevant[:max_results]
def calculate_relevance(self, query, key):
"""计算相关性分数"""
query_words = set(query.lower().split())
key_words = set(key.lower().split())
# Jaccard 相似度
intersection = len(query_words & key_words)
union = len(query_words | key_words)
return intersection / union if union > 0 else 0
def get_context(self, max_messages=10):
"""获取最近对话上下文"""
return self.short_term[-max_messages:]
规划
from typing import List, Dict
class Planner:
def __init__(self, llm_client):
self.client = llm_client
def create_plan(self, goal, constraints=None):
"""创建分步计划以实现目标"""
system_prompt = f"""您是一个规划代理。
创建详细计划以实现目标:{goal}
约束:
{constraints if constraints else "无"}
可用操作:
- search:搜索信息
- query_database:查询数据库
- calculate:执行计算
- api_call:调用外部API
将您的计划格式化为:
1. [操作] - [描述]
2. [操作] - [描述]
...
每个步骤应具体且可操作。
"""
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": goal}
],
tools=self.tools
)
plan_text = response.choices[0].message.content
return self.parse_plan(plan_text)
def parse_plan(self, plan_text):
"""将计划解析为结构化格式"""
steps = []
lines = plan_text.split('
')
for line in lines:
if line.strip() and line.strip()[0].isdigit():
parts = line.split('-', 1)
if len(parts) == 2:
operation = parts[0].strip()
description = parts[1].strip()
steps.append({
"step": int(operation[0]),
"operation": operation,
"description": description
})
return steps
def update_plan(self, plan, completed_step, new_step=None):
"""完成步骤后更新计划"""
updated_plan = []
for step in plan:
if step["step"] == completed_step:
continue # 跳过已完成的步骤
if new_step:
# 在完成的步骤后插入新步骤
if step["step"] == completed_step - 1:
updated_plan.append(step)
updated_step = {
"step": completed_step,
"operation": new_step["operation"],
"description": new_step["description"]
}
# 重编号剩余步骤
for s in plan[step["step"]:]:
updated_step = s.copy()
updated_step["step"] = len(updated_plan) + 1
updated_plan.append(updated_step)
break
else:
updated_plan.append(step)
return updated_plan
工具使用
from typing import Dict, Callable, Any
class ToolRegistry:
def __init__(self):
self.tools: Dict[str, Dict] = {}
def register(self, name: str, handler: Callable, description: str, parameters: Dict):
"""注册工具"""
self.tools[name] = {
"handler": handler,
"description": description,
"parameters": parameters
}
def get_tool(self, name: str) -> Dict:
"""按名称获取工具"""
return self.tools.get(name)
def list_tools(self) -> List[Dict]:
"""列出所有可用工具"""
return list(self.tools.values())
def execute(self, name: str, arguments: Dict) -> Any:
"""使用给定参数执行工具"""
tool = self.get_tool(name)
if not tool:
raise ValueError(f"未找到工具:{name}")
handler = tool["handler"]
return handler(**arguments)
def get_openai_schema(self):
"""将工具转换为OpenAI函数调用格式"""
return [
{
"type": "function",
"function": {
"name": name,
"description": tool["description"],
"parameters": tool["parameters"]
}
}
for name, tool in self.tools.items()
]
# 用法
def search_database(query: str) -> Dict:
"""搜索数据库"""
# 实施
return {"results": [], "query": query}
def calculate(expression: str) -> float:
"""计算数学表达式"""
# 实施
return eval(expression)
registry = ToolRegistry()
registry.register("search", search_database, "搜索数据库", {
"type": "object",
"properties": {
"query": {"type": "string"}
},
"required": ["query"]
})
registry.register("calculate", calculate, "执行计算", {
"type": "object",
"properties": {
"expression": {"type": "string"}
},
"required": ["expression"]
})
反思
class ReflectionEngine:
def __init__(self, llm_client):
self.client = llm_client
self.reflection_history = []
def reflect_on_action(self, action, result, context):
"""反思行动及其结果"""
reflection_prompt = f"""
您是一个反思引擎。分析以下行动和结果:
行动:{action}
结果:{result}
上下文:{context}
提供一个反思,包括:
1. 这个行动的目标是什么?
2. 行动成功了吗?
3. 如果不成功,出了什么问题?
4. 下次如何改进?
5. 我们应该尝试不同的方法吗?
将您的反思格式化为:
分析:[您的分析]
建议:[您的建议]
"""
response = self.client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": reflection_prompt}]
)
reflection = response.choices[0].message.content
self.reflection_history.append(reflection)
return reflection
def get_improvement_suggestion(self, recent_reflections):
"""从最近的反思中获取改进建议"""
if not recent_reflections:
return None
# 分析反思中的模式
suggestions = []
for reflection in recent_reflections:
if "improve" in reflection.lower() or "better" in reflection.lower():
# 提取改进建议
lines = reflection.split('
')
for line in lines:
if "recommendation" in line.lower():
suggestions.append(line.split(':', 1)[1].strip())
return suggestions if suggestions else None
代理框架
LangChain 代理
from langchain.agents import initialize_agent, Tool, AgentExecutor
from langchain.chat_models import ChatOpenAI
from langchain.utilities import SerpAPIWrapper
# 初始化LLM
llm = ChatOpenAI(temperature=0)
# 定义工具
search = SerpAPIWrapper()
tools = [
Tool(
name="搜索",
func=search.run,
description="搜索最新信息"
)
]
# 初始化代理
agent = initialize_agent(
tools,
llm=llm,
agent="chat-zero-shot-react-description",
verbose=True
)
# 创建执行器
executor = AgentExecutor.from_agent_and_tools(
agent=agent,
tools=tools,
verbose=True
)
# 运行代理
result = executor.run("AI 的最新新闻是什么?")
print(result)
AutoGPT 模式
from langchain.experimental import AutoGPT
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(temperature=0)
# 定义工具
tools = [
{
"name": "google_search",
"description": "在Google上搜索信息",
"func": google_search,
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"}
},
"required": ["query"]
}
}
]
# 创建AutoGPT代理
agent = AutoGPT.from_llm_and_tools(
llm,
tools,
agent_type="zero-shot-react-description",
verbose=True
)
# 运行代理
result = agent.run([
"研究AI的最新发展",
"总结关键发现",
"提供建议"
])
print(result)
CrewAI
from crewai import Agent, Task, Crew, Process
from crewai.tools import SerperDevTool
# 定义代理
researcher = Agent(
role='研究员',
goal='查找和分析信息',
backstory='您是一名专家研究员。',
verbose=True,
tools=[SerperDevTool()]
)
writer = Agent(
role='作者',
goal='基于研究创建吸引人的内容',
backstory='您是一名熟练的作者。',
verbose=True
)
# 定义任务
research_task = Task(
description='研究最新AI发展',
agent=researcher,
expected_output='详细的研究发现'
)
write_task = Task(
description='撰写关于AI发展的博客文章',
agent=writer,
context=[research_task],
expected_output='最终博客文章'
)
# 创建团队
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential,
verbose=True
)
# 执行团队
result = crew.kickoff()
print(result)
Semantic Kernel
import semantic_kernel as sk
from semantic_kernel.connectors.ai.open_ai import AzureOpenAIChatCompletion
# 初始化内核
kernel = sk.Kernel()
kernel.add_chat_service("chat-gpt", AzureOpenAIChatCompletion())
# 定义插件
class WeatherPlugin:
@sk.kernel_function(
description="获取当前位置的天气",
name="get_weather"
)
def get_weather(location: str) -> str:
# 调用天气API
return f"{location}的天气:晴朗,22°C"
# 添加插件到内核
kernel.add_plugin(WeatherPlugin())
# 创建代理
agent = sk.ChatCompletionAgent(
service_id="chat-gpt",
kernel=kernel,
instructions="您是一个有帮助的助手,可以访问天气信息。"
)
# 运行代理
result = agent.chat("东京的天气如何?")
print(result)
代理工具设计
工具命名约定
# 好的工具名称
GOOD_TOOL_NAMES = [
"search_database", # 清晰、描述性
"calculate_sum", # 面向行动
"get_user_profile", # 获取 + 资源
"send_email", # 行动 + 资源
"validate_address", # 行动 + 资源
]
# 坏的工具名称
BAD_TOOL_NAMES = [
"tool1", # 不描述性
"do_stuff", # 模糊
"helper", # 通用
"func", # 缩写
]
工具文档
def search_products(
query: str,
category: str = None,
min_price: float = None,
max_price: float = None,
limit: int = 10
) -> dict:
"""
在目录中搜索产品。
参数:
query: 搜索查询 - 可以包括产品名称、类别或关键词
category: 按特定类别过滤(可选)
min_price: 最低价格过滤器(可选)
max_price: 最高价格过滤器(可选)
limit: 返回的最大结果数(默认:10)
返回:
dict: 包含的字典:
- success: 搜索是否成功
- results: 匹配产品列表
- total: 匹配总数
- error: 如果不成功,错误消息
"""
# 实施
try:
# 构建查询
db_query = "SELECT * FROM products WHERE name LIKE ?"
params = [f"%{query}%"]
if category:
db_query += " AND category = ?"
params.append(category)
if min_price is not None:
db_query += " AND price >= ?"
params.append(min_price)
if max_price is not None:
db_query += " AND price <= ?"
params.append(max_price)
db_query += f" LIMIT {limit}"
# 执行查询
cursor.execute(db_query, params)
results = cursor.fetchall()
return {
"success": True,
"results": results,
"total": len(results)
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
工具错误处理
from typing import Optional, Dict, Any
class ToolError(Exception):
"""工具错误的自定义异常"""
pass
def safe_tool_execute(tool_name: str, arguments: Dict, tool_handler) -> Dict[str, Any]:
"""
执行具有全面错误处理的工具
参数:
tool_name: 要执行的工具名称
arguments: 传递给工具的参数
tool_handler: 工具处理函数
返回:
dict: 结果字典,包含成功状态和结果或错误
"""
try:
# 验证参数
validation_result = validate_tool_arguments(tool_name, arguments)
if not validation_result["valid"]:
return {
"success": False,
"error": validation_result["error"],
"error_code": "VALIDATION_ERROR"
}
# 执行工具
result = tool_handler(**arguments)
return {
"success": True,
"result": result
}
except ToolError as e:
return {
"success": False,
"error": e.message,
"error_code": "TOOL_ERROR"
}
except Exception as e:
return {
"success": False,
"error": f"意外错误:{str(e)}",
"error_code": "UNEXPECTED_ERROR"
}
def validate_tool_arguments(tool_name: str, arguments: Dict) -> Dict[str, Any]:
"""验证工具参数"""
errors = []
# 检查缺失的必需参数
required_params = get_required_params(tool_name)
for param in required_params:
if param not in arguments:
errors.append(f"缺失必需参数:{param}")
# 验证参数类型
param_types = get_param_types(tool_name)
for param, value in arguments.items():
if param in param_types:
expected_type = param_types[param]
if not isinstance(value, expected_type):
errors.append(f"参数 {param} 应该是 {expected_type.__name__}")
return {
"valid": len(errors) == 0,
"error": "; ".join(errors) if errors else None
}
内存模式
对话缓冲区
class ConversationBuffer:
def __init__(self, max_messages=20):
self.messages = []
self.max_messages = max_messages
def add(self, role, content):
"""添加消息到缓冲区"""
self.messages.append({
"role": role,
"content": content,
"timestamp": datetime.now().isoformat()
})
# 如果超过最大,修剪
if len(self.messages) > self.max_messages:
self.messages = self.messages[-self.max_messages:]
def get_recent(self, n=10):
"""获取 n 个最新消息"""
return self.messages[-n:]
def get_all(self):
"""获取所有消息"""
return self.messages
def clear(self):
"""清除所有消息"""
self.messages = []
def get_context_string(self):
"""获取LLM的格式化上下文"""
if not self.messages:
return ""
return "
".join([
f"{msg['role']}: {msg['content']}"
for msg in self.messages
])
摘要内存
class SummaryMemory:
def __init__(self, llm_client):
self.client = llm_client
self.summaries = {}
def add_messages(self, messages):
"""添加消息并创建摘要"""
if not messages:
return
# 创建摘要
text = "
".join([m["content"] for m in messages])
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": "用2-3句话总结以下对话。"
},
{
"role": "user",
"content": text
}
]
)
summary = response.choices[0].message.content
key = self.generate_key(messages)
self.summaries[key] = {
"summary": summary,
"messages": messages,
"timestamp": datetime.now().isoformat()
}
def create_summary(self, messages):
"""创建消息摘要"""
text = "
".join([m["content"] for m in messages])
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": "用2-3句话总结以下对话。"
},
{
"role": "user",
"content": text
}
]
)
return response.choices[0].message.content
def generate_key(self, messages):
"""为存储摘要生成密钥"""
# 使用第一条消息作为密钥
if messages:
return messages[0]["content"][:50] # 前50个字符
return "default"
向量内存
class VectorMemory:
def __init__(self, vector_db):
self.vector_db = vector_db
self.embeddings = {}
def add(self, key: str, value: str, metadata: dict = None):
"""添加具有嵌入的内存"""
# 生成嵌入
embedding = self.generate_embedding(value)
# 存储在向量数据库中
self.vector_db.upsert([{
"id": key,
"values": embedding,
"metadata": metadata or {"text": value}
}])
self.embeddings[key] = {
"value": value,
"embedding": embedding,
"metadata": metadata,
"timestamp": datetime.now().isoformat()
}
def search(self, query: str, top_k: int = 5):
"""搜索相关内存"""
query_embedding = self.generate_embedding(query)
results = self.vector_db.query(
vector=query_embedding.tolist(),
top_k=top_k
)
return [
{
"key": r["id"],
"value": r["metadata"]["text"],
"score": r["score"],
"metadata": r["metadata"]
}
for r in results["matches"]
]
def generate_embedding(self, text: str):
"""为文本生成嵌入"""
# 如果可用,使用缓存的嵌入
if text in self.embeddings:
return self.embeddings[text]["embedding"]
# 生成新嵌入
# 实施取决于嵌入模型
embedding = get_embedding_from_model(text)
return embedding
多代理系统
代理协作
class MultiAgentSystem:
def __init__(self, agents):
self.agents = agents
self.message_bus = []
def send_message(self, from_agent: str, to_agent: str, message: dict):
"""从一个代理发送消息到另一个代理"""
msg = {
"from": from_agent,
"to": to_agent,
"message": message,
"timestamp": datetime.now().isoformat()
}
self.message_bus.append(msg)
def get_messages_for_agent(self, agent_name: str):
"""获取特定代理的消息"""
return [
msg for msg in self.message_bus
if msg["to"] == agent_name
]
def broadcast(self, from_agent: str, message: dict):
"""广播消息到所有代理"""
for agent in self.agents:
if agent != from_agent:
self.send_message(from_agent, agent, message)
# 用法
system = MultiAgentSystem(["researcher", "analyzer", "writer"])
# 研究员发送发现给分析员
system.send_message("researcher", "analyzer", {
"type": "research_complete",
"data": {"topic": "AI趋势", "findings": "..."}
})
# 分析员发送摘要给作者
system.send_message("analyzer", "writer", {
"type": "analysis_complete",
"summary": "关键趋势:..."
})
代理委派
class DelegatingAgent:
def __init__(self, llm_client, sub_agents):
self.client = llm_client
self.sub_agents = sub_agents
def delegate_task(self, task: str, context: dict):
"""委派任务给适当的子代理"""
# 确定哪个代理应处理任务
agent = self.select_agent(task, context)
# 委派给代理
result = agent.execute(task, context)
return {
"delegated_to": agent.name,
"result": result
}
def select_agent(self, task: str, context: dict):
"""为任务选择最佳代理"""
task_lower = task.lower()
# 简单关键词匹配
scores = []
for agent in self.sub_agents:
score = 0
for keyword in agent.keywords:
if keyword in task_lower:
score += 1
scores.append((score, agent))
# 返回分数最高的代理
if scores:
return max(scores, key=lambda x: x[0])[1]
# 默认返回第一个代理
return self.sub_agents[0]
# 用法
delegator = DelegatingAgent(client, [searcher, analyzer, writer])
result = delegator.delegate_task("搜索Python教程", {})
主管模式
class SupervisorAgent:
def __init__(self, llm_client, workers):
self.client = llm_client
self.workers = workers
self.task_queue = []
self.completed_tasks = []
def assign_task(self, task: dict):
"""分配任务给可用的工作者"""
# 找到任务的最佳工作者
worker = self.select_worker(task)
# 分配任务
task["worker"] = worker.name
task["status"] = "assigned"
task["assigned_at"] = datetime.now().isoformat()
self.task_queue.append(task)
return worker.execute(task)
def select_worker(self, task: dict):
"""为任务选择最佳工作者"""
task_type = task.get("type", "")
# 找到可以处理此任务类型的工作者
eligible_workers = [
w for w in self.workers
if task_type in w.capabilities
]
if not eligible_workers:
return self.workers[0]
# 选择负载最低的工作者
return min(eligible_workers, key=lambda w: w.current_tasks)
def get_status(self):
"""获取整体状态"""
assigned = len([t for t in self.task_queue if t["status"] == "assigned"])
completed = len(self.completed_tasks)
return {
"total_tasks": assigned + completed,
"assigned": assigned,
"completed": completed,
"workers": {w.name: w.current_tasks for w in self.workers}
}
错误恢复和自校正
指数退避重试
import time
import random
class RetryHandler:
def __init__(self, max_retries=3, base_delay=1, max_delay=60):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
def execute_with_retry(self, func, *args, **kwargs):
"""执行函数,带指数退避重试"""
last_exception = None
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < self.max_retries - 1:
# 计算带抖动的延迟
delay = min(
self.base_delay * (2 ** attempt),
self.max_delay
)
jitter = random.uniform(0.8, 1.2)
time.sleep(delay * jitter)
else:
# 最后一次尝试失败
raise e
# 所有重试失败
raise last_exception
# 用法
@RetryHandler(max_retries=3)
def unreliable_tool_call(data):
# 可能偶尔失败
if random.random() < 0.3:
raise Exception("随机失败")
return {"result": data}
result = unreliable_tool_call("测试数据")
后备策略
class FallbackManager:
def __init__(self):
self.primary_tools = {}
self.fallback_tools = {}
def register_tool(self, name, primary, fallback):
"""注册带主要和后备的工具"""
self.primary_tools[name] = primary
self.fallback_tools[name] = fallback
def execute(self, tool_name, arguments):
"""执行带后备的工具"""
# 尝试主要工具
try:
result = self.primary_tools[tool_name](**arguments)
return {
"success": True,
"result": result,
"method": "primary"
}
except Exception as e:
# 使用后备
if tool_name in self.fallback_tools:
try:
result = self.fallback_tools[tool_name](**arguments)
return {
"success": True,
"result": result,
"method": "fallback",
"warning": f"主要工具失败:{str(e)}"
}
except Exception as fe:
return {
"success": False,
"error": f"主要和后备都失败:{str(fe)}"
}
else:
return {
"success": False,
"error": f"工具失败:{str(e)}"
}
# 用法
def primary_search(query):
# 可能失败
if random.random() < 0.2:
raise Exception("搜索服务不可用")
return {"results": []}
def fallback_search(query):
# 更简单、更可靠
return {"results": ["后备结果"]}
manager = FallbackManager()
manager.register_tool("search", primary_search, fallback_search)
从失败中学习
class LearningAgent:
def __init__(self, llm_client):
self.client = llm_client
self.failure_history = []
self.success_patterns = {}
def record_failure(self, action, error):
"""为学习记录失败"""
self.failure_history.append({
"action": action,
"error": str(error),
"timestamp": datetime.now().isoformat()
})
def record_success(self, action, result):
"""记录成功行动"""
pattern = self.extract_pattern(action)
if pattern not in self.success_patterns:
self.success_patterns[pattern] = 0
self.success_patterns[pattern] += 1
def extract_pattern(self, action):
"""从行动中提取模式"""
# 简单模式提取
if "search" in action.lower():
return "search"
elif "calculate" in action.lower():
return "calculate"
else:
return "unknown"
def get_recommendation(self, action):
"""基于失败历史获取建议"""
pattern = self.extract_pattern(action)
# 找到相似失败
similar_failures = [
f for f in self.failure_history
if pattern in f["action"].lower()
]
if similar_failures:
# 分析常见失败原因
error_types = [f["error"] for f in similar_failures]
from collections import Counter
common_error = Counter(error_types).most_common(1)[0]
# 获取最常见错误
suggestions = {
"timeout": "尝试增加超时或使用更快的端点",
"rate_limit": "实现速率限制并带退避重试",
"invalid_input": "添加更好的输入验证",
"permission": "在执行前检查用户权限",
"network": "检查网络连接并尝试替代端点"
}
return suggestions.get(common_error, "尝试不同的参数")
护栏和安全
输出验证
import re
class OutputValidator:
def __init__(self):
self.rules = {
"no_code_execution": True,
"no_pii": True,
"max_length": 1000,
"allowed_patterns": []
}
def validate(self, output: str) -> dict:
"""根据规则验证代理输出"""
violations = []
# 检查代码执行
if self.rules["no_code_execution"]:
code_patterns = [
r'```.*?python',
r'```.*?javascript',
r'```.*?bash',
r'exec\(',
r'eval\('
]
for pattern in code_patterns:
if re.search(pattern, output, re.IGNORECASE):
violations.append("包含代码执行尝试")
# 检查PII
if self.rules["no_pii"]:
pii_patterns = [
r'\b\d{3}[-.\d{2}[-.\d{4}\b', # SSN
r'\b\d{3}[-.\d{2}[-.\d{4}\b', # 电话
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b', # 邮箱
r'\b\d{16}\b' # 信用卡
]
for pattern in pii_patterns:
if re.search(pattern, output):
violations.append("包含潜在PII")
# 检查长度
if len(output) > self.rules["max_length"]:
violations.append(f"输出太长({len(output)} 字符)")
# 检查允许的模式
if self.rules["allowed_patterns"]:
for pattern in self.rules["allowed_patterns"]:
if not re.search(pattern, output):
violations.append(f"不匹配所需模式:{pattern}")
return {
"valid": len(violations) == 0,
"violations": violations
}
行动限制
class ActionLimiter:
def __init__(self, max_actions_per_minute=10, max_actions_per_hour=100):
self.max_per_minute = max_actions_per_minute
self.max_per_hour = max_actions_per_hour
self.action_log = []
def check_limit(self, action_type: str) -> bool:
"""检查行动是否允许"""
now = datetime.now()
minute_ago = now - timedelta(minutes=1)
hour_ago = now - timedelta(hours=1)
# 计数最后一分钟的行动
recent_minute = [
a for a in self.action_log
if a["type"] == action_type and a["timestamp"] > minute_ago.isoformat()
]
if len(recent_minute) >= self.max_per_minute:
return False
# 计数最后一小时的行动
recent_hour = [
a for a in self.action_log
if a["type"] == action_type and a["timestamp"] > hour_ago.isoformat()
]
if len(recent_hour) >= self.max_per_hour:
return False
return True
def record_action(self, action_type: str, details: dict = None):
"""记录行动"""
self.action_log.append({
"type": action_type,
"details": details,
"timestamp": datetime.now().isoformat()
})
# 清理旧日志(保留最后24小时)
day_ago = (datetime.now() - timedelta(hours=24)).isoformat()
self.action_log = [
a for a in self.action_log
if a["timestamp"] > day_ago
]
人在环路中
class HumanApprovalFlow:
def __init__(self):
self.pending_approvals = {}
self.approved_actions = {}
def request_approval(self, agent_name: str, action: dict, user_id: str):
"""请求人类批准行动"""
approval_id = f"{agent_name}_{datetime.now().timestamp()}"
self.pending_approvals[approval_id] = {
"agent": agent_name,
"action": action,
"user_id": user_id,
"requested_at": datetime.now().isoformat(),
"status": "pending"
}
# 通知用户(实施取决于通知系统)
self.notify_user(user_id, approval_id, action)
return {
"approval_id": approval_id,
"status": "pending"
}
def approve_action(self, approval_id: str):
"""批准待定行动"""
if approval_id not in self.pending_approvals:
return {"error": "未找到批准"}
approval = self.pending_approvals[approval_id]
# 执行行动
result = self.execute_action(approval["action"])
# 记录批准
self.approved_actions[approval_id] = {
"approval": approval,
"executed_at": datetime.now().isoformat(),
"result": result
}
# 从待定中移除
del self.pending_approvals[approval_id]
return result
def reject_action(self, approval_id: str, reason: str):
"""拒绝待定行动"""
if approval_id not in self.pending_approvals:
return {"error": "未找到批准"}
approval = self.pending_approvals[approval_id]
# 记录拒绝
self.approved_actions[approval_id] = {
"approval": approval,
"rejected_at": datetime.now().isoformat(),
"reason": reason
}
# 从待定中移除
del self.pending_approvals[approval_id]
return {"status": "rejected", "reason": reason}
评估和测试
单元测试代理
import pytest
from unittest.mock import Mock, patch
def test_agent_tool_execution():
"""测试代理工具执行"""
# 模拟LLM响应
with patch('openai.OpenAI') as mock_openai:
mock_client = Mock()
mock_response = Mock()
mock_response.choices = [Mock()]
mock_response.choices[0].message = Mock()
mock_response.choices[0].message.function_calls = [
Mock(name="test_tool", arguments='{"param": "value"}')
]
mock_client.chat.completions.create.return_value = mock_response
# 测试
agent = Agent(mock_client, [test_tool])
result = agent.run("测试查询")
# 验证工具被调用
assert mock_response.choices[0].message.function_calls[0].name == "test_tool"
集成测试
def test_end_to_end_agent_workflow():
"""测试完整代理工作流"""
# 设置测试环境
llm_client = OpenAI()
tools = [search_tool, database_tool]
# 创建代理
agent = ReActAgent(llm_client, tools)
# 测试场景
user_query = "查找我关于Python编程的信息"
# 运行代理
result = agent.run(user_query)
# 验证工作流
assert len(result) > 0 # 代理产生了输出
# 检查是否调用了工具
tool_calls = [msg for msg in result if msg.get("role") == "function"]
assert len(tool_calls) > 0 # 使用了工具
# 验证最终答案
final_answer = [msg for msg in result if msg.get("role") == "assistant"]][-1]
assert "Python" in final_answer.get("content", "") # 答案相关
成本管理
class AgentCostTracker:
def __init__(self):
self.tool_costs = {}
self.llm_costs = {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_cost": 0
}
def track_tool_call(self, tool_name: str, arguments: dict):
"""跟踪工具执行成本"""
# 估计工具成本(可能是API调用、计算时间等)
cost = self.estimate_tool_cost(tool_name, arguments)
self.tool_costs[tool_name] = self.tool_costs.get(tool_name, 0) + cost
return cost
def track_llm_call(self, prompt_tokens: int, completion_tokens: int):
"""跟踪LLM API成本"""
# GPT-4 定价(示例)
prompt_cost = prompt_tokens * 0.00003 # $0.03 每1K令牌
completion_cost = completion_tokens * 0.00006 # $0.06 每1K令牌
self.llm_costs["prompt_tokens"] += prompt_tokens
self.llm_costs["completion_tokens"] += completion_tokens
self.llm_costs["total_cost"] += prompt_cost + completion_cost
return prompt_cost + completion_cost
def get_total_cost(self):
"""获取代理执行总成本"""
tool_cost = sum(self.tool_costs.values())
llm_cost = self.llm_costs["total_cost"]
return {
"tool_calls": tool_cost,
"llm_calls": llm_cost,
"total": tool_cost + llm_cost
}
可观察性和调试
代理日志
import logging
class AgentLogger:
def __init__(self, agent_name: str):
self.agent_name = agent_name
self.logger = logging.getLogger(f"agent.{agent_name}")
self.logger.setLevel(logging.DEBUG)
# 文件处理器
handler = logging.FileHandler(f'logs/{agent_name}.log')
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_thought(self, thought: str):
"""记录代理思考过程"""
self.logger.debug(f"思考:{thought}")
def log_action(self, action: str, details: dict):
"""记录代理行动"""
self.logger.info(f"行动:{action},细节:{details}")
def log_tool_call(self, tool_name: str, arguments: dict, result):
"""记录工具执行"""
self.logger.info(f"工具:{tool_name},参数:{arguments},结果:{result}")
def log_error(self, error: str, context: dict):
"""记录错误"""
self.logger.error(f"错误:{error},上下文:{context}")
def log_completion(self, result: str):
"""记录任务完成"""
self.logger.info(f"完成:{result}")
调试工具
class AgentDebugger:
def __init__(self, agent):
self.agent = agent
self.step_history = []
def trace_execution(self, query: str):
"""逐步追踪代理执行"""
self.step_history = []
# 运行带追踪的代理
original_run = self.agent.run
# 捕获每一步
for step in original_run:
self.step_history.append({
"step": len(self.step_history) + 1,
"type": self.get_step_type(step),
"content": step,
"timestamp": datetime.now().isoformat()
})
return {
"steps": self.step_history,
"final_result": original_run[-1] if original_run else None
}
def get_step_type(self, step):
"""确定步骤类型"""
content = str(step)
if "思考:" in content:
return "thought"
elif "行动:" in content:
return "action"
elif "观察:" in content:
return "observation"
elif "函数:" in content:
return "function_call"
else:
return "other"
def visualize_trace(self):
"""生成执行追踪可视化"""
mermaid_diagram = "graph TD;
"
for step in self.step_history:
step_num = step["step"]
content = step["content"][:50] # 截断
step_type = step["type"]
if step_type == "thought":
mermaid_diagram += f'S{step_num}[思考:"{content}"]
'
elif step_type == "action":
mermaid_diagram += f'S{step_num}[行动:"{content}"]
'
elif step_type == "observation":
mermaid_diagram += f'S{step_num}[观察:"{content}"]
'
elif step_type == "function_call":
mermaid_diagram += f'S{step_num}[函数:"{content}"]
'
mermaid_diagram += "end;"
return mermaid_diagram
生产部署
部署考虑
class ProductionAgentConfig:
def __init__(self):
self.config = {
# LLM 设置
"model": "gpt-4",
"temperature": 0.7,
"max_tokens": 4096,
"timeout": 30,
# 代理设置
"max_iterations": 10,
"max_tools_per_call": 5,
# 安全设置
"enable_guardrails": True,
"require_human_approval_for": ["delete", "transfer"],
"max_cost_per_session": 1.00,
# 监控
"enable_logging": True,
"enable_tracing": True,
"log_level": "INFO",
# 性能
"enable_caching": True,
"cache_ttl": 300,
"log_level": "INFO"
}
def validate(self) -> bool:
"""验证配置"""
# 检查必需设置
required = ["model"]
for key in required:
if key not in self.config:
return False
# 验证值
if self.config["temperature"] < 0 or self.config["temperature"] > 2:
return False
return True
def get_llm_config(self) -> dict:
"""获取LLM配置"""
return {
"model": self.config["model"],
"temperature": self.config["temperature"],
"max_tokens": self.config["max_tokens"],
"timeout": self.config["timeout"]
}
扩展策略
class AgentPool:
def __init__(self, agent_factory, pool_size=5):
self.agent_factory = agent_factory
self.pool_size = pool_size
self.agents = []
self.current_index = 0
def initialize_pool(self):
"""初始化代理池"""
for i in range(self.pool_size):
agent = self.agent_factory(f"agent_{i}")
self.agents.append(agent)
return self.agents
def get_agent(self) -> object:
"""获取下一个可用代理(轮询)"""
if not self.agents:
raise Exception("代理池未初始化")
agent = self.agents[self.current_index]
self.current_index = (self.current_index + 1) % self.pool_size
return agent
def get_all_agents(self) -> list:
"""获取池中所有代理"""
return self.agents
常见用例
研究助手
class ResearchAssistantAgent:
def __init__(self, llm_client, tools):
self.client = llm_client
self.tools = tools
self.memory = ConversationBuffer(max_messages=20)
def research(self, topic: str):
"""研究主题"""
# 阶段 1:计划研究
plan = self.create_research_plan(topic)
# 阶段 2:执行研究
findings = []
for step in plan:
result = self.execute_research_step(step)
findings.append(result)
# 阶段 3:合成
synthesis = self.synthesize_findings(findings)
return {
"topic": topic,
"plan": plan,
"findings": findings,
"synthesis": synthesis
}
def create_research_plan(self, topic: str):
"""创建研究计划"""
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": "为以下主题创建分步研究计划:" + topic + "。"
},
{"role": "user", "content": f"研究:{topic}"}
],
tools=self.tools
)
return self.parse_plan(response.choices[0].message.content)
def execute_research_step(self, step):
"""执行研究步骤"""
# 实施
return {"result": f"步骤的研究结果"}
def synthesize_findings(self, findings):
"""将发现合成为最终答案"""
return f"合成报告:{findings}"
代码生成代理
class CodeGenerationAgent:
def __init__(self, llm_client):
self.client = llm_client
def generate_code(self, requirements: str, language: str = "python"):
"""基于需求生成代码"""
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": f"""您是一名专家 {language} 开发者。
基于需求生成干净、有良好文档的代码。
包括类型提示和文档字符串。
"""
},
{
"role": "user",
"content": f"需求:{requirements}"
}
]
)
code = response.choices[0].message.content
# 验证和格式化代码
return self.format_code(code)
def format_code(self, code: str) -> str:
"""用语法高亮格式化代码"""
# 添加Markdown代码块
if "```" not in code:
code = f"```{language}
{code}
```"
return code
客户服务代理
class CustomerServiceAgent:
def __init__(self, llm_client, tools):
self.client = llm_client
self.tools = tools
self.customer_db = None
def handle_query(self, query: str, customer_id: str):
"""处理客户服务查询"""
# 获取客户上下文
customer = self.get_customer_context(customer_id)
# 分析查询意图
intent = self.analyze_intent(query)
# 路由到适当的处理器
if intent == "order_status":
return self.check_order_status(query, customer_id)
elif intent == "refund_request":
return self.process_refund(query, customer_id)
elif intent == "product_info":
return self.get_product_info(query)
else:
return self.general_inquiry(query)
def analyze_intent(self, query: str) -> str:
"""分析客户查询意图"""
if "order" in query.lower() and "status" in query.lower():
return "order_status"
elif "refund" in query.lower():
return "refund_request"
elif "product" in query.lower():
return "product_info"
else:
return "general_inquiry"
def get_customer_context(self, customer_id: str):
"""获取客户上下文"""
# 实施
return {"customer_id": customer_id, "data": {}}
数据分析代理
class DataAnalysisAgent:
def __init__(self, llm_client, tools):
self.client = llm_client
self.tools = tools
def analyze_data(self, data: dict):
"""分析提供的数据"""
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": """您是一名数据分析师。
分析提供的数据并提供见解。
关注趋势、模式和可操作建议。
可用工具:
- search:搜索信息
- calculate:执行计算
- database:查询数据库
"""
},
{
"role": "user",
"content": f"数据:{json.dumps(data)}"
}
],
tools=self.tools
)
analysis = response.choices[0].message.content
return {
"data": data,
"analysis": analysis,
"insights": self.extract_insights(analysis)
}
def extract_insights(self, analysis: str) -> list:
"""从分析中提取关键见解"""
insights = []
# 查找见解关键词
insight_keywords = [
"trend", "pattern", "recommendation", "anomaly", "increase", "decrease", "correlation"
]
for keyword in insight_keywords:
if keyword in analysis.lower():
# 提取包含关键词的句子
sentences = analysis.split('.')
for sentence in sentences:
if keyword in sentence.lower():
insights.append(sentence.strip())
return insights
最佳实践
代理设计
- 保持代理专注于特定任务
- 使用清晰的工具接口
- 实现适当的错误处理
- 为安全添加护栏
- 为您的用例使用适当的内存类型
- 实现内存检索策略
- 定期清理旧内存
- 为高效检索索引内存
- 记录所有代理决策
- 跟踪工具执行
- 监控成本
- 为异常设置警报
- 实现调试工具
内存管理
-
为您的用例使用适当的内存类型
- 短期:用于对话上下文
- 长期:用于持久知识
- 情景:用于特定经验
-
实现内存检索策略
- 为高效检索索引内存
- 定期清理旧内存
工具使用
- 设计具有清晰接口的工具
- 实现适当的验证
- 为工具执行使用超时
- 提供有意义的错误消息
- 实现重试逻辑
规划
- 将复杂任务分解为较小步骤
- 执行前验证计划
- 优雅处理计划失败
- 允许计划调整
安全
- 实现输出验证
- 为关键行动使用人类批准
- 设置行动限制
- 监控代理行为
- 为审计记录所有行动
- 为异常设置警报
可观察性
- 记录所有代理决策
- 跟踪工具执行
- 监控成本
- 为异常设置警报
- 实现调试工具
生产部署
- 为模型加载使用单例模式
- 配置适当的超时
- 为性能实现缓存
- 设置监控
- 使用适当的硬件