名称: mcp-transport-sse-setup 描述: 为基于Web的MCP服务器配置HTTP/SSE传输,包括正确的端点、身份验证和CORS。 允许的工具: 读取、写入、编辑、Bash、Glob、Grep
MCP传输SSE设置
为基于Web的MCP服务器配置HTTP/SSE传输。
功能
- 为MCP服务器配置SSE传输
- 为MCP通信设置HTTP端点
- 实现身份验证中间件
- 为浏览器客户端配置CORS
- 设置健康检查端点
- 实现连接管理
使用场景
在以下情况下调用此技能:
- 设置基于Web的MCP服务器传输
- 为实时通信配置SSE
- 为MCP端点实现身份验证
- 为基于浏览器的客户端启用CORS
输入参数
| 参数 | 类型 | 必填 | 描述 |
|---|---|---|---|
| language | 字符串 | 是 | 目标语言(typescript、python) |
| framework | 字符串 | 否 | Web框架(express、fastify、fastapi) |
| auth | 对象 | 否 | 身份验证配置 |
| cors | 对象 | 否 | CORS配置 |
生成模式
TypeScript Express SSE传输
import express from 'express';
import cors from 'cors';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
const app = express();
// CORS配置
app.use(cors({
origin: process.env.ALLOWED_ORIGINS?.split(',') || '*',
credentials: true,
}));
app.use(express.json());
// 存储活动连接
const connections = new Map<string, SSEServerTransport>();
// SSE端点
app.get('/sse', async (req, res) => {
const connectionId = req.query.connectionId as string || crypto.randomUUID();
// 设置SSE头部
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
// 创建传输
const transport = new SSEServerTransport('/message', res);
connections.set(connectionId, transport);
// 为此连接创建服务器实例
const server = new Server(
{ name: 'my-mcp-server', version: '1.0.0' },
{ capabilities: { tools: {}, resources: {} } }
);
// 注册处理器...
registerToolHandlers(server);
// 处理连接关闭
req.on('close', () => {
connections.delete(connectionId);
transport.close();
});
// 连接传输
await server.connect(transport);
});
// 消息端点
app.post('/message', async (req, res) => {
const connectionId = req.query.connectionId as string;
const transport = connections.get(connectionId);
if (!transport) {
res.status(404).json({ error: '连接未找到' });
return;
}
try {
await transport.handlePostMessage(req, res);
} catch (error) {
res.status(500).json({ error: '处理消息失败' });
}
});
// 健康检查
app.get('/health', (req, res) => {
res.json({
status: '健康',
connections: connections.size,
timestamp: new Date().toISOString(),
});
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`MCP服务器监听端口 ${PORT}`);
});
Python FastAPI SSE传输
from fastapi import FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse
from mcp.server import Server
from mcp.server.sse import SseServerTransport
import uuid
import asyncio
app = FastAPI()
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 存储连接
connections: dict[str, SseServerTransport] = {}
@app.get("/sse")
async def sse_endpoint(request: Request):
connection_id = request.query_params.get("connectionId") or str(uuid.uuid4())
async def event_generator():
transport = SseServerTransport("/message")
connections[connection_id] = transport
server = Server("my-mcp-server")
register_handlers(server)
try:
async for event in transport.events():
yield event
finally:
connections.pop(connection_id, None)
return EventSourceResponse(event_generator())
@app.post("/message")
async def message_endpoint(request: Request):
connection_id = request.query_params.get("connectionId")
transport = connections.get(connection_id)
if not transport:
return Response(status_code=404, content="连接未找到")
body = await request.json()
await transport.handle_message(body)
return Response(status_code=200)
@app.get("/health")
async def health():
return {
"status": "健康",
"connections": len(connections),
}
工作流程
- 配置框架 - 设置Web框架
- 添加CORS - 配置跨域请求
- 创建SSE端点 - 设置事件流
- 添加消息处理器 - POST端点用于消息处理
- 实现身份验证 - 可选的身份验证
- 添加健康检查 - 监控端点
目标流程
- mcp-transport-layer
- mcp-server-bootstrap
- mcp-server-monitoring-debugging