MCP集成模式 mcp-integration-patterns

这个技能专注于构建Model Context Protocol(MCP)服务器和客户端,用于扩展AI助手的功能,集成自定义工具、资源和提示模板。它覆盖MCP协议基础、服务器开发模式、资源管理、安全措施和测试方法,适用于AI应用开发。关键词:MCP协议,AI助手扩展,服务器开发,客户端集成,安全认证,协议规范。

AI应用 0 次安装 0 次浏览 更新于 3/7/2026

name: mcp-integration-patterns description: 构建MCP(Model Context Protocol)服务器和客户端,用于通过自定义工具、资源和提示扩展AI助手。 license: MIT

MCP集成模式

这个技能提供构建Model Context Protocol(MCP)服务器和客户端的指导,用于扩展AI助手,使其能够集成自定义功能。

核心能力

  • MCP服务器开发:暴露工具、资源和提示
  • MCP客户端集成:连接到MCP服务器
  • 传输协议:stdio、HTTP/SSE、WebSocket
  • 安全:认证、授权、沙箱化

MCP基础

MCP提供什么

┌─────────────────────────────────────────────────────────────────────┐
│                         AI助手(Claude)                        │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                 │
│  │   工具      │  │   资源      │  │   提示      │                 │
│  │  (操作)   │  │  (数据)   │  │ (模板)    │                 │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘                 │
│         │                │                │                         │
│         └────────────────┼────────────────┘                         │
│                          │                                          │
│                    MCP协议                                          │
│                          │                                          │
├──────────────────────────┼──────────────────────────────────────────┤
│                          │                                          │
│  ┌───────────────────────┼───────────────────────────────────────┐ │
│  │                   MCP服务器                                  │ │
│  │                                                                │ │
│  │  ┌─────────┐   ┌─────────┐   ┌─────────┐   ┌─────────┐       │ │
│  │  │数据库   │   │   Git   │   │  Slack  │   │ 自定义  │       │ │
│  │  │ 服务器  │   │ 服务器  │   │ 服务器  │   │ 服务器  │       │ │
│  │  └─────────┘   └─────────┘   └─────────┘   └─────────┘       │ │
│  └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘

MCP原语

原语 目的 方向
工具 执行操作 客户端 → 服务器
资源 暴露数据 客户端 ← 服务器
提示 模板交互 客户端 ← 服务器
采样 请求完成 客户端 ← 服务器

构建MCP服务器

Python SDK服务器

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, Resource

# 初始化服务器
server = Server("my-custom-server")

# 定义工具
@server.list_tools()
async def list_tools():
    return [
        Tool(
            name="search_documents",
            description="按查询搜索内部文档",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "搜索查询"
                    },
                    "limit": {
                        "type": "integer",
                        "description": "最大结果数",
                        "default": 10
                    }
                },
                "required": ["query"]
            }
        ),
        Tool(
            name="create_ticket",
            description="创建支持工单",
            inputSchema={
                "type": "object",
                "properties": {
                    "title": {"type": "string"},
                    "description": {"type": "string"},
                    "priority": {
                        "type": "string",
                        "enum": ["low", "medium", "high"]
                    }
                },
                "required": ["title", "description"]
            }
        )
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "search_documents":
        results = await search_documents(
            arguments["query"],
            arguments.get("limit", 10)
        )
        return [TextContent(
            type="text",
            text=format_results(results)
        )]

    elif name == "create_ticket":
        ticket = await create_ticket(
            arguments["title"],
            arguments["description"],
            arguments.get("priority", "medium")
        )
        return [TextContent(
            type="text",
            text=f"创建工单 #{ticket.id}"
        )]

    raise ValueError(f"未知工具: {name}")

# 运行服务器
async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(read_stream, write_stream)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

TypeScript SDK服务器

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { ListToolsRequestSchema, CallToolRequestSchema } from "@modelcontextprotocol/sdk/types.js";

const server = new Server(
  { name: "my-custom-server", version: "1.0.0" },
  { capabilities: { tools: {} } }
);

// 列出可用工具
server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [
    {
      name: "execute_query",
      description: "对数据库执行SQL查询",
      inputSchema: {
        type: "object",
        properties: {
          query: { type: "string", description: "要执行的SQL查询" },
          database: { type: "string", description: "目标数据库" }
        },
        required: ["query"]
      }
    }
  ]
}));

// 处理工具调用
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "execute_query") {
    const results = await executeQuery(args.query, args.database);
    return {
      content: [{ type: "text", text: JSON.stringify(results, null, 2) }]
    };
  }

  throw new Error(`未知工具: ${name}`);
});

// 启动服务器
const transport = new StdioServerTransport();
await server.connect(transport);

资源

暴露AI可以读取的数据。

from mcp.types import Resource, ResourceTemplate

@server.list_resources()
async def list_resources():
    return [
        Resource(
            uri="file:///config/settings.json",
            name="应用设置",
            description="当前应用配置",
            mimeType="application/json"
        ),
        Resource(
            uri="db://users/schema",
            name="用户表结构",
            description="用户表的数据库结构",
            mimeType="text/plain"
        )
    ]

@server.list_resource_templates()
async def list_resource_templates():
    return [
        ResourceTemplate(
            uriTemplate="file:///logs/{date}.log",
            name="每日日志",
            description="特定日期的应用日志"
        ),
        ResourceTemplate(
            uriTemplate="db://tables/{table_name}/schema",
            name="表结构",
            description="任何数据库表的结构"
        )
    ]

@server.read_resource()
async def read_resource(uri: str):
    if uri == "file:///config/settings.json":
        settings = await load_settings()
        return settings

    if uri.startswith("db://"):
        # 解析URI并从数据库获取
        return await fetch_db_resource(uri)

    if uri.startswith("file:///logs/"):
        date = uri.split("/")[-1].replace(".log", "")
        return await read_log_file(date)

    raise ValueError(f"未知资源: {uri}")

提示

提供可重用的提示模板。

from mcp.types import Prompt, PromptArgument, PromptMessage

@server.list_prompts()
async def list_prompts():
    return [
        Prompt(
            name="code_review",
            description="审查代码的最佳实践和错误",
            arguments=[
                PromptArgument(
                    name="code",
                    description="要审查的代码",
                    required=True
                ),
                PromptArgument(
                    name="language",
                    description="编程语言",
                    required=False
                )
            ]
        ),
        Prompt(
            name="summarize_document",
            description="用关键点总结文档",
            arguments=[
                PromptArgument(
                    name="document_uri",
                    description="要总结的文档URI",
                    required=True
                ),
                PromptArgument(
                    name="max_points",
                    description="提取的最大关键点数",
                    required=False
                )
            ]
        )
    ]

@server.get_prompt()
async def get_prompt(name: str, arguments: dict):
    if name == "code_review":
        code = arguments["code"]
        language = arguments.get("language", "unknown")

        return {
            "description": f"{language}代码审查",
            "messages": [
                PromptMessage(
                    role="user",
                    content={
                        "type": "text",
                        "text": f"""审查此{language}代码的:
1. 错误和潜在问题
2. 安全漏洞
3. 性能问题
4. 最佳实践违规

代码:
```{language}
{code}

提供具体、可操作的反馈。“”" } ) ] }

raise ValueError(f"未知提示: {name}")

## 传输协议

### stdio(默认)

当MCP服务器作为子进程运行时使用。

```json
// claude_desktop_config.json
{
  "mcpServers": {
    "my-server": {
      "command": "python",
      "args": ["-m", "my_mcp_server"],
      "env": {
        "DATABASE_URL": "postgresql://..."
      }
    }
  }
}

HTTP with SSE

用于远程服务器:

from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route

sse = SseServerTransport("/messages")

async def handle_sse(request):
    async with sse.connect_sse(
        request.scope, request.receive, request._send
    ) as streams:
        await server.run(streams[0], streams[1])

app = Starlette(routes=[
    Route("/sse", endpoint=handle_sse),
    Route("/messages", endpoint=sse.handle_post_message, methods=["POST"])
])

服务器模式

数据库集成

import asyncpg

class DatabaseMCPServer:
    def __init__(self, database_url):
        self.database_url = database_url
        self.pool = None

    async def initialize(self):
        self.pool = await asyncpg.create_pool(self.database_url)

    @server.call_tool()
    async def call_tool(self, name: str, arguments: dict):
        if name == "query":
            # 验证查询(防止危险操作)
            query = arguments["query"]
            if self._is_dangerous_query(query):
                return [TextContent(
                    type="text",
                    text="错误:查询包含不允许的操作"
                )]

            async with self.pool.acquire() as conn:
                results = await conn.fetch(query)
                return [TextContent(
                    type="text",
                    text=self._format_results(results)
                )]

    def _is_dangerous_query(self, query):
        """阻止破坏性查询"""
        dangerous = ['DROP', 'DELETE', 'TRUNCATE', 'UPDATE', 'INSERT']
        query_upper = query.upper()
        return any(d in query_upper for d in dangerous)

文件系统访问

import os
from pathlib import Path

class FileSystemMCPServer:
    def __init__(self, allowed_paths):
        self.allowed_paths = [Path(p).resolve() for p in allowed_paths]

    def _validate_path(self, path_str):
        """确保路径在允许的目录内"""
        path = Path(path_str).resolve()
        for allowed in self.allowed_paths:
            try:
                path.relative_to(allowed)
                return path
            except ValueError:
                continue
        raise PermissionError(f"访问被拒绝: {path_str}")

    @server.list_resources()
    async def list_resources(self):
        resources = []
        for allowed_path in self.allowed_paths:
            for file in allowed_path.rglob("*"):
                if file.is_file():
                    resources.append(Resource(
                        uri=f"file://{file}",
                        name=file.name,
                        mimeType=self._get_mimetype(file)
                    ))
        return resources

    @server.read_resource()
    async def read_resource(self, uri: str):
        if uri.startswith("file://"):
            path = self._validate_path(uri[7:])
            return path.read_text()
        raise ValueError(f"未知URI方案: {uri}")

外部API集成

import httpx

class APIIntegrationServer:
    def __init__(self, api_key):
        self.api_key = api_key  # 允许秘密
        self.client = httpx.AsyncClient()

    @server.call_tool()
    async def call_tool(self, name: str, arguments: dict):
        if name == "fetch_weather":
            response = await self.client.get(
                "https://api.weather.com/current",
                params={"city": arguments["city"]},
                headers={"Authorization": f"Bearer {self.api_key}"}
            )
            return [TextContent(type="text", text=response.text)]

        if name == "send_notification":
            response = await self.client.post(
                "https://api.notifications.com/send",
                json={
                    "to": arguments["recipient"],
                    "message": arguments["message"]
                },
                headers={"Authorization": f"Bearer {self.api_key}"}
            )
            return [TextContent(
                type="text",
                text=f"通知已发送: {response.json()['id']}"
            )]

安全考虑

输入验证

from pydantic import BaseModel, validator

class QueryInput(BaseModel):
    query: str
    limit: int = 10

    @validator('query')
    def validate_query(cls, v):
        # 防止SQL注入
        dangerous = [';', '--', '/*', '*/', 'DROP', 'DELETE']
        for d in dangerous:
            if d.lower() in v.lower():
                raise ValueError(f"查询包含不允许的模式: {d}")
        return v

    @validator('limit')
    def validate_limit(cls, v):
        if v < 1 or v > 100:
            raise ValueError("限制必须在1到100之间")
        return v

@server.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "search":
        # 验证输入
        validated = QueryInput(**arguments)
        return await execute_search(validated.query, validated.limit)

速率限制

from datetime import datetime, timedelta
from collections import defaultdict

class RateLimiter:
    def __init__(self, requests_per_minute=60):
        self.requests_per_minute = requests_per_minute
        self.requests = defaultdict(list)

    def check(self, client_id: str):
        now = datetime.now()
        cutoff = now - timedelta(minutes=1)

        # 清理旧请求
        self.requests[client_id] = [
            t for t in self.requests[client_id] if t > cutoff
        ]

        if len(self.requests[client_id]) >= self.requests_per_minute:
            raise RateLimitError("请求过多")

        self.requests[client_id].append(now)

rate_limiter = RateLimiter()

@server.call_tool()
async def call_tool(name: str, arguments: dict):
    rate_limiter.check("default")  # 或使用实际客户端ID
    # ... 处理请求

认证

import os

class AuthenticatedServer:
    def __init__(self):
        self.api_key = os.environ.get("MCP_API_KEY")  # 允许秘密
        if not self.api_key:
            raise ValueError("未设置MCP_API_KEY")

    def authenticate(self, request_headers: dict):
        provided_key = request_headers.get("Authorization", "").replace("Bearer ", "")
        if not provided_key or provided_key != self.api_key:
            raise AuthenticationError("无效的API密钥")

测试MCP服务器

import pytest
from mcp.client import Client
from mcp.client.stdio import stdio_client

@pytest.fixture
async def mcp_client():
    """创建连接到服务器的测试客户端"""
    async with stdio_client(
        command="python",
        args=["-m", "my_mcp_server"]
    ) as (read, write):
        client = Client("test-client", "1.0.0")
        await client.connect(read, write)
        yield client

@pytest.mark.asyncio
async def test_list_tools(mcp_client):
    tools = await mcp_client.list_tools()
    assert len(tools) > 0
    assert any(t.name == "search_documents" for t in tools)

@pytest.mark.asyncio
async def test_call_tool(mcp_client):
    result = await mcp_client.call_tool(
        "search_documents",
        {"query": "test", "limit": 5}
    )
    assert result.content
    assert result.content[0].type == "text"

参考文献

  • references/mcp-specification.md - 完整的MCP协议规范
  • references/server-examples.md - 完整的服务器实现示例
  • references/deployment-patterns.md - 生产部署策略