MCP传输SSE设置 mcp-transport-sse-setup

MCP传输SSE设置技能用于为基于Web的MCP服务器配置HTTP/SSE传输层,包括服务器端事件流、跨域资源共享、身份验证和连接管理。该技能支持TypeScript和Python语言,提供Express和FastAPI框架的完整实现模板,帮助开发者快速搭建实时通信的MCP服务器。关键词:MCP服务器、SSE传输、实时通信、HTTP端点、CORS配置、身份验证、连接管理、WebSocket替代方案、服务器端事件、AI模型通信协议。

后端开发 0 次安装 0 次浏览 更新于 2/23/2026

名称: mcp-transport-sse-setup 描述: 为基于Web的MCP服务器配置HTTP/SSE传输,包括正确的端点、身份验证和CORS。 允许的工具: 读取、写入、编辑、Bash、Glob、Grep

MCP传输SSE设置

为基于Web的MCP服务器配置HTTP/SSE传输。

功能

  • 为MCP服务器配置SSE传输
  • 为MCP通信设置HTTP端点
  • 实现身份验证中间件
  • 为浏览器客户端配置CORS
  • 设置健康检查端点
  • 实现连接管理

使用场景

在以下情况下调用此技能:

  • 设置基于Web的MCP服务器传输
  • 为实时通信配置SSE
  • 为MCP端点实现身份验证
  • 为基于浏览器的客户端启用CORS

输入参数

参数 类型 必填 描述
language 字符串 目标语言(typescript、python)
framework 字符串 Web框架(express、fastify、fastapi)
auth 对象 身份验证配置
cors 对象 CORS配置

生成模式

TypeScript Express SSE传输

import express from 'express';
import cors from 'cors';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';

const app = express();

// CORS配置
app.use(cors({
  origin: process.env.ALLOWED_ORIGINS?.split(',') || '*',
  credentials: true,
}));

app.use(express.json());

// 存储活动连接
const connections = new Map<string, SSEServerTransport>();

// SSE端点
app.get('/sse', async (req, res) => {
  const connectionId = req.query.connectionId as string || crypto.randomUUID();

  // 设置SSE头部
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  res.setHeader('X-Accel-Buffering', 'no');

  // 创建传输
  const transport = new SSEServerTransport('/message', res);
  connections.set(connectionId, transport);

  // 为此连接创建服务器实例
  const server = new Server(
    { name: 'my-mcp-server', version: '1.0.0' },
    { capabilities: { tools: {}, resources: {} } }
  );

  // 注册处理器...
  registerToolHandlers(server);

  // 处理连接关闭
  req.on('close', () => {
    connections.delete(connectionId);
    transport.close();
  });

  // 连接传输
  await server.connect(transport);
});

// 消息端点
app.post('/message', async (req, res) => {
  const connectionId = req.query.connectionId as string;
  const transport = connections.get(connectionId);

  if (!transport) {
    res.status(404).json({ error: '连接未找到' });
    return;
  }

  try {
    await transport.handlePostMessage(req, res);
  } catch (error) {
    res.status(500).json({ error: '处理消息失败' });
  }
});

// 健康检查
app.get('/health', (req, res) => {
  res.json({
    status: '健康',
    connections: connections.size,
    timestamp: new Date().toISOString(),
  });
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`MCP服务器监听端口 ${PORT}`);
});

Python FastAPI SSE传输

from fastapi import FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse
from mcp.server import Server
from mcp.server.sse import SseServerTransport
import uuid
import asyncio

app = FastAPI()

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 存储连接
connections: dict[str, SseServerTransport] = {}

@app.get("/sse")
async def sse_endpoint(request: Request):
    connection_id = request.query_params.get("connectionId") or str(uuid.uuid4())

    async def event_generator():
        transport = SseServerTransport("/message")
        connections[connection_id] = transport

        server = Server("my-mcp-server")
        register_handlers(server)

        try:
            async for event in transport.events():
                yield event
        finally:
            connections.pop(connection_id, None)

    return EventSourceResponse(event_generator())

@app.post("/message")
async def message_endpoint(request: Request):
    connection_id = request.query_params.get("connectionId")
    transport = connections.get(connection_id)

    if not transport:
        return Response(status_code=404, content="连接未找到")

    body = await request.json()
    await transport.handle_message(body)
    return Response(status_code=200)

@app.get("/health")
async def health():
    return {
        "status": "健康",
        "connections": len(connections),
    }

工作流程

  1. 配置框架 - 设置Web框架
  2. 添加CORS - 配置跨域请求
  3. 创建SSE端点 - 设置事件流
  4. 添加消息处理器 - POST端点用于消息处理
  5. 实现身份验证 - 可选的身份验证
  6. 添加健康检查 - 监控端点

目标流程

  • mcp-transport-layer
  • mcp-server-bootstrap
  • mcp-server-monitoring-debugging