自主代理模式Skill autonomous-agent-patterns

本技能提供构建自主编码代理的设计模式,包括工具集成、权限系统、浏览器自动化和人在环工作流。适用于AI代理开发、工具API设计、权限系统实现和自主编码助手创建。关键词:自主代理、AI代理、设计模式、工具调用、安全权限、浏览器自动化、上下文管理。

AI智能体 0 次安装 0 次浏览 更新于 3/21/2026

名称: 自主代理模式 描述: “用于构建自主编码代理的设计模式。涵盖工具集成、权限系统、浏览器自动化和人在环工作流。适用于构建AI代理、设计工具API、实现权限系统或创建自主编码助手。”

🕹️ 自主代理模式

用于构建自主编码代理的设计模式,灵感来自ClineOpenAI Codex

何时使用此技能

在以下情况下使用此技能:

  • 构建自主AI代理
  • 设计工具/函数调用API
  • 实现权限和审批系统
  • 为代理创建浏览器自动化
  • 设计人在环工作流

1. 核心代理架构

1.1 代理循环

┌─────────────────────────────────────────────────────────────┐
│                     代理循环                                  │
│                                                              │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐              │
│  │  思考    │───▶│  决策    │───▶│   行动   │              │
│  │ (推理)   │    │ (计划)   │    │ (执行)   │              │
│  └──────────┘    └──────────┘    └──────────┘              │
│       ▲                               │                     │
│       │         ┌──────────┐          │                     │
│       └─────────│  观察    │◀─────────┘                     │
│                 │ (结果)    │                                │
│                 └──────────┘                                │
└─────────────────────────────────────────────────────────────┘
class AgentLoop:
    def __init__(self, llm, tools, max_iterations=50):
        self.llm = llm
        self.tools = {t.name: t for t in tools}
        self.max_iterations = max_iterations
        self.history = []

    def run(self, task: str) -> str:
        self.history.append({"role": "user", "content": task})

        for i in range(self.max_iterations):
            # 思考:获取LLM响应和工具选项
            response = self.llm.chat(
                messages=self.history,
                tools=self._format_tools(),
                tool_choice="auto"
            )

            # 决策:检查代理是否想使用工具
            if response.tool_calls:
                for tool_call in response.tool_calls:
                    # 行动:执行工具
                    result = self._execute_tool(tool_call)

                    # 观察:将结果添加到历史记录
                    self.history.append({
                        "role": "tool",
                        "tool_call_id": tool_call.id,
                        "content": str(result)
                    })
            else:
                # 没有工具调用 = 任务完成
                return response.content

        return "达到最大迭代次数"

    def _execute_tool(self, tool_call) -> Any:
        tool = self.tools[tool_call.name]
        args = json.loads(tool_call.arguments)
        return tool.execute(**args)

1.2 多模型架构

class MultiModelAgent:
    """
    使用不同模型用于不同目的:
    - 快速模型用于规划
    - 强大模型用于复杂推理
    - 专用模型用于代码生成
    """

    def __init__(self):
        self.models = {
            "fast": "gpt-3.5-turbo",      # 快速决策
            "smart": "gpt-4-turbo",        # 复杂推理
            "code": "claude-3-sonnet",     # 代码生成
        }

    def select_model(self, task_type: str) -> str:
        if task_type == "planning":
            return self.models["fast"]
        elif task_type == "analysis":
            return self.models["smart"]
        elif task_type == "code":
            return self.models["code"]
        return self.models["smart"]

2. 工具设计模式

2.1 工具模式

class Tool:
    """代理工具的基础类"""

    @property
    def schema(self) -> dict:
        """工具的JSON模式"""
        return {
            "name": self.name,
            "description": self.description,
            "parameters": {
                "type": "object",
                "properties": self._get_parameters(),
                "required": self._get_required()
            }
        }

    def execute(self, **kwargs) -> ToolResult:
        """执行工具并返回结果"""
        raise NotImplementedError

class ReadFileTool(Tool):
    name = "read_file"
    description = "从文件系统读取文件内容"

    def _get_parameters(self):
        return {
            "path": {
                "type": "string",
                "description": "文件的绝对路径"
            },
            "start_line": {
                "type": "integer",
                "description": "开始读取的行号(1索引)"
            },
            "end_line": {
                "type": "integer",
                "description": "停止读取的行号(包含)"
            }
        }

    def _get_required(self):
        return ["path"]

    def execute(self, path: str, start_line: int = None, end_line: int = None) -> ToolResult:
        try:
            with open(path, 'r') as f:
                lines = f.readlines()

            if start_line and end_line:
                lines = lines[start_line-1:end_line]

            return ToolResult(
                success=True,
                output="".join(lines)
            )
        except FileNotFoundError:
            return ToolResult(
                success=False,
                error=f"文件未找到: {path}"
            )

2.2 基本代理工具

CODING_AGENT_TOOLS = {
    # 文件操作
    "read_file": "读取文件内容",
    "write_file": "创建或覆盖文件",
    "edit_file": "对文件进行目标编辑",
    "list_directory": "列出文件和文件夹",
    "search_files": "按模式搜索文件",

    # 代码理解
    "search_code": "搜索代码模式(grep)",
    "get_definition": "查找函数/类定义",
    "get_references": "查找符号的所有引用",

    # 终端
    "run_command": "执行shell命令",
    "read_output": "读取命令输出",
    "send_input": "向运行中的命令发送输入",

    # 浏览器(可选)
    "open_browser": "在浏览器中打开URL",
    "click_element": "点击页面元素",
    "type_text": "在输入中键入文本",
    "screenshot": "捕获屏幕截图",

    # 上下文
    "ask_user": "向用户提问",
    "search_web": "在网络上搜索信息"
}

2.3 编辑工具设计

class EditFileTool(Tool):
    """
    带有冲突检测的精确文件编辑。
    使用搜索/替换模式进行可靠编辑。
    """

    name = "edit_file"
    description = "通过替换特定内容来编辑文件"

    def execute(
        self,
        path: str,
        search: str,
        replace: str,
        expected_occurrences: int = 1
    ) -> ToolResult:
        """
        参数:
            path: 要编辑的文件
            search: 要查找的精确文本(必须完全匹配,包括空格)
            replace: 要替换的文本
            expected_occurrences: 搜索应出现的次数(验证)
        """
        with open(path, 'r') as f:
            content = f.read()

        # 验证
        actual_occurrences = content.count(search)
        if actual_occurrences != expected_occurrences:
            return ToolResult(
                success=False,
                error=f"预期 {expected_occurrences} 次出现,找到 {actual_occurrences}"
            )

        if actual_occurrences == 0:
            return ToolResult(
                success=False,
                error="在文件中未找到搜索文本"
            )

        # 应用编辑
        new_content = content.replace(search, replace)

        with open(path, 'w') as f:
            f.write(new_content)

        return ToolResult(
            success=True,
            output=f"替换了 {actual_occurrences} 次出现"
        )

3. 权限与安全模式

3.1 权限级别

class PermissionLevel(Enum):
    # 完全自动 - 无需用户批准
    AUTO = "auto"

    # 每次会话询问一次
    ASK_ONCE = "ask_once"

    # 每次询问
    ASK_EACH = "ask_each"

    # 从不允许
    NEVER = "never"

PERMISSION_CONFIG = {
    # 低风险 - 可自动批准
    "read_file": PermissionLevel.AUTO,
    "list_directory": PermissionLevel.AUTO,
    "search_code": PermissionLevel.AUTO,

    # 中等风险 - 询问一次
    "write_file": PermissionLevel.ASK_ONCE,
    "edit_file": PermissionLevel.ASK_ONCE,

    # 高风险 - 每次询问
    "run_command": PermissionLevel.ASK_EACH,
    "delete_file": PermissionLevel.ASK_EACH,

    # 危险 - 从不自动批准
    "sudo_command": PermissionLevel.NEVER,
    "format_disk": PermissionLevel.NEVER
}

3.2 批准UI模式

class ApprovalManager:
    def __init__(self, ui, config):
        self.ui = ui
        self.config = config
        self.session_approvals = {}

    def request_approval(self, tool_name: str, args: dict) -> bool:
        level = self.config.get(tool_name, PermissionLevel.ASK_EACH)

        if level == PermissionLevel.AUTO:
            return True

        if level == PermissionLevel.NEVER:
            self.ui.show_error(f"工具 '{tool_name}' 不被允许")
            return False

        if level == PermissionLevel.ASK_ONCE:
            if tool_name in self.session_approvals:
                return self.session_approvals[tool_name]

        # 显示批准对话框
        approved = self.ui.show_approval_dialog(
            tool=tool_name,
            args=args,
            risk_level=self._assess_risk(tool_name, args)
        )

        if level == PermissionLevel.ASK_ONCE:
            self.session_approvals[tool_name] = approved

        return approved

    def _assess_risk(self, tool_name: str, args: dict) -> str:
        """分析特定调用的风险级别"""
        if tool_name == "run_command":
            cmd = args.get("command", "")
            if any(danger in cmd for danger in ["rm -rf", "sudo", "chmod"]):
                return "高"
        return "中"

3.3 沙盒化

class SandboxedExecution:
    """
    在隔离环境中执行代码/命令
    """

    def __init__(self, workspace_dir: str):
        self.workspace = workspace_dir
        self.allowed_commands = ["npm", "python", "node", "git", "ls", "cat"]
        self.blocked_paths = ["/etc", "/usr", "/bin", os.path.expanduser("~")]

    def validate_path(self, path: str) -> bool:
        """确保路径在工作空间内"""
        real_path = os.path.realpath(path)
        workspace_real = os.path.realpath(self.workspace)
        return real_path.startswith(workspace_real)

    def validate_command(self, command: str) -> bool:
        """检查命令是否被允许"""
        cmd_parts = shlex.split(command)
        if not cmd_parts:
            return False

        base_cmd = cmd_parts[0]
        return base_cmd in self.allowed_commands

    def execute_sandboxed(self, command: str) -> ToolResult:
        if not self.validate_command(command):
            return ToolResult(
                success=False,
                error=f"命令不被允许: {command}"
            )

        # 在隔离环境中执行
        result = subprocess.run(
            command,
            shell=True,
            cwd=self.workspace,
            capture_output=True,
            timeout=30,
            env={
                **os.environ,
                "HOME": self.workspace,  # 隔离主目录
            }
        )

        return ToolResult(
            success=result.returncode == 0,
            output=result.stdout.decode(),
            error=result.stderr.decode() if result.returncode != 0 else None
        )

4. 浏览器自动化

4.1 浏览器工具模式

class BrowserTool:
    """
    使用Playwright/Puppeteer进行代理的浏览器自动化。
    启用视觉调试和网络测试。
    """

    def __init__(self, headless: bool = True):
        self.browser = None
        self.page = None
        self.headless = headless

    async def open_url(self, url: str) -> ToolResult:
        """导航到URL并返回页面信息"""
        if not self.browser:
            self.browser = await playwright.chromium.launch(headless=self.headless)
            self.page = await self.browser.new_page()

        await self.page.goto(url)

        # 捕获状态
        screenshot = await self.page.screenshot(type='png')
        title = await self.page.title()

        return ToolResult(
            success=True,
            output=f"已加载: {title}",
            metadata={
                "screenshot": base64.b64encode(screenshot).decode(),
                "url": self.page.url
            }
        )

    async def click(self, selector: str) -> ToolResult:
        """点击元素"""
        try:
            await self.page.click(selector, timeout=5000)
            await self.page.wait_for_load_state("networkidle")

            screenshot = await self.page.screenshot()
            return ToolResult(
                success=True,
                output=f"已点击: {selector}",
                metadata={"screenshot": base64.b64encode(screenshot).decode()}
            )
        except TimeoutError:
            return ToolResult(
                success=False,
                error=f"未找到元素: {selector}"
            )

    async def type_text(self, selector: str, text: str) -> ToolResult:
        """在输入中键入文本"""
        await self.page.fill(selector, text)
        return ToolResult(success=True, output=f"已键入 {selector}")

    async def get_page_content(self) -> ToolResult:
        """获取页面的可访问文本内容"""
        content = await self.page.evaluate("""
            () => {
                // 获取可见文本
                const walker = document.createTreeWalker(
                    document.body,
                    NodeFilter.SHOW_TEXT,
                    null,
                    false
                );

                let text = '';
                while (walker.nextNode()) {
                    const node = walker.currentNode;
                    if (node.textContent.trim()) {
                        text += node.textContent.trim() + '\
';
                    }
                }
                return text;
            }
        """)
        return ToolResult(success=True, output=content)

4.2 视觉代理模式

class VisualAgent:
    """
    使用屏幕截图来理解网页的代理。
    可以在没有选择器的情况下视觉识别元素。
    """

    def __init__(self, llm, browser):
        self.llm = llm
        self.browser = browser

    async def describe_page(self) -> str:
        """使用视觉模型描述当前页面"""
        screenshot = await self.browser.screenshot()

        response = self.llm.chat([
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "描述这个网页。列出您看到的所有交互元素。"},
                    {"type": "image", "data": screenshot}
                ]
            }
        ])

        return response.content

    async def find_and_click(self, description: str) -> ToolResult:
        """通过视觉描述找到元素并点击它"""
        screenshot = await self.browser.screenshot()

        # 请求视觉模型找到元素
        response = self.llm.chat([
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": f"""
                        找到匹配的元素:"{description}"
                        返回近似坐标作为JSON:{{"x": number, "y": number}}
                        """
                    },
                    {"type": "image", "data": screenshot}
                ]
            }
        ])

        coords = json.loads(response.content)
        await self.browser.page.mouse.click(coords["x"], coords["y"])

        return ToolResult(success=True, output=f"在 ({coords['x']}, {coords['y']}) 处点击")

5. 上下文管理

5.1 上下文注入模式

class ContextManager:
    """
    管理提供给代理的上下文。
    灵感来自Cline的@-提及模式。
    """

    def __init__(self, workspace: str):
        self.workspace = workspace
        self.context = []

    def add_file(self, path: str) -> None:
        """@file - 将文件内容添加到上下文"""
        with open(path, 'r') as f:
            content = f.read()

        self.context.append({
            "type": "file",
            "path": path,
            "content": content
        })

    def add_folder(self, path: str, max_files: int = 20) -> None:
        """@folder - 添加文件夹中的所有文件"""
        for root, dirs, files in os.walk(path):
            for file in files[:max_files]:
                file_path = os.path.join(root, file)
                self.add_file(file_path)

    def add_url(self, url: str) -> None:
        """@url - 获取并添加URL内容"""
        response = requests.get(url)
        content = html_to_markdown(response.text)

        self.context.append({
            "type": "url",
            "url": url,
            "content": content
        })

    def add_problems(self, diagnostics: list) -> None:
        """@problems - 添加IDE诊断"""
        self.context.append({
            "type": "diagnostics",
            "problems": diagnostics
        })

    def format_for_prompt(self) -> str:
        """将所有上下文格式化以用于LLM提示"""
        parts = []
        for item in self.context:
            if item["type"] == "file":
                parts.append(f"## 文件: {item['path']}
```
{item['content']}
```")
            elif item["type"] == "url":
                parts.append(f"## URL: {item['url']}
{item['content']}")
            elif item["type"] == "diagnostics":
                parts.append(f"## 问题:
{json.dumps(item['problems'], indent=2)}")

        return "

".join(parts)

5.2 检查点/恢复

class CheckpointManager:
    """
    保存和恢复代理状态以进行长时间运行的任务。
    """

    def __init__(self, storage_dir: str):
        self.storage_dir = storage_dir
        os.makedirs(storage_dir, exist_ok=True)

    def save_checkpoint(self, session_id: str, state: dict) -> str:
        """保存当前代理状态"""
        checkpoint = {
            "timestamp": datetime.now().isoformat(),
            "session_id": session_id,
            "history": state["history"],
            "context": state["context"],
            "workspace_state": self._capture_workspace(state["workspace"]),
            "metadata": state.get("metadata", {})
        }

        path = os.path.join(self.storage_dir, f"{session_id}.json")
        with open(path, 'w') as f:
            json.dump(checkpoint, f, indent=2)

        return path

    def restore_checkpoint(self, checkpoint_path: str) -> dict:
        """从检查点恢复代理状态"""
        with open(checkpoint_path, 'r') as f:
            checkpoint = json.load(f)

        return {
            "history": checkpoint["history"],
            "context": checkpoint["context"],
            "workspace": self._restore_workspace(checkpoint["workspace_state"]),
            "metadata": checkpoint["metadata"]
        }

    def _capture_workspace(self, workspace: str) -> dict:
        """捕获相关工作空间状态"""
        # Git状态、文件哈希等。
        return {
            "git_ref": subprocess.getoutput(f"cd {workspace} && git rev-parse HEAD"),
            "git_dirty": subprocess.getoutput(f"cd {workspace} && git status --porcelain")
        }

6. MCP(模型上下文协议)集成

6.1 MCP服务器模式

from mcp import Server, Tool

class MCPAgent:
    """
    可以动态发现和使用MCP工具的代理。
    Cline中的'添加一个工具...'模式。
    """

    def __init__(self, llm):
        self.llm = llm
        self.mcp_servers = {}
        self.available_tools = {}

    def connect_server(self, name: str, config: dict) -> None:
        """连接到MCP服务器"""
        server = Server(config)
        self.mcp_servers[name] = server

        # 发现工具
        tools = server.list_tools()
        for tool in tools:
            self.available_tools[tool.name] = {
                "server": name,
                "schema": tool.schema
            }

    async def create_tool(self, description: str) -> str:
        """
        基于用户描述创建新的MCP服务器。
        '添加一个工具来获取Jira票据'
        """
        # 生成MCP服务器代码
        code = self.llm.generate(f"""
        创建一个Python MCP服务器,该服务器带有一个工具,用于:
        {description}

        使用FastMCP框架。包括适当的错误处理。
        仅返回Python代码。
        """)

        # 保存和安装
        server_name = self._extract_name(description)
        path = f"./mcp_servers/{server_name}/server.py"

        with open(path, 'w') as f:
            f.write(code)

        # 热重载
        self.connect_server(server_name, {"path": path})

        return f"已创建工具: {server_name}"

最佳实践清单

代理设计

  • [ ] 清晰的任务分解
  • [ ] 适当的工具粒度
  • [ ] 每个步骤的错误处理
  • [ ] 对用户的进度可见性

安全

  • [ ] 实现权限系统
  • [ ] 阻止危险操作
  • [ ] 不可信代码的沙盒化
  • [ ] 启用审计日志

用户体验

  • [ ] 批准UI清晰
  • [ ] 提供进度更新
  • [ ] 可用撤销/回滚
  • [ ] 行动解释

资源