name: implementing-realtime-sync description: 用于实时更新的实时通信模式,支持协作和存在感。在构建聊天应用、协作工具、实时仪表板或流接口(如LLM响应、指标)时使用。涵盖SSE(服务器发送事件,用于单向流)、WebSocket(双向通信)、WebRTC(点对点视频/音频)、CRDTs(Yjs、Automerge用于无冲突协作)、存在模式、离线同步和扩展策略。支持Python、Rust、Go和TypeScript。
实时同步
实现实时通信,用于应用程序中的实时更新、协作和存在感。
何时使用
在构建以下应用时使用此技能:
- LLM流接口 - 逐步流式传输LLM令牌(ai-chat集成)
- 实时仪表板 - 向客户端推送指标和更新
- 协作编辑 - 使用CRDTs进行多用户文档/电子表格编辑
- 聊天应用 - 具有存在感的实时消息传递
- 多用户功能 - 光标跟踪、实时更新、存在感
- 离线优先应用 - 移动/PWA,支持重新连接时同步
协议选择框架
根据通信模式选择传输协议:
决策树
单向(仅服务器 → 客户端)
├─ LLM流式传输、通知、实时源
└─ 使用SSE(服务器发送事件)
├─ 自动重新连接(浏览器原生)
├─ 事件ID用于恢复
└─ 简单的HTTP实现
双向(客户端 ↔ 服务器)
├─ 聊天、游戏、协作编辑
└─ 使用WebSocket
├─ 需要手动重新连接
├─ 支持二进制和文本
└─ 双向通信的较低延迟
协作编辑
├─ 多用户文档/电子表格
└─ 使用WebSocket + CRDT(Yjs或Automerge)
├─ CRDT处理冲突解决
├─ WebSocket用于传输
└─ 离线优先并同步
点对点媒体
├─ 视频、屏幕共享、语音通话
└─ 使用WebRTC
├─ WebSocket用于信令
├─ 直接P2P连接
└─ STUN/TURN用于NAT穿越
协议比较
| 协议 | 方向 | 重新连接 | 复杂度 | 最佳用途 |
|---|---|---|---|---|
| SSE | 服务器 → 客户端 | 自动 | 低 | 实时源、LLM流式传输 |
| WebSocket | 双向 | 手动 | 中 | 聊天、游戏、协作 |
| WebRTC | 点对点 | 复杂 | 高 | 视频、屏幕共享、语音 |
实现模式
模式1:使用SSE的LLM流式传输
逐步流式传输LLM令牌到前端(ai-chat集成)。
Python(FastAPI):
from sse_starlette.sse import EventSourceResponse
@app.post("/chat/stream")
async def stream_chat(prompt: str):
async def generate():
async for chunk in llm_stream:
yield {"event": "token", "data": chunk.content}
yield {"event": "done", "data": "[DONE]"}
return EventSourceResponse(generate())
前端:
const es = new EventSource('/chat/stream')
es.addEventListener('token', (e) => appendToken(e.data))
参考 references/sse.md 获取完整实现、重新连接和事件ID恢复。
模式2:WebSocket聊天
聊天应用的双向通信。
Python(FastAPI):
connections: set[WebSocket] = set()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
connections.add(websocket)
try:
while True:
data = await websocket.receive_text()
for conn in connections:
await conn.send_text(data)
except WebSocketDisconnect:
connections.remove(websocket)
参考 references/websockets.md 获取多语言示例、认证、心跳和扩展。
模式3:使用CRDTs的协作编辑
使用Yjs进行无冲突的多用户编辑。
TypeScript(Yjs):
import * as Y from 'yjs'
import { WebsocketProvider } from 'y-websocket'
const doc = new Y.Doc()
const provider = new WebsocketProvider('ws://localhost:1234', 'doc-id', doc)
const ytext = doc.getText('content')
ytext.observe(event => console.log('Changes:', event.changes))
ytext.insert(0, 'Hello collaborative world!')
参考 references/crdts.md 获取冲突解决、Yjs vs Automerge和高级模式。
模式4:存在感
跟踪在线用户、光标位置和输入指示器。
Yjs Awareness API:
const awareness = provider.awareness
awareness.setLocalState({ user: { name: 'Alice' }, cursor: { x: 100, y: 200 } })
awareness.on('change', () => {
awareness.getStates().forEach((state, clientId) => {
renderCursor(state.cursor, state.user)
})
})
参考 references/presence-patterns.md 获取光标跟踪、输入指示器和在线状态。
模式5:离线同步(移动/PWA)
在本地排队变更,并在连接恢复时同步。
TypeScript(Yjs + IndexedDB):
import { IndexeddbPersistence } from 'y-indexeddb'
import { WebsocketProvider } from 'y-websocket'
const doc = new Y.Doc()
const indexeddbProvider = new IndexeddbPersistence('my-doc', doc)
const wsProvider = new WebsocketProvider('wss://api.example.com/sync', 'my-doc', doc)
wsProvider.on('status', (e) => {
console.log(e.status === 'connected' ? 'Online' : 'Offline')
})
参考 references/offline-sync.md 获取冲突解决和同步策略。
库推荐
Python
WebSocket:
websockets 13.x- 基于AsyncIO,生产就绪FastAPI WebSocket- 内置,依赖注入Flask-SocketIO- Socket.IO协议,带后备
SSE:
sse-starlette- FastAPI/Starlette,异步,基于生成器Flask-SSE- Redis后端用于pub/sub
Rust
WebSocket:
tokio-tungstenite 0.23- Tokio集成,生产就绪axum WebSocket- 内置提取器,tower中间件
SSE:
axum SSE- 原生支持,异步流
Go
WebSocket:
gorilla/websocket- 经久耐用,支持压缩nhooyr/websocket- 现代API,支持上下文
SSE:
net/http(原生) - Flusher接口,无依赖
TypeScript
WebSocket:
ws- 原生WebSocket服务器,轻量级Socket.io 4.x- 自动重新连接,后备,房间Hono WebSocket- 边缘运行时(Cloudflare Workers, Deno)
SSE:
EventSource(原生) - 浏览器原生,自动重试- Node.js
http(原生) - 服务器端,无依赖
CRDT:
Yjs- 成熟,TypeScript/Rust,富文本编辑Automerge- Rust/JS,JSON-like数据,时间旅行
重新连接策略
SSE: 浏览器的EventSource自动处理重新连接,采用指数回退。 WebSocket: 实施手动指数回退,带抖动以防雷群。
参考 references/sse.md 和 references/websockets.md 获取完整实现模式。
安全模式
认证: 使用基于cookie(同源)或在Sec-WebSocket-Protocol头中的令牌。 速率限制: 使用滑动窗口实施每用户消息节流。
参考 references/websockets.md 获取认证和速率限制实现。
使用Redis Pub/Sub进行扩展
对于水平扩展,使用Redis pub/sub在多个后端服务器之间广播消息。
参考 references/websockets.md 获取完整的Redis扩展实现。
前端集成
React Hooks模式
用于LLM流式传输的SSE(ai-chat):
useEffect(() => {
const es = new EventSource(`/api/chat/stream?prompt=${prompt}`)
es.addEventListener('token', (e) => setContent(prev => prev + e.data))
return () => es.close()
}, [prompt])
用于实时指标的WebSocket(仪表板):
useEffect(() => {
const ws = new WebSocket('ws://localhost:8000/metrics')
ws.onmessage = (e) => setMetrics(JSON.parse(e.data))
return () => ws.close()
}, [])
用于协作表格的Yjs:
useEffect(() => {
const doc = new Y.Doc()
const provider = new WebsocketProvider('ws://localhost:1234', docId, doc)
const yarray = doc.getArray('rows')
yarray.observe(() => setRows(yarray.toArray()))
return () => provider.destroy()
}, [docId])
参考文档
有关详细实现模式,请查阅:
references/sse.md- SSE协议、重新连接、事件IDreferences/websockets.md- WebSocket认证、心跳、扩展references/crdts.md- Yjs vs Automerge、冲突解决references/presence-patterns.md- 光标跟踪、输入指示器references/offline-sync.md- 移动模式、冲突策略
示例项目
可运行的工作实现可在以下位置找到:
examples/llm-streaming-sse/- FastAPI SSE用于LLM流式传输(可运行)examples/chat-websocket/- Python FastAPI + TypeScript聊天examples/collaborative-yjs/- Yjs协作编辑器
测试工具
使用脚本验证实现:
scripts/test_websocket_connection.py- WebSocket连接测试