name: streaming-llm-responses description: | 为AI聊天应用实现实时流式UI模式。适用于添加响应生命周期处理器、进度指示器、客户端效果或线程状态同步时。 涵盖onResponseStart/End、onEffect、ProgressUpdateEvent和客户端工具。 不适用于构建没有实时反馈的基础聊天功能。
流式LLM响应
构建具有流式反馈的响应式实时聊天界面。
快速开始
import { useChatKit } from "@openai/chatkit-react";
const chatkit = useChatKit({
api: { url: API_URL, domainKey: DOMAIN_KEY },
onResponseStart: () => setIsResponding(true),
onResponseEnd: () => setIsResponding(false),
onEffect: ({ name, data }) => {
if (name === "update_status") updateUI(data);
},
});
响应生命周期
用户发送消息
↓
onResponseStart() 触发
↓
[流式传输:令牌到达,显示ProgressUpdateEvents]
↓
onResponseEnd() 触发
↓
UI解锁,准备下一次交互
核心模式
1. 响应生命周期处理器
在AI响应期间锁定UI以防止竞态条件:
function ChatWithLifecycle() {
const [isResponding, setIsResponding] = useState(false);
const lockInteraction = useAppStore((s) => s.lockInteraction);
const unlockInteraction = useAppStore((s) => s.unlockInteraction);
const chatkit = useChatKit({
api: { url: API_URL, domainKey: DOMAIN_KEY },
onResponseStart: () => {
setIsResponding(true);
lockInteraction(); // 禁用地图/画布/表单交互
},
onResponseEnd: () => {
setIsResponding(false);
unlockInteraction();
},
onError: ({ error }) => {
console.error("ChatKit错误:", error);
setIsResponding(false);
unlockInteraction();
},
});
return (
<div>
{isResponding && <LoadingOverlay />}
<ChatKit control={chatkit.control} />
</div>
);
}
2. 客户端效果(即发即弃)
服务器发送效果以更新客户端UI,无需等待响应:
后端 - 流式效果:
from chatkit.types import ClientEffectEvent
async def respond(self, thread, item, context):
# ... 智能体处理 ...
# 触发客户端效果更新UI
yield ClientEffectEvent(
name="update_status",
data={
"state": {"energy": 80, "happiness": 90},
"flash": "状态已更新!"
}
)
# 另一个效果
yield ClientEffectEvent(
name="show_notification",
data={"message": "任务完成!"}
)
前端 - 处理效果:
const chatkit = useChatKit({
api: { url: API_URL, domainKey: DOMAIN_KEY },
onEffect: ({ name, data }) => {
switch (name) {
case "update_status":
applyStatusUpdate(data.state);
if (data.flash) setFlashMessage(data.flash);
break;
case "add_marker":
addMapMarker(data);
break;
case "select_mode":
setSelectionMode(data.mode);
break;
}
},
});
3. 进度更新
在长时间操作期间显示“正在搜索…”、“正在加载…”、“正在分析…”:
from chatkit.types import ProgressUpdateEvent
@function_tool
async def search_articles(ctx: AgentContext, query: str) -> str:
"""搜索与查询匹配的文章。"""
yield ProgressUpdateEvent(message="正在搜索文章...")
results = await article_store.search(query)
yield ProgressUpdateEvent(message=f"找到 {len(results)} 篇文章...")
for i, article in enumerate(results):
if i % 5 == 0:
yield ProgressUpdateEvent(
message=f"正在处理文章 {i+1}/{len(results)}..."
)
return format_results(results)
4. 线程生命周期事件
跟踪线程变化以实现持久化和UI更新:
const chatkit = useChatKit({
api: { url: API_URL, domainKey: DOMAIN_KEY },
onThreadChange: ({ threadId }) => {
setThreadId(threadId);
if (threadId) localStorage.setItem("lastThreadId", threadId);
clearSelections();
},
onThreadLoadStart: ({ threadId }) => {
setIsLoadingThread(true);
},
onThreadLoadEnd: ({ threadId }) => {
setIsLoadingThread(false);
},
});
5. 客户端工具(状态查询)
AI需要读取客户端状态以做出决策:
后端 - 定义客户端工具:
@function_tool(name_override="get_selected_items")
async def get_selected_items(ctx: AgentContext) -> dict:
"""获取画布上当前选中的项目。
这是一个客户端工具 - 在浏览器中执行,结果返回。
"""
yield ProgressUpdateEvent(message="正在读取选择...")
pass # 实际执行发生在客户端
前端 - 处理客户端工具:
const chatkit = useChatKit({
api: { url: API_URL, domainKey: DOMAIN_KEY },
onClientTool: ({ name, params }) => {
switch (name) {
case "get_selected_items":
return { itemIds: selectedItemIds };
case "get_current_viewport":
return {
center: mapRef.current.getCenter(),
zoom: mapRef.current.getZoom(),
};
case "get_form_data":
return { values: formRef.current.getValues() };
default:
throw new Error(`未知客户端工具: ${name}`);
}
},
});
客户端效果 vs 客户端工具
| 类型 | 方向 | 需要响应 | 使用场景 |
|---|---|---|---|
| 客户端效果 | 服务器 → 客户端 | 否(即发即弃) | 更新UI,显示通知 |
| 客户端工具 | 服务器 → 客户端 → 服务器 | 是(返回值) | 获取客户端状态供AI决策 |
按使用场景的常见模式
交互式地图/画布
onResponseStart: () => lockCanvas(),
onResponseEnd: () => unlockCanvas(),
onEffect: ({ name, data }) => {
if (name === "add_marker") addMarker(data);
if (name === "pan_to") panTo(data.location);
},
onClientTool: ({ name }) => {
if (name === "get_selection") return getSelectedItems();
},
基于表单的UI
onResponseStart: () => setFormDisabled(true),
onResponseEnd: () => setFormDisabled(false),
onClientTool: ({ name }) => {
if (name === "get_form_values") return form.getValues();
},
游戏/模拟
onResponseStart: () => pauseSimulation(),
onResponseEnd: () => resumeSimulation(),
onEffect: ({ name, data }) => {
if (name === "update_entity") updateEntity(data);
if (name === "show_notification") showToast(data.message);
},
线程标题生成
根据对话动态更新线程标题:
class TitleAgent:
async def generate_title(self, first_message: str) -> str:
result = await Runner.run(
Agent(
name="TitleGenerator",
instructions="生成3-5个词的标题。",
model="gpt-4o-mini", # 快速模型
),
input=f"第一条消息: {first_message}",
)
return result.final_output
# 在ChatKitServer中
async def respond(self, thread, item, context):
if not thread.title and item:
title = await self.title_agent.generate_title(item.content)
thread.title = title
await self.store.save_thread(thread, context)
反模式
- 响应期间不锁定UI - 导致竞态条件
- 在效果中阻塞 - 效果应该是即发即弃的
- 在onEffect中进行繁重计算 - 使用requestAnimationFrame进行DOM更新
- 缺少错误处理 - 始终处理onError以解锁UI
- 不持久化线程状态 - 使用onThreadChange保存上下文
验证
运行:python3 scripts/verify.py
预期:✓ streaming-llm-responses 技能就绪
如果验证失败
- 检查:references/文件夹中有streaming-patterns.md
- 如果仍然失败,停止并报告
参考资料
- references/streaming-patterns.md - 完整的流式配置