
在 LLM 应用里,Streaming 从来不只是“把字一个一个吐出来”。对真实项目来说,它解决的是另外几个更硬的问题:首字延迟太长、Agent 调工具时用户以为系统卡死、长链路任务没有过程反馈、线上排障时看不见中间状态。LangChain 在官方文档里把 Streaming 能力拆成了 updates、messages、custom 三类,而且支持同时开启多种模式;LangChain Agent 本身又是构建在 LangGraph 之上的,所以这些能力本质上是“把 Agent 执行过程变成事件流”。
这篇文章不打算重复文档定义,而是从工程视角回答一个更实际的问题:
LangChain Streaming 在真实项目里应该怎么设计、怎么接入、怎么落地、会踩哪些坑?
一、为什么工程里一定要做 Streaming
很多文章会说 Streaming 能“提升用户体验”,但这句话太空。工程里真正的价值,通常体现在下面四件事上。
1)降低“首字可见时间”,不是只降低总耗时
很多 LLM 请求总耗时其实没有夸张到不能接受,问题在于用户在前 2~5 秒里什么都看不到。
对聊天系统来说,用户感知最强的指标往往不是“10 秒返回完”,而是“300ms 内先看到系统开始说话”。
messages 模式的价值就在这里。它会从所有发生 LLM 调用的节点里持续产出 (token, metadata),也就是你可以尽早把 token 推到前端,而不是等整个 Agent 完成后再统一返回。官方文档明确说明,messages 就是用来流式拿 LLM token 的。
2)Agent 调工具时,必须让用户知道“系统还活着”
普通聊天补全文本时,只要 token 一直在滚动,用户一般不会焦虑。
但 Agent 一旦开始调工具,事情就不一样了:模型可能先生成工具调用,再等待外部 API、数据库、检索服务、内部系统响应。这段时间如果没有反馈,用户很容易认为“卡住了”。
LangChain 的 updates 模式会在每个 agent step 后产出状态更新。对于一次典型的 tool 调用流程,官方文档给出的 step 顺序就是:
- model:产出工具调用请求
- tools:产出工具执行结果
- model:产出最终回答
这其实非常适合映射到前端状态:“正在分析问题” → “正在调用工具” → “工具返回中” → “正在组织最终答案”。
3)长链路任务需要“过程反馈”,而不是只有最终结果
比如:
- 客服 Agent 去查订单、查物流、查退款规则
- 数据分析 Agent 要走“取数 → 清洗 → 聚合 → 生成结论”
- 搜索 Agent 要查多个数据源,再做归纳
这里光有 messages 不够。因为 token 只能说明“模型在输出”,但不能说明“业务流程走到哪一步”。
这正是 custom 的作用:你可以在工具或图节点内部主动发业务进度,比如:
正在查询订单库已获取 120/500 条记录开始合并多数据源结果准备生成最终答复
官方文档把这个能力描述为:使用 get_stream_writer() 从图节点内部流出任意自定义数据。
4)Streaming 不只是给用户看,也是给工程师看的
这一点经常被忽略。
真实项目里,复杂 Agent 的问题通常不是“最终答错了”这么简单,而是:
- 模型到底有没有决定调用工具
- 工具参数是不是被拆碎了
- 工具返回后模型为什么又绕回去
- 哪一步最慢
- 某次异常到底发生在模型、工具、网络还是业务逻辑层
如果你只拿最终 answer,定位问题会非常痛苦。
而 updates + messages 的组合,天然就是一条轻量级执行观测链路:你能看到 step 级状态,也能看到 token 级输出,必要时还能把 custom 进度事件和 trace id 绑在一起做日志关联。LangGraph 文档也明确说明,v2 流式输出里每个 chunk 都是带 type 的 StreamPart,适合后端统一分发。
二、三种 Streaming 模式,工程里分别该怎么用
LangChain 官方把 Streaming 分成三类:
updatesmessagescustom
定义很简单,但工程里更重要的是:它们各自解决什么问题。
1)updates:看“流程步骤”,不看“逐字输出”
官方定义是“每个 agent step 之后流出状态更新”。如果放到工程里理解,它更像是:
Agent 执行阶段的状态流
最适合的场景:
- 前端展示“当前阶段”
- 后端记录 step 级日志
- 调试工具调用链路
- 做卡点定位和耗时统计
例如一次 create_agent 的典型执行中,你会看到:
model节点先产出一个 tool calltools节点返回一个 ToolMessagemodel节点再生成最终回复
这正好就是用户界面里“分析中 / 查数据中 / 回答中”的来源。
优点
- 粒度刚好,适合展示阶段状态
- 很适合调试 agent step
- 能拿到完成后的状态,而不只是碎片 token
缺点
- 不适合直接做打字机效果
- 前端如果只消费
updates,文本输出会显得“顿挫”
一句话判断
你的需求如果是“我想知道 Agent 现在跑到哪一步了”,优先看 updates。
2)messages:看“模型流式输出”,包括 token 和工具调用碎片
官方定义是“从发生 LLM 调用的图节点中流出 (token, metadata) 元组”。工程上可以理解成:
LLM 层的增量输出流
这不只是普通文本 token。文档明确展示了,在 Agent 场景下,messages 还会产出 tool_call_chunk,也就是工具调用参数生成过程中的增量片段。比如模型先吐出工具名,再逐步吐出参数 JSON 的各个片段。
最适合的场景:
- 聊天回复逐字显示
- 显示 reasoning / thinking token
- 想看模型是如何一步步拼出 tool call 的
- 想在前端做到“先看到内容,再等完成”
优点
- 首字响应最快
- 最适合用户可见的打字机效果
- 还能观察 reasoning 和 tool call 生成过程
缺点
- 数据是“增量碎片”,不是完成态
- 会出现
tool_call_chunk,前端如果直接拼文本,很容易被打乱 - 单用它时,不容易准确表达“当前处于哪个步骤”
一句话判断
你的需求如果是“我要尽快把模型输出推给用户”,优先看 messages。
3)custom:发业务进度,不依赖模型文本
官方给的能力非常直接:在工具或图节点里通过 get_stream_writer() 主动输出任意数据。文档还特别说明,加入 get_stream_writer() 后,这个工具就依赖 LangGraph 执行上下文,脱离该上下文单独调用会失败。
工程里可以把它理解成:
你自己定义的业务事件流
适合的场景:
- 进度提示
- 阶段说明
- 埋点 / 可观测性补充信息
- 把非 LangChain 原生流式源接进来
LangGraph 文档还提到,custom 可以用来包装任意外部流式客户端,即便那个 LLM API 本身不实现 LangChain chat model 接口,也可以通过 writer 手动往外发 chunk。
优点
- 最灵活
- 最贴合业务
- 特别适合长耗时工具
缺点
- 需要你自己设计事件协议
- 不能替代 token 流
- 用不好会让前后端协议变乱
一句话判断
你的需求如果是“我要告诉用户系统具体在做什么业务动作”,用 custom。
4)一个项目里怎么组合这三种模式
这是最关键的。
我自己的建议是:
- 简单聊天:
messages - 有工具但流程不复杂:
messages + updates - 复杂 Agent / 长耗时工具 / 多数据源任务:
messages + updates + custom
原因很简单:
messages负责“内容输出”updates负责“流程阶段”custom负责“业务进度”
三者不是互斥关系,而是三层不同视角。官方文档也明确支持把多个 stream_mode 作为列表同时传入,而且在 version="v2" 下,每个 chunk 都会带 type 字段,天然适合服务端做统一分发。
三、一个最小可运行示例:先把 Streaming 跑起来
下面先给一个尽量短、但工程上有意义的例子:
- 用
create_agent - 带一个 tool
- 同时开启
messages、updates、custom - 展示后端如何区分不同 chunk
说明:下面代码是基于 LangChain 官方 Streaming/Agents 文档能力写的最小整合版。要运行,需要你自己配置模型 provider 的 API Key。官方文档中的 Agent 示例使用了
create_agent(...),Streaming 通过stream()/astream()消费。
import jsonimport timefrom typing import Anyfrom langchain.agents import create_agentfrom langchain.tools import toolfrom langchain.messages import AIMessageChunk, AIMessage, ToolMessagefrom langgraph.config import get_stream_writer@tooldef query_order_status(order_id: str) -> str:"""查询订单状态"""writer = get_stream_writer()writer({"event": "custom_progress","stage": "tool_query_order","message": f"开始查询订单 {order_id}"})time.sleep(0.5)writer({"event": "custom_progress","stage": "tool_query_order","message": f"订单 {order_id} 查询成功"})return json.dumps({"order_id": order_id,"status": "shipped","carrier": "SF Express","eta": "2026-04-02"}, ensure_ascii=False)agent = create_agent(model="openai:gpt-5", # 换成你自己可用的模型tools=[query_order_status],)input_data = {"messages": [{"role": "user","content": "帮我查询订单 A10086 的状态,并用中文告诉我结果"}]}for chunk in agent.stream(input_data,stream_mode=["messages", "updates", "custom"],version="v2",):chunk_type = chunk["type"]if chunk_type == "messages":token, metadata = chunk["data"]# token 可能是普通文本 chunk,也可能是工具调用 chunkif isinstance(token, AIMessageChunk):if token.text:print("[TOKEN]", token.text, flush=True)if token.tool_call_chunks:print("[TOOL_CALL_CHUNK]", token.tool_call_chunks, flush=True)elif chunk_type == "updates":# updates 是“步骤完成后的状态”for step, data in chunk["data"].items():last_msg = data["messages"][-1]if isinstance(last_msg, AIMessage) and last_msg.tool_calls:print("[STEP]", step, "tool_calls =", last_msg.tool_calls, flush=True)elif isinstance(last_msg, ToolMessage):print("[STEP]", step, "tool_result =", last_msg.content, flush=True)else:print("[STEP]", step, "final =", getattr(last_msg, "content", None), flush=True)elif chunk_type == "custom":print("[CUSTOM]", chunk["data"], flush=True)
这段代码在工程里分别起什么作用
@tool + get_stream_writer()
这部分不是为了“让工具能工作”,而是为了让工具执行过程可以被外部观察。
例如查订单、查物流、跑 SQL、调用内部服务时,工具本身可能要 1~5 秒,这时只有最终返回值远远不够,最好中间也有阶段信息。官方文档就是把 custom 定位为“从工具执行中流出任意更新”。
stream_mode=["messages", "updates", "custom"]
这行代码是整个工程接入的核心。
它等于同时订阅三种视角:
messages:模型产生了什么增量内容updates:当前 step 的完成态是什么custom:业务内部自定义进度
这样后端可以只写一套消费循环,再根据 chunk["type"] 路由到不同处理器。LangGraph v2 文档明确推荐这种按 type 分发的方式。
token.text 和 token.tool_call_chunks
这里是最容易踩坑的地方。
很多人第一次看到 messages,以为拿到的一定是文本 token。其实文档已经明确展示了,模型在准备调用工具时,产出的可能是一串 tool_call_chunk,包括工具名、参数 JSON 的部分片段。
所以工程上不要这样做:
# 错误示意:把所有 messages 都当文本拼接full_text+=token.text
而应该区分:
- 文本 token → 给用户显示
- tool_call_chunk → 进入工具调用缓冲区,或者只用于调试/状态展示
updates 里的 completed state
如果你想拿到完成后的工具调用,只看 messages 不够,因为它给的是增量片段。
官方文档明确建议,在消息被 agent state 跟踪的场景下,用 stream_mode=["messages", "updates"] 来同时拿:
messages:看增量updates:看完成态的 message / tool_calls / ToolMessage
这在工程里非常重要。因为前端显示可以基于 token,但后端决定“现在是否已正式进入工具执行阶段”,更可靠的依据往往是 updates 里的完成态。
四、更接近真实项目的示例:FastAPI + SSE 把 LangChain 流转成前端可消费事件
真实项目里,前端通常不会直接理解 LangChain 的原始 chunk。
更稳妥的做法是:
- 后端消费 LangChain 的流式事件
- 转换成你自己的统一事件协议
- 通过 SSE 或 WebSocket 推给前端
这里我用 FastAPI + SSE 举例。原因很简单:
- 服务端实现简单
- 浏览器原生支持
- 对单向流式输出很友好
- 比 WebSocket 更适合“问一个问题,流一条回答”的聊天场景
五、先设计事件协议,再写代码
我推荐的前后端协议最少包含下面几类事件:
tokensteptool_starttool_endcustom_progressdoneerror
为什么这样拆
token
给前端做打字机效果,只放最终要显示给用户的文本 token。
step
告诉前端 Agent 当前处于哪个阶段,例如:
modeltoolsfinalizing
tool_start
模型已经确定要调用哪个工具,参数是什么。
tool_end
工具执行完成,拿到了什么结果。
custom_progress
工具内部更细的进度,例如“已拉取第 3 页数据”。
done
本轮对话流结束。
error
链路中的任何异常,都统一映射成这类事件,前端好处理。
六、FastAPI + SSE 后端示例
import asyncioimport jsonimport loggingfrom typing import AsyncIteratorfrom fastapi import FastAPI, Requestfrom fastapi.responses import StreamingResponsefrom langchain.agents import create_agentfrom langchain.tools import toolfrom langchain.messages import AIMessageChunk, AIMessage, ToolMessagefrom langgraph.config import get_stream_writerlogger = logging.getLogger(__name__)logging.basicConfig(level=logging.INFO)app = FastAPI()@tooldef search_kb(query: str) -> str:"""搜索知识库"""writer = get_stream_writer()writer({"event": "custom_progress","stage": "kb_search","message": f"开始搜索知识库: {query}"})# 模拟耗时import timetime.sleep(0.8)writer({"event": "custom_progress","stage": "kb_search","message": "知识库检索完成,命中 3 条候选结果"})return ("退款规则:商品签收后 7 天内支持无理由退款;""如已发货,退款到账时效 1~3 个工作日。")agent = create_agent(model="openai:gpt-5", # 替换为你自己的模型tools=[search_kb],)def sse_pack(event: str, data: dict) -> str:return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"@app.get("/chat/stream")async def chat_stream(request: Request, q: str):async def event_generator() -> AsyncIterator[str]:final_text_parts = []try:async for chunk in agent.astream({"messages": [{"role": "user", "content": q}]},stream_mode=["messages", "updates", "custom"],version="v2",):# 客户端断开连接时尽快停止if await request.is_disconnected():logger.info("client disconnected")breakchunk_type = chunk["type"]if chunk_type == "messages":token, metadata = chunk["data"]if isinstance(token, AIMessageChunk):# 1) 文本 token:直接推给前端展示if token.text:final_text_parts.append(token.text)yield sse_pack("token", {"text": token.text,"node": metadata.get("langgraph_node"),})# 2) 工具调用 chunk:不要当成文本展示if token.tool_call_chunks:yield sse_pack("tool_call_delta", {"chunks": token.tool_call_chunks,"node": metadata.get("langgraph_node"),})elif chunk_type == "updates":for step, data in chunk["data"].items():last_msg = data["messages"][-1]yield sse_pack("step", {"step": step,"message_type": last_msg.__class__.__name__,})if isinstance(last_msg, AIMessage) and last_msg.tool_calls:yield sse_pack("tool_start", {"step": step,"tool_calls": last_msg.tool_calls,})elif isinstance(last_msg, ToolMessage):yield sse_pack("tool_end", {"step": step,"tool_output": last_msg.content,})elif chunk_type == "custom":data = chunk["data"]yield sse_pack("custom_progress", data if isinstance(data, dict) else {"message": str(data)})yield sse_pack("done", {"text": "".join(final_text_parts)})except asyncio.CancelledError:logger.warning("request cancelled")raiseexcept Exception as e:logger.exception("stream failed")yield sse_pack("error", {"error": type(e).__name__,"message": str(e),})return StreamingResponse(event_generator(),media_type="text/event-stream",headers={"Cache-Control": "no-cache","Connection": "keep-alive","X-Accel-Buffering": "no",},)
这段后端代码的工程意义
1)不要把 LangChain 原始 chunk 直接透传给前端
原始 chunk 是框架内部语义,前端不应该依赖太深。
比如今天是 AIMessageChunk.tool_call_chunks,未来框架结构变了,你前端就会被迫改。更稳妥的做法是后端做一层协议转换,前端只认你定义的事件名。
2)token 和 tool_call_delta 一定要分开
这是 Streaming 接入里最常见的 bug。
因为 messages 里既可能有文本 token,也可能有 tool_call_chunk。官方文档展示得非常清楚:调用工具时,模型会一段一段流出工具名和参数 JSON。
如果你把这些 chunk 当普通文本显示,前端就会出现:
- 屏幕上冒出半截 JSON
- 文本还没结束就突然插入工具参数
- 用户看到一堆内部结构
所以正确做法是:
token.text→ 用户可见文本tool_call_chunks→ 单独处理,最多拿来做调试或状态提示
3)工具真正“开始执行”的判断,尽量基于 updates
messages 里的 tool_call_chunk 只是“模型正在生成调用意图”,还不代表工具已经真正执行。
而 updates 里拿到完成态的 AIMessage.tool_calls,以及后续的 ToolMessage,更适合映射成 tool_start / tool_end。官方文档也建议在需要拿到完成后的 parsed tool call 时,把 messages 和 updates 一起开。
4)request.is_disconnected() 很重要
很多线上 Streaming 接口的问题不是“不会流”,而是“用户关了页面后后端还在傻跑”。
SSE / WebSocket 场景都要考虑客户端中断:
- 浏览器切页
- 用户手动停止
- 网关断开
- 手机网络切换
不及时取消,模型调用和工具调用会继续消耗资源。
七、前端应该如何理解这些事件
一个稳定的聊天前端,通常应该维护一个明确的状态机,而不是“收到什么就 render 什么”。
我建议至少区分四种可见状态:
- 模型正在思考
触发时机:收到
step(model),但还没有用户可见文本 token - 正在调用工具
触发时机:收到
tool_start - 工具执行完成,正在组织答案
触发时机:收到
tool_end,但最终 answer token 还没开始 - 最终回答输出中
触发时机:收到文本
token
这样用户对系统行为的预期是稳定的,不会因为工具链路中间断流就以为系统死掉。
八、工程里最容易踩的坑
下面这部分最重要。
坑 1:为什么 messages 模式下会出现 tool_call_chunk,而不是直接文本?
因为 messages 流的是LLM 的增量消息块,不是“面向用户的最终文本”。
官方文档明确写到,在 Streaming tool calls 场景里,你可能既想拿到“工具调用生成中的部分 JSON”,也想拿到“最终被执行的完整 tool calls”。messages 给的是前者的增量片段,所以会出现 tool_call_chunk。完成态则需要结合 updates 或你自己聚合。
工程结论:
messages不是“纯文本流”- 它是“模型输出流”
- 模型输出里当然可能包含工具调用片段
坑 2:为什么 custom 里的 get_stream_writer() 不能脱离 LangGraph 执行上下文单独调用?
官方文档直接说明了这一点:如果你在工具里加了 get_stream_writer(),那这个工具就不能脱离 LangGraph 执行上下文独立调用。
原因用工程语言解释就是:
get_stream_writer()不是普通全局对象,它依赖当前 graph run 的 runtime context。没有这层上下文,就没有地方把事件写出去。
这会带来一个实际问题:
很多人会把 tool 单独写单元测试,然后直接调用函数,结果一跑就炸。
更稳妥的写法有两个:
方案 A:给工具核心逻辑和流式包装分层
def _query_order_core(order_id: str) -> dict:return {"order_id": order_id, "status": "shipped"}@tooldef query_order(order_id: str) -> str:writer = get_stream_writer()writer({"event": "progress", "message": "start"})result = _query_order_core(order_id)writer({"event": "progress", "message": "done"})return json.dumps(result, ensure_ascii=False)
这样 _query_order_core() 可以单独测试,query_order() 负责 LangChain 包装。
方案 B:让 writer 变成可注入依赖
这样普通环境传 None,流式环境传真实 writer。
LangGraph 文档还提到,在 Python < 3.11 的 async 环境里,get_stream_writer() 不可用,需要改成显式传 writer。这个思路本身也说明:writer 最好在工程上被视为一种依赖,而不是业务函数的唯一入口。
坑 3:前端如何区分“模型正在思考”“正在调用工具”“工具执行完成”“最终回答输出中”?
不要试图仅靠 token 猜。
更稳的做法是组合事件:
step(model)且暂无文本 → 思考中tool_start→ 调工具中tool_end→ 工具完成- 首个文本
token→ 最终回答输出中 done→ 结束
也就是说,前端状态主要看 updates,文本展示主要看 messages。
坑 4:如何避免前端一边展示 token,一边被 tool call 打乱?
这是最常见的前端渲染问题。
原则只有一条:不要把所有 messages 里的 chunk 都拼进同一个文本缓冲区。
推荐做法:
token.text→ 追加到answer_buffertoken.tool_call_chunks→ 追加到tool_call_buffertool_call_buffer不直接给用户展示,只用于:- 调试日志
- 可选的“正在调用 xxx” 状态提示
- 和
updates的完成态对账
如果你真的想展示工具调用过程,也要单独渲染成“系统状态”,不要混进回答正文。
坑 5:多种 stream mode 同时开启时,后端事件分发怎么写?
不要写一大坨 if else 然后直接业务逻辑塞进去。
建议抽成标准分发器:
handlers = {"messages": handle_messages,"updates": handle_updates,"custom": handle_custom,}async for chunk in agent.astream(...):handler = handlers.get(chunk["type"])if handler:async for event in handler(chunk):yield event
因为 LangGraph v2 的核心约定就是:每个 chunk 都是带 type 的 StreamPart。既然框架已经把事件类型分好了,后端最好也按事件总线的方式处理。
坑 6:异常处理、超时处理、取消请求处理怎么做?
这块是 Streaming 落地里最容易被忽略的地方。
异常处理
无论是模型异常、工具异常、序列化异常,最终都要变成统一的 error 事件返回给前端。
不要让连接直接断掉,否则前端体验会非常糟。
超时处理
至少要分两层:
- 模型调用超时
- 工具调用超时
Agent 文档里就展示了模型实例可以设置 timeout。
工程上建议:
- 模型 timeout 比网关 timeout 更短
- 工具 timeout 再按业务单独控制
- 超时异常进入统一
error事件
取消请求
SSE 场景用 request.is_disconnected();WebSocket 场景则监听断连。
取消之后应尽快:
- 停止继续向客户端写
- 中断后续工具调用
- 记录 cancel 日志
坑 7:日志怎么记,才能排查线上问题?
推荐至少记录三层日志:
第一层:请求级
- request_id
- session_id
- user_id
- trace_id
- prompt 摘要
- 开始/结束时间
第二层:step 级
- step 名称
- step 开始/结束时间
- tool name
- tool args 摘要
- tool result 摘要
- 耗时
第三层:流事件级
- token 数量
- 首字时间
- done 时间
- error 类型
- 客户端是否中断
最关键的一点是:
日志要和流式事件协议保持同构。
例如你已经定义了:
tool_starttool_endcustom_progress
那日志里最好也对应这三个阶段。这样线上复盘时,你看日志就像在“回放事件流”。
九、工程建议:不同项目该怎么选 Streaming 方案
1)什么场景只用 messages 就够了
适合:
- 简单聊天机器人
- 没有工具调用,或者工具调用很轻
- 只关心首字响应和打字机效果
- 前端只要“内容流”,不要过程状态
例如一个纯问答助手,大部分时候只输出文本,没有明显的业务步骤。
判断标准:
如果你的前端 UI 不打算展示“正在查库 / 正在检索 / 正在调用工具”,那只用 messages 完全可以。
2)什么场景必须配合 updates
适合:
- Agent 会调工具
- 想明确知道当前执行阶段
- 想拿到完成态 tool_calls / ToolMessage
- 想做线上调试和可观测性
官方文档对工具调用流式的建议,本质上就是:
只看 messages 你拿到的是增量碎片;想拿完成态,要结合 updates。
我的建议:
只要是 Agent + Tools,就默认把 updates 打开。
3)什么场景应该补充 custom
适合:
- 工具执行耗时明显
- 需要业务进度反馈
- 有多数据源、多阶段任务
- 你想把内部系统状态暴露给前端
- 需要补充埋点、调试信息、阶段说明
比如:
- 搜索 5 个数据源
- 跑 SQL + 再做汇总
- 文件解析 + OCR + 结构化抽取
- 复杂客服流程:查订单、查库存、查规则、决定方案
这些场景如果只靠 messages 和 updates,信息还是不够细。
这时候 custom 最有价值。
4)中小项目怎么选
中小项目目标一般是“尽快上线,不要把协议搞太复杂”。
我建议:
- 后端:SSE
- LangChain:
messages + updates - 协议:
token / step / tool_start / tool_end / done / error custom暂时只给长耗时工具加
这样已经能覆盖 80% 的需求。
5)复杂 Agent 项目怎么选
复杂项目往往有这些特征:
- 多工具
- 多跳推理
- 多数据源
- 子 Agent / 子图
- 线上需要排障
我建议:
- 后端:SSE 或 WebSocket 都行,但事件协议必须稳定
- LangChain:默认
messages + updates + custom - 每个事件都带:
- request_id
- run_id
- step
- timestamp
- optional trace metadata
- 日志和事件协议统一
- 前端维护显式状态机,而不是“收到啥渲染啥”
十、我推荐的默认实践方案
这部分给一套能直接落地的大多数业务系统的做法。
默认后端方案
- 框架:FastAPI
- 推送协议:SSE
- LangChain 调用方式:
astream(..., stream_mode=["messages", "updates", "custom"], version="v2") - 服务端做一层事件适配,不直接暴露 LangChain 原始 chunk
默认事件协议
前端只认这几类:
tokensteptool_starttool_endcustom_progressdoneerror
默认消费规则
messages- 文本 token →
token tool_call_chunk→ 只进内部缓冲,不直接显示
- 文本 token →
updatesAIMessage.tool_calls→tool_startToolMessage→tool_end- 其他 step →
step
custom- 统一转成
custom_progress
- 统一转成
默认前端规则
- 用
token做正文打字机输出 - 用
step/tool_start/tool_end/custom_progress做状态栏 - 不把工具调用碎片混进正文
- 维护清晰状态机:
- thinking
- calling_tool
- finalizing
- streaming_answer
- done
- error
默认日志规则
每次请求至少记录:
request_id- 首字时间
- 总耗时
- step 耗时
- tool 名称与参数摘要
- error 类型
- 是否客户端中断
默认代码组织建议
不要把 writer、业务逻辑、工具包装混在一起。
推荐分层:
service/xxx.py:纯业务逻辑,可单测tools/xxx.py:LangChain tool 包装,负责get_stream_writer()streaming/adapter.py:把 LangChain chunk 转成前端事件api/chat.py:SSE/WebSocket 接口
这样后面你要替换模型、替换推送协议、替换前端协议,都不会牵一发动全身。
总结
LangChain Streaming 真正有价值的地方,不是“能流式”,而是它把 Agent 执行过程拆成了三层可观测信号:
messages:模型增量输出,解决首字延迟和打字机效果updates:step 完成态,解决阶段可见性和工具链路调试custom:业务自定义进度,解决长任务反馈和可观测性补充
如果只把 Streaming 理解成“让回答一个字一个字往外蹦”,那它的价值只用了 30%。
真正适合工程落地的做法,是把它当成一条事件流总线:后端消费 LangChain 流,转成你自己的前端协议和日志协议,再把用户体验、调试体验、线上可观测性统一起来。官方文档已经把基础能力铺好了,关键在于你不要停留在 API 层,而要把这些 chunk 设计成系统行为。
推荐落地方案
如果你现在要给一个大多数业务系统上 Streaming,我推荐直接用下面这套默认架构:
方案选型
- 后端传输:SSE
- LangChain 模式:
messages + updates默认开启 - 复杂工具链路:补
custom - 流式版本:统一用
version="v2"
事件协议
token:只传用户可见文本step:阶段更新tool_start:工具开始tool_end:工具结束custom_progress:业务进度done:完成error:错误
默认策略
- 简单聊天:
messages - 带工具的普通 Agent:
messages + updates - 复杂 Agent / 长耗时任务:
messages + updates + custom
最重要的三条工程原则
- 不要把
messages里的所有 chunk 都当成文本。 - 不要让前端直接依赖 LangChain 原始 chunk 结构。
- 不要只做用户可见 Streaming,不做日志和状态流。
按这套方案做,基本能覆盖绝大多数客服、搜索、知识库问答、数据查询类 LLM 应用,而且后面要扩展可观测性、取消请求、问题排查,也不会推翻重来。