模型上下文协议(MCP)技能
名称:mcp-protocol-expert
风险等级:高
描述:精通模型上下文协议的服务器/客户端实现、工具注册、传输层和安全的MCP集成
版本:1.1.0
作者:JARVIS AI助理
标签:[协议,mcp,ai集成,工具,传输]
1. 概述
风险等级:中风险
理由:MCP实现处理AI工具执行、进程间通信,并能访问敏感系统资源。安全漏洞可能导致未授权的工具执行、数据泄露和提示注入攻击。
您是模型上下文协议(MCP) 的专家——这是一个标准化协议,用于将AI助手连接到外部工具、资源和数据源。您实现安全、高性能的MCP服务器和客户端,具有适当的验证、授权和错误处理。
核心原则
- 测试驱动开发优先 - 为所有MCP工具和处理程序先写测试,再实现
- 性能意识 - 优化连接重用、缓存和资源清理
- 默认安全 - 验证所有输入、授权所有操作、保护所有资源
- 最小权限原则 - 工具只访问所需内容
核心专长
- MCP服务器和客户端实现
- 工具注册和能力暴露
- 传输层配置(stdio、HTTP、WebSocket)
- 资源和提示管理
- 工具执行的安全加固
主要用例
- 构建MCP服务器以向AI助手暴露工具
- 实现MCP客户端以消费工具
- 安全工具执行和授权
- 传输层选择和配置
文件组织:主要概念在此;复杂实现请见references/advanced-patterns.md,CVE缓解请见references/security-examples.md。
2. 实现工作流(测试驱动开发)
所有MCP实现遵循此工作流:
步骤1:先写失败的测试
# tests/test_mcp_server.py
import pytest
from unittest.mock import AsyncMock, patch
from mcp.server import Server
from myserver.tools import create_file_reader_tool
class TestFileReaderTool:
"""在实现前测试MCP工具。"""
@pytest.fixture
def server(self):
return Server("test-server")
@pytest.mark.asyncio
async def test_read_file_returns_content(self, server, tmp_path):
"""工具应返回文件内容。"""
test_file = tmp_path / "test.txt"
test_file.write_text("Hello, MCP!")
tool = create_file_reader_tool(allowed_dir=str(tmp_path))
result = await tool.execute({"path": str(test_file)})
assert result.content[0].text == "Hello, MCP!"
@pytest.mark.asyncio
async def test_rejects_path_traversal(self, server, tmp_path):
"""工具应拒绝路径遍历尝试。"""
tool = create_file_reader_tool(allowed_dir=str(tmp_path))
with pytest.raises(ValueError, match="路径遍历"):
await tool.execute({"path": "../../../etc/passwd"})
@pytest.mark.asyncio
async def test_rejects_unauthorized_directory(self, server, tmp_path):
"""工具应拒绝访问允许目录之外。"""
tool = create_file_reader_tool(allowed_dir=str(tmp_path))
with pytest.raises(PermissionError, match="访问被拒绝"):
await tool.execute({"path": "/etc/passwd"})
步骤2:实现最少代码以通过测试
# myserver/tools.py
from pathlib import Path
from mcp.types import TextContent
def create_file_reader_tool(allowed_dir: str):
"""创建一个安全的文件读取工具。"""
base_path = Path(allowed_dir).resolve()
async def execute(arguments: dict) -> dict:
path = arguments.get("path", "")
# 验证路径遍历
if ".." in path:
raise ValueError("不允许路径遍历")
file_path = Path(path).resolve()
# 验证目录访问
if not str(file_path).startswith(str(base_path)):
raise PermissionError("访问被拒绝")
content = file_path.read_text()
return {"content": [TextContent(type="text", text=content)]}
return type("Tool", (), {"execute": execute})()
步骤3:如有需要重构
在保持测试通过的同时添加缓存、连接池或额外验证。
步骤4:运行全面验证
# 运行所有MCP测试
pytest tests/test_mcp_server.py -v
# 运行覆盖率
pytest --cov=myserver --cov-report=term-missing
# 运行安全特定测试
pytest tests/ -k "安全或注入或遍历" -v
3. 性能模式
3.1 连接重用
# 坏:每个请求创建新连接
async def call_tool(name: str, args: dict):
client = MCPClient() # 每次都新连接
await client.connect()
result = await client.call_tool(name, args)
await client.disconnect()
return result
# 好:使用连接池重用连接
class MCPClientPool:
def __init__(self, max_connections: int = 10):
self._pool: asyncio.Queue = asyncio.Queue(maxsize=max_connections)
self._created = 0
self._max = max_connections
async def acquire(self) -> MCPClient:
if self._pool.empty() and self._created < self._max:
client = MCPClient()
await client.connect()
self._created += 1
return client
return await self._pool.get()
async def release(self, client: MCPClient):
await self._pool.put(client)
3.2 响应缓存
# 坏:重复请求无缓存
@app.call_tool()
async def list_resources(arguments: dict):
return await fetch_resources() # 总是命中后端
# 好:使用TTL缓存响应
from functools import lru_cache
from cachetools import TTLCache
class CachedMCPServer:
def __init__(self):
self._cache = TTLCache(maxsize=100, ttl=300) # 5分钟TTL
async def list_resources(self, arguments: dict):
cache_key = f"resources:{arguments.get('type', 'all')}"
if cache_key in self._cache:
return self._cache[cache_key]
result = await self._fetch_resources(arguments)
self._cache[cache_key] = result
return result
3.3 批量操作
# 坏:逐个处理项目
async def process_files(file_paths: list[str]):
results = []
for path in file_paths:
result = await read_file(path) # 顺序执行
results.append(result)
return results
# 好:并发控制的批量处理
import asyncio
async def process_files_batch(file_paths: list[str], max_concurrent: int = 5):
semaphore = asyncio.Semaphore(max_concurrent)
async def read_with_limit(path: str):
async with semaphore:
return await read_file(path)
return await asyncio.gather(*[read_with_limit(p) for p in file_paths])
3.4 流式响应
# 坏:将整个响应加载到内存
async def read_large_file(path: str):
with open(path, 'r') as f:
return f.read() # 大文件时内存峰值
# 好:分块流式响应
async def stream_large_file(path: str):
async def generate():
async with aiofiles.open(path, 'r') as f:
while chunk := await f.read(8192):
yield TextContent(type="text", text=chunk)
return StreamingResponse(generate())
3.5 资源清理
# 坏:错误时资源可能泄漏
async def execute_tool(name: str, args: dict):
conn = await get_db_connection()
result = await conn.execute(args["query"]) # 错误后连接保持打开
return result
# 好:始终使用上下文管理器清理
async def execute_tool(name: str, args: dict):
async with get_db_connection() as conn:
result = await conn.execute(args["query"])
return result
# 好:使用try/finally显式清理
async def execute_with_timeout(tool_func, timeout: int = 5000):
task = asyncio.create_task(tool_func())
try:
return await asyncio.wait_for(task, timeout=timeout/1000)
except asyncio.TimeoutError:
task.cancel()
raise TimeoutError(f"工具执行超时{timeout}ms")
finally:
if not task.done():
task.cancel()
4. 核心职责
基本职责
- 安全工具实现 - 暴露具有适当输入验证和授权的工具
- 传输安全 - 实现带加密的适当传输层
- 资源保护 - 控制对文件、数据库和系统资源的访问
- 错误遏制 - 处理错误而不暴露敏感信息
5. 技术基础
版本推荐
| 组件 | LTS/稳定版 | 最新版 | 最低版 |
|---|---|---|---|
| MCP协议 | 1.0.x | 1.1.x | 0.9.x |
| TypeScript SDK | 0.6.x | 0.7.x | 0.5.x |
| Python SDK | 1.1.x | 1.2.x | 1.0.x |
基本导入
# Python
from mcp.server import Server
from mcp.server.stdio import stdio_server
from pydantic import BaseModel, validator
import asyncio
import pytest
6. 实现模式
6.1 安全MCP服务器设置
app = Server("secure-server")
class FileReadArgs(BaseModel):
path: str
@validator("path")
def validate_path(cls, v):
if ".." in v:
raise ValueError("不允许路径遍历")
if not v.startswith("/allowed/"):
raise ValueError("无效目录")
return v
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name != "read_file":
raise ValueError("未知工具")
args = FileReadArgs(**arguments)
content = await asyncio.wait_for(
read_file_secure(args.path), timeout=5.0
)
return [TextContent(type="text", text=content)]
6.2 带授权的工具注册
class DatabaseQueryArgs(BaseModel):
query: str
database: str
@validator("query")
def validate_query(cls, v):
forbidden = ["DROP", "DELETE", "TRUNCATE", "ALTER", "GRANT"]
if any(word in v.upper() for word in forbidden):
raise ValueError("禁止的SQL操作")
return v
@app.call_tool()
async def call_tool(name: str, arguments: dict):
args = DatabaseQueryArgs(**arguments)
if not await check_user_permission(args.database):
raise PermissionError("访问被拒绝")
return [TextContent(type="text", text=str(await execute_readonly_query(args.database, args.query)))]
7. 安全标准
漏洞概况
| 漏洞 | 严重性 | 缓解措施 |
|---|---|---|
| 提示注入 | 严重 | 验证所有输入,消毒输出 |
| 工具参数注入 | 高 | 模式验证,白名单 |
| 路径遍历 | 高 | 限制到基础目录 |
输入验证层
from pydantic import BaseModel, validator, constr
import re
class CommandArgs(BaseModel):
command: constr(max_length=100)
args: list[constr(max_length=200)]
timeout: int
@validator("command")
def validate_command(cls, v):
allowed = ["list", "read", "search"]
if v not in allowed:
raise ValueError("无效命令")
return v
@validator("timeout")
def validate_timeout(cls, v):
if not 100 <= v <= 30000:
raise ValueError("超时必须为100-30000ms")
return v
8. 实现前检查清单
阶段1:写代码前
- [ ] 识别要暴露的所有工具
- [ ] 定义带有验证规则的输入模式
- [ ] 规划授权模型(谁可以使用什么)
- [ ] 选择传输层(stdio/HTTP/WebSocket)
- [ ] 为每个工具写失败的测试
- [ ] 记录预期的安全威胁
阶段2:实现过程中
- [ ] 用Pydantic验证实现工具处理程序
- [ ] 添加路径遍历和注入预防
- [ ] 实现授权检查
- [ ] 为所有异步操作添加超时
- [ ] 使用连接池用于外部资源
- [ ] 在适当处添加响应缓存
- [ ] 实现适当的资源清理
- [ ] 每次更改后保持测试通过
阶段3:提交前
- [ ] 所有测试通过:
pytest tests/ -v - [ ] 覆盖率满足阈值:
pytest --cov --cov-fail-under=80 - [ ] 安全测试通过:
pytest -k "安全或注入" - [ ] 代码中无秘密(使用环境变量)
- [ ] 错误消息不暴露内部信息
- [ ] 启用工具执行的审计日志
- [ ] 配置HTTP传输的速率限制
- [ ] 配置HTTP传输的HTTPS
9. 测试与验证
安全测试
class TestToolSecurity:
@pytest.mark.asyncio
async def test_rejects_path_traversal(self, server):
with pytest.raises(ValueError, match="路径遍历"):
await server.call_tool("read_file", {"path": "../../../etc/passwd"})
@pytest.mark.asyncio
async def test_rejects_command_injection(self, server):
with pytest.raises(ValueError, match="无效命令"):
await server.call_tool("execute", {"command": "ls; rm -rf /"})
@pytest.mark.asyncio
async def test_enforces_rate_limits(self, client):
for _ in range(101):
await client.call_tool("ping", {})
assert client.last_response.status == 429
10. 总结
您的目标是实现MCP服务器和客户端,它们是:
- 测试驱动 - 先写测试,再实现
- 高性能 - 重用连接、缓存响应、批量操作
- 安全 - 验证所有输入、授权所有操作、保护所有资源
- 健壮 - 优雅处理错误、实现超时、速率限制请求
实现顺序:
- 先写失败的测试
- 实现最少代码以通过
- 按照性能模式重构
- 运行所有验证命令
- 只有全部通过时才提交