新闻详情

新闻详情

首页 / 资讯中心 / 详情

狼人杀 AI 对局:后端如何用 SSE 流式推送到前端?

发布时间:2026/6/25 22:47:28
狼人杀 AI 对局:后端如何用 SSE 流式推送到前端?
一、为什么要流式而不是等一局跑完再返回 JSON九人局里一个阶段可能连续触发多次 LLM 调用狼人讨论、白天发言、逐人投票……单次推理往往要几秒到十几秒。如果后端等 LangGraph 整段跑完再return {state: ...}前端只能转圈等待用户看不到Bot 正在想什么发言是否正在生成投票是否一人一人公布。所以我们采用 SSEServer-Sent Events在一次 HTTP 连接里后端持续推送多条 JSON 事件前端边收边更新 UI连接结束时再推一条done: true的最终快照。二、总体架构核心思想是一个队列多路生产者每个对局用thread_id区分对应一个asyncio.QueueLangGraph 在后台 task里跑不阻塞 SSE 写出图节点里产生的流式事件都投进同一个队列SSE 生成器只负责queue.get()→yield data: ...\n\n三、后端入口所有推进游戏的接口都走同一条流/start、/advance、/action/speak、/action/vote等最终都调用run_and_stream返回return StreamingResponse(event_stream(), media_typetext/event-stream)event_stream的逻辑async def run_and_stream(input_data, thread_id): thought_queue asyncio.Queue() register_thought_stream(thread_id, thought_queue, loop) async def graph_producer(): async for event in graph.astream_events(input_data, cfg(thread_id), versionv1): if event[event] on_chat_model_stream: chunk event[data][chunk] if chunk.content: await thought_queue.put({type: token, content: chunk.content}) await thought_queue.put(SENTINEL) # 图跑完 graph_task asyncio.create_task(graph_producer()) while True: item await thought_queue.get() if item is SENTINEL: break yield fdata: {json.dumps(item)}\n\n await graph_task s graph.get_state(cfg(thread_id)) final_data {state: pick_values(s.values), next: s.next, done: True} yield fdata: {json.dumps(final_data)}\n\n为什么用astream_eventsLangGraph 对 LangChain 模型调用会发出on_chat_model_stream事件可以拿到 LangChain 路径下的 token推给前端做消息打字机。为什么最后还要get_state流式过程中前端只做预览/增量节点结束时 state 可能还有 patch如__waiting_for__等人机交互。最终以 checkpoint 快照为准避免前后端状态漂移。四、同步 DSPy 节点怎么往异步 SSE 里推 tokenLangGraph 里很多节点调 DSPy同步而 SSE 消费在 asyncio 事件循环里。直接阻塞会卡死整个服务。stream_context.py线程本地 callback 跨线程入队# 全局thread_id → (Queue, EventLoop) _registries: dict[str, tuple] {} def setup_node_streaming(thread_id): queue, loop _registries[thread_id] def callback(token: str): loop.call_soon_threadsafe(queue.put_nowait, { type: thought_token, content: token, }) _local.thought_callback callback # threading.localDSPy 自定义 LMai_dspy/__init__.py在__call__里检测 callback有则streamTrue调 OpenAI API每个 token 回调callback get_thought_callback() if callback is not None: for chunk in stream: token chunk.choices[0].delta.content callback(token) # → 安全投进 asyncio.Queuegame_logic.py异步节点里用asyncio.to_thread包 DSPyasync def _run_dspy_streamed(thread_id, label, func, *args, **kwargs): emit_thought_event(thread_id, {type: thought_start, label: label}) def _run(): setup_node_streaming(thread_id) try: return func(*args, **kwargs) finally: teardown_node_streaming() return await asyncio.to_thread(_run) # 同步 DSPy 不阻塞事件循环白天发言等路径则用_run_llm_streamed同样to_threadsetup_node_streaming直接流式调 OpenAI-compatible API。五、为什么不用 WebSocketSSE 单向推送足够服务端 → 客户端操作仍用 POST。实现简单和 FastAPIStreamingResponse天然契合Demo 阶段性价比最高。
网站建设 高端定制 企业官网