name: streaming-api-patterns description: 使用服务器发送事件(SSE)、WebSocket和ReadableStream API实现实时数据流。掌握背压处理、重连策略和LLM流式传输,适用于2025+实时应用场景。 version: 1.0.0 author: AI Agent Hub tags: [流式传输, sse, websocket, 实时, api, 2025]
流式API模式
概述
现代应用需要实时数据交付。本技能涵盖服务器发送事件(SSE)用于服务器到客户端流式传输、WebSocket用于双向通信,以及Streams API用于处理背压和高效数据流。
何时使用此技能:
- 流式传输LLM响应(ChatGPT风格界面)
- 实时通知和更新
- 实时数据源(股票价格、分析数据)
- 聊天应用
- 长时间运行任务的进度更新
- 协作编辑功能
核心技术
1. 服务器发送事件(SSE)
最适合:服务器到客户端流式传输(LLM响应、通知)
// Next.js路由处理器
export async function GET(req: Request) {
const encoder = new TextEncoder()
const stream = new ReadableStream({
async start(controller) {
// 发送数据
controller.enqueue(encoder.encode('data: Hello
'))
// 保持连接活跃
const interval = setInterval(() => {
controller.enqueue(encoder.encode(': keepalive
'))
}, 30000)
// 清理
req.signal.addEventListener('abort', () => {
clearInterval(interval)
controller.close()
})
}
})
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
}
})
}
// 客户端
const eventSource = new EventSource('/api/stream')
eventSource.onmessage = (event) => {
console.log(event.data)
}
2. WebSocket
最适合:双向实时通信(聊天、协作)
// WebSocket服务器(Next.js with ws)
import { WebSocketServer } from 'ws'
const wss = new WebSocketServer({ port: 8080 })
wss.on('connection', (ws) => {
ws.on('message', (data) => {
// 广播给所有客户端
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(data)
}
})
})
})
// 客户端
const ws = new WebSocket('ws://localhost:8080')
ws.onmessage = (event) => console.log(event.data)
ws.send(JSON.stringify({ type: 'message', text: 'Hello' }))
3. ReadableStream API
最适合:处理带有背压的大数据流
async function* generateData() {
for (let i = 0; i < 1000; i++) {
await new Promise(resolve => setTimeout(resolve, 100))
yield `data-${i}`
}
}
const stream = new ReadableStream({
async start(controller) {
for await (const chunk of generateData()) {
controller.enqueue(new TextEncoder().encode(chunk + '
'))
}
controller.close()
}
})
LLM流式传输模式
// 服务器
import OpenAI from 'openai'
const openai = new OpenAI()
export async function POST(req: Request) {
const { messages } = await req.json()
const stream = await openai.chat.completions.create({
model: 'gpt-4-turbo-preview',
messages,
stream: true
})
const encoder = new TextEncoder()
return new Response(
new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content
if (content) {
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ content })}
`))
}
}
controller.enqueue(encoder.encode('data: [DONE]
'))
controller.close()
}
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache'
}
}
)
}
// 客户端
async function streamChat(messages) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages })
})
const reader = response.body.getReader()
const decoder = new TextDecoder()
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value)
const lines = chunk.split('
')
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6)
if (data === '[DONE]') return
const json = JSON.parse(data)
console.log(json.content) // 流式令牌
}
}
}
}
重连策略
class ReconnectingEventSource {
private eventSource: EventSource | null = null
private reconnectDelay = 1000
private maxReconnectDelay = 30000
constructor(private url: string, private onMessage: (data: string) => void) {
this.connect()
}
private connect() {
this.eventSource = new EventSource(this.url)
this.eventSource.onmessage = (event) => {
this.reconnectDelay = 1000 // 成功时重置
this.onMessage(event.data)
}
this.eventSource.onerror = () => {
this.eventSource?.close()
// 指数退避
setTimeout(() => this.connect(), this.reconnectDelay)
this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay)
}
}
close() {
this.eventSource?.close()
}
}
最佳实践
SSE
- ✅ 用于单向服务器到客户端流式传输
- ✅ 实现自动重连
- ✅ 每30秒发送保持活跃消息
- ✅ 处理浏览器连接限制(每个域名6个)
- ✅ 使用HTTP/2以获得更好性能
WebSocket
- ✅ 用于双向实时通信
- ✅ 实现心跳/乒乓机制
- ✅ 使用指数退避处理重连
- ✅ 验证和清理消息
- ✅ 为离线期间实现消息队列
背压
- ✅ 使用带有适当流量控制的ReadableStream
- ✅ 监控缓冲区大小
- ✅ 当消费者较慢时暂停生产
- ✅ 为慢速消费者实现超时
性能
- ✅ 压缩数据(gzip/brotli)
- ✅ 批量处理小消息
- ✅ 对大数据使用二进制格式(MessagePack、Protobuf)
- ✅ 实现客户端缓冲
- ✅ 监控连接数和资源使用情况