大语言模型函数调用 LLMFunctionCalling

大语言模型函数调用(工具使用)技能,使大语言模型能够通过调用预定义函数与外部系统交互,实现结构化输出和外部集成。该技能包括函数定义、执行流程、错误处理、安全考虑等关键技术,适用于构建AI智能体、自动化任务和增强LLM应用能力。关键词:大语言模型,函数调用,工具使用,AI集成,结构化输出,LLM扩展。

AI智能体 0 次安装 0 次浏览 更新于 3/5/2026

名称:大语言模型函数调用 描述:使用大语言模型实现函数调用(工具使用),以获取结构化输出和外部集成。

大语言模型函数调用

概述

大语言模型(LLM)函数调用(也称为工具使用)使大语言模型能够通过调用预定义函数与外部系统交互。LLM不仅能生成文本,还能请求执行具有结构化参数的特定函数,接收结果,并基于这些结果继续推理。

前提条件

  • 理解LLM API及其功能
  • 了解JSON Schema和函数定义格式
  • 熟悉错误处理和重试模式
  • 了解函数执行的安全最佳实践
  • 基本了解异步/等待模式以实现并行执行
  • 熟悉验证库(如Pydantic、jsonschema)

关键概念

什么是函数调用(工具使用)

函数调用允许LLMs:

  1. 理解意图:识别用户请求何时需要外部操作
  2. 选择工具:选择适当的函数进行调用
  3. 生成参数:创建正确格式的函数参数
  4. 执行函数:运行函数并获取结果
  5. 处理结果:使用函数输出来回答用户

示例流程

用户:"东京的天气怎么样?"
         ↓
LLM:我需要调用get_weather函数,参数为:
       {"location": "东京"}
         ↓
系统:执行get_weather("东京")
         ↓
结果:{"temperature": 22, "condition": "晴朗"}
         ↓
LLM:东京的天气晴朗,温度为22°C。

OpenAI函数调用API

基本函数定义

from openai import OpenAI

client = OpenAI()

# 定义函数
functions = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "获取某个位置的当前天气",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "城市和州,例如旧金山,加州"
                    }
                },
                "required": ["location"]
            }
        }
    }
]

# 使用函数调用发出请求
response = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "user", "content": "东京的天气怎么样?"}
    ],
    functions=functions
)

# 检查LLM是否要调用函数
if response.choices[0].finish_reason == "function_calls":
    function_call = response.choices[0].message.function_calls[0]
    
    # 执行函数
    if function_call.name == "get_weather":
        args = json.loads(function_call.arguments)
        weather_data = get_weather(args["location"])
        
        # 将结果发送回LLM
        second_response = client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "user", "content": "东京的天气怎么样?"},
                response.choices[0].message,  # 助理消息,包含函数调用
                {
                    "role": "function",
                    "name": "get_weather",
                    "content": json.dumps(weather_data)
                }
            ]
        )
        
        print(second_response.choices[0].message.content)

多个函数

functions = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "获取当前天气",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"}
                },
                "required": ["location"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "get_time",
            "description": "获取当前时间",
            "parameters": {
                "type": "object",
                "properties": {
                    "timezone": {
                        "type": "string",
                        "description": "时区,例如美国/纽约"
                    }
                },
                "required": []
            }
        }
    }
]

response = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "user", "content": "东京的天气和时间怎么样?"}
    ],
    functions=functions
)

带函数调用的流式处理

from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "user", "content": "计算25 * 47"}
    ],
    functions=[{
        "type": "function",
        "function": {
            "name": "calculate",
            "description": "执行数学计算",
            "parameters": {
                "type": "object",
                "properties": {
                    "operation": {
                        "type": "string",
                        "enum": ["add", "subtract", "multiply", "divide"]
                    },
                    "a": {"type": "number"},
                    "b": {"type": "number"}
                },
                "required": ["operation", "a", "b"]
            }
        }
    }],
    stream=True
)

for chunk in stream:
    if chunk.choices[0].finish_reason == "function_calls":
        function_call = chunk.choices[0].delta.function_calls[0]
        # 执行函数
        result = execute_function(function_call)
        
        # 用结果继续流式处理
        stream = client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "user", "content": "计算25 * 47"},
                {"role": "assistant", "content": None, "function_calls": [function_call]},
                {"role": "function", "name": function_call.name, "content": json.dumps(result)}
            ],
            stream=True
        )
        for response_chunk in stream:
            print(response_chunk.choices[0].delta.content)

Anthropic工具使用API

基本工具定义

import anthropic

client = anthropic.Anthropic()

# 定义工具
tools = [
    {
        "name": "get_weather",
        "description": "获取某个位置的当前天气",
        "input_schema": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "城市和州"
                }
            },
            "required": ["location"]
        }
    }
]

# 发出请求
message = client.messages.create(
    model="claude-3-opus-20240229",
    max_tokens=1024,
    tools=tools,
    messages=[
        {"role": "user", "content": "东京的天气怎么样?"}
    ]
)

# 检查工具使用
if message.stop_reason == "tool_use":
    for tool_use in message.content:
        if tool_use.type == "tool_use":
            tool_name = tool_use.name
            tool_input = tool_use.input
            
            # 执行工具
            if tool_name == "get_weather":
                result = get_weather(tool_input["location"])
                
                # 将结果发送回
                response = client.messages.create(
                    model="claude-3-opus-20240229",
                    max_tokens=1024,
                    messages=[
                        {"role": "user", "content": "东京的天气怎么样?"},
                        message,  # 助理消息,包含工具使用
                        {
                            "role": "user",
                            "content": f"工具结果:{json.dumps(result)}"
                        }
                    ]
                )
                
                print(response.content[0].text)

多个工具

tools = [
    {
        "name": "search_database",
        "description": "搜索产品数据库",
        "input_schema": {
            "type": "object",
            "properties": {
                "query": {"type": "string"},
                "category": {"type": "string"}
            },
            "required": ["query"]
        }
    },
    {
        "name": "get_user_profile",
        "description": "获取用户配置文件信息",
        "input_schema": {
            "type": "object",
            "properties": {
                "user_id": {"type": "string"}
            },
            "required": ["user_id"]
        }
    }
]

response = client.messages.create(
    model="claude-3-opus-20240229",
    max_tokens=1024,
    tools=tools,
    messages=[
        {"role": "user", "content": "为用户123找到跑步鞋"}
    ]
)

# 处理多个工具调用
for content in response.content:
    if content.type == "tool_use":
        tool_name = content.name
        tool_input = content.input
        
        if tool_name == "search_database":
            results = search_products(tool_input["query"], tool_input.get("category"))
        elif tool_name == "get_user_profile":
            results = get_user_profile(tool_input["user_id"])
        
        # 发送所有结果回
        tool_results = [{"type": "tool_result", "tool_use_id": content.id, "content": json.dumps(results)}]
        
        final_response = client.messages.create(
            model="claude-3-opus-20240229",
            max_tokens=1024,
            messages=[
                {"role": "user", "content": "为用户123找到跑步鞋"},
                response.content,
                *tool_results
            ]
        )

函数定义模式

JSON Schema

functions = [
    {
        "type": "function",
        "function": {
            "name": "create_user",
            "description": "创建新用户账户",
            "parameters": {
                "type": "object",
                "properties": {
                    "email": {
                        "type": "string",
                        "format": "email",
                        "description": "用户的电子邮件地址"
                    },
                    "password": {
                        "type": "string",
                        "minLength": 8,
                        "description": "用户的密码(至少8个字符)"
                    },
                    "name": {
                        "type": "string",
                        "minLength": 2,
                        "description": "用户的显示名称"
                    },
                    "age": {
                        "type": "integer",
                        "minimum": 13,
                        "maximum": 120,
                        "description": "用户的年龄"
                    },
                    "subscribe_newsletter": {
                        "type": "boolean",
                        "default": False,
                        "description": "是否订阅新闻通讯"
                    }
                },
                "required": ["email", "password", "name"],
                "additionalProperties": False
            }
        }
    }
]

参数描述

functions = [
    {
        "type": "function",
        "function": {
            "name": "search_products",
            "description": "在目录中搜索产品",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "搜索查询 - 可以包括产品名称、类别或关键字"
                    },
                    "category": {
                        "type": "string",
                        "description": "按特定类别筛选(可选)",
                        "enum": ["电子产品", "服装", "运动", "家居"]
                    },
                    "min_price": {
                        "type": "number",
                        "description": "最低价格筛选(可选)"
                    },
                    "max_price": {
                        "type": "number",
                        "description": "最高价格筛选(可选)"
                    },
                    "sort_by": {
                        "type": "string",
                        "description": "按字段排序结果",
                        "enum": ["price", "name", "rating", "relevance"]
                    },
                    "limit": {
                        "type": "integer",
                        "description": "要返回的最大结果数",
                        "default": 10,
                        "minimum": 1,
                        "maximum": 50
                    }
                },
                "required": ["query"]
            }
        }
    }
]

必需与可选

functions = [
    {
        "type": "function",
        "function": {
            "name": "book_flight",
            "description": "预订航班机票",
            "parameters": {
                "type": "object",
                "properties": {
                    "origin": {
                        "type": "string",
                        "description": "出发机场代码(例如JFK、LAX)"
                    },
                    "destination": {
                        "type": "string",
                        "description": "目的地机场代码"
                    },
                    "date": {
                        "type": "string",
                        "description": "出发日期,格式为YYYY-MM-DD"
                    },
                    "passengers": {
                        "type": "integer",
                        "description": "乘客数量",
                        "default": 1
                    },
                    "class": {
                        "type": "string",
                        "description": "航班舱位",
                        "enum": ["economy", "business", "first"],
                        "default": "economy"
                    }
                },
                "required": ["origin", "destination", "date"]
            }
        }
    }
]

结构化输出提取

提取结构化数据

functions = [
    {
        "type": "function",
        "function": {
            "name": "extract_order_info",
            "description": "从用户消息中提取订单信息",
            "parameters": {
                "type": "object",
                "properties": {
                    "product_name": {
                        "type": "string",
                        "description": "产品名称"
                    },
                    "quantity": {
                        "type": "integer",
                        "description": "订购数量"
                    },
                    "address": {
                        "type": "string",
                        "description": "送货地址"
                    },
                    "payment_method": {
                        "type": "string",
                        "description": "支付方式",
                        "enum": ["credit_card", "debit_card", "paypal", "bank_transfer"]
                    }
                },
                "required": []
            }
        }
    }
]

response = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "system", "content": "从用户消息中提取订单信息。"},
        {"role": "user", "content": "我想订购2双耐克跑步鞋,使用我的信用卡送到纽约市123主街,NY 10001"}
    ],
    functions=functions
)

# 检查是否调用了函数
if response.choices[0].finish_reason == "function_calls":
    function_call = response.choices[0].message.function_calls[0]
    args = json.loads(function_call.arguments)
    
    # 处理提取的数据
    print(f"产品:{args['product_name']}")
    print(f"数量:{args['quantity']}")
    print(f"地址:{args['address']}")
    print(f"支付方式:{args['payment_method']}")

数据验证

from pydantic import BaseModel, EmailStr, validator
from typing import Optional

class OrderInfo(BaseModel):
    product_name: str
    quantity: int = 1
    address: str
    payment_method: str
    email: Optional[EmailStr] = None
    notes: Optional[str] = None
    
    @validator('quantity')
    def validate_quantity(cls, v):
        if v < 1 or v > 100:
            raise ValueError('数量必须在1到100之间')
        return v

# 与函数调用一起使用
functions = [
    {
        "type": "function",
        "function": {
            "name": "create_order",
            "description": "创建新订单",
            "parameters": {
                "type": "object",
                "properties": {
                    "product_name": {"type": "string"},
                    "quantity": {"type": "integer"},
                    "address": {"type": "string"},
                    "payment_method": {"type": "string"},
                    "email": {"type": "string"},
                    "notes": {"type": "string"}
                },
                "required": ["product_name", "address", "payment_method"]
            }
        }
    }
]

# 验证和处理
def process_order(args):
    try:
        order = OrderInfo(**args)
        # 处理订单
        return {"success": True, "order_id": "12345"}
    except ValueError as e:
        return {"success": False, "error": str(e)}

多函数调用

顺序函数调用

functions = [
    {
        "type": "function",
        "function": {
            "name": "get_user_balance",
            "description": "获取用户账户余额",
            "parameters": {
                "type": "object",
                "properties": {
                    "user_id": {"type": "string"}
                },
                "required": ["user_id"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "transfer_money",
            "description": "在账户之间转账",
            "parameters": {
                "type": "object",
                "properties": {
                    "from_user_id": {"type": "string"},
                    "to_user_id": {"type": "string"},
                    "amount": {"type": "number"}
                },
                "required": ["from_user_id", "to_user_id", "amount"]
            }
        }
    }
]

response = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "user", "content": "从用户123转账50美元到用户456"}
    ],
    functions=functions
)

# 处理多个函数调用
if response.choices[0].finish_reason == "function_calls":
    function_calls = response.choices[0].message.function_calls
    
    # 顺序执行
    results = []
    for fc in function_calls:
        if fc.name == "get_user_balance":
            balance = get_user_balance(fc.arguments)
            results.append({"function": "get_user_balance", "result": balance})
        elif fc.name == "transfer_money":
            transfer_result = transfer_money(fc.arguments)
            results.append({"function": "transfer_money", "result": transfer_result})
    
    # 将结果发送回
    second_response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "user", "content": "从用户123转账50美元到用户456"},
            response.choices[0].message,
            *[{
                "role": "function",
                "name": r["function"],
                "content": json.dumps(r["result"])
            } for r in results]
        ]
    )

并行函数调用

functions = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "获取某个位置的天气",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"}
                },
                "required": ["location"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "get_time",
            "description": "获取当前时间",
            "parameters": {
                "type": "object",
                "properties": {
                    "timezone": {"type": "string"}
                },
                "required": []
            }
        }
    }
]

response = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "user", "content": "东京和纽约的天气和时间怎么样?"}
    ],
    functions=functions
)

# 处理并行函数调用
if response.choices[0].finish_reason == "function_calls":
    function_calls = response.choices[0].message.function_calls
    
    # 并行执行
    import asyncio
    
    async def execute_all(calls):
        tasks = []
        for fc in calls:
            if fc.name == "get_weather":
                task = asyncio.create_task(get_weather(fc.arguments))
            elif fc.name == "get_time":
                task = asyncio.create_task(get_time(fc.arguments))
            tasks.append(task)
        
        results = await asyncio.gather(*tasks)
        return results
    
    results = asyncio.run(execute_all(function_calls))
    
    # 将结果发送回
    second_response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "user", "content": "东京和纽约的天气和时间怎么样?"},
            response.choices[0].message,
            *[{
                "role": "function",
                "name": fc.name,
                "content": json.dumps(r)
            } for fc, r in zip(function_calls, results)]
        ]
    )

函数调用路由

智能路由

class FunctionRouter:
    def __init__(self):
        self.functions = {
            "weather": self.get_weather,
            "time": self.get_time,
            "database": self.search_database,
            "user": self.get_user,
        }
    
    def route(self, function_name, arguments):
        if function_name in self.functions:
            return self.functions[function_name](arguments)
        else:
            raise ValueError(f"未知函数:{function_name}")
    
    def get_weather(self, args):
        location = args.get("location")
        return {"temperature": 22, "condition": "晴朗"}
    
    def get_time(self, args):
        timezone = args.get("timezone", "UTC")
        return {"time": "2024-01-16 12:00:00", "timezone": timezone}
    
    def search_database(self, args):
        query = args.get("query")
        return {"results": [{"id": 1, "name": "产品1"}]}
    
    def get_user(self, args):
        user_id = args.get("user_id")
        return {"id": user_id, "name": "张三"}

router = FunctionRouter()

# 与LLM一起使用
response = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "user", "content": "东京的天气怎么样?"}
    ],
    functions=[{
        "type": "function",
        "function": {
            "name": "weather",
            "description": "获取天气",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"}
                },
                "required": ["location"]
            }
        }
    }]
)

if response.choices[0].finish_reason == "function_calls":
    fc = response.choices[0].message.function_calls[0]
    result = router.route(fc.name, json.loads(fc.arguments))

动态函数加载

import importlib
import os

class DynamicFunctionLoader:
    def __init__(self, functions_dir="functions"):
        self.functions_dir = functions_dir
        self.loaded_functions = {}
        self.load_functions()
    
    def load_functions(self):
        for filename in os.listdir(self.functions_dir):
            if filename.endswith('.py') and not filename.startswith('_'):
                module_name = filename[:-3]
                try:
                    module = importlib.import_module(f"functions.{module_name}")
                    for attr_name in dir(module):
                        attr = getattr(module, attr_name)
                        if callable(attr) and hasattr(attr, 'tool_definition'):
                            self.loaded_functions[attr.tool_definition['name']] = attr
                except Exception as e:
                    print(f"加载{module_name}失败:{e}")
    
    def get_function_definitions(self):
        definitions = []
        for name, func in self.loaded_functions.items():
            definitions.append(func.tool_definition)
        return definitions
    
    def execute_function(self, name, arguments):
        if name in self.loaded_functions:
            return self.loaded_functions[name](**arguments)
        else:
            raise ValueError(f"未找到函数:{name}")

# 示例函数模块
# functions/weather.py
def get_weather(location: str) -> dict:
    """获取某个位置的天气"""
    return {"temperature": 22, "condition": "晴朗", "location": location}

get_weather.tool_definition = {
    "type": "function",
    "function": {
        "name": "get_weather",
        "description": "获取某个位置的当前天气",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "城市和州"
                }
            },
            "required": ["location"]
        }
    }
}

# 使用
loader = DynamicFunctionLoader()
functions = loader.get_function_definitions()

response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "东京的天气怎么样?"}],
    functions=functions
)

错误处理

无效函数调用

def safe_execute_function(name, arguments):
    try:
        # 验证函数名称
        if name not in available_functions:
            return {
                "error": f"未知函数:{name}",
                "available_functions": list(available_functions.keys())
            }
        
        # 验证参数
        func_schema = available_functions[name]
        for param in func_schema.get("required", []):
            if param not in arguments:
                return {
                    "error": f"缺少必需参数:{param}",
                    "function": name
                }
        
        # 执行函数
        result = available_functions[name]["handler"](**arguments)
        return {"success": True, "result": result}
        
    except Exception as e:
        return {
            "error": str(e),
            "function": name
        }

# 与LLM响应一起使用
if response.choices[0].finish_reason == "function_calls":
    fc = response.choices[0].message.function_calls[0]
    args = json.loads(fc.arguments)
    result = safe_execute_function(fc.name, args)
    
    # 将错误发送回LLM
    if "error" in result:
        second_response = client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "user", "content": original_user_message},
                response.choices[0].message,
                {
                    "role": "function",
                    "name": fc.name,
                    "content": json.dumps(result)
                }
            ]
        )

重试策略

import time
from functools import wraps

def retry_on_failure(max_retries=3, delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_retries - 1:
                        time.sleep(delay * (attempt + 1))
            
            raise last_exception
        return wrapper
    return decorator

# 应用到函数
@retry_on_failure(max_retries=3, delay=2)
def get_weather(location):
    # 可能偶尔失败
    return call_weather_api(location)

后备机制

class RobustFunctionExecutor:
    def __init__(self):
        self.functions = {}
        self.fallbacks = {}
    
    def register_function(self, name, handler, fallback=None):
        self.functions[name] = handler
        if fallback:
            self.fallbacks[name] = fallback
    
    def execute(self, name, arguments):
        try:
            return {"success": True, "result": self.functions[name](**arguments)}
        except Exception as e:
            if name in self.fallbacks:
                try:
                    fallback_result = self.fallbacks[name](**arguments)
                    return {
                        "success": True,
                        "result": fallback_result,
                        "warning": f"主函数失败,使用了后备:{str(e)}"
                    }
                except Exception as fe:
                    return {
                        "success": False,
                        "error": f"主函数和后备都失败:{str(e)}"
                    }
            else:
                return {
                    "success": False,
                    "error": str(e)
                }

# 使用
executor = RobustFunctionExecutor()

executor.register_function(
    "get_weather",
    lambda location: call_weather_api(location),
    lambda location: {"temperature": 20, "condition": "未知"}  # 后备
)

验证和清理

输入验证

from pydantic import BaseModel, validator

class WeatherQuery(BaseModel):
    location: str
    
    @validator('location')
    def validate_location(cls, v):
        if len(v) < 2 or len(v) > 100:
            raise ValueError('位置必须在2到100个字符之间')
        # 清理输入
        return v.strip().lower()

def safe_weather_handler(args):
    try:
        validated = WeatherQuery(**args)
        return get_weather(validated.location)
    except ValueError as e:
        return {"error": str(e)}

输出清理

def sanitize_output(data):
    """从函数输出中移除敏感信息"""
    sensitive_keys = ['password', 'ssn', 'credit_card', 'api_key']
    
    sanitized = data.copy()
    for key in sensitive_keys:
        if key in sanitized:
            sanitized[key] = "***已屏蔽***"
    
    return sanitized

# 与函数调用一起使用
def get_user_profile(user_id):
    user_data = fetch_user_from_db(user_id)
    return sanitize_output(user_data)

模式验证

import jsonschema

# 定义模式
function_schema = {
    "type": "object",
    "properties": {
        "email": {"type": "string", "format": "email"},
        "age": {"type": "integer", "minimum": 0, "maximum": 120}
    },
    "required": ["email"]
}

def validate_arguments(args, schema):
    try:
        jsonschema.validate(instance=args, schema=schema)
        return {"valid": True}
    except jsonschema.ValidationError as e:
        return {"valid": False, "errors": e.message}

# 与函数调用一起使用
if response.choices[0].finish_reason == "function_calls":
    fc = response.choices[0].message.function_calls[0]
    args = json.loads(fc.arguments)
    validation = validate_arguments(args, function_schema)
    
    if not validation["valid"]:
        # 将错误发送回LLM
        second_response = client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "user", "content": original_message},
                response.choices[0].message,
                {
                    "role": "function",
                    "name": fc.name,
                    "content": json.dumps({"error": validation["errors"]})
                }
            ]
        )

安全考虑

输入验证

def validate_function_input(function_name, arguments):
    # 检查SQL注入
    for value in arguments.values():
        if isinstance(value, str):
            dangerous_patterns = ["'", ";", "--", "/*", "xp_"]
            if any(pattern in value.lower() for pattern in dangerous_patterns):
                raise ValueError(f"检测到潜在危险输入")
    
    # 检查命令注入
    dangerous_commands = ["eval(", "exec(", "system(", "__import__"]
    for value in arguments.values():
        if isinstance(value, str):
            if any(cmd in value for cmd in dangerous_commands):
                raise ValueError(f"检测到命令注入尝试")
    
    return True

权限检查

class SecureFunctionExecutor:
    def __init__(self):
        self.function_permissions = {
            "get_user_data": ["read:users"],
            "update_user": ["write:users"],
            "delete_user": ["delete:users"],
            "admin_functions": ["admin:access"]
        }
        self.user_permissions = set()
    
    def set_user_permissions(self, permissions):
        self.user_permissions = set(permissions)
    
    def check_permission(self, function_name):
        required = self.function_permissions.get(function_name, [])
        if not required:
            return True
        return all(perm in self.user_permissions for perm in required)
    
    def execute(self, function_name, arguments):
        if not self.check_permission(function_name):
            return {
                "error": "权限被拒绝",
                "required_permissions": self.function_permissions.get(function_name, [])
            }
        
        return execute_function(function_name, arguments)

# 使用
executor = SecureFunctionExecutor()
executor.set_user_permissions(["read:users", "write:users"])

result = executor.execute("get_user_data", {"user_id": "123"})  # 成功
result = executor.execute("delete_user", {"user_id": "123"})  # 权限被拒绝

速率限制

from collections import defaultdict
from datetime import datetime, timedelta
import threading

class RateLimiter:
    def __init__(self, max_calls=100, window=timedelta(minutes=1)):
        self.max_calls = max_calls
        self.window = window
        self.calls = defaultdict(list)
        self.lock = threading.Lock()
    
    def check_rate_limit(self, user_id, function_name):
        now = datetime.now()
        key = f"{user_id}:{function_name}"
        
        with self.lock:
            # 移除窗口外的旧调用
            self.calls[key] = [
                call_time for call_time in self.calls[key]
                if now - call_time < self.window
            ]
            
            # 检查是否超过限制
            if len(self.calls[key]) >= self.max_calls:
                return False
            
            # 记录此调用
            self.calls[key].append(now)
            return True
    
    def record_call(self, user_id, function_name):
        self.check_rate_limit(user_id, function_name)

# 使用
limiter = RateLimiter(max_calls=10, window=timedelta(minutes=1))

def execute_with_rate_limit(user_id, function_name, arguments):
    if not limiter.check_rate_limit(user_id, function_name):
        return {
            "error": "超过速率限制。请稍后再试。"
        }
    
    return execute_function(function_name, arguments)

常见模式

数据库查询

functions = [
    {
        "type": "function",
        "function": {
            "name": "query_database",
            "description": "在数据库上执行SQL查询",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "要执行的SQL查询"
                    },
                    "params": {
                        "type": "array",
                        "description": "查询参数"
                    }
                },
                "required": ["query"]
            }
        }
    }
]

def execute_sql_query(query, params):
    # 验证查询(只允许SELECT)
    if not query.strip().upper().startswith("SELECT"):
        raise ValueError("只允许SELECT查询")
    
    # 使用参数化查询执行
    cursor.execute(query, params)
    return cursor.fetchall()

API集成

functions = [
    {
        "type": "function",
        "function": {
            "name": "call_external_api",
            "description": "向外部API发出请求",
            "parameters": {
                "type": "object",
                "properties": {
                    "endpoint": {
                        "type": "string",
                        "description": "要调用的API端点"
                    },
                    "method": {
                        "type": "string",
                        "enum": ["GET", "POST", "PUT", "DELETE"],
                        "default": "GET"
                    },
                    "headers": {
                        "type": "object",
                        "description": "请求头"
                    },
                    "body": {
                        "type": "object",
                        "description": "请求体(用于POST/PUT)"
                    }
                },
                "required": ["endpoint"]
            }
        }
    }
]

def call_external_api(endpoint, method="GET", headers=None, body=None):
    # 白名单允许的端点
    allowed_endpoints = [
        "/api/weather",
        "/api/products",
        "/api/users"
    ]
    
    if not any(endpoint.startswith(prefix) for prefix in allowed_endpoints):
        raise ValueError(f"端点不允许:{endpoint}")
    
    # 发出请求
    if method == "GET":
        response = requests.get(endpoint, headers=headers)
    elif method == "POST":
        response = requests.post(endpoint, headers=headers, json=body)
    
    return response.json()

代码执行

import subprocess
import tempfile
import os

functions = [
    {
        "type": "function",
        "function": {
            "name": "execute_code",
            "description": "在沙盒环境中执行Python代码",
            "parameters": {
                "type": "object",
                "properties": {
                    "code": {
                        "type": "string",
                        "description": "要执行的Python代码"
                    }
                },
                "required": ["code"]
            }
        }
    }
]

def execute_code_safely(code):
    # 验证代码
    dangerous_keywords = ["import os", "import subprocess", "exec(", "eval("]
    if any(keyword in code.lower() for keyword in dangerous_keywords):
        raise ValueError("代码包含危险关键字")
    
    # 在临时文件中执行
    with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
        f.write(code)
        temp_file = f.name
    
    try:
        # 使用超时执行
        result = subprocess.run(
            ['python', temp_file],
            capture_output=True,
            text=True,
            timeout=10  # 10秒超时
        )
        
        # 清理
        os.unlink(temp_file)
        
        return {
            "stdout": result.stdout,
            "stderr": result.stderr,
            "returncode": result.returncode
        }
    except subprocess.TimeoutExpired:
        os.unlink(temp_file)
        return {"error": "代码执行超时"}

文件操作

import os
import shutil

functions = [
    {
        "type": "function",
        "function": {
            "name": "read_file",
            "description": "读取文件内容",
            "parameters": {
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "要读取的文件路径"
                    },
                    "max_lines": {
                        "type": "integer",
                        "description": "要读取的最大行数"
                    }
                },
                "required": ["path"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "write_file",
            "description": "将内容写入文件",
            "parameters": {
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "要写入的文件路径"
                    },
                    "content": {
                        "type": "string",
                        "description": "要写入的内容"
                    },
                    "mode": {
                        "type": "string",
                        "enum": ["append", "overwrite"],
                        "default": "overwrite"
                    }
                },
                "required": ["path", "content"]
            }
        }
    }
]

def safe_read_file(path, max_lines=None):
    # 验证路径在允许的目录内
    allowed_dir = "/safe/files"
    full_path = os.path.abspath(path)
    
    if not full_path.startswith(os.path.abspath(allowed_dir)):
        raise ValueError("路径超出允许目录")
    
    # 读取文件
    with open(full_path, 'r') as f:
        if max_lines:
            lines = []
            for i, line in enumerate(f):
                if i >= max_lines:
                    break
                lines.append(line.rstrip())
            return {"content": "
".join(lines)}
        else:
            content = f.read()
            return {"content": content}

def safe_write_file(path, content, mode="overwrite"):
    allowed_dir = "/safe/files"
    full_path = os.path.abspath(path)
    
    if not full_path.startswith(os.path.abspath(allowed_dir)):
        raise ValueError("路径超出允许目录")
    
    if mode == "append":
        with open(full_path, 'a') as f:
            f.write(content + "
")
    else:
        with open(full_path, 'w') as f:
            f.write(content)
    
    return {"success": True, "path": full_path}

使用工具构建AI代理

工具感知的代理

class ToolAwareAgent:
    def __init__(self, llm_client, tools):
        self.client = llm_client
        self.tools = tools
        self.tool_registry = {tool["name"]: tool for tool in tools}
        self.conversation_history = []
    
    def process(self, user_message):
        # 将用户消息添加到历史
        self.conversation_history.append({
            "role": "user",
            "content": user_message
        })
        
        # 获取LLM响应
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=self.conversation_history,
            tools=self.tools
        )
        
        assistant_message = response.choices[0].message
        self.conversation_history.append(assistant_message)
        
        # 处理函数调用
        if assistant_message.function_calls:
            tool_results = []
            
            for fc in assistant_message.function_calls:
                tool_name = fc.name
                tool_args = json.loads(fc.arguments)
                
                # 执行工具
                tool_result = self.execute_tool(tool_name, tool_args)
                tool_results.append({
                    "role": "tool",
                    "tool_use_id": fc.id,
                    "name": tool_name,
                    "content": json.dumps(tool_result)
                })
            
            # 获取最终响应
            final_response = self.client.chat.completions.create(
                model="gpt-4",
                messages=self.conversation_history + tool_results
            )
            
            self.conversation_history.append(final_response.choices[0].message)
            return final_response.choices[0].message.content
        
        return assistant_message.content
    
    def execute_tool(self, tool_name, arguments):
        tool = self.tool_registry.get(tool_name)
        if not tool:
            raise ValueError(f"未找到工具:{tool_name}")
        
        # 执行工具
        handler = tool.get("handler")
        return handler(**arguments)

多步推理

class MultiStepAgent:
    def __init__(self, llm_client):
        self.client = llm_client
        self.max_steps = 5
        self.current_step = 0
    
    def process(self, user_message):
        self.current_step = 0
        conversation = [{"role": "user", "content": user_message}]
        
        while self.current_step < self.max_steps:
            response = self.client.chat.completions.create(
                model="gpt-4",
                messages=conversation,
                tools=self.get_tools()
            )
            
            assistant_message = response.choices[0].message
            conversation.append(assistant_message)
            
            # 检查是否有函数调用
            if assistant_message.function_calls:
                # 执行工具
                for fc in assistant_message.function_calls:
                    tool_result = self.execute_tool(fc.name, fc.arguments)
                    conversation.append({
                        "role": "tool",
                        "name": fc.name,
                        "content": json.dumps(tool_result)
                    })
                
                self.current_step += 1
            else:
                # 没有更多函数调用,完成
                break
        
        return conversation[-1]["content"]
    
    def get_tools(self):
        return [
            {
                "type": "function",
                "function": {
                    "name": "search",
                    "description": "搜索信息"
                }
            },
            {
                "type": "function",
                "function": {
                    "name": "analyze",
                    "description": "分析找到的信息"
                }
            },
            {
                "type": "function",
                "function": {
                    "name": "summarize",
                    "description": "总结分析"
                }
            }
        ]

工具编排

顺序编排

class ToolOrchestrator:
    def __init__(self):
        self.tools = {}
        self.workflows = {
            "data_analysis": ["search", "analyze", "summarize"],
            "user_lookup": ["get_user", "get_profile", "get_orders"],
            "order_processing": ["check_inventory", "calculate_price", "create_order"]
        }
    
    def register_tool(self, name, handler):
        self.tools[name] = handler
    
    def execute_workflow(self, workflow_name, context):
        if workflow_name not in self.workflows:
            raise ValueError(f"未知工作流:{workflow_name}")
        
        workflow = self.workflows[workflow_name]
        results = {}
        
        for tool_name in workflow:
            if tool_name in self.tools:
                results[tool_name] = self.tools[tool_name](context)
        
        return results

# 使用
orchestrator = ToolOrchestrator()
orchestrator.register_tool("search", search_tool)
orchestrator.register_tool("analyze", analyze_tool)
orchestrator.register_tool("summarize", summarize_tool)

results = orchestrator.execute_workflow("data_analysis", {"query": "市场趋势"})

并行编排

import asyncio

class ParallelOrchestrator:
    def __init__(self):
        self.tools = {}
    
    def register_tool(self, name, handler):
        self.tools[name] = handler
    
    async def execute_parallel(self, tool_calls):
        tasks = []
        
        for call in tool_calls:
            if call["name"] in self.tools:
                task = asyncio.create_task(
                    self.tools[call["name"]](**call["arguments"])
                )
                tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return [
            {
                "name": call["name"],
                "success": not isinstance(result, Exception),
                "result": result if not isinstance(result, Exception) else str(result)
            }
            for call, result in zip(tool_calls, results)
        ]

# 使用
orchestrator = ParallelOrchestrator()
orchestrator.register_tool("get_weather", get_weather)
orchestrator.register_tool("get_time", get_time)

tool_calls = [
    {"name": "get_weather", "arguments": {"location": "东京"}},
    {"name": "get_time", "arguments": {"timezone": "亚洲/东京"}}
]

results = asyncio.run(orchestrator.execute_parallel(tool_calls))

带函数调用的流式处理

实时工具执行

from openai import OpenAI

client = OpenAI()

async def streaming_function_call():
    stream = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "user", "content": "计算1到100的和"}
        ],
        functions=[{
            "type": "function",
            "function": {
                "name": "calculate_sum",
                "description": "计算数字的和",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "numbers": {
                            "type": "array",
                            "items": {"type": "integer"},
                            "description": "要求和的数字列表"
                        }
                    },
                    "required": ["numbers"]
                }
            }
        }],
        stream=True
    )
    
    async for chunk in stream:
        if chunk.choices[0].finish_reason == "function_calls":
            fc = chunk.choices[0].delta.function_calls[0]
            
            # 执行函数
            args = json.loads(fc.arguments)
            result = calculate_sum(args["numbers"])
            
            # 将结果流式传回
            result_stream = client.chat.completions.create(
                model="gpt-4",
                messages=[
                    {"role": "user", "content": "计算1到100的和"},
                    {"role": "assistant", "content": None, "function_calls": [fc]},
                    {
                        "role": "function",
                        "name": fc.name,
                        "content": json.dumps(result)
                    }
                ],
                stream=True
            )
            
            async for result_chunk in result_stream:
                if result_chunk.choices[0].delta.content:
                    print(result_chunk.choices[0].delta.content, end='', flush=True)

成本优化

令牌使用监控

class TokenTracker:
    def __init__(self):
        self.usage = {
            "prompt_tokens": 0,
            "completion_tokens": 0,
            "total_tokens": 0
        }
    
    def track(self, response):
        if hasattr(response, 'usage'):
            self.usage["prompt_tokens"] += response.usage.prompt_tokens
            self.usage["completion_tokens"] += response.usage.completion_tokens
            self.usage["total_tokens"] += response.usage.total_tokens
    
    def get_stats(self):
        return self.usage

# 使用
tracker = TokenTracker()
tracker.track(response)
print(f"总使用令牌数:{tracker.get_stats()['total_tokens']}")

缓存函数结果

from functools import lru_cache
import hashlib

class FunctionCache:
    def __init__(self, ttl=300):
        self.cache = {}
        self.ttl = ttl  # 存活时间(秒)
    
    def get_cache_key(self, function_name, arguments):
        key_str = f"{function_name}:{json.dumps(arguments, sort_keys=True)}"
        return hashlib.md5(key_str.encode()).hexdigest()
    
    def get(self, function_name, arguments):
        key = self.get_cache_key(function_name, arguments)
        
        if key in self.cache:
            entry = self.cache[key]
            if time.time() - entry["timestamp"] < self.ttl:
                return {"cached": True, "result": entry["result"]}
            else:
                del self.cache[key]
        
        return None
    
    def set(self, function_name, arguments, result):
        key = self.get_cache_key(function_name, arguments)
        self.cache[key] = {
            "result": result,
            "timestamp": time.time()
        }

# 使用
cache = FunctionCache(ttl=300)

def cached_function_call(function_name, arguments):
    # 检查缓存
    cached = cache.get(function_name, arguments)
    if cached:
        return cached
    
    # 执行函数
    result = execute_function(function_name, arguments)
    
    # 存储到缓存
    cache.set(function_name, arguments, result)
    
    return {"cached": False, "result": result}

最小化函数调用

def smart_function_selection(user_query, available_functions):
    """基于查询选择最相关的函数"""
    query_lower = user_query.lower()
    
    # 基于关键词匹配为函数评分
    scored_functions = []
    for func in available_functions:
        score = 0
        description = func.get("description", "").lower()
        
        # 检查关键词匹配
        for keyword in ["weather", "temperature", "forecast"]:
            if keyword in query_lower and keyword in description:
                score += 2
        
        scored_functions.append((score, func))
    
    # 按分数排序并返回最高分函数
    scored_functions.sort(key=lambda x: x[0], reverse=True)
    
    # 只使用最高分函数
    if scored_functions and scored_functions[0][0] > 0:
        return [scored_functions[0][1]]
    
    return available_functions

测试函数调用

单元测试

import pytest
from unittest.mock import Mock, patch

def test_function_calling():
    # 模拟LLM响应
    with patch('openai.OpenAI') as mock_openai:
        mock_client = Mock()
        mock_openai.return_value = mock_client
        
        # 模拟函数调用响应
        mock_response = Mock()
        mock_response.choices = [Mock()]
        mock_response.choices[0].message = Mock()
        mock_response.choices[0].message.function_calls = [
            Mock(name="get_weather", arguments='{"location": "东京"}')
        ]
        mock_client.chat.completions.create.return_value = mock_response
        
        # 测试
        client = mock_openai()
        response = client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": "东京的天气怎么样?"}],
            functions=functions
        )
        
        # 断言函数调用已发出
        assert response.choices[0].message.function_calls is not None
        assert response.choices[0].message.function_calls[0].name == "get_weather"

集成测试

def test_end_to_end_function_calling():
    # 模拟用户交互
    user_message = "东京的天气怎么样?"
    
    # 获取LLM响应
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": user_message}],
        functions=functions
    )
    
    # 检查函数是否被调用
    assert response.choices[0].finish_reason == "function_calls"
    
    # 执行函数
    fc = response.choices[0].message.function_calls[0]
    args = json.loads(fc.arguments)
    weather_data = get_weather(args["location"])
    
    # 将结果发送回
    final_response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "user", "content": user_message},
            response.choices[0].message,
            {
                "role": "function",
                "name": fc.name,
                "content": json.dumps(weather_data)
            }
        ]
    )
    
    # 验证最终响应
    assert "晴朗" in final_response.choices[0].message.content.lower()

监控和日志记录

函数调用日志记录

import logging

class FunctionCallLogger:
    def __init__(self):
        self.logger = logging.getLogger('function_calls')
        self.logger.setLevel(logging.INFO)
        
        handler = logging.FileHandler('function_calls.log')
        handler.setFormatter(logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        ))
        self.logger.addHandler(handler)
    
    def log_call(self, function_name, arguments, result, duration_ms):
        self.logger.info(
            f"函数:{function_name}, "
            f"参数:{arguments}, "
            f"结果:{result}, "
            f"持续时间:{duration_ms}毫秒"
        )
    
    def log_error(self, function_name, arguments, error):
        self.logger.error(
            f"函数:{function_name}, "
            f"参数:{arguments}, "
            f"错误:{error}"
        )

# 使用
logger = FunctionCallLogger()

import time
start = time.time()
try:
    result = get_weather("东京")
    logger.log_call("get_weather", {"location": "东京"}, result, (time.time() - start) * 1000)
except Exception as e:
    logger.log_error("get_weather", {"location": "东京"}, str(e))

最佳实践

  1. 函数设计

    • 保持函数专注和单一目的
    • 使用清晰、描述性的名称
    • 提供详细的参数描述
    • 清楚定义必需与可选参数
  2. 错误处理

    • 执行前验证所有输入
    • 提供清晰的错误消息
    • 为临时故障实现重试逻辑
    • 尽可能使用后备机制
  3. 安全

    • 验证和清理所有输入
    • 实现权限检查
    • 使用速率限制
    • 永远不在输出中暴露敏感数据
  4. 性能

    • 适当时缓存函数结果
    • 尽可能使用并行执行
    • 监控令牌使用
    • 优化函数执行时间
  5. 测试

    • 单元测试单个函数
    • 集成测试端到端流程
    • 测试错误场景
    • 在生产中监控函数调用模式

相关技能