name: agentic-development description: 构建具有Pydantic AI(Python)和Claude SDK(Node.js)的AI智能体
Agentic Development 技能
加载与:base.md + llm-patterns.md + [language].md
用于构建执行多步骤任务的工具的自主AI智能体。
来源: Claude Agent SDK | Anthropic Claude代码最佳实践 | Pydantic AI | Google Gemini智能体开发 | OpenAI构建智能体
按语言选择框架
| 语言/框架 | 默认 | 为什么 |
|---|---|---|
| Python | Pydantic AI | 类型安全,Pydantic验证,多模型,生产就绪 |
| Node.js / Next.js | Claude Agent SDK | 官方Anthropic SDK,工具,多智能体,原生流式处理 |
Python: Pydantic AI (默认)
from pydantic_ai import Agent
from pydantic import BaseModel
class SearchResult(BaseModel):
title: str
url: str
summary: str
agent = Agent(
'claude-sonnet-4-20250514',
result_type=list[SearchResult],
system_prompt='你是一名研究助理。',
)
# 类型安全的结果
result = await agent.run('查找关于AI智能体的文章')
for item in result.data:
print(f"{item.title}: {item.url}")
Node.js / Next.js: Claude Agent SDK (默认)
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
// 定义工具
const tools: Anthropic.Tool[] = [
{
name: "web_search",
description: "搜索网络信息",
input_schema: {
type: "object",
properties: {
query: { type: "string", description: "搜索查询" },
},
required: ["query"],
},
},
];
// Agentic循环
async function runAgent(prompt: string) {
const messages: Anthropic.MessageParam[] = [
{ role: "user", content: prompt },
];
while (true) {
const response = await client.messages.create({
model: "claude-sonnet-4-20250514",
max_tokens: 4096,
tools,
messages,
});
// 检查工具使用
if (response.stop_reason === "tool_use") {
const toolUse = response.content.find((b) => b.type === "tool_use");
if (toolUse) {
const result = await executeTool(toolUse.name, toolUse.input);
messages.push({ role: "assistant", content: response.content });
messages.push({
role: "user",
content: [{ type: "tool_result", tool_use_id: toolUse.id, content: result }],
});
continue;
}
}
// 完成 - 返回最终响应
return response.content.find((b) => b.type === "text")?.text;
}
}
核心原则
先计划,逐步行动,总是验证。
研究和计划后再执行的智能体始终优于直接行动的智能体。将复杂任务分解为可验证的步骤,谨慎使用工具,并在执行过程中保持清晰的州。
智能体架构
三个组件(OpenAI)
┌─────────────────────────────────────────────────┐
│ 智能体 │
├─────────────────────────────────────────────────┤
│ 模型(大脑) │ LLM用于推理和 │
│ │ 决策制定 │
├─────────────────────┼───────────────────────────┤
│ 工具(臂/腿) │ APIs,函数,外部│
│ │ 系统用于行动 │
├─────────────────────┼───────────────────────────┤
│ 指令 │ 系统提示定义 │
│ (规则) │ 行为和边界 │
└─────────────────────┴───────────────────────────┘
项目结构
project/
├── src/
│ ├── agents/
│ │ ├── orchestrator.ts # 主智能体协调器
│ │ ├── specialized/ # 特定任务智能体
│ │ │ ├── researcher.ts
│ │ │ ├── coder.ts
│ │ │ └── reviewer.ts
│ │ └── base.ts # 共享智能体接口
│ ├── tools/
│ │ ├── definitions/ # 工具模式
│ │ ├── implementations/ # 工具逻辑
│ │ └── registry.ts # 工具发现
│ ├── prompts/
│ │ ├── system/ # 智能体指令
│ │ └── templates/ # 任务模板
│ └── memory/
│ ├── conversation.ts # 短期上下文
│ └── persistent.ts # 长期存储
├── tests/
│ ├── agents/ # 智能体行为测试
│ ├── tools/ # 工具单元测试
│ └── evals/ # 端到端评估
└── skills/ # 智能体技能(Anthropic模式)
├── skill-name/
│ ├── instructions.md
│ ├── scripts/
│ └── resources/
工作流程模式:探索-计划-执行-验证
1. 探索阶段
// 在行动前收集上下文
async function explore(task: Task): Promise<Context> {
const relevantFiles = await agent.searchCodebase(task.query);
const existingPatterns = await agent.analyzePatterns(relevantFiles);
const dependencies = await agent.identifyDependencies(task);
return { relevantFiles, existingPatterns, dependencies };
}
2. 计划阶段(关键)
// 在执行前明确计划
async function plan(task: Task, context: Context): Promise<Plan> {
const prompt = `
任务:${task.description}
上下文:${JSON.stringify(context)}
创建一个分步计划。对于每个步骤:
1. 采取什么行动
2. 使用什么工具
3. 如何验证成功
4. 可能出错的地方
输出包含步骤数组的JSON。
`;
return await llmCall({ prompt, schema: PlanSchema });
}
3. 执行阶段
// 带验证的执行
async function execute(plan: Plan): Promise<Result[]> {
const results: Result[] = [];
for (const step of plan.steps) {
// 执行单个步骤
const result = await executeStep(step);
// 验证后再继续
if (!await verify(step, result)) {
// 自我纠正或升级
const corrected = await selfCorrect(step, result);
if (!corrected.success) {
return handleFailure(step, results);
}
}
results.push(result);
}
return results;
}
4. 验证阶段
// 独立验证防止过度拟合
async function verify(step: Step, result: Result): Promise<boolean> {
// 如果有,运行测试
if (step.testCommand) {
const testResult = await runCommand(step.testCommand);
if (!testResult.success) return false;
}
// 使用LLM验证标准
const verification = await llmCall({
prompt: `
步骤:${step.description}
预期:${step.successCriteria}
实际:${JSON.stringify(result)}
结果是否满足成功标准?
用{ "passes": boolean, "reasoning": string }回应
`,
schema: VerificationSchema
});
return verification.passes;
}
工具设计
工具定义模式
// tools/definitions/file-operations.ts
import { z } from 'zod';
export const ReadFileTool = {
name: 'read_file',
description: '读取文件内容。在修改任何文件之前使用。',
parameters: z.object({
path: z.string().describe('文件的绝对路径'),
startLine: z.number().optional().describe('开始行(1-索引)'),
endLine: z.number().optional().describe('结束行(1-索引)'),
}),
// 风险等级用于护栏(OpenAI模式)
riskLevel: 'low' as const,
};
export const WriteFileTool = {
name: 'write_file',
description: '写入文件内容。始终先读取以了解上下文。',
parameters: z.object({
path: z.string().describe('文件的绝对路径'),
content: z.string().describe('完整的文件内容'),
}),
riskLevel: 'medium' as const,
// 高风险操作需要确认
requiresConfirmation: true,
};
工具实现
// tools/implementations/file-operations.ts
export async function readFile(
params: z.infer<typeof ReadFileTool.parameters>
): Promise<ToolResult> {
try {
const content = await fs.readFile(params.path, 'utf-8');
const lines = content.split('
');
const start = (params.startLine ?? 1) - 1;
const end = params.endLine ?? lines.length;
return {
success: true,
data: lines.slice(start, end).join('
'),
metadata: { totalLines: lines.length }
};
} catch (error) {
return {
success: false,
error: `Failed to read file: ${error.message}`
};
}
}
优先使用内置工具(OpenAI)
// 使用平台提供的工具时可用
const agent = createAgent({
tools: [
// 内置工具(由平台处理)
{ type: 'web_search' },
{ type: 'code_interpreter' },
// 仅在需要时自定义工具
{ type: 'function', function: customDatabaseTool },
],
});
多智能体模式
单一智能体(默认)
对于大多数任务,使用一个智能体。多个智能体增加复杂性。
智能体作为工具模式(OpenAI)
// 将专业智能体作为可调用工具
const researchAgent = createAgent({
name: 'researcher',
instructions: '你研究主题并返回结构化发现。',
tools: [webSearchTool, documentReadTool],
});
const mainAgent = createAgent({
tools: [
{
type: 'function',
function: {
name: 'research_topic',
description: '委托研究给专业智能体',
parameters: ResearchQuerySchema,
handler: async (query) => researchAgent.run(query),
},
},
],
});
交接模式(OpenAI)
// 智能体之间的单向转移
const customerServiceAgent = createAgent({
tools: [
// 需要时移交给专家
{
name: 'transfer_to_billing',
description: '转移至账单专家处理支付问题',
handler: async (context) => {
return { handoff: 'billing_agent', context };
},
},
],
});
使用多个智能体的时候
- 具有不重叠工具的不同任务域
- 需要不同的授权级别
- 具有清晰的交接点的复杂工作流程
- 并行执行独立子任务
记忆与状态
对话记忆
// memory/conversation.ts
interface ConversationMemory {
messages: Message[];
maxTokens: number;
add(message: Message): void;
getContext(): Message[];
summarize(): Promise<string>;
}
// 在工具调用之间维护状态(Gemini模式)
interface AgentState {
thoughtSignature?: string; // 加密推理状态
conversationId: string; // 用于共享记忆
currentPlan?: Plan;
completedSteps: Step[];
}
持久记忆
// memory/persistent.ts
interface PersistentMemory {
// 跨会话存储学习
store(key: string, value: any): Promise<void>;
retrieve(key: string): Promise<any>;
// 过去交互的语义搜索
search(query: string, limit: number): Promise<Memory[]>;
}
护栏与安全
多层保护(OpenAI)
// guards/index.ts
interface GuardrailConfig {
// 输入验证
inputClassifier: (input: string) => Promise<SafetyResult>;
// 输出验证
outputValidator: (output: string) => Promise<SafetyResult>;
// 工具风险评估
toolRiskLevels: Record<string, 'low' | 'medium' | 'high'>;
// 需要人工批准的操作
humanInTheLoop: string[];
}
async function executeWithGuardrails(
agent: Agent,
input: string,
config: GuardrailConfig
): Promise<Result> {
// 1. 检查输入安全
const inputCheck = await config.inputClassifier(input);
if (!inputCheck.safe) {
return { blocked: true, reason: inputCheck.reason };
}
// 2. 执行工具监控
const result = await agent.run(input, {
beforeTool: async (tool, params) => {
const risk = config.toolRiskLevels[tool.name];
if (risk === 'high' || config.humanInTheLoop.includes(tool.name)) {
return await requestHumanApproval(tool, params);
}
return { approved: true };
},
});
// 3. 验证输出
const outputCheck = await config.outputValidator(result.output);
if (!outputCheck.safe) {
return { blocked: true, reason: outputCheck.reason };
}
return result;
}
范围执行(OpenAI)
// 智能体必须在定义范围内
const agentInstructions = `
你是Acme公司的客服智能体。
范围边界(不可协商):
- 仅回答关于Acme产品和服务的问题
- 永远不要提供法律,医疗或财务建议
- 永远不要访问或修改数据超出你的授权范围
- 如果请求超出范围,请礼貌拒绝并解释原因
如果你不能在范围内完成任务,请通知用户
并在继续之前请求明确的批准。
`;
模型选择
匹配任务到模型
| 任务复杂性 | 推荐模型 | 注释 |
|---|---|---|
| 简单,快速 | gpt-5-mini, claude-haiku | 低延迟 |
| 通用 | gpt-4.1, claude-sonnet | 平衡 |
| 复杂推理 | o4-mini, claude-opus | 更高准确性 |
| 深度规划 | gpt-5 + 推理,ultrathink | 最大能力 |
Gemini特定
// 使用thinking_level进行推理深度
const response = await gemini.generate({
model: 'gemini-3',
thinking_level: 'high', // 用于复杂规划
temperature: 1.0, // 优化为推理引擎
});
// 在工具调用之间保持思考状态
const nextResponse = await gemini.generate({
thoughtSignature: response.thoughtSignature, // 函数调用必需
// ... 其余参数
});
Claude特定(思考模式)
// 使用关键词触发扩展思考
const thinkingLevels = {
'think': '标准分析',
'think hard': '更深层次的推理',
'think harder': '广泛分析',
'ultrathink': '最大推理预算',
};
const prompt = `
在提出解决方案之前,仔细思考这个问题。
任务:${task.description}
`;
测试智能体
单元测试(工具)
describe('readFile tool', () => {
it('reads file content correctly', async () => {
const result = await readFile({ path: '/test/file.txt' });
expect(result.success).toBe(true);
expect(result.data).toContain('expected content');
});
});
行为测试(智能体决策)
describe('agent planning', () => {
it('creates plan before executing file modifications', async () => {
const trace = await agent.runWithTrace('Refactor the auth module');
// 验证计划首先发生
const firstToolCall = trace.toolCalls[0];
expect(firstToolCall.name).toBe('read_file');
// 验证没有读取就没有写入
const writeIndex = trace.toolCalls.findIndex(t => t.name === 'write_file');
const readIndex = trace.toolCalls.findIndex(t => t.name === 'read_file');
expect(readIndex).toBeLessThan(writeIndex);
});
});
评估测试
// 每晚运行,不在常规CI中
describe('Agent Accuracy (Eval)', () => {
const testCases = loadTestCases('./evals/coding-tasks.json');
it.each(testCases)('completes $name correctly', async (testCase) => {
const result = await agent.run(testCase.input);
// 验证预期结果
expect(result.filesModified).toEqual(testCase.expectedFiles);
expect(await runTests(testCase.testCommand)).toBe(true);
}, 120000);
});
Pydantic AI模式(Python默认)
项目结构(Python)
project/
├── src/
│ ├── agents/
│ │ ├── __init__.py
│ │ ├── researcher.py # 研究智能体
│ │ ├── coder.py # 编码智能体
│ │ └── orchestrator.py # 主协调器
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── web.py # 网络搜索工具
│ │ ├── files.py # 文件操作
│ │ └── database.py # 数据库查询
│ ├── models/
│ │ ├── __init__.py
│ │ └── schemas.py # Pydantic模型
│ └── deps.py # 依赖项
├── tests/
│ ├── test_agents.py
│ └── test_tools.py
└── pyproject.toml
智能体与工具
from pydantic_ai import Agent, RunContext
from pydantic import BaseModel
from httpx import AsyncClient
class SearchResult(BaseModel):
title: str
url: str
snippet: str
class ResearchDeps(BaseModel):
http_client: AsyncClient
api_key: str
research_agent = Agent(
'claude-sonnet-4-20250514',
deps_type=ResearchDeps,
result_type=list[SearchResult],
system_prompt='你是一名研究助理。使用工具查找信息。',
)
@research_agent.tool
async def web_search(ctx: RunContext[ResearchDeps], query: str) -> list[dict]:
"""搜索网络信息。"""
response = await ctx.deps.http_client.get(
'https://api.search.com/search',
params={'q': query},
headers={'Authorization': f'Bearer {ctx.deps.api_key}'},
)
return response.json()['results']
@research_agent.tool
async def read_webpage(ctx: RunContext[ResearchDeps], url: str) -> str:
"""读取并提取网页内容。"""
response = await ctx.deps.http_client.get(url)
return response.text[:5000] # 截断以适应上下文
# 用法
async def main():
async with AsyncClient() as client:
deps = ResearchDeps(http_client=client, api_key='...')
result = await research_agent.run(
'查找关于LLM智能体的最新文章',
deps=deps,
)
for item in result.data:
print(f"- {item.title}")
结构化输出与验证
from pydantic import BaseModel, Field
from pydantic_ai import Agent
class CodeReview(BaseModel):
summary: str = Field(description="简要总结审查")
issues: list[str] = Field(description="发现的问题列表")
suggestions: list[str]