实现实时同步Skill implementing-realtime-sync

此技能专注于实现实时通信系统,用于构建聊天应用、协作编辑工具、实时仪表板和流接口(如LLM响应)。关键技术和协议包括SSE、WebSocket、WebRTC和CRDTs,支持离线同步、存在感模式和扩展策略。适用于Python、Rust、Go、TypeScript等多种编程语言,方便SEO搜索实时通信、协作、同步、架构设计等关键词。

架构设计 0 次安装 0 次浏览 更新于 3/23/2026

name: implementing-realtime-sync description: 用于实时更新的实时通信模式,支持协作和存在感。在构建聊天应用、协作工具、实时仪表板或流接口(如LLM响应、指标)时使用。涵盖SSE(服务器发送事件,用于单向流)、WebSocket(双向通信)、WebRTC(点对点视频/音频)、CRDTs(Yjs、Automerge用于无冲突协作)、存在模式、离线同步和扩展策略。支持Python、Rust、Go和TypeScript。

实时同步

实现实时通信,用于应用程序中的实时更新、协作和存在感。

何时使用

在构建以下应用时使用此技能:

  • LLM流接口 - 逐步流式传输LLM令牌(ai-chat集成)
  • 实时仪表板 - 向客户端推送指标和更新
  • 协作编辑 - 使用CRDTs进行多用户文档/电子表格编辑
  • 聊天应用 - 具有存在感的实时消息传递
  • 多用户功能 - 光标跟踪、实时更新、存在感
  • 离线优先应用 - 移动/PWA,支持重新连接时同步

协议选择框架

根据通信模式选择传输协议:

决策树

单向(仅服务器 → 客户端)
├─ LLM流式传输、通知、实时源
└─ 使用SSE(服务器发送事件)
   ├─ 自动重新连接(浏览器原生)
   ├─ 事件ID用于恢复
   └─ 简单的HTTP实现

双向(客户端 ↔ 服务器)
├─ 聊天、游戏、协作编辑
└─ 使用WebSocket
   ├─ 需要手动重新连接
   ├─ 支持二进制和文本
   └─ 双向通信的较低延迟

协作编辑
├─ 多用户文档/电子表格
└─ 使用WebSocket + CRDT(Yjs或Automerge)
   ├─ CRDT处理冲突解决
   ├─ WebSocket用于传输
   └─ 离线优先并同步

点对点媒体
├─ 视频、屏幕共享、语音通话
└─ 使用WebRTC
   ├─ WebSocket用于信令
   ├─ 直接P2P连接
   └─ STUN/TURN用于NAT穿越

协议比较

协议 方向 重新连接 复杂度 最佳用途
SSE 服务器 → 客户端 自动 实时源、LLM流式传输
WebSocket 双向 手动 聊天、游戏、协作
WebRTC 点对点 复杂 视频、屏幕共享、语音

实现模式

模式1:使用SSE的LLM流式传输

逐步流式传输LLM令牌到前端(ai-chat集成)。

Python(FastAPI):

from sse_starlette.sse import EventSourceResponse

@app.post("/chat/stream")
async def stream_chat(prompt: str):
    async def generate():
        async for chunk in llm_stream:
            yield {"event": "token", "data": chunk.content}
        yield {"event": "done", "data": "[DONE]"}
    return EventSourceResponse(generate())

前端:

const es = new EventSource('/chat/stream')
es.addEventListener('token', (e) => appendToken(e.data))

参考 references/sse.md 获取完整实现、重新连接和事件ID恢复。

模式2:WebSocket聊天

聊天应用的双向通信。

Python(FastAPI):

connections: set[WebSocket] = set()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    connections.add(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            for conn in connections:
                await conn.send_text(data)
    except WebSocketDisconnect:
        connections.remove(websocket)

参考 references/websockets.md 获取多语言示例、认证、心跳和扩展。

模式3:使用CRDTs的协作编辑

使用Yjs进行无冲突的多用户编辑。

TypeScript(Yjs):

import * as Y from 'yjs'
import { WebsocketProvider } from 'y-websocket'

const doc = new Y.Doc()
const provider = new WebsocketProvider('ws://localhost:1234', 'doc-id', doc)
const ytext = doc.getText('content')

ytext.observe(event => console.log('Changes:', event.changes))
ytext.insert(0, 'Hello collaborative world!')

参考 references/crdts.md 获取冲突解决、Yjs vs Automerge和高级模式。

模式4:存在感

跟踪在线用户、光标位置和输入指示器。

Yjs Awareness API:

const awareness = provider.awareness
awareness.setLocalState({ user: { name: 'Alice' }, cursor: { x: 100, y: 200 } })
awareness.on('change', () => {
  awareness.getStates().forEach((state, clientId) => {
    renderCursor(state.cursor, state.user)
  })
})

参考 references/presence-patterns.md 获取光标跟踪、输入指示器和在线状态。

模式5:离线同步(移动/PWA)

在本地排队变更,并在连接恢复时同步。

TypeScript(Yjs + IndexedDB):

import { IndexeddbPersistence } from 'y-indexeddb'
import { WebsocketProvider } from 'y-websocket'

const doc = new Y.Doc()
const indexeddbProvider = new IndexeddbPersistence('my-doc', doc)
const wsProvider = new WebsocketProvider('wss://api.example.com/sync', 'my-doc', doc)

wsProvider.on('status', (e) => {
  console.log(e.status === 'connected' ? 'Online' : 'Offline')
})

参考 references/offline-sync.md 获取冲突解决和同步策略。

库推荐

Python

WebSocket:

  • websockets 13.x - 基于AsyncIO,生产就绪
  • FastAPI WebSocket - 内置,依赖注入
  • Flask-SocketIO - Socket.IO协议,带后备

SSE:

  • sse-starlette - FastAPI/Starlette,异步,基于生成器
  • Flask-SSE - Redis后端用于pub/sub

Rust

WebSocket:

  • tokio-tungstenite 0.23 - Tokio集成,生产就绪
  • axum WebSocket - 内置提取器,tower中间件

SSE:

  • axum SSE - 原生支持,异步流

Go

WebSocket:

  • gorilla/websocket - 经久耐用,支持压缩
  • nhooyr/websocket - 现代API,支持上下文

SSE:

  • net/http(原生) - Flusher接口,无依赖

TypeScript

WebSocket:

  • ws - 原生WebSocket服务器,轻量级
  • Socket.io 4.x - 自动重新连接,后备,房间
  • Hono WebSocket - 边缘运行时(Cloudflare Workers, Deno)

SSE:

  • EventSource(原生) - 浏览器原生,自动重试
  • Node.js http(原生) - 服务器端,无依赖

CRDT:

  • Yjs - 成熟,TypeScript/Rust,富文本编辑
  • Automerge - Rust/JS,JSON-like数据,时间旅行

重新连接策略

SSE: 浏览器的EventSource自动处理重新连接,采用指数回退。 WebSocket: 实施手动指数回退,带抖动以防雷群。

参考 references/sse.mdreferences/websockets.md 获取完整实现模式。

安全模式

认证: 使用基于cookie(同源)或在Sec-WebSocket-Protocol头中的令牌。 速率限制: 使用滑动窗口实施每用户消息节流。

参考 references/websockets.md 获取认证和速率限制实现。

使用Redis Pub/Sub进行扩展

对于水平扩展,使用Redis pub/sub在多个后端服务器之间广播消息。

参考 references/websockets.md 获取完整的Redis扩展实现。

前端集成

React Hooks模式

用于LLM流式传输的SSE(ai-chat):

useEffect(() => {
  const es = new EventSource(`/api/chat/stream?prompt=${prompt}`)
  es.addEventListener('token', (e) => setContent(prev => prev + e.data))
  return () => es.close()
}, [prompt])

用于实时指标的WebSocket(仪表板):

useEffect(() => {
  const ws = new WebSocket('ws://localhost:8000/metrics')
  ws.onmessage = (e) => setMetrics(JSON.parse(e.data))
  return () => ws.close()
}, [])

用于协作表格的Yjs:

useEffect(() => {
  const doc = new Y.Doc()
  const provider = new WebsocketProvider('ws://localhost:1234', docId, doc)
  const yarray = doc.getArray('rows')
  yarray.observe(() => setRows(yarray.toArray()))
  return () => provider.destroy()
}, [docId])

参考文档

有关详细实现模式,请查阅:

  • references/sse.md - SSE协议、重新连接、事件ID
  • references/websockets.md - WebSocket认证、心跳、扩展
  • references/crdts.md - Yjs vs Automerge、冲突解决
  • references/presence-patterns.md - 光标跟踪、输入指示器
  • references/offline-sync.md - 移动模式、冲突策略

示例项目

可运行的工作实现可在以下位置找到:

  • examples/llm-streaming-sse/ - FastAPI SSE用于LLM流式传输(可运行)
  • examples/chat-websocket/ - Python FastAPI + TypeScript聊天
  • examples/collaborative-yjs/ - Yjs协作编辑器

测试工具

使用脚本验证实现:

  • scripts/test_websocket_connection.py - WebSocket连接测试