名称:大语言模型函数调用 描述:使用大语言模型实现函数调用(工具使用),以获取结构化输出和外部集成。
大语言模型函数调用
概述
大语言模型(LLM)函数调用(也称为工具使用)使大语言模型能够通过调用预定义函数与外部系统交互。LLM不仅能生成文本,还能请求执行具有结构化参数的特定函数,接收结果,并基于这些结果继续推理。
前提条件
- 理解LLM API及其功能
- 了解JSON Schema和函数定义格式
- 熟悉错误处理和重试模式
- 了解函数执行的安全最佳实践
- 基本了解异步/等待模式以实现并行执行
- 熟悉验证库(如Pydantic、jsonschema)
关键概念
什么是函数调用(工具使用)
函数调用允许LLMs:
- 理解意图:识别用户请求何时需要外部操作
- 选择工具:选择适当的函数进行调用
- 生成参数:创建正确格式的函数参数
- 执行函数:运行函数并获取结果
- 处理结果:使用函数输出来回答用户
示例流程
用户:"东京的天气怎么样?"
↓
LLM:我需要调用get_weather函数,参数为:
{"location": "东京"}
↓
系统:执行get_weather("东京")
↓
结果:{"temperature": 22, "condition": "晴朗"}
↓
LLM:东京的天气晴朗,温度为22°C。
OpenAI函数调用API
基本函数定义
from openai import OpenAI
client = OpenAI()
# 定义函数
functions = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取某个位置的当前天气",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "城市和州,例如旧金山,加州"
}
},
"required": ["location"]
}
}
}
]
# 使用函数调用发出请求
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "东京的天气怎么样?"}
],
functions=functions
)
# 检查LLM是否要调用函数
if response.choices[0].finish_reason == "function_calls":
function_call = response.choices[0].message.function_calls[0]
# 执行函数
if function_call.name == "get_weather":
args = json.loads(function_call.arguments)
weather_data = get_weather(args["location"])
# 将结果发送回LLM
second_response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "东京的天气怎么样?"},
response.choices[0].message, # 助理消息,包含函数调用
{
"role": "function",
"name": "get_weather",
"content": json.dumps(weather_data)
}
]
)
print(second_response.choices[0].message.content)
多个函数
functions = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取当前天气",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
},
{
"type": "function",
"function": {
"name": "get_time",
"description": "获取当前时间",
"parameters": {
"type": "object",
"properties": {
"timezone": {
"type": "string",
"description": "时区,例如美国/纽约"
}
},
"required": []
}
}
}
]
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "东京的天气和时间怎么样?"}
],
functions=functions
)
带函数调用的流式处理
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "计算25 * 47"}
],
functions=[{
"type": "function",
"function": {
"name": "calculate",
"description": "执行数学计算",
"parameters": {
"type": "object",
"properties": {
"operation": {
"type": "string",
"enum": ["add", "subtract", "multiply", "divide"]
},
"a": {"type": "number"},
"b": {"type": "number"}
},
"required": ["operation", "a", "b"]
}
}
}],
stream=True
)
for chunk in stream:
if chunk.choices[0].finish_reason == "function_calls":
function_call = chunk.choices[0].delta.function_calls[0]
# 执行函数
result = execute_function(function_call)
# 用结果继续流式处理
stream = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "计算25 * 47"},
{"role": "assistant", "content": None, "function_calls": [function_call]},
{"role": "function", "name": function_call.name, "content": json.dumps(result)}
],
stream=True
)
for response_chunk in stream:
print(response_chunk.choices[0].delta.content)
Anthropic工具使用API
基本工具定义
import anthropic
client = anthropic.Anthropic()
# 定义工具
tools = [
{
"name": "get_weather",
"description": "获取某个位置的当前天气",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "城市和州"
}
},
"required": ["location"]
}
}
]
# 发出请求
message = client.messages.create(
model="claude-3-opus-20240229",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": "东京的天气怎么样?"}
]
)
# 检查工具使用
if message.stop_reason == "tool_use":
for tool_use in message.content:
if tool_use.type == "tool_use":
tool_name = tool_use.name
tool_input = tool_use.input
# 执行工具
if tool_name == "get_weather":
result = get_weather(tool_input["location"])
# 将结果发送回
response = client.messages.create(
model="claude-3-opus-20240229",
max_tokens=1024,
messages=[
{"role": "user", "content": "东京的天气怎么样?"},
message, # 助理消息,包含工具使用
{
"role": "user",
"content": f"工具结果:{json.dumps(result)}"
}
]
)
print(response.content[0].text)
多个工具
tools = [
{
"name": "search_database",
"description": "搜索产品数据库",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"category": {"type": "string"}
},
"required": ["query"]
}
},
{
"name": "get_user_profile",
"description": "获取用户配置文件信息",
"input_schema": {
"type": "object",
"properties": {
"user_id": {"type": "string"}
},
"required": ["user_id"]
}
}
]
response = client.messages.create(
model="claude-3-opus-20240229",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": "为用户123找到跑步鞋"}
]
)
# 处理多个工具调用
for content in response.content:
if content.type == "tool_use":
tool_name = content.name
tool_input = content.input
if tool_name == "search_database":
results = search_products(tool_input["query"], tool_input.get("category"))
elif tool_name == "get_user_profile":
results = get_user_profile(tool_input["user_id"])
# 发送所有结果回
tool_results = [{"type": "tool_result", "tool_use_id": content.id, "content": json.dumps(results)}]
final_response = client.messages.create(
model="claude-3-opus-20240229",
max_tokens=1024,
messages=[
{"role": "user", "content": "为用户123找到跑步鞋"},
response.content,
*tool_results
]
)
函数定义模式
JSON Schema
functions = [
{
"type": "function",
"function": {
"name": "create_user",
"description": "创建新用户账户",
"parameters": {
"type": "object",
"properties": {
"email": {
"type": "string",
"format": "email",
"description": "用户的电子邮件地址"
},
"password": {
"type": "string",
"minLength": 8,
"description": "用户的密码(至少8个字符)"
},
"name": {
"type": "string",
"minLength": 2,
"description": "用户的显示名称"
},
"age": {
"type": "integer",
"minimum": 13,
"maximum": 120,
"description": "用户的年龄"
},
"subscribe_newsletter": {
"type": "boolean",
"default": False,
"description": "是否订阅新闻通讯"
}
},
"required": ["email", "password", "name"],
"additionalProperties": False
}
}
}
]
参数描述
functions = [
{
"type": "function",
"function": {
"name": "search_products",
"description": "在目录中搜索产品",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询 - 可以包括产品名称、类别或关键字"
},
"category": {
"type": "string",
"description": "按特定类别筛选(可选)",
"enum": ["电子产品", "服装", "运动", "家居"]
},
"min_price": {
"type": "number",
"description": "最低价格筛选(可选)"
},
"max_price": {
"type": "number",
"description": "最高价格筛选(可选)"
},
"sort_by": {
"type": "string",
"description": "按字段排序结果",
"enum": ["price", "name", "rating", "relevance"]
},
"limit": {
"type": "integer",
"description": "要返回的最大结果数",
"default": 10,
"minimum": 1,
"maximum": 50
}
},
"required": ["query"]
}
}
}
]
必需与可选
functions = [
{
"type": "function",
"function": {
"name": "book_flight",
"description": "预订航班机票",
"parameters": {
"type": "object",
"properties": {
"origin": {
"type": "string",
"description": "出发机场代码(例如JFK、LAX)"
},
"destination": {
"type": "string",
"description": "目的地机场代码"
},
"date": {
"type": "string",
"description": "出发日期,格式为YYYY-MM-DD"
},
"passengers": {
"type": "integer",
"description": "乘客数量",
"default": 1
},
"class": {
"type": "string",
"description": "航班舱位",
"enum": ["economy", "business", "first"],
"default": "economy"
}
},
"required": ["origin", "destination", "date"]
}
}
}
]
结构化输出提取
提取结构化数据
functions = [
{
"type": "function",
"function": {
"name": "extract_order_info",
"description": "从用户消息中提取订单信息",
"parameters": {
"type": "object",
"properties": {
"product_name": {
"type": "string",
"description": "产品名称"
},
"quantity": {
"type": "integer",
"description": "订购数量"
},
"address": {
"type": "string",
"description": "送货地址"
},
"payment_method": {
"type": "string",
"description": "支付方式",
"enum": ["credit_card", "debit_card", "paypal", "bank_transfer"]
}
},
"required": []
}
}
}
]
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "从用户消息中提取订单信息。"},
{"role": "user", "content": "我想订购2双耐克跑步鞋,使用我的信用卡送到纽约市123主街,NY 10001"}
],
functions=functions
)
# 检查是否调用了函数
if response.choices[0].finish_reason == "function_calls":
function_call = response.choices[0].message.function_calls[0]
args = json.loads(function_call.arguments)
# 处理提取的数据
print(f"产品:{args['product_name']}")
print(f"数量:{args['quantity']}")
print(f"地址:{args['address']}")
print(f"支付方式:{args['payment_method']}")
数据验证
from pydantic import BaseModel, EmailStr, validator
from typing import Optional
class OrderInfo(BaseModel):
product_name: str
quantity: int = 1
address: str
payment_method: str
email: Optional[EmailStr] = None
notes: Optional[str] = None
@validator('quantity')
def validate_quantity(cls, v):
if v < 1 or v > 100:
raise ValueError('数量必须在1到100之间')
return v
# 与函数调用一起使用
functions = [
{
"type": "function",
"function": {
"name": "create_order",
"description": "创建新订单",
"parameters": {
"type": "object",
"properties": {
"product_name": {"type": "string"},
"quantity": {"type": "integer"},
"address": {"type": "string"},
"payment_method": {"type": "string"},
"email": {"type": "string"},
"notes": {"type": "string"}
},
"required": ["product_name", "address", "payment_method"]
}
}
}
]
# 验证和处理
def process_order(args):
try:
order = OrderInfo(**args)
# 处理订单
return {"success": True, "order_id": "12345"}
except ValueError as e:
return {"success": False, "error": str(e)}
多函数调用
顺序函数调用
functions = [
{
"type": "function",
"function": {
"name": "get_user_balance",
"description": "获取用户账户余额",
"parameters": {
"type": "object",
"properties": {
"user_id": {"type": "string"}
},
"required": ["user_id"]
}
}
},
{
"type": "function",
"function": {
"name": "transfer_money",
"description": "在账户之间转账",
"parameters": {
"type": "object",
"properties": {
"from_user_id": {"type": "string"},
"to_user_id": {"type": "string"},
"amount": {"type": "number"}
},
"required": ["from_user_id", "to_user_id", "amount"]
}
}
}
]
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "从用户123转账50美元到用户456"}
],
functions=functions
)
# 处理多个函数调用
if response.choices[0].finish_reason == "function_calls":
function_calls = response.choices[0].message.function_calls
# 顺序执行
results = []
for fc in function_calls:
if fc.name == "get_user_balance":
balance = get_user_balance(fc.arguments)
results.append({"function": "get_user_balance", "result": balance})
elif fc.name == "transfer_money":
transfer_result = transfer_money(fc.arguments)
results.append({"function": "transfer_money", "result": transfer_result})
# 将结果发送回
second_response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "从用户123转账50美元到用户456"},
response.choices[0].message,
*[{
"role": "function",
"name": r["function"],
"content": json.dumps(r["result"])
} for r in results]
]
)
并行函数调用
functions = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取某个位置的天气",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
},
{
"type": "function",
"function": {
"name": "get_time",
"description": "获取当前时间",
"parameters": {
"type": "object",
"properties": {
"timezone": {"type": "string"}
},
"required": []
}
}
}
]
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "东京和纽约的天气和时间怎么样?"}
],
functions=functions
)
# 处理并行函数调用
if response.choices[0].finish_reason == "function_calls":
function_calls = response.choices[0].message.function_calls
# 并行执行
import asyncio
async def execute_all(calls):
tasks = []
for fc in calls:
if fc.name == "get_weather":
task = asyncio.create_task(get_weather(fc.arguments))
elif fc.name == "get_time":
task = asyncio.create_task(get_time(fc.arguments))
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
results = asyncio.run(execute_all(function_calls))
# 将结果发送回
second_response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "东京和纽约的天气和时间怎么样?"},
response.choices[0].message,
*[{
"role": "function",
"name": fc.name,
"content": json.dumps(r)
} for fc, r in zip(function_calls, results)]
]
)
函数调用路由
智能路由
class FunctionRouter:
def __init__(self):
self.functions = {
"weather": self.get_weather,
"time": self.get_time,
"database": self.search_database,
"user": self.get_user,
}
def route(self, function_name, arguments):
if function_name in self.functions:
return self.functions[function_name](arguments)
else:
raise ValueError(f"未知函数:{function_name}")
def get_weather(self, args):
location = args.get("location")
return {"temperature": 22, "condition": "晴朗"}
def get_time(self, args):
timezone = args.get("timezone", "UTC")
return {"time": "2024-01-16 12:00:00", "timezone": timezone}
def search_database(self, args):
query = args.get("query")
return {"results": [{"id": 1, "name": "产品1"}]}
def get_user(self, args):
user_id = args.get("user_id")
return {"id": user_id, "name": "张三"}
router = FunctionRouter()
# 与LLM一起使用
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "东京的天气怎么样?"}
],
functions=[{
"type": "function",
"function": {
"name": "weather",
"description": "获取天气",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
}]
)
if response.choices[0].finish_reason == "function_calls":
fc = response.choices[0].message.function_calls[0]
result = router.route(fc.name, json.loads(fc.arguments))
动态函数加载
import importlib
import os
class DynamicFunctionLoader:
def __init__(self, functions_dir="functions"):
self.functions_dir = functions_dir
self.loaded_functions = {}
self.load_functions()
def load_functions(self):
for filename in os.listdir(self.functions_dir):
if filename.endswith('.py') and not filename.startswith('_'):
module_name = filename[:-3]
try:
module = importlib.import_module(f"functions.{module_name}")
for attr_name in dir(module):
attr = getattr(module, attr_name)
if callable(attr) and hasattr(attr, 'tool_definition'):
self.loaded_functions[attr.tool_definition['name']] = attr
except Exception as e:
print(f"加载{module_name}失败:{e}")
def get_function_definitions(self):
definitions = []
for name, func in self.loaded_functions.items():
definitions.append(func.tool_definition)
return definitions
def execute_function(self, name, arguments):
if name in self.loaded_functions:
return self.loaded_functions[name](**arguments)
else:
raise ValueError(f"未找到函数:{name}")
# 示例函数模块
# functions/weather.py
def get_weather(location: str) -> dict:
"""获取某个位置的天气"""
return {"temperature": 22, "condition": "晴朗", "location": location}
get_weather.tool_definition = {
"type": "function",
"function": {
"name": "get_weather",
"description": "获取某个位置的当前天气",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "城市和州"
}
},
"required": ["location"]
}
}
}
# 使用
loader = DynamicFunctionLoader()
functions = loader.get_function_definitions()
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "东京的天气怎么样?"}],
functions=functions
)
错误处理
无效函数调用
def safe_execute_function(name, arguments):
try:
# 验证函数名称
if name not in available_functions:
return {
"error": f"未知函数:{name}",
"available_functions": list(available_functions.keys())
}
# 验证参数
func_schema = available_functions[name]
for param in func_schema.get("required", []):
if param not in arguments:
return {
"error": f"缺少必需参数:{param}",
"function": name
}
# 执行函数
result = available_functions[name]["handler"](**arguments)
return {"success": True, "result": result}
except Exception as e:
return {
"error": str(e),
"function": name
}
# 与LLM响应一起使用
if response.choices[0].finish_reason == "function_calls":
fc = response.choices[0].message.function_calls[0]
args = json.loads(fc.arguments)
result = safe_execute_function(fc.name, args)
# 将错误发送回LLM
if "error" in result:
second_response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": original_user_message},
response.choices[0].message,
{
"role": "function",
"name": fc.name,
"content": json.dumps(result)
}
]
)
重试策略
import time
from functools import wraps
def retry_on_failure(max_retries=3, delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
time.sleep(delay * (attempt + 1))
raise last_exception
return wrapper
return decorator
# 应用到函数
@retry_on_failure(max_retries=3, delay=2)
def get_weather(location):
# 可能偶尔失败
return call_weather_api(location)
后备机制
class RobustFunctionExecutor:
def __init__(self):
self.functions = {}
self.fallbacks = {}
def register_function(self, name, handler, fallback=None):
self.functions[name] = handler
if fallback:
self.fallbacks[name] = fallback
def execute(self, name, arguments):
try:
return {"success": True, "result": self.functions[name](**arguments)}
except Exception as e:
if name in self.fallbacks:
try:
fallback_result = self.fallbacks[name](**arguments)
return {
"success": True,
"result": fallback_result,
"warning": f"主函数失败,使用了后备:{str(e)}"
}
except Exception as fe:
return {
"success": False,
"error": f"主函数和后备都失败:{str(e)}"
}
else:
return {
"success": False,
"error": str(e)
}
# 使用
executor = RobustFunctionExecutor()
executor.register_function(
"get_weather",
lambda location: call_weather_api(location),
lambda location: {"temperature": 20, "condition": "未知"} # 后备
)
验证和清理
输入验证
from pydantic import BaseModel, validator
class WeatherQuery(BaseModel):
location: str
@validator('location')
def validate_location(cls, v):
if len(v) < 2 or len(v) > 100:
raise ValueError('位置必须在2到100个字符之间')
# 清理输入
return v.strip().lower()
def safe_weather_handler(args):
try:
validated = WeatherQuery(**args)
return get_weather(validated.location)
except ValueError as e:
return {"error": str(e)}
输出清理
def sanitize_output(data):
"""从函数输出中移除敏感信息"""
sensitive_keys = ['password', 'ssn', 'credit_card', 'api_key']
sanitized = data.copy()
for key in sensitive_keys:
if key in sanitized:
sanitized[key] = "***已屏蔽***"
return sanitized
# 与函数调用一起使用
def get_user_profile(user_id):
user_data = fetch_user_from_db(user_id)
return sanitize_output(user_data)
模式验证
import jsonschema
# 定义模式
function_schema = {
"type": "object",
"properties": {
"email": {"type": "string", "format": "email"},
"age": {"type": "integer", "minimum": 0, "maximum": 120}
},
"required": ["email"]
}
def validate_arguments(args, schema):
try:
jsonschema.validate(instance=args, schema=schema)
return {"valid": True}
except jsonschema.ValidationError as e:
return {"valid": False, "errors": e.message}
# 与函数调用一起使用
if response.choices[0].finish_reason == "function_calls":
fc = response.choices[0].message.function_calls[0]
args = json.loads(fc.arguments)
validation = validate_arguments(args, function_schema)
if not validation["valid"]:
# 将错误发送回LLM
second_response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": original_message},
response.choices[0].message,
{
"role": "function",
"name": fc.name,
"content": json.dumps({"error": validation["errors"]})
}
]
)
安全考虑
输入验证
def validate_function_input(function_name, arguments):
# 检查SQL注入
for value in arguments.values():
if isinstance(value, str):
dangerous_patterns = ["'", ";", "--", "/*", "xp_"]
if any(pattern in value.lower() for pattern in dangerous_patterns):
raise ValueError(f"检测到潜在危险输入")
# 检查命令注入
dangerous_commands = ["eval(", "exec(", "system(", "__import__"]
for value in arguments.values():
if isinstance(value, str):
if any(cmd in value for cmd in dangerous_commands):
raise ValueError(f"检测到命令注入尝试")
return True
权限检查
class SecureFunctionExecutor:
def __init__(self):
self.function_permissions = {
"get_user_data": ["read:users"],
"update_user": ["write:users"],
"delete_user": ["delete:users"],
"admin_functions": ["admin:access"]
}
self.user_permissions = set()
def set_user_permissions(self, permissions):
self.user_permissions = set(permissions)
def check_permission(self, function_name):
required = self.function_permissions.get(function_name, [])
if not required:
return True
return all(perm in self.user_permissions for perm in required)
def execute(self, function_name, arguments):
if not self.check_permission(function_name):
return {
"error": "权限被拒绝",
"required_permissions": self.function_permissions.get(function_name, [])
}
return execute_function(function_name, arguments)
# 使用
executor = SecureFunctionExecutor()
executor.set_user_permissions(["read:users", "write:users"])
result = executor.execute("get_user_data", {"user_id": "123"}) # 成功
result = executor.execute("delete_user", {"user_id": "123"}) # 权限被拒绝
速率限制
from collections import defaultdict
from datetime import datetime, timedelta
import threading
class RateLimiter:
def __init__(self, max_calls=100, window=timedelta(minutes=1)):
self.max_calls = max_calls
self.window = window
self.calls = defaultdict(list)
self.lock = threading.Lock()
def check_rate_limit(self, user_id, function_name):
now = datetime.now()
key = f"{user_id}:{function_name}"
with self.lock:
# 移除窗口外的旧调用
self.calls[key] = [
call_time for call_time in self.calls[key]
if now - call_time < self.window
]
# 检查是否超过限制
if len(self.calls[key]) >= self.max_calls:
return False
# 记录此调用
self.calls[key].append(now)
return True
def record_call(self, user_id, function_name):
self.check_rate_limit(user_id, function_name)
# 使用
limiter = RateLimiter(max_calls=10, window=timedelta(minutes=1))
def execute_with_rate_limit(user_id, function_name, arguments):
if not limiter.check_rate_limit(user_id, function_name):
return {
"error": "超过速率限制。请稍后再试。"
}
return execute_function(function_name, arguments)
常见模式
数据库查询
functions = [
{
"type": "function",
"function": {
"name": "query_database",
"description": "在数据库上执行SQL查询",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "要执行的SQL查询"
},
"params": {
"type": "array",
"description": "查询参数"
}
},
"required": ["query"]
}
}
}
]
def execute_sql_query(query, params):
# 验证查询(只允许SELECT)
if not query.strip().upper().startswith("SELECT"):
raise ValueError("只允许SELECT查询")
# 使用参数化查询执行
cursor.execute(query, params)
return cursor.fetchall()
API集成
functions = [
{
"type": "function",
"function": {
"name": "call_external_api",
"description": "向外部API发出请求",
"parameters": {
"type": "object",
"properties": {
"endpoint": {
"type": "string",
"description": "要调用的API端点"
},
"method": {
"type": "string",
"enum": ["GET", "POST", "PUT", "DELETE"],
"default": "GET"
},
"headers": {
"type": "object",
"description": "请求头"
},
"body": {
"type": "object",
"description": "请求体(用于POST/PUT)"
}
},
"required": ["endpoint"]
}
}
}
]
def call_external_api(endpoint, method="GET", headers=None, body=None):
# 白名单允许的端点
allowed_endpoints = [
"/api/weather",
"/api/products",
"/api/users"
]
if not any(endpoint.startswith(prefix) for prefix in allowed_endpoints):
raise ValueError(f"端点不允许:{endpoint}")
# 发出请求
if method == "GET":
response = requests.get(endpoint, headers=headers)
elif method == "POST":
response = requests.post(endpoint, headers=headers, json=body)
return response.json()
代码执行
import subprocess
import tempfile
import os
functions = [
{
"type": "function",
"function": {
"name": "execute_code",
"description": "在沙盒环境中执行Python代码",
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "要执行的Python代码"
}
},
"required": ["code"]
}
}
}
]
def execute_code_safely(code):
# 验证代码
dangerous_keywords = ["import os", "import subprocess", "exec(", "eval("]
if any(keyword in code.lower() for keyword in dangerous_keywords):
raise ValueError("代码包含危险关键字")
# 在临时文件中执行
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(code)
temp_file = f.name
try:
# 使用超时执行
result = subprocess.run(
['python', temp_file],
capture_output=True,
text=True,
timeout=10 # 10秒超时
)
# 清理
os.unlink(temp_file)
return {
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode
}
except subprocess.TimeoutExpired:
os.unlink(temp_file)
return {"error": "代码执行超时"}
文件操作
import os
import shutil
functions = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "读取文件内容",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "要读取的文件路径"
},
"max_lines": {
"type": "integer",
"description": "要读取的最大行数"
}
},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "write_file",
"description": "将内容写入文件",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "要写入的文件路径"
},
"content": {
"type": "string",
"description": "要写入的内容"
},
"mode": {
"type": "string",
"enum": ["append", "overwrite"],
"default": "overwrite"
}
},
"required": ["path", "content"]
}
}
}
]
def safe_read_file(path, max_lines=None):
# 验证路径在允许的目录内
allowed_dir = "/safe/files"
full_path = os.path.abspath(path)
if not full_path.startswith(os.path.abspath(allowed_dir)):
raise ValueError("路径超出允许目录")
# 读取文件
with open(full_path, 'r') as f:
if max_lines:
lines = []
for i, line in enumerate(f):
if i >= max_lines:
break
lines.append(line.rstrip())
return {"content": "
".join(lines)}
else:
content = f.read()
return {"content": content}
def safe_write_file(path, content, mode="overwrite"):
allowed_dir = "/safe/files"
full_path = os.path.abspath(path)
if not full_path.startswith(os.path.abspath(allowed_dir)):
raise ValueError("路径超出允许目录")
if mode == "append":
with open(full_path, 'a') as f:
f.write(content + "
")
else:
with open(full_path, 'w') as f:
f.write(content)
return {"success": True, "path": full_path}
使用工具构建AI代理
工具感知的代理
class ToolAwareAgent:
def __init__(self, llm_client, tools):
self.client = llm_client
self.tools = tools
self.tool_registry = {tool["name"]: tool for tool in tools}
self.conversation_history = []
def process(self, user_message):
# 将用户消息添加到历史
self.conversation_history.append({
"role": "user",
"content": user_message
})
# 获取LLM响应
response = self.client.chat.completions.create(
model="gpt-4",
messages=self.conversation_history,
tools=self.tools
)
assistant_message = response.choices[0].message
self.conversation_history.append(assistant_message)
# 处理函数调用
if assistant_message.function_calls:
tool_results = []
for fc in assistant_message.function_calls:
tool_name = fc.name
tool_args = json.loads(fc.arguments)
# 执行工具
tool_result = self.execute_tool(tool_name, tool_args)
tool_results.append({
"role": "tool",
"tool_use_id": fc.id,
"name": tool_name,
"content": json.dumps(tool_result)
})
# 获取最终响应
final_response = self.client.chat.completions.create(
model="gpt-4",
messages=self.conversation_history + tool_results
)
self.conversation_history.append(final_response.choices[0].message)
return final_response.choices[0].message.content
return assistant_message.content
def execute_tool(self, tool_name, arguments):
tool = self.tool_registry.get(tool_name)
if not tool:
raise ValueError(f"未找到工具:{tool_name}")
# 执行工具
handler = tool.get("handler")
return handler(**arguments)
多步推理
class MultiStepAgent:
def __init__(self, llm_client):
self.client = llm_client
self.max_steps = 5
self.current_step = 0
def process(self, user_message):
self.current_step = 0
conversation = [{"role": "user", "content": user_message}]
while self.current_step < self.max_steps:
response = self.client.chat.completions.create(
model="gpt-4",
messages=conversation,
tools=self.get_tools()
)
assistant_message = response.choices[0].message
conversation.append(assistant_message)
# 检查是否有函数调用
if assistant_message.function_calls:
# 执行工具
for fc in assistant_message.function_calls:
tool_result = self.execute_tool(fc.name, fc.arguments)
conversation.append({
"role": "tool",
"name": fc.name,
"content": json.dumps(tool_result)
})
self.current_step += 1
else:
# 没有更多函数调用,完成
break
return conversation[-1]["content"]
def get_tools(self):
return [
{
"type": "function",
"function": {
"name": "search",
"description": "搜索信息"
}
},
{
"type": "function",
"function": {
"name": "analyze",
"description": "分析找到的信息"
}
},
{
"type": "function",
"function": {
"name": "summarize",
"description": "总结分析"
}
}
]
工具编排
顺序编排
class ToolOrchestrator:
def __init__(self):
self.tools = {}
self.workflows = {
"data_analysis": ["search", "analyze", "summarize"],
"user_lookup": ["get_user", "get_profile", "get_orders"],
"order_processing": ["check_inventory", "calculate_price", "create_order"]
}
def register_tool(self, name, handler):
self.tools[name] = handler
def execute_workflow(self, workflow_name, context):
if workflow_name not in self.workflows:
raise ValueError(f"未知工作流:{workflow_name}")
workflow = self.workflows[workflow_name]
results = {}
for tool_name in workflow:
if tool_name in self.tools:
results[tool_name] = self.tools[tool_name](context)
return results
# 使用
orchestrator = ToolOrchestrator()
orchestrator.register_tool("search", search_tool)
orchestrator.register_tool("analyze", analyze_tool)
orchestrator.register_tool("summarize", summarize_tool)
results = orchestrator.execute_workflow("data_analysis", {"query": "市场趋势"})
并行编排
import asyncio
class ParallelOrchestrator:
def __init__(self):
self.tools = {}
def register_tool(self, name, handler):
self.tools[name] = handler
async def execute_parallel(self, tool_calls):
tasks = []
for call in tool_calls:
if call["name"] in self.tools:
task = asyncio.create_task(
self.tools[call["name"]](**call["arguments"])
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
{
"name": call["name"],
"success": not isinstance(result, Exception),
"result": result if not isinstance(result, Exception) else str(result)
}
for call, result in zip(tool_calls, results)
]
# 使用
orchestrator = ParallelOrchestrator()
orchestrator.register_tool("get_weather", get_weather)
orchestrator.register_tool("get_time", get_time)
tool_calls = [
{"name": "get_weather", "arguments": {"location": "东京"}},
{"name": "get_time", "arguments": {"timezone": "亚洲/东京"}}
]
results = asyncio.run(orchestrator.execute_parallel(tool_calls))
带函数调用的流式处理
实时工具执行
from openai import OpenAI
client = OpenAI()
async def streaming_function_call():
stream = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "计算1到100的和"}
],
functions=[{
"type": "function",
"function": {
"name": "calculate_sum",
"description": "计算数字的和",
"parameters": {
"type": "object",
"properties": {
"numbers": {
"type": "array",
"items": {"type": "integer"},
"description": "要求和的数字列表"
}
},
"required": ["numbers"]
}
}
}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].finish_reason == "function_calls":
fc = chunk.choices[0].delta.function_calls[0]
# 执行函数
args = json.loads(fc.arguments)
result = calculate_sum(args["numbers"])
# 将结果流式传回
result_stream = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "计算1到100的和"},
{"role": "assistant", "content": None, "function_calls": [fc]},
{
"role": "function",
"name": fc.name,
"content": json.dumps(result)
}
],
stream=True
)
async for result_chunk in result_stream:
if result_chunk.choices[0].delta.content:
print(result_chunk.choices[0].delta.content, end='', flush=True)
成本优化
令牌使用监控
class TokenTracker:
def __init__(self):
self.usage = {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}
def track(self, response):
if hasattr(response, 'usage'):
self.usage["prompt_tokens"] += response.usage.prompt_tokens
self.usage["completion_tokens"] += response.usage.completion_tokens
self.usage["total_tokens"] += response.usage.total_tokens
def get_stats(self):
return self.usage
# 使用
tracker = TokenTracker()
tracker.track(response)
print(f"总使用令牌数:{tracker.get_stats()['total_tokens']}")
缓存函数结果
from functools import lru_cache
import hashlib
class FunctionCache:
def __init__(self, ttl=300):
self.cache = {}
self.ttl = ttl # 存活时间(秒)
def get_cache_key(self, function_name, arguments):
key_str = f"{function_name}:{json.dumps(arguments, sort_keys=True)}"
return hashlib.md5(key_str.encode()).hexdigest()
def get(self, function_name, arguments):
key = self.get_cache_key(function_name, arguments)
if key in self.cache:
entry = self.cache[key]
if time.time() - entry["timestamp"] < self.ttl:
return {"cached": True, "result": entry["result"]}
else:
del self.cache[key]
return None
def set(self, function_name, arguments, result):
key = self.get_cache_key(function_name, arguments)
self.cache[key] = {
"result": result,
"timestamp": time.time()
}
# 使用
cache = FunctionCache(ttl=300)
def cached_function_call(function_name, arguments):
# 检查缓存
cached = cache.get(function_name, arguments)
if cached:
return cached
# 执行函数
result = execute_function(function_name, arguments)
# 存储到缓存
cache.set(function_name, arguments, result)
return {"cached": False, "result": result}
最小化函数调用
def smart_function_selection(user_query, available_functions):
"""基于查询选择最相关的函数"""
query_lower = user_query.lower()
# 基于关键词匹配为函数评分
scored_functions = []
for func in available_functions:
score = 0
description = func.get("description", "").lower()
# 检查关键词匹配
for keyword in ["weather", "temperature", "forecast"]:
if keyword in query_lower and keyword in description:
score += 2
scored_functions.append((score, func))
# 按分数排序并返回最高分函数
scored_functions.sort(key=lambda x: x[0], reverse=True)
# 只使用最高分函数
if scored_functions and scored_functions[0][0] > 0:
return [scored_functions[0][1]]
return available_functions
测试函数调用
单元测试
import pytest
from unittest.mock import Mock, patch
def test_function_calling():
# 模拟LLM响应
with patch('openai.OpenAI') as mock_openai:
mock_client = Mock()
mock_openai.return_value = mock_client
# 模拟函数调用响应
mock_response = Mock()
mock_response.choices = [Mock()]
mock_response.choices[0].message = Mock()
mock_response.choices[0].message.function_calls = [
Mock(name="get_weather", arguments='{"location": "东京"}')
]
mock_client.chat.completions.create.return_value = mock_response
# 测试
client = mock_openai()
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "东京的天气怎么样?"}],
functions=functions
)
# 断言函数调用已发出
assert response.choices[0].message.function_calls is not None
assert response.choices[0].message.function_calls[0].name == "get_weather"
集成测试
def test_end_to_end_function_calling():
# 模拟用户交互
user_message = "东京的天气怎么样?"
# 获取LLM响应
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": user_message}],
functions=functions
)
# 检查函数是否被调用
assert response.choices[0].finish_reason == "function_calls"
# 执行函数
fc = response.choices[0].message.function_calls[0]
args = json.loads(fc.arguments)
weather_data = get_weather(args["location"])
# 将结果发送回
final_response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": user_message},
response.choices[0].message,
{
"role": "function",
"name": fc.name,
"content": json.dumps(weather_data)
}
]
)
# 验证最终响应
assert "晴朗" in final_response.choices[0].message.content.lower()
监控和日志记录
函数调用日志记录
import logging
class FunctionCallLogger:
def __init__(self):
self.logger = logging.getLogger('function_calls')
self.logger.setLevel(logging.INFO)
handler = logging.FileHandler('function_calls.log')
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(handler)
def log_call(self, function_name, arguments, result, duration_ms):
self.logger.info(
f"函数:{function_name}, "
f"参数:{arguments}, "
f"结果:{result}, "
f"持续时间:{duration_ms}毫秒"
)
def log_error(self, function_name, arguments, error):
self.logger.error(
f"函数:{function_name}, "
f"参数:{arguments}, "
f"错误:{error}"
)
# 使用
logger = FunctionCallLogger()
import time
start = time.time()
try:
result = get_weather("东京")
logger.log_call("get_weather", {"location": "东京"}, result, (time.time() - start) * 1000)
except Exception as e:
logger.log_error("get_weather", {"location": "东京"}, str(e))
最佳实践
-
函数设计
- 保持函数专注和单一目的
- 使用清晰、描述性的名称
- 提供详细的参数描述
- 清楚定义必需与可选参数
-
错误处理
- 执行前验证所有输入
- 提供清晰的错误消息
- 为临时故障实现重试逻辑
- 尽可能使用后备机制
-
安全
- 验证和清理所有输入
- 实现权限检查
- 使用速率限制
- 永远不在输出中暴露敏感数据
-
性能
- 适当时缓存函数结果
- 尽可能使用并行执行
- 监控令牌使用
- 优化函数执行时间
-
测试
- 单元测试单个函数
- 集成测试端到端流程
- 测试错误场景
- 在生产中监控函数调用模式