名称: mcp-integration-patterns 描述: 构建MCP(Model Context Protocol)服务器和客户端,用于通过自定义工具、资源和提示扩展AI助手的能力。 许可证: MIT
MCP集成模式
这个技能提供构建Model Context Protocol(MCP)服务器和客户端的指南,用于通过自定义能力扩展AI助手。
核心能力
- MCP服务器开发: 暴露工具、资源和提示
- MCP客户端集成: 连接到MCP服务器
- 传输协议: stdio、HTTP/SSE、WebSocket
- 安全: 认证、授权、沙箱化
MCP基础
MCP提供的内容
┌─────────────────────────────────────────────────────────────────────┐
│ AI助手(Claude) │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 工具 │ │ 资源 │ │ 提示 │ │
│ │ (动作) │ │ (数据) │ │(模板) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └────────────────┼────────────────┘ │
│ │ │
│ MCP协议 │
│ │ │
├──────────────────────────┼──────────────────────────────────────────┤
│ │ │
│ ┌───────────────────────┼───────────────────────────────────────┐ │
│ │ MCP服务器 │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │数据库 │ │ Git │ │ Slack │ │ 自定义 │ │ │
│ │ │ 服务器 │ │ 服务器 │ │ 服务器 │ │ 服务器 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
MCP原语
| 原语 | 目的 | 方向 |
|---|---|---|
| 工具 | 执行动作 | 客户端 → 服务器 |
| 资源 | 暴露数据 | 客户端 ← 服务器 |
| 提示 | 模板交互 | 客户端 ← 服务器 |
| 采样 | 请求完成 | 客户端 ← 服务器 |
构建MCP服务器
Python SDK服务器
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, Resource
# 初始化服务器
server = Server("my-custom-server")
# 定义工具
@server.list_tools()
async def list_tools():
return [
Tool(
name="search_documents",
description="按查询搜索内部文档",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询"
},
"limit": {
"type": "integer",
"description": "最大结果数",
"default": 10
}
},
"required": ["query"]
}
),
Tool(
name="create_ticket",
description="创建支持工单",
inputSchema={
"type": "object",
"properties": {
"title": {"type": "string"},
"description": {"type": "string"},
"priority": {
"type": "string",
"enum": ["low", "medium", "high"]
}
},
"required": ["title", "description"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "search_documents":
results = await search_documents(
arguments["query"],
arguments.get("limit", 10)
)
return [TextContent(
type="text",
text=format_results(results)
)]
elif name == "create_ticket":
ticket = await create_ticket(
arguments["title"],
arguments["description"],
arguments.get("priority", "medium")
)
return [TextContent(
type="text",
text=f"Created ticket #{ticket.id}"
)]
raise ValueError(f"Unknown tool: {name}")
# 运行服务器
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
TypeScript SDK服务器
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { ListToolsRequestSchema, CallToolRequestSchema } from "@modelcontextprotocol/sdk/types.js";
const server = new Server(
{ name: "my-custom-server", version: "1.0.0" },
{ capabilities: { tools: {} } }
);
// 列出可用工具
server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: "execute_query",
description: "在数据库上执行SQL查询",
inputSchema: {
type: "object",
properties: {
query: { type: "string", description: "要执行的SQL查询" },
database: { type: "string", description: "目标数据库" }
},
required: ["query"]
}
}
]
}));
// 处理工具调用
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "execute_query") {
const results = await executeQuery(args.query, args.database);
return {
content: [{ type: "text", text: JSON.stringify(results, null, 2) }]
};
}
throw new Error(`Unknown tool: ${name}`);
});
// 启动服务器
const transport = new StdioServerTransport();
await server.connect(transport);
资源
暴露AI可以读取的数据。
from mcp.types import Resource, ResourceTemplate
@server.list_resources()
async def list_resources():
return [
Resource(
uri="file:///config/settings.json",
name="应用程序设置",
description="当前应用程序配置",
mimeType="application/json"
),
Resource(
uri="db://users/schema",
name="用户表架构",
description="用户表的数据库架构",
mimeType="text/plain"
)
]
@server.list_resource_templates()
async def list_resource_templates():
return [
ResourceTemplate(
uriTemplate="file:///logs/{date}.log",
name="每日日志",
description="特定日期的应用程序日志"
),
ResourceTemplate(
uriTemplate="db://tables/{table_name}/schema",
name="表架构",
description="任何数据库表的架构"
)
]
@server.read_resource()
async def read_resource(uri: str):
if uri == "file:///config/settings.json":
settings = await load_settings()
return settings
if uri.startswith("db://"):
# 解析URI并从数据库获取
return await fetch_db_resource(uri)
if uri.startswith("file:///logs/"):
date = uri.split("/")[-1].replace(".log", "")
return await read_log_file(date)
raise ValueError(f"Unknown resource: {uri}")
提示
提供可重用的提示模板。
from mcp.types import Prompt, PromptArgument, PromptMessage
@server.list_prompts()
async def list_prompts():
return [
Prompt(
name="code_review",
description="审查代码以检查最佳实践和错误",
arguments=[
PromptArgument(
name="code",
description="要审查的代码",
required=True
),
PromptArgument(
name="language",
description="编程语言",
required=False
)
]
),
Prompt(
name="summarize_document",
description="用关键点总结文档",
arguments=[
PromptArgument(
name="document_uri",
description="要总结的文档URI",
required=True
),
PromptArgument(
name="max_points",
description="提取的最大关键点数",
required=False
)
]
)
]
@server.get_prompt()
async def get_prompt(name: str, arguments: dict):
if name == "code_review":
code = arguments["code"]
language = arguments.get("language", "unknown")
return {
"description": f"{language}的代码审查",
"messages": [
PromptMessage(
role="user",
content={
"type": "text",
"text": f"""审查此{language}代码以检查:
1. 错误和潜在问题
2. 安全漏洞
3. 性能问题
4. 最佳实践违规
代码:
```{language}
{code}
提供具体、可操作的反馈。“”" } ) ] }
raise ValueError(f"Unknown prompt: {name}")
## 传输协议
### stdio(默认)
当MCP服务器作为子进程运行时使用。
```json
// claude_desktop_config.json
{
"mcpServers": {
"my-server": {
"command": "python",
"args": ["-m", "my_mcp_server"],
"env": {
"DATABASE_URL": "postgresql://..."
}
}
}
}
HTTP with SSE
用于远程服务器:
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route
sse = SseServerTransport("/messages")
async def handle_sse(request):
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await server.run(streams[0], streams[1])
app = Starlette(routes=[
Route("/sse", endpoint=handle_sse),
Route("/messages", endpoint=sse.handle_post_message, methods=["POST"])
])
服务器模式
数据库集成
import asyncpg
class DatabaseMCPServer:
def __init__(self, database_url):
self.database_url = database_url
self.pool = None
async def initialize(self):
self.pool = await asyncpg.create_pool(self.database_url)
@server.call_tool()
async def call_tool(self, name: str, arguments: dict):
if name == "query":
# 验证查询(防止危险操作)
query = arguments["query"]
if self._is_dangerous_query(query):
return [TextContent(
type="text",
text="错误:查询包含不允许的操作"
)]
async with self.pool.acquire() as conn:
results = await conn.fetch(query)
return [TextContent(
type="text",
text=self._format_results(results)
)]
def _is_dangerous_query(self, query):
"""阻止破坏性查询"""
dangerous = ['DROP', 'DELETE', 'TRUNCATE', 'UPDATE', 'INSERT']
query_upper = query.upper()
return any(d in query_upper for d in dangerous)
文件系统访问
import os
from pathlib import Path
class FileSystemMCPServer:
def __init__(self, allowed_paths):
self.allowed_paths = [Path(p).resolve() for p in allowed_paths]
def _validate_path(self, path_str):
"""确保路径在允许的目录内"""
path = Path(path_str).resolve()
for allowed in self.allowed_paths:
try:
path.relative_to(allowed)
return path
except ValueError:
continue
raise PermissionError(f"访问被拒绝:{path_str}")
@server.list_resources()
async def list_resources(self):
resources = []
for allowed_path in self.allowed_paths:
for file in allowed_path.rglob("*"):
if file.is_file():
resources.append(Resource(
uri=f"file://{file}",
name=file.name,
mimeType=self._get_mimetype(file)
))
return resources
@server.read_resource()
async def read_resource(self, uri: str):
if uri.startswith("file://"):
path = self._validate_path(uri[7:])
return path.read_text()
raise ValueError(f"未知的URI方案:{uri}")
外部API集成
import httpx
class APIIntegrationServer:
def __init__(self, api_key):
self.api_key = api_key # allow-secret
self.client = httpx.AsyncClient()
@server.call_tool()
async def call_tool(self, name: str, arguments: dict):
if name == "fetch_weather":
response = await self.client.get(
"https://api.weather.com/current",
params={"city": arguments["city"]},
headers={"Authorization": f"Bearer {self.api_key}"}
)
return [TextContent(type="text", text=response.text)]
if name == "send_notification":
response = await self.client.post(
"https://api.notifications.com/send",
json={
"to": arguments["recipient"],
"message": arguments["message"]
},
headers={"Authorization": f"Bearer {self.api_key}"}
)
return [TextContent(
type="text",
text=f"Notification sent: {response.json()['id']}"
)]
安全考虑
输入验证
from pydantic import BaseModel, validator
class QueryInput(BaseModel):
query: str
limit: int = 10
@validator('query')
def validate_query(cls, v):
# 防止SQL注入
dangerous = [';', '--', '/*', '*/', 'DROP', 'DELETE']
for d in dangerous:
if d.lower() in v.lower():
raise ValueError(f"查询包含不允许的模式:{d}")
return v
@validator('limit')
def validate_limit(cls, v):
if v < 1 or v > 100:
raise ValueError("限制必须在1到100之间")
return v
@server.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "search":
# 验证输入
validated = QueryInput(**arguments)
return await execute_search(validated.query, validated.limit)
速率限制
from datetime import datetime, timedelta
from collections import defaultdict
class RateLimiter:
def __init__(self, requests_per_minute=60):
self.requests_per_minute = requests_per_minute
self.requests = defaultdict(list)
def check(self, client_id: str):
now = datetime.now()
cutoff = now - timedelta(minutes=1)
# 清理旧请求
self.requests[client_id] = [
t for t in self.requests[client_id] if t > cutoff
]
if len(self.requests[client_id]) >= self.requests_per_minute:
raise RateLimitError("请求过多")
self.requests[client_id].append(now)
rate_limiter = RateLimiter()
@server.call_tool()
async def call_tool(name: str, arguments: dict):
rate_limiter.check("default") # 或使用实际客户端ID
# ... 处理请求
认证
import os
class AuthenticatedServer:
def __init__(self):
self.api_key = os.environ.get("MCP_API_KEY") # allow-secret
if not self.api_key:
raise ValueError("MCP_API_KEY未设置")
def authenticate(self, request_headers: dict):
provided_key = request_headers.get("Authorization", "").replace("Bearer ", "")
if not provided_key or provided_key != self.api_key:
raise AuthenticationError("无效的API密钥")
测试MCP服务器
import pytest
from mcp.client import Client
from mcp.client.stdio import stdio_client
@pytest.fixture
async def mcp_client():
"""创建连接到服务器的测试客户端"""
async with stdio_client(
command="python",
args=["-m", "my_mcp_server"]
) as (read, write):
client = Client("test-client", "1.0.0")
await client.connect(read, write)
yield client
@pytest.mark.asyncio
async def test_list_tools(mcp_client):
tools = await mcp_client.list_tools()
assert len(tools) > 0
assert any(t.name == "search_documents" for t in tools)
@pytest.mark.asyncio
async def test_call_tool(mcp_client):
result = await mcp_client.call_tool(
"search_documents",
{"query": "test", "limit": 5}
)
assert result.content
assert result.content[0].type == "text"
参考文献
references/mcp-specification.md- 完整的MCP协议规范references/server-examples.md- 完整的服务器实现示例references/deployment-patterns.md- 生产部署策略