MCP协议专家Skill mcp

这个技能专注于Model Context Protocol (MCP) 的实现,用于构建安全的AI工具集成系统,连接AI助手与外部资源。它覆盖了MCP服务器和客户端的开发、工具注册与执行、传输层配置以及安全加固,适用于AI智能体、自动化工具和系统集成场景。关键词:MCP, AI助手, 工具集成, 安全协议, 性能优化, 测试驱动开发, 模型上下文协议, AI集成安全

AI智能体 0 次安装 0 次浏览 更新于 3/15/2026

模型上下文协议(MCP)技能

名称:mcp-protocol-expert
风险等级:高
描述:精通模型上下文协议的服务器/客户端实现、工具注册、传输层和安全的MCP集成
版本:1.1.0
作者:JARVIS AI助理
标签:[协议,mcp,ai集成,工具,传输]

1. 概述

风险等级:中风险

理由:MCP实现处理AI工具执行、进程间通信,并能访问敏感系统资源。安全漏洞可能导致未授权的工具执行、数据泄露和提示注入攻击。

您是模型上下文协议(MCP) 的专家——这是一个标准化协议,用于将AI助手连接到外部工具、资源和数据源。您实现安全、高性能的MCP服务器和客户端,具有适当的验证、授权和错误处理。

核心原则

  1. 测试驱动开发优先 - 为所有MCP工具和处理程序先写测试,再实现
  2. 性能意识 - 优化连接重用、缓存和资源清理
  3. 默认安全 - 验证所有输入、授权所有操作、保护所有资源
  4. 最小权限原则 - 工具只访问所需内容

核心专长

  • MCP服务器和客户端实现
  • 工具注册和能力暴露
  • 传输层配置(stdio、HTTP、WebSocket)
  • 资源和提示管理
  • 工具执行的安全加固

主要用例

  • 构建MCP服务器以向AI助手暴露工具
  • 实现MCP客户端以消费工具
  • 安全工具执行和授权
  • 传输层选择和配置

文件组织:主要概念在此;复杂实现请见references/advanced-patterns.md,CVE缓解请见references/security-examples.md


2. 实现工作流(测试驱动开发)

所有MCP实现遵循此工作流:

步骤1:先写失败的测试

# tests/test_mcp_server.py
import pytest
from unittest.mock import AsyncMock, patch
from mcp.server import Server
from myserver.tools import create_file_reader_tool

class TestFileReaderTool:
    """在实现前测试MCP工具。"""

    @pytest.fixture
    def server(self):
        return Server("test-server")

    @pytest.mark.asyncio
    async def test_read_file_returns_content(self, server, tmp_path):
        """工具应返回文件内容。"""
        test_file = tmp_path / "test.txt"
        test_file.write_text("Hello, MCP!")

        tool = create_file_reader_tool(allowed_dir=str(tmp_path))
        result = await tool.execute({"path": str(test_file)})

        assert result.content[0].text == "Hello, MCP!"

    @pytest.mark.asyncio
    async def test_rejects_path_traversal(self, server, tmp_path):
        """工具应拒绝路径遍历尝试。"""
        tool = create_file_reader_tool(allowed_dir=str(tmp_path))

        with pytest.raises(ValueError, match="路径遍历"):
            await tool.execute({"path": "../../../etc/passwd"})

    @pytest.mark.asyncio
    async def test_rejects_unauthorized_directory(self, server, tmp_path):
        """工具应拒绝访问允许目录之外。"""
        tool = create_file_reader_tool(allowed_dir=str(tmp_path))

        with pytest.raises(PermissionError, match="访问被拒绝"):
            await tool.execute({"path": "/etc/passwd"})

步骤2:实现最少代码以通过测试

# myserver/tools.py
from pathlib import Path
from mcp.types import TextContent

def create_file_reader_tool(allowed_dir: str):
    """创建一个安全的文件读取工具。"""
    base_path = Path(allowed_dir).resolve()

    async def execute(arguments: dict) -> dict:
        path = arguments.get("path", "")

        # 验证路径遍历
        if ".." in path:
            raise ValueError("不允许路径遍历")

        file_path = Path(path).resolve()

        # 验证目录访问
        if not str(file_path).startswith(str(base_path)):
            raise PermissionError("访问被拒绝")

        content = file_path.read_text()
        return {"content": [TextContent(type="text", text=content)]}

    return type("Tool", (), {"execute": execute})()

步骤3:如有需要重构

在保持测试通过的同时添加缓存、连接池或额外验证。

步骤4:运行全面验证

# 运行所有MCP测试
pytest tests/test_mcp_server.py -v

# 运行覆盖率
pytest --cov=myserver --cov-report=term-missing

# 运行安全特定测试
pytest tests/ -k "安全或注入或遍历" -v

3. 性能模式

3.1 连接重用

# 坏:每个请求创建新连接
async def call_tool(name: str, args: dict):
    client = MCPClient()  # 每次都新连接
    await client.connect()
    result = await client.call_tool(name, args)
    await client.disconnect()
    return result

# 好:使用连接池重用连接
class MCPClientPool:
    def __init__(self, max_connections: int = 10):
        self._pool: asyncio.Queue = asyncio.Queue(maxsize=max_connections)
        self._created = 0
        self._max = max_connections

    async def acquire(self) -> MCPClient:
        if self._pool.empty() and self._created < self._max:
            client = MCPClient()
            await client.connect()
            self._created += 1
            return client
        return await self._pool.get()

    async def release(self, client: MCPClient):
        await self._pool.put(client)

3.2 响应缓存

# 坏:重复请求无缓存
@app.call_tool()
async def list_resources(arguments: dict):
    return await fetch_resources()  # 总是命中后端

# 好:使用TTL缓存响应
from functools import lru_cache
from cachetools import TTLCache

class CachedMCPServer:
    def __init__(self):
        self._cache = TTLCache(maxsize=100, ttl=300)  # 5分钟TTL

    async def list_resources(self, arguments: dict):
        cache_key = f"resources:{arguments.get('type', 'all')}"

        if cache_key in self._cache:
            return self._cache[cache_key]

        result = await self._fetch_resources(arguments)
        self._cache[cache_key] = result
        return result

3.3 批量操作

# 坏:逐个处理项目
async def process_files(file_paths: list[str]):
    results = []
    for path in file_paths:
        result = await read_file(path)  # 顺序执行
        results.append(result)
    return results

# 好:并发控制的批量处理
import asyncio

async def process_files_batch(file_paths: list[str], max_concurrent: int = 5):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def read_with_limit(path: str):
        async with semaphore:
            return await read_file(path)

    return await asyncio.gather(*[read_with_limit(p) for p in file_paths])

3.4 流式响应

# 坏:将整个响应加载到内存
async def read_large_file(path: str):
    with open(path, 'r') as f:
        return f.read()  # 大文件时内存峰值

# 好:分块流式响应
async def stream_large_file(path: str):
    async def generate():
        async with aiofiles.open(path, 'r') as f:
            while chunk := await f.read(8192):
                yield TextContent(type="text", text=chunk)

    return StreamingResponse(generate())

3.5 资源清理

# 坏:错误时资源可能泄漏
async def execute_tool(name: str, args: dict):
    conn = await get_db_connection()
    result = await conn.execute(args["query"])  # 错误后连接保持打开
    return result

# 好:始终使用上下文管理器清理
async def execute_tool(name: str, args: dict):
    async with get_db_connection() as conn:
        result = await conn.execute(args["query"])
        return result

# 好:使用try/finally显式清理
async def execute_with_timeout(tool_func, timeout: int = 5000):
    task = asyncio.create_task(tool_func())
    try:
        return await asyncio.wait_for(task, timeout=timeout/1000)
    except asyncio.TimeoutError:
        task.cancel()
        raise TimeoutError(f"工具执行超时{timeout}ms")
    finally:
        if not task.done():
            task.cancel()

4. 核心职责

基本职责

  1. 安全工具实现 - 暴露具有适当输入验证和授权的工具
  2. 传输安全 - 实现带加密的适当传输层
  3. 资源保护 - 控制对文件、数据库和系统资源的访问
  4. 错误遏制 - 处理错误而不暴露敏感信息

5. 技术基础

版本推荐

组件 LTS/稳定版 最新版 最低版
MCP协议 1.0.x 1.1.x 0.9.x
TypeScript SDK 0.6.x 0.7.x 0.5.x
Python SDK 1.1.x 1.2.x 1.0.x

基本导入

# Python
from mcp.server import Server
from mcp.server.stdio import stdio_server
from pydantic import BaseModel, validator
import asyncio
import pytest

6. 实现模式

6.1 安全MCP服务器设置

app = Server("secure-server")

class FileReadArgs(BaseModel):
    path: str

    @validator("path")
    def validate_path(cls, v):
        if ".." in v:
            raise ValueError("不允许路径遍历")
        if not v.startswith("/allowed/"):
            raise ValueError("无效目录")
        return v

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name != "read_file":
        raise ValueError("未知工具")

    args = FileReadArgs(**arguments)
    content = await asyncio.wait_for(
        read_file_secure(args.path), timeout=5.0
    )
    return [TextContent(type="text", text=content)]

6.2 带授权的工具注册

class DatabaseQueryArgs(BaseModel):
    query: str
    database: str

    @validator("query")
    def validate_query(cls, v):
        forbidden = ["DROP", "DELETE", "TRUNCATE", "ALTER", "GRANT"]
        if any(word in v.upper() for word in forbidden):
            raise ValueError("禁止的SQL操作")
        return v

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    args = DatabaseQueryArgs(**arguments)
    if not await check_user_permission(args.database):
        raise PermissionError("访问被拒绝")
    return [TextContent(type="text", text=str(await execute_readonly_query(args.database, args.query)))]

7. 安全标准

漏洞概况

漏洞 严重性 缓解措施
提示注入 严重 验证所有输入,消毒输出
工具参数注入 模式验证,白名单
路径遍历 限制到基础目录

输入验证层

from pydantic import BaseModel, validator, constr
import re

class CommandArgs(BaseModel):
    command: constr(max_length=100)
    args: list[constr(max_length=200)]
    timeout: int

    @validator("command")
    def validate_command(cls, v):
        allowed = ["list", "read", "search"]
        if v not in allowed:
            raise ValueError("无效命令")
        return v

    @validator("timeout")
    def validate_timeout(cls, v):
        if not 100 <= v <= 30000:
            raise ValueError("超时必须为100-30000ms")
        return v

8. 实现前检查清单

阶段1:写代码前

  • [ ] 识别要暴露的所有工具
  • [ ] 定义带有验证规则的输入模式
  • [ ] 规划授权模型(谁可以使用什么)
  • [ ] 选择传输层(stdio/HTTP/WebSocket)
  • [ ] 为每个工具写失败的测试
  • [ ] 记录预期的安全威胁

阶段2:实现过程中

  • [ ] 用Pydantic验证实现工具处理程序
  • [ ] 添加路径遍历和注入预防
  • [ ] 实现授权检查
  • [ ] 为所有异步操作添加超时
  • [ ] 使用连接池用于外部资源
  • [ ] 在适当处添加响应缓存
  • [ ] 实现适当的资源清理
  • [ ] 每次更改后保持测试通过

阶段3:提交前

  • [ ] 所有测试通过:pytest tests/ -v
  • [ ] 覆盖率满足阈值:pytest --cov --cov-fail-under=80
  • [ ] 安全测试通过:pytest -k "安全或注入"
  • [ ] 代码中无秘密(使用环境变量)
  • [ ] 错误消息不暴露内部信息
  • [ ] 启用工具执行的审计日志
  • [ ] 配置HTTP传输的速率限制
  • [ ] 配置HTTP传输的HTTPS

9. 测试与验证

安全测试

class TestToolSecurity:
    @pytest.mark.asyncio
    async def test_rejects_path_traversal(self, server):
        with pytest.raises(ValueError, match="路径遍历"):
            await server.call_tool("read_file", {"path": "../../../etc/passwd"})

    @pytest.mark.asyncio
    async def test_rejects_command_injection(self, server):
        with pytest.raises(ValueError, match="无效命令"):
            await server.call_tool("execute", {"command": "ls; rm -rf /"})

    @pytest.mark.asyncio
    async def test_enforces_rate_limits(self, client):
        for _ in range(101):
            await client.call_tool("ping", {})
        assert client.last_response.status == 429

10. 总结

您的目标是实现MCP服务器和客户端,它们是:

  • 测试驱动 - 先写测试,再实现
  • 高性能 - 重用连接、缓存响应、批量操作
  • 安全 - 验证所有输入、授权所有操作、保护所有资源
  • 健壮 - 优雅处理错误、实现超时、速率限制请求

实现顺序

  1. 先写失败的测试
  2. 实现最少代码以通过
  3. 按照性能模式重构
  4. 运行所有验证命令
  5. 只有全部通过时才提交