SERIES · Langchain Agent 应用开发

LangChain Streaming:会不会流式输出?

2026-03-31 · 20 min read · by GUMP

LangChain Streaming:会不会流式输出?

在 LLM 应用里,Streaming 从来不只是“把字一个一个吐出来”。对真实项目来说,它解决的是另外几个更硬的问题:首字延迟太长、Agent 调工具时用户以为系统卡死、长链路任务没有过程反馈、线上排障时看不见中间状态。LangChain 在官方文档里把 Streaming 能力拆成了 updates、messages、custom 三类,而且支持同时开启多种模式;LangChain Agent 本身又是构建在 LangGraph 之上的,所以这些能力本质上是“把 Agent 执行过程变成事件流”。

这篇文章不打算重复文档定义,而是从工程视角回答一个更实际的问题:

LangChain Streaming 在真实项目里应该怎么设计、怎么接入、怎么落地、会踩哪些坑?


一、为什么工程里一定要做 Streaming

很多文章会说 Streaming 能“提升用户体验”,但这句话太空。工程里真正的价值,通常体现在下面四件事上。

1)降低“首字可见时间”,不是只降低总耗时

很多 LLM 请求总耗时其实没有夸张到不能接受,问题在于用户在前 2~5 秒里什么都看不到。

对聊天系统来说,用户感知最强的指标往往不是“10 秒返回完”,而是“300ms 内先看到系统开始说话”。

messages 模式的价值就在这里。它会从所有发生 LLM 调用的节点里持续产出 (token, metadata),也就是你可以尽早把 token 推到前端,而不是等整个 Agent 完成后再统一返回。官方文档明确说明,messages 就是用来流式拿 LLM token 的。

2)Agent 调工具时,必须让用户知道“系统还活着”

普通聊天补全文本时,只要 token 一直在滚动,用户一般不会焦虑。

但 Agent 一旦开始调工具,事情就不一样了:模型可能先生成工具调用,再等待外部 API、数据库、检索服务、内部系统响应。这段时间如果没有反馈,用户很容易认为“卡住了”。

LangChain 的 updates 模式会在每个 agent step 后产出状态更新。对于一次典型的 tool 调用流程,官方文档给出的 step 顺序就是:

  • model:产出工具调用请求
  • tools:产出工具执行结果
  • model:产出最终回答

这其实非常适合映射到前端状态:“正在分析问题” → “正在调用工具” → “工具返回中” → “正在组织最终答案”。

3)长链路任务需要“过程反馈”,而不是只有最终结果

比如:

  • 客服 Agent 去查订单、查物流、查退款规则
  • 数据分析 Agent 要走“取数 → 清洗 → 聚合 → 生成结论”
  • 搜索 Agent 要查多个数据源,再做归纳

这里光有 messages 不够。因为 token 只能说明“模型在输出”,但不能说明“业务流程走到哪一步”。

这正是 custom 的作用:你可以在工具或图节点内部主动发业务进度,比如:

  • 正在查询订单库
  • 已获取 120/500 条记录
  • 开始合并多数据源结果
  • 准备生成最终答复

官方文档把这个能力描述为:使用 get_stream_writer() 从图节点内部流出任意自定义数据。

4)Streaming 不只是给用户看,也是给工程师看的

这一点经常被忽略。

真实项目里,复杂 Agent 的问题通常不是“最终答错了”这么简单,而是:

  • 模型到底有没有决定调用工具
  • 工具参数是不是被拆碎了
  • 工具返回后模型为什么又绕回去
  • 哪一步最慢
  • 某次异常到底发生在模型、工具、网络还是业务逻辑层

如果你只拿最终 answer,定位问题会非常痛苦。

updates + messages 的组合,天然就是一条轻量级执行观测链路:你能看到 step 级状态,也能看到 token 级输出,必要时还能把 custom 进度事件和 trace id 绑在一起做日志关联。LangGraph 文档也明确说明,v2 流式输出里每个 chunk 都是带 typeStreamPart,适合后端统一分发。


二、三种 Streaming 模式,工程里分别该怎么用

LangChain 官方把 Streaming 分成三类:

  • updates
  • messages
  • custom

定义很简单,但工程里更重要的是:它们各自解决什么问题


1)updates:看“流程步骤”,不看“逐字输出”

官方定义是“每个 agent step 之后流出状态更新”。如果放到工程里理解,它更像是:

Agent 执行阶段的状态流

最适合的场景:

  • 前端展示“当前阶段”
  • 后端记录 step 级日志
  • 调试工具调用链路
  • 做卡点定位和耗时统计

例如一次 create_agent 的典型执行中,你会看到:

  • model 节点先产出一个 tool call
  • tools 节点返回一个 ToolMessage
  • model 节点再生成最终回复

这正好就是用户界面里“分析中 / 查数据中 / 回答中”的来源。

优点

  • 粒度刚好,适合展示阶段状态
  • 很适合调试 agent step
  • 能拿到完成后的状态,而不只是碎片 token

缺点

  • 不适合直接做打字机效果
  • 前端如果只消费 updates,文本输出会显得“顿挫”

一句话判断

你的需求如果是“我想知道 Agent 现在跑到哪一步了”,优先看 updates


2)messages:看“模型流式输出”,包括 token 和工具调用碎片

官方定义是“从发生 LLM 调用的图节点中流出 (token, metadata) 元组”。工程上可以理解成:

LLM 层的增量输出流

这不只是普通文本 token。文档明确展示了,在 Agent 场景下,messages 还会产出 tool_call_chunk,也就是工具调用参数生成过程中的增量片段。比如模型先吐出工具名,再逐步吐出参数 JSON 的各个片段。

最适合的场景:

  • 聊天回复逐字显示
  • 显示 reasoning / thinking token
  • 想看模型是如何一步步拼出 tool call 的
  • 想在前端做到“先看到内容,再等完成”

优点

  • 首字响应最快
  • 最适合用户可见的打字机效果
  • 还能观察 reasoning 和 tool call 生成过程

缺点

  • 数据是“增量碎片”,不是完成态
  • 会出现 tool_call_chunk,前端如果直接拼文本,很容易被打乱
  • 单用它时,不容易准确表达“当前处于哪个步骤”

一句话判断

你的需求如果是“我要尽快把模型输出推给用户”,优先看 messages


3)custom:发业务进度,不依赖模型文本

官方给的能力非常直接:在工具或图节点里通过 get_stream_writer() 主动输出任意数据。文档还特别说明,加入 get_stream_writer() 后,这个工具就依赖 LangGraph 执行上下文,脱离该上下文单独调用会失败。

工程里可以把它理解成:

你自己定义的业务事件流

适合的场景:

  • 进度提示
  • 阶段说明
  • 埋点 / 可观测性补充信息
  • 把非 LangChain 原生流式源接进来

LangGraph 文档还提到,custom 可以用来包装任意外部流式客户端,即便那个 LLM API 本身不实现 LangChain chat model 接口,也可以通过 writer 手动往外发 chunk。

优点

  • 最灵活
  • 最贴合业务
  • 特别适合长耗时工具

缺点

  • 需要你自己设计事件协议
  • 不能替代 token 流
  • 用不好会让前后端协议变乱

一句话判断

你的需求如果是“我要告诉用户系统具体在做什么业务动作”,用 custom


4)一个项目里怎么组合这三种模式

这是最关键的。

我自己的建议是:

  • 简单聊天messages
  • 有工具但流程不复杂messages + updates
  • 复杂 Agent / 长耗时工具 / 多数据源任务messages + updates + custom

原因很简单:

  • messages 负责“内容输出”
  • updates 负责“流程阶段”
  • custom 负责“业务进度”

三者不是互斥关系,而是三层不同视角。官方文档也明确支持把多个 stream_mode 作为列表同时传入,而且在 version="v2" 下,每个 chunk 都会带 type 字段,天然适合服务端做统一分发。


三、一个最小可运行示例:先把 Streaming 跑起来

下面先给一个尽量短、但工程上有意义的例子:

  • create_agent
  • 带一个 tool
  • 同时开启 messagesupdatescustom
  • 展示后端如何区分不同 chunk

说明:下面代码是基于 LangChain 官方 Streaming/Agents 文档能力写的最小整合版。要运行,需要你自己配置模型 provider 的 API Key。官方文档中的 Agent 示例使用了 create_agent(...),Streaming 通过 stream() / astream() 消费。

python
import json
import time
from typing import Any
from langchain.agents import create_agent
from langchain.tools import tool
from langchain.messages import AIMessageChunk, AIMessage, ToolMessage
from langgraph.config import get_stream_writer
@tool
def query_order_status(order_id: str) -> str:
"""查询订单状态"""
writer = get_stream_writer()
writer({
"event": "custom_progress",
"stage": "tool_query_order",
"message": f"开始查询订单 {order_id}"
})
time.sleep(0.5)
writer({
"event": "custom_progress",
"stage": "tool_query_order",
"message": f"订单 {order_id} 查询成功"
})
return json.dumps({
"order_id": order_id,
"status": "shipped",
"carrier": "SF Express",
"eta": "2026-04-02"
}, ensure_ascii=False)
agent = create_agent(
model="openai:gpt-5", # 换成你自己可用的模型
tools=[query_order_status],
)
input_data = {
"messages": [
{
"role": "user",
"content": "帮我查询订单 A10086 的状态,并用中文告诉我结果"
}
]
}
for chunk in agent.stream(
input_data,
stream_mode=["messages", "updates", "custom"],
version="v2",
):
chunk_type = chunk["type"]
if chunk_type == "messages":
token, metadata = chunk["data"]
# token 可能是普通文本 chunk,也可能是工具调用 chunk
if isinstance(token, AIMessageChunk):
if token.text:
print("[TOKEN]", token.text, flush=True)
if token.tool_call_chunks:
print("[TOOL_CALL_CHUNK]", token.tool_call_chunks, flush=True)
elif chunk_type == "updates":
# updates 是“步骤完成后的状态”
for step, data in chunk["data"].items():
last_msg = data["messages"][-1]
if isinstance(last_msg, AIMessage) and last_msg.tool_calls:
print("[STEP]", step, "tool_calls =", last_msg.tool_calls, flush=True)
elif isinstance(last_msg, ToolMessage):
print("[STEP]", step, "tool_result =", last_msg.content, flush=True)
else:
print("[STEP]", step, "final =", getattr(last_msg, "content", None), flush=True)
elif chunk_type == "custom":
print("[CUSTOM]", chunk["data"], flush=True)

这段代码在工程里分别起什么作用

@tool + get_stream_writer()

这部分不是为了“让工具能工作”,而是为了让工具执行过程可以被外部观察。

例如查订单、查物流、跑 SQL、调用内部服务时,工具本身可能要 1~5 秒,这时只有最终返回值远远不够,最好中间也有阶段信息。官方文档就是把 custom 定位为“从工具执行中流出任意更新”。

stream_mode=["messages", "updates", "custom"]

这行代码是整个工程接入的核心。

它等于同时订阅三种视角:

  • messages:模型产生了什么增量内容
  • updates:当前 step 的完成态是什么
  • custom:业务内部自定义进度

这样后端可以只写一套消费循环,再根据 chunk["type"] 路由到不同处理器。LangGraph v2 文档明确推荐这种按 type 分发的方式。

token.texttoken.tool_call_chunks

这里是最容易踩坑的地方。

很多人第一次看到 messages,以为拿到的一定是文本 token。其实文档已经明确展示了,模型在准备调用工具时,产出的可能是一串 tool_call_chunk,包括工具名、参数 JSON 的部分片段。

所以工程上不要这样做:

python
# 错误示意:把所有 messages 都当文本拼接
full_text+=token.text

而应该区分:

  • 文本 token → 给用户显示
  • tool_call_chunk → 进入工具调用缓冲区,或者只用于调试/状态展示

updates 里的 completed state

如果你想拿到完成后的工具调用,只看 messages 不够,因为它给的是增量片段。

官方文档明确建议,在消息被 agent state 跟踪的场景下,用 stream_mode=["messages", "updates"] 来同时拿:

  • messages:看增量
  • updates:看完成态的 message / tool_calls / ToolMessage

这在工程里非常重要。因为前端显示可以基于 token,但后端决定“现在是否已正式进入工具执行阶段”,更可靠的依据往往是 updates 里的完成态。


四、更接近真实项目的示例:FastAPI + SSE 把 LangChain 流转成前端可消费事件

真实项目里,前端通常不会直接理解 LangChain 的原始 chunk。

更稳妥的做法是:

  1. 后端消费 LangChain 的流式事件
  2. 转换成你自己的统一事件协议
  3. 通过 SSE 或 WebSocket 推给前端

这里我用 FastAPI + SSE 举例。原因很简单:

  • 服务端实现简单
  • 浏览器原生支持
  • 对单向流式输出很友好
  • 比 WebSocket 更适合“问一个问题,流一条回答”的聊天场景

五、先设计事件协议,再写代码

我推荐的前后端协议最少包含下面几类事件:

  • token
  • step
  • tool_start
  • tool_end
  • custom_progress
  • done
  • error

为什么这样拆

token

给前端做打字机效果,只放最终要显示给用户的文本 token。

step

告诉前端 Agent 当前处于哪个阶段,例如:

  • model
  • tools
  • finalizing

tool_start

模型已经确定要调用哪个工具,参数是什么。

tool_end

工具执行完成,拿到了什么结果。

custom_progress

工具内部更细的进度,例如“已拉取第 3 页数据”。

done

本轮对话流结束。

error

链路中的任何异常,都统一映射成这类事件,前端好处理。


六、FastAPI + SSE 后端示例

python
import asyncio
import json
import logging
from typing import AsyncIterator
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from langchain.agents import create_agent
from langchain.tools import tool
from langchain.messages import AIMessageChunk, AIMessage, ToolMessage
from langgraph.config import get_stream_writer
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
app = FastAPI()
@tool
def search_kb(query: str) -> str:
"""搜索知识库"""
writer = get_stream_writer()
writer({
"event": "custom_progress",
"stage": "kb_search",
"message": f"开始搜索知识库: {query}"
})
# 模拟耗时
import time
time.sleep(0.8)
writer({
"event": "custom_progress",
"stage": "kb_search",
"message": "知识库检索完成,命中 3 条候选结果"
})
return (
"退款规则:商品签收后 7 天内支持无理由退款;"
"如已发货,退款到账时效 1~3 个工作日。"
)
agent = create_agent(
model="openai:gpt-5", # 替换为你自己的模型
tools=[search_kb],
)
def sse_pack(event: str, data: dict) -> str:
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
@app.get("/chat/stream")
async def chat_stream(request: Request, q: str):
async def event_generator() -> AsyncIterator[str]:
final_text_parts = []
try:
async for chunk in agent.astream(
{
"messages": [
{"role": "user", "content": q}
]
},
stream_mode=["messages", "updates", "custom"],
version="v2",
):
# 客户端断开连接时尽快停止
if await request.is_disconnected():
logger.info("client disconnected")
break
chunk_type = chunk["type"]
if chunk_type == "messages":
token, metadata = chunk["data"]
if isinstance(token, AIMessageChunk):
# 1) 文本 token:直接推给前端展示
if token.text:
final_text_parts.append(token.text)
yield sse_pack("token", {
"text": token.text,
"node": metadata.get("langgraph_node"),
})
# 2) 工具调用 chunk:不要当成文本展示
if token.tool_call_chunks:
yield sse_pack("tool_call_delta", {
"chunks": token.tool_call_chunks,
"node": metadata.get("langgraph_node"),
})
elif chunk_type == "updates":
for step, data in chunk["data"].items():
last_msg = data["messages"][-1]
yield sse_pack("step", {
"step": step,
"message_type": last_msg.__class__.__name__,
})
if isinstance(last_msg, AIMessage) and last_msg.tool_calls:
yield sse_pack("tool_start", {
"step": step,
"tool_calls": last_msg.tool_calls,
})
elif isinstance(last_msg, ToolMessage):
yield sse_pack("tool_end", {
"step": step,
"tool_output": last_msg.content,
})
elif chunk_type == "custom":
data = chunk["data"]
yield sse_pack("custom_progress", data if isinstance(data, dict) else {
"message": str(data)
})
yield sse_pack("done", {
"text": "".join(final_text_parts)
})
except asyncio.CancelledError:
logger.warning("request cancelled")
raise
except Exception as e:
logger.exception("stream failed")
yield sse_pack("error", {
"error": type(e).__name__,
"message": str(e),
})
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)

这段后端代码的工程意义

1)不要把 LangChain 原始 chunk 直接透传给前端

原始 chunk 是框架内部语义,前端不应该依赖太深。

比如今天是 AIMessageChunk.tool_call_chunks,未来框架结构变了,你前端就会被迫改。更稳妥的做法是后端做一层协议转换,前端只认你定义的事件名。

2)tokentool_call_delta 一定要分开

这是 Streaming 接入里最常见的 bug。

因为 messages 里既可能有文本 token,也可能有 tool_call_chunk。官方文档展示得非常清楚:调用工具时,模型会一段一段流出工具名和参数 JSON。

如果你把这些 chunk 当普通文本显示,前端就会出现:

  • 屏幕上冒出半截 JSON
  • 文本还没结束就突然插入工具参数
  • 用户看到一堆内部结构

所以正确做法是:

  • token.text → 用户可见文本
  • tool_call_chunks → 单独处理,最多拿来做调试或状态提示

3)工具真正“开始执行”的判断,尽量基于 updates

messages 里的 tool_call_chunk 只是“模型正在生成调用意图”,还不代表工具已经真正执行。

updates 里拿到完成态的 AIMessage.tool_calls,以及后续的 ToolMessage,更适合映射成 tool_start / tool_end。官方文档也建议在需要拿到完成后的 parsed tool call 时,把 messagesupdates 一起开。

4)request.is_disconnected() 很重要

很多线上 Streaming 接口的问题不是“不会流”,而是“用户关了页面后后端还在傻跑”。

SSE / WebSocket 场景都要考虑客户端中断:

  • 浏览器切页
  • 用户手动停止
  • 网关断开
  • 手机网络切换

不及时取消,模型调用和工具调用会继续消耗资源。


七、前端应该如何理解这些事件

一个稳定的聊天前端,通常应该维护一个明确的状态机,而不是“收到什么就 render 什么”。

我建议至少区分四种可见状态:

  1. 模型正在思考 触发时机:收到 step(model),但还没有用户可见文本 token
  2. 正在调用工具 触发时机:收到 tool_start
  3. 工具执行完成,正在组织答案 触发时机:收到 tool_end,但最终 answer token 还没开始
  4. 最终回答输出中 触发时机:收到文本 token

这样用户对系统行为的预期是稳定的,不会因为工具链路中间断流就以为系统死掉。


八、工程里最容易踩的坑

下面这部分最重要。


坑 1:为什么 messages 模式下会出现 tool_call_chunk,而不是直接文本?

因为 messages 流的是LLM 的增量消息块,不是“面向用户的最终文本”。

官方文档明确写到,在 Streaming tool calls 场景里,你可能既想拿到“工具调用生成中的部分 JSON”,也想拿到“最终被执行的完整 tool calls”。messages 给的是前者的增量片段,所以会出现 tool_call_chunk。完成态则需要结合 updates 或你自己聚合。

工程结论:

  • messages 不是“纯文本流”
  • 它是“模型输出流”
  • 模型输出里当然可能包含工具调用片段

坑 2:为什么 custom 里的 get_stream_writer() 不能脱离 LangGraph 执行上下文单独调用?

官方文档直接说明了这一点:如果你在工具里加了 get_stream_writer(),那这个工具就不能脱离 LangGraph 执行上下文独立调用。

原因用工程语言解释就是:

get_stream_writer() 不是普通全局对象,它依赖当前 graph run 的 runtime context。

没有这层上下文,就没有地方把事件写出去。

这会带来一个实际问题:

很多人会把 tool 单独写单元测试,然后直接调用函数,结果一跑就炸。

更稳妥的写法有两个:

方案 A:给工具核心逻辑和流式包装分层

python
def _query_order_core(order_id: str) -> dict:
return {"order_id": order_id, "status": "shipped"}
@tool
def query_order(order_id: str) -> str:
writer = get_stream_writer()
writer({"event": "progress", "message": "start"})
result = _query_order_core(order_id)
writer({"event": "progress", "message": "done"})
return json.dumps(result, ensure_ascii=False)

这样 _query_order_core() 可以单独测试,query_order() 负责 LangChain 包装。

方案 B:让 writer 变成可注入依赖

这样普通环境传 None,流式环境传真实 writer。

LangGraph 文档还提到,在 Python < 3.11 的 async 环境里,get_stream_writer() 不可用,需要改成显式传 writer。这个思路本身也说明:writer 最好在工程上被视为一种依赖,而不是业务函数的唯一入口。


坑 3:前端如何区分“模型正在思考”“正在调用工具”“工具执行完成”“最终回答输出中”?

不要试图仅靠 token 猜。

更稳的做法是组合事件:

  • step(model) 且暂无文本 → 思考中
  • tool_start → 调工具中
  • tool_end → 工具完成
  • 首个文本 token → 最终回答输出中
  • done → 结束

也就是说,前端状态主要看 updates,文本展示主要看 messages


坑 4:如何避免前端一边展示 token,一边被 tool call 打乱?

这是最常见的前端渲染问题。

原则只有一条:不要把所有 messages 里的 chunk 都拼进同一个文本缓冲区。

推荐做法:

  • token.text → 追加到 answer_buffer
  • token.tool_call_chunks → 追加到 tool_call_buffer
  • tool_call_buffer 不直接给用户展示,只用于:
    • 调试日志
    • 可选的“正在调用 xxx” 状态提示
    • updates 的完成态对账

如果你真的想展示工具调用过程,也要单独渲染成“系统状态”,不要混进回答正文。


坑 5:多种 stream mode 同时开启时,后端事件分发怎么写?

不要写一大坨 if else 然后直接业务逻辑塞进去。

建议抽成标准分发器:

python
handlers = {
"messages": handle_messages,
"updates": handle_updates,
"custom": handle_custom,
}
async for chunk in agent.astream(...):
handler = handlers.get(chunk["type"])
if handler:
async for event in handler(chunk):
yield event

因为 LangGraph v2 的核心约定就是:每个 chunk 都是带 typeStreamPart。既然框架已经把事件类型分好了,后端最好也按事件总线的方式处理。


坑 6:异常处理、超时处理、取消请求处理怎么做?

这块是 Streaming 落地里最容易被忽略的地方。

异常处理

无论是模型异常、工具异常、序列化异常,最终都要变成统一的 error 事件返回给前端。

不要让连接直接断掉,否则前端体验会非常糟。

超时处理

至少要分两层:

  • 模型调用超时
  • 工具调用超时

Agent 文档里就展示了模型实例可以设置 timeout

工程上建议:

  • 模型 timeout 比网关 timeout 更短
  • 工具 timeout 再按业务单独控制
  • 超时异常进入统一 error 事件

取消请求

SSE 场景用 request.is_disconnected();WebSocket 场景则监听断连。

取消之后应尽快:

  • 停止继续向客户端写
  • 中断后续工具调用
  • 记录 cancel 日志

坑 7:日志怎么记,才能排查线上问题?

推荐至少记录三层日志:

第一层:请求级

  • request_id
  • session_id
  • user_id
  • trace_id
  • prompt 摘要
  • 开始/结束时间

第二层:step 级

  • step 名称
  • step 开始/结束时间
  • tool name
  • tool args 摘要
  • tool result 摘要
  • 耗时

第三层:流事件级

  • token 数量
  • 首字时间
  • done 时间
  • error 类型
  • 客户端是否中断

最关键的一点是:

日志要和流式事件协议保持同构。

例如你已经定义了:

  • tool_start
  • tool_end
  • custom_progress

那日志里最好也对应这三个阶段。这样线上复盘时,你看日志就像在“回放事件流”。


九、工程建议:不同项目该怎么选 Streaming 方案

1)什么场景只用 messages 就够了

适合:

  • 简单聊天机器人
  • 没有工具调用,或者工具调用很轻
  • 只关心首字响应和打字机效果
  • 前端只要“内容流”,不要过程状态

例如一个纯问答助手,大部分时候只输出文本,没有明显的业务步骤。

判断标准:

如果你的前端 UI 不打算展示“正在查库 / 正在检索 / 正在调用工具”,那只用 messages 完全可以。


2)什么场景必须配合 updates

适合:

  • Agent 会调工具
  • 想明确知道当前执行阶段
  • 想拿到完成态 tool_calls / ToolMessage
  • 想做线上调试和可观测性

官方文档对工具调用流式的建议,本质上就是:

只看 messages 你拿到的是增量碎片;想拿完成态,要结合 updates

我的建议:

只要是 Agent + Tools,就默认把 updates 打开。


3)什么场景应该补充 custom

适合:

  • 工具执行耗时明显
  • 需要业务进度反馈
  • 有多数据源、多阶段任务
  • 你想把内部系统状态暴露给前端
  • 需要补充埋点、调试信息、阶段说明

比如:

  • 搜索 5 个数据源
  • 跑 SQL + 再做汇总
  • 文件解析 + OCR + 结构化抽取
  • 复杂客服流程:查订单、查库存、查规则、决定方案

这些场景如果只靠 messagesupdates,信息还是不够细。

这时候 custom 最有价值。


4)中小项目怎么选

中小项目目标一般是“尽快上线,不要把协议搞太复杂”。

我建议:

  • 后端:SSE
  • LangChain:messages + updates
  • 协议:token / step / tool_start / tool_end / done / error
  • custom 暂时只给长耗时工具加

这样已经能覆盖 80% 的需求。


5)复杂 Agent 项目怎么选

复杂项目往往有这些特征:

  • 多工具
  • 多跳推理
  • 多数据源
  • 子 Agent / 子图
  • 线上需要排障

我建议:

  • 后端:SSE 或 WebSocket 都行,但事件协议必须稳定
  • LangChain:默认 messages + updates + custom
  • 每个事件都带:
    • request_id
    • run_id
    • step
    • timestamp
    • optional trace metadata
  • 日志和事件协议统一
  • 前端维护显式状态机,而不是“收到啥渲染啥”

十、我推荐的默认实践方案

这部分给一套能直接落地的大多数业务系统的做法。

默认后端方案

  • 框架:FastAPI
  • 推送协议:SSE
  • LangChain 调用方式:astream(..., stream_mode=["messages", "updates", "custom"], version="v2")
  • 服务端做一层事件适配,不直接暴露 LangChain 原始 chunk

默认事件协议

前端只认这几类:

  • token
  • step
  • tool_start
  • tool_end
  • custom_progress
  • done
  • error

默认消费规则

  • messages
    • 文本 token → token
    • tool_call_chunk → 只进内部缓冲,不直接显示
  • updates
    • AIMessage.tool_callstool_start
    • ToolMessagetool_end
    • 其他 step → step
  • custom
    • 统一转成 custom_progress

默认前端规则

  • token 做正文打字机输出
  • step/tool_start/tool_end/custom_progress 做状态栏
  • 不把工具调用碎片混进正文
  • 维护清晰状态机:
    • thinking
    • calling_tool
    • finalizing
    • streaming_answer
    • done
    • error

默认日志规则

每次请求至少记录:

  • request_id
  • 首字时间
  • 总耗时
  • step 耗时
  • tool 名称与参数摘要
  • error 类型
  • 是否客户端中断

默认代码组织建议

不要把 writer、业务逻辑、工具包装混在一起。

推荐分层:

  • service/xxx.py:纯业务逻辑,可单测
  • tools/xxx.py:LangChain tool 包装,负责 get_stream_writer()
  • streaming/adapter.py:把 LangChain chunk 转成前端事件
  • api/chat.py:SSE/WebSocket 接口

这样后面你要替换模型、替换推送协议、替换前端协议,都不会牵一发动全身。


总结

LangChain Streaming 真正有价值的地方,不是“能流式”,而是它把 Agent 执行过程拆成了三层可观测信号:

  • messages:模型增量输出,解决首字延迟和打字机效果
  • updates:step 完成态,解决阶段可见性和工具链路调试
  • custom:业务自定义进度,解决长任务反馈和可观测性补充

如果只把 Streaming 理解成“让回答一个字一个字往外蹦”,那它的价值只用了 30%。

真正适合工程落地的做法,是把它当成一条事件流总线:后端消费 LangChain 流,转成你自己的前端协议和日志协议,再把用户体验、调试体验、线上可观测性统一起来。官方文档已经把基础能力铺好了,关键在于你不要停留在 API 层,而要把这些 chunk 设计成系统行为。


推荐落地方案

如果你现在要给一个大多数业务系统上 Streaming,我推荐直接用下面这套默认架构:

方案选型

  • 后端传输:SSE
  • LangChain 模式messages + updates 默认开启
  • 复杂工具链路:补 custom
  • 流式版本:统一用 version="v2"

事件协议

  • token:只传用户可见文本
  • step:阶段更新
  • tool_start:工具开始
  • tool_end:工具结束
  • custom_progress:业务进度
  • done:完成
  • error:错误

默认策略

  • 简单聊天:messages
  • 带工具的普通 Agent:messages + updates
  • 复杂 Agent / 长耗时任务:messages + updates + custom

最重要的三条工程原则

  1. 不要把 messages 里的所有 chunk 都当成文本。
  2. 不要让前端直接依赖 LangChain 原始 chunk 结构。
  3. 不要只做用户可见 Streaming,不做日志和状态流。

按这套方案做,基本能覆盖绝大多数客服、搜索、知识库问答、数据查询类 LLM 应用,而且后面要扩展可观测性、取消请求、问题排查,也不会推翻重来。