流式API模式 streaming-api-patterns

本技能专注于实现实时数据流传输技术,涵盖服务器发送事件(SSE)、WebSocket和ReadableStream API三大核心技术。适用于大语言模型(LLM)流式响应、实时股票价格推送、聊天应用、实时通知等场景。关键词:实时数据流、SSE、WebSocket、背压处理、LLM流式传输、API开发、2025实时应用、服务器推送、双向通信、数据流处理。

后端开发 0 次安装 0 次浏览 更新于 3/2/2026

name: streaming-api-patterns description: 使用服务器发送事件(SSE)、WebSocket和ReadableStream API实现实时数据流。掌握背压处理、重连策略和LLM流式传输,适用于2025+实时应用场景。 version: 1.0.0 author: AI Agent Hub tags: [流式传输, sse, websocket, 实时, api, 2025]

流式API模式

概述

现代应用需要实时数据交付。本技能涵盖服务器发送事件(SSE)用于服务器到客户端流式传输、WebSocket用于双向通信,以及Streams API用于处理背压和高效数据流。

何时使用此技能:

  • 流式传输LLM响应(ChatGPT风格界面)
  • 实时通知和更新
  • 实时数据源(股票价格、分析数据)
  • 聊天应用
  • 长时间运行任务的进度更新
  • 协作编辑功能

核心技术

1. 服务器发送事件(SSE)

最适合:服务器到客户端流式传输(LLM响应、通知)

// Next.js路由处理器
export async function GET(req: Request) {
  const encoder = new TextEncoder()

  const stream = new ReadableStream({
    async start(controller) {
      // 发送数据
      controller.enqueue(encoder.encode('data: Hello

'))

      // 保持连接活跃
      const interval = setInterval(() => {
        controller.enqueue(encoder.encode(': keepalive

'))
      }, 30000)

      // 清理
      req.signal.addEventListener('abort', () => {
        clearInterval(interval)
        controller.close()
      })
    }
  })

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    }
  })
}

// 客户端
const eventSource = new EventSource('/api/stream')
eventSource.onmessage = (event) => {
  console.log(event.data)
}

2. WebSocket

最适合:双向实时通信(聊天、协作)

// WebSocket服务器(Next.js with ws)
import { WebSocketServer } from 'ws'

const wss = new WebSocketServer({ port: 8080 })

wss.on('connection', (ws) => {
  ws.on('message', (data) => {
    // 广播给所有客户端
    wss.clients.forEach((client) => {
      if (client.readyState === WebSocket.OPEN) {
        client.send(data)
      }
    })
  })
})

// 客户端
const ws = new WebSocket('ws://localhost:8080')
ws.onmessage = (event) => console.log(event.data)
ws.send(JSON.stringify({ type: 'message', text: 'Hello' }))

3. ReadableStream API

最适合:处理带有背压的大数据流

async function* generateData() {
  for (let i = 0; i < 1000; i++) {
    await new Promise(resolve => setTimeout(resolve, 100))
    yield `data-${i}`
  }
}

const stream = new ReadableStream({
  async start(controller) {
    for await (const chunk of generateData()) {
      controller.enqueue(new TextEncoder().encode(chunk + '
'))
    }
    controller.close()
  }
})

LLM流式传输模式

// 服务器
import OpenAI from 'openai'

const openai = new OpenAI()

export async function POST(req: Request) {
  const { messages } = await req.json()

  const stream = await openai.chat.completions.create({
    model: 'gpt-4-turbo-preview',
    messages,
    stream: true
  })

  const encoder = new TextEncoder()

  return new Response(
    new ReadableStream({
      async start(controller) {
        for await (const chunk of stream) {
          const content = chunk.choices[0]?.delta?.content
          if (content) {
            controller.enqueue(encoder.encode(`data: ${JSON.stringify({ content })}

`))
          }
        }
        controller.enqueue(encoder.encode('data: [DONE]

'))
        controller.close()
      }
    }),
    {
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache'
      }
    }
  )
}

// 客户端
async function streamChat(messages) {
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ messages })
  })

  const reader = response.body.getReader()
  const decoder = new TextDecoder()

  while (true) {
    const { done, value } = await reader.read()
    if (done) break

    const chunk = decoder.decode(value)
    const lines = chunk.split('
')

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6)
        if (data === '[DONE]') return

        const json = JSON.parse(data)
        console.log(json.content) // 流式令牌
      }
    }
  }
}

重连策略

class ReconnectingEventSource {
  private eventSource: EventSource | null = null
  private reconnectDelay = 1000
  private maxReconnectDelay = 30000

  constructor(private url: string, private onMessage: (data: string) => void) {
    this.connect()
  }

  private connect() {
    this.eventSource = new EventSource(this.url)

    this.eventSource.onmessage = (event) => {
      this.reconnectDelay = 1000 // 成功时重置
      this.onMessage(event.data)
    }

    this.eventSource.onerror = () => {
      this.eventSource?.close()

      // 指数退避
      setTimeout(() => this.connect(), this.reconnectDelay)
      this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay)
    }
  }

  close() {
    this.eventSource?.close()
  }
}

最佳实践

SSE

  • ✅ 用于单向服务器到客户端流式传输
  • ✅ 实现自动重连
  • ✅ 每30秒发送保持活跃消息
  • ✅ 处理浏览器连接限制(每个域名6个)
  • ✅ 使用HTTP/2以获得更好性能

WebSocket

  • ✅ 用于双向实时通信
  • ✅ 实现心跳/乒乓机制
  • ✅ 使用指数退避处理重连
  • ✅ 验证和清理消息
  • ✅ 为离线期间实现消息队列

背压

  • ✅ 使用带有适当流量控制的ReadableStream
  • ✅ 监控缓冲区大小
  • ✅ 当消费者较慢时暂停生产
  • ✅ 为慢速消费者实现超时

性能

  • ✅ 压缩数据(gzip/brotli)
  • ✅ 批量处理小消息
  • ✅ 对大数据使用二进制格式(MessagePack、Protobuf)
  • ✅ 实现客户端缓冲
  • ✅ 监控连接数和资源使用情况

资源