名称: agentica-sdk 描述: 使用Agentica SDK构建Python代理 - @agentic装饰器、spawn()、持久化、MCP集成 允许工具: [Bash, Read, Write, Edit]
Agentica SDK 参考 (v0.3.1)
使用Agentica框架在Python中构建AI代理。代理可以实现函数、维护状态、使用工具并相互协调。
使用时机
在以下情况下使用此技能:
- 构建新的Python代理
- 向现有代码添加代理能力
- 将MCP工具与代理集成
- 实现多代理协调
- 调试代理行为
快速开始
代理函数(最简单)
from agentica import agentic
@agentic()
async def add(a: int, b: int) -> int:
"""返回a和b的和"""
...
result = await add(1, 2) # 代理计算:3
生成代理(更多控制)
from agentica import spawn
agent = await spawn(premise="你是一个说实话的人。")
result: bool = await agent.call(bool, "地球是平的")
# 返回:False
核心模式
返回类型
# 字符串(默认)
result = await agent.call("2+2等于多少?")
# 类型化输出
result: int = await agent.call(int, "2+2等于多少?")
result: dict[str, int] = await agent.call(dict[str, int], "统计项目")
# 仅副作用
await agent.call(None, "向John发送消息")
前提与系统提示
# 前提:添加到默认系统提示
agent = await spawn(premise="你是数学专家。")
# 系统:完全控制(替换默认)
agent = await spawn(system="你是一个仅响应JSON的响应者。")
传递工具(作用域)
from agentica import agentic, spawn
# 在装饰器中
@agentic(scope={'web_search': web_search_fn})
async def researcher(query: str) -> str:
"""研究一个主题。"""
...
# 在spawn中
agent = await spawn(
premise="数据分析器",
scope={"analyze": custom_analyzer}
)
# 每次调用的作用域
result = await agent.call(
dict[str, int],
"分析数据集",
dataset=data, # 可用作'dataset'
analyzer=custom_fn # 可用作'analyzer'
)
SDK集成模式
from slack_sdk import WebClient
slack = WebClient(token=SLACK_TOKEN)
# 提取特定方法
@agentic(scope={
'list_users': slack.users_list,
'send_message': slack.chat_postMessage
})
async def team_notifier(message: str) -> None:
"""发送团队通知。"""
...
代理实例化
spawn() - 异步(大多数情况)
agent = await spawn(premise="有帮助的助手")
Agent() - 同步(用于 __init__)
from agentica.agent import Agent
class CustomAgent:
def __init__(self):
# 同步 - 使用 Agent() 而不是 spawn()
self._brain = Agent(
premise="专门助手",
scope={"tool": some_tool}
)
async def run(self, task: str) -> str:
return await self._brain(str, task)
模型选择
# 在 spawn 中
agent = await spawn(
premise="快速响应",
model="openai:gpt-5" # 默认: openai:gpt-4.1
)
# 在装饰器中
@agentic(model="anthropic:claude-sonnet-4.5")
async def analyze(text: str) -> dict:
"""分析文本。"""
...
可用模型:
openai:gpt-3.5-turbo,openai:gpt-4o,openai:gpt-4.1,openai:gpt-5anthropic:claude-sonnet-4,anthropic:claude-opus-4.1anthropic:claude-sonnet-4.5,anthropic:claude-opus-4.5- 任何 OpenRouter slug(例如
google/gemini-2.5-flash)
持久化(有状态代理)
@agentic(persist=True)
async def chatbot(message: str) -> str:
"""记住对话历史。"""
...
await chatbot("我的名字是Alice")
await chatbot("我的名字是什么?") # 知道: Alice
对于 spawn() 代理,状态在同一实例的调用间自动保持。
令牌限制
from agentica import spawn, MaxTokens
# 简单限制
agent = await spawn(
premise="简短响应",
max_tokens=500
)
# 精细控制
agent = await spawn(
premise="受控输出",
max_tokens=MaxTokens(
per_invocation=5000, # 所有轮次的总数
per_round=1000, # 每轮推理
rounds=5 # 最大推理轮次
)
)
令牌使用跟踪
from agentica import spawn, last_usage, total_usage
agent = await spawn(premise="你是有帮助的。")
await agent.call(str, "你好!")
# 代理方法
usage = agent.last_usage()
print(f"上次: {usage.input_tokens} 输入, {usage.output_tokens} 输出")
usage = agent.total_usage()
print(f"总计: {usage.total_tokens} 已处理")
# 对于 @agentic 函数
@agentic()
async def my_fn(x: str) -> str: ...
await my_fn("test")
print(last_usage(my_fn))
print(total_usage(my_fn))
流式处理
from agentica import spawn
from agentica.logging.loggers import StreamLogger
import asyncio
agent = await spawn(premise="你是有帮助的。")
stream = StreamLogger()
with stream:
result = asyncio.create_task(
agent.call(bool, "巴黎是法国的首都吗?")
)
# 首先消费流以获取实时输出
async for chunk in stream:
print(chunk.content, end="", flush=True)
# chunk.role 是 'user'、'agent' 或 'system'
# 然后等待结果
final = await result
MCP集成
from agentica import spawn, agentic
# 通过配置文件
agent = await spawn(
premise="使用工具的代理",
mcp="path/to/mcp_config.json"
)
@agentic(mcp="path/to/mcp_config.json")
async def tool_user(query: str) -> str:
"""使用MCP工具。"""
...
mcp_config.json 格式:
{
"mcpServers": {
"tavily-remote-mcp": {
"command": "npx -y mcp-remote https://mcp.tavily.com/mcp/?tavilyApiKey=<key>",
"env": {}
}
}
}
日志记录
默认行为
- 以颜色打印到标准输出
- 写入
./logs/agent-<id>.log
上下文日志记录
from agentica.logging.loggers import FileLogger, PrintLogger
from agentica.logging.agent_logger import NoLogging
# 仅文件
with FileLogger():
agent = await spawn(premise="调试代理")
await agent.call(int, "计算")
# 静默
with NoLogging():
agent = await spawn(premise="静默代理")
每代理日志记录
# 监听器在 agent_listener 子模块中(不从 agentica.logging 导出)
from agentica.logging.agent_listener import (
PrintOnlyListener, # 仅控制台输出
FileOnlyListener, # 仅文件日志记录
StandardListener, # 控制台 + 文件(默认)
NoopListener, # 静默 - 无日志记录
)
agent = await spawn(
premise="自定义日志记录",
listener=PrintOnlyListener
)
# 静默代理
agent = await spawn(
premise="静默代理",
listener=NoopListener
)
全局配置
from agentica.logging.agent_listener import (
set_default_agent_listener,
get_default_agent_listener,
PrintOnlyListener,
)
set_default_agent_listener(PrintOnlyListener)
set_default_agent_listener(None) # 禁用所有
错误处理
from agentica.errors import (
AgenticaError, # 所有SDK错误的基础
RateLimitError, # 速率限制
InferenceError, # 推理的HTTP错误
MaxTokensError, # 令牌限制超出
MaxRoundsError, # 最大推理轮次超出
ContentFilteringError, # 内容过滤
APIConnectionError, # 网络问题
APITimeoutError, # 请求超时
InsufficientCreditsError,# 积分不足
OverloadedError, # 服务器过载
ServerError, # 通用服务器错误
)
try:
result = await agent.call(str, "做某事")
except RateLimitError:
await asyncio.sleep(60)
result = await agent.call(str, "做某事")
except MaxTokensError:
# 减少作用域或增加限制
pass
except ContentFilteringError:
# 内容被过滤
pass
except InferenceError as e:
logger.error(f"推理失败: {e}")
except AgenticaError as e:
logger.error(f"SDK错误: {e}")
自定义异常
class DataValidationError(Exception):
"""无效输入数据。"""
pass
@agentic(DataValidationError) # 传递异常类型
async def analyze(data: str) -> dict:
"""
分析数据。
引发:
DataValidationError: 如果数据格式错误
"""
...
try:
result = await analyze(raw_data)
except DataValidationError as e:
logger.warning(f"无效: {e}")
多代理模式
自定义代理类
from agentica.agent import Agent
class ResearchAgent:
def __init__(self, web_search_fn):
self._brain = Agent(
premise="研究助手。",
scope={"web_search": web_search_fn}
)
async def research(self, topic: str) -> str:
return await self._brain(str, f"研究: {topic}")
async def summarize(self, text: str) -> str:
return await self._brain(str, f"总结: {text}")
代理协调
class LeadResearcher:
def __init__(self):
self._brain = Agent(
premise="跨子代理协调研究。",
scope={"SubAgent": ResearchAgent}
)
async def __call__(self, query: str) -> str:
return await self._brain(str, query)
lead = LeadResearcher()
report = await lead("研究2025年AI代理框架")
追踪与调试
OpenTelemetry追踪
from agentica import initialize_tracing
# 初始化追踪(返回 TracerProvider)
tracer = initialize_tracing(
service_name="my-agent-app",
environment="development", # 可选
tempo_endpoint="http://localhost:4317", # 可选: Grafana Tempo
organization_id="my-org", # 可选
log_level="INFO", # DEBUG, INFO, WARNING, ERROR
instrument_httpx=False, # 可选: 追踪HTTP调用
)
SDK调试日志记录
from agentica import enable_sdk_logging
# 启用内部SDK日志(用于调试SDK本身)
disable_fn = enable_sdk_logging(log_tags="1")
# ... 运行代理 ...
disable_fn() # 完成后禁用
顶级导出
# 从 agentica 主要导入
from agentica import (
# 核心
Agent, # 同步代理类
agentic, # @agentic 装饰器
spawn, # 异步代理创建
# 配置
ModelStrings, # 模型字符串类型提示
AgenticFunction, # 代理函数类型
# 令牌跟踪
last_usage, # 获取上次调用的令牌使用
total_usage, # 获取累积令牌使用
# 追踪/日志记录
initialize_tracing, # OpenTelemetry 设置
enable_sdk_logging, # SDK 调试日志
# 版本
__version__, # "0.3.1"
)
检查清单
在使用 Agentica 之前:
- [ ] 使用
@agentic()的函数必须是async - [ ]
spawn()返回可等待对象 - 使用await spawn(...) - [ ]
agent.call()是可等待的 - 使用await agent.call(...) - [ ]
call()的第一个参数是返回类型,第二个是提示字符串 - [ ] 在
@agentic中使用persist=True以保留对话记忆 - [ ] 在同步
__init__中使用Agent()(而不是spawn()) - [ ] 在文档字符串中记录异常以供代理引发
- [ ] 从
agentica.logging.agent_listener导入监听器(而不是agentica.logging)