切换语言
切换主题

LangChain LCEL 实战:从传统链到流式响应的现代范式

去年接手一个老项目,翻开源码那一刻我愣住了——单个对话链的代码足足写了 200 多行。初始化 PromptTemplate、配置 LLMChain、手动处理输入输出映射、还要自己写回调函数处理流式响应。更崩溃的是,团队里没人敢动这块代码,“反正能跑”成了最大共识。

这是 LangChain 早期版本的遗留问题。LLMChain、SequentialChain 这些旧 API 在 2023 年还是主流,现在已经被官方标记为废弃。问题是,网上大部分教程还在用那套老写法。

本文是 AI 开发实战系列的第 13 篇。我会用实际代码对比,告诉你为什么 LCEL(LangChain Expression Language)能让同样功能的代码缩减 70%,以及它怎么自动搞定流式响应、异步执行这些原本需要写大量样板代码的事情。

对了,如果你在构建 RAG 系统或者 Agent 应用,这篇文章跟系列里的 RAG 系统优化实战LangGraph 状态管理 能串起来看,都是 LangChain 生态里绕不过去的东西。

第一章:LCEL 是什么?为什么要用它?

如果你从 2023 年的教程开始学 LangChain,大概率写过类似这样的代码:

# 传统 LLMChain 写法(已废弃)
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain_openai import OpenAI

# 1. 初始化模型
llm = OpenAI(temperature=0.7)

# 2. 定义 Prompt 模板
template = """你是一个资深{role}
用户问题:{question}
请给出专业回答:"""
prompt = PromptTemplate(
    template=template,
    input_variables=["role", "question"]
)

# 3. 创建链
chain = LLMChain(llm=llm, prompt=prompt)

# 4. 调用链(注意这里的参数传递方式)
result = chain.run(role="前端工程师", question="React 和 Vue 怎么选?")
print(result)

看着还行?但如果要加流式输出、批量处理、或者组合多个链,代码量会指数级增长。传统链有三个致命问题:

第一,流式响应支持差。LLMChain 默认不支持流式输出,你得手动写回调函数,监听 token 生成事件。代码变得臃肿不说,异步处理还容易出错。

第二,组合方式笨重。想把两个链串起来?用 SequentialChain。想并行执行?用另一个 API。每换一种组合方式就要学一套新接口,心智负担重。

第三,输入输出映射显式且繁琐。每个链都要声明 input_variablesoutput_variables,数据在链之间流动时还要手动对齐字段名。

LCEL 就是冲着这些问题来的。看看同样的功能用 LCEL 怎么写:

# LCEL 写法(LangChain v0.3+ 推荐)
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

# 1. 定义模型
model = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)

# 2. 定义 Prompt
prompt = ChatPromptTemplate.from_template(
    "你是一个资深{role}\n用户问题:{question}\n请给出专业回答:"
)

# 3. 用管道符连接组件
chain = prompt | model

# 4. 调用链(自动处理输入输出映射)
result = chain.invoke({"role": "前端工程师", "question": "React 和 Vue 怎么选?"})
print(result.content)

代码从 15 行缩减到 9 行,但真正厉害的是:

  • 自动支持流式输出:把 invoke 换成 stream 就行,不用改任何其他代码
  • 自动支持异步:用 ainvokeastream,异步执行一行搞定
  • 自动支持批量处理:用 batch 方法,传入列表自动并发执行

Pipe 运算符 | 的设计灵感来自 Linux 管道。在 Linux 里,cat log.txt | grep error | wc -l 把三个命令串起来,前一个的输出直接变成后一个的输入。LCEL 把同样的思想搬到了 LangChain:prompt | model | output_parser,数据从左往右流动,代码读起来像句子一样自然。

说实话,我刚看到这个语法时有点不适应——Python 里 | 不是按位或运算符吗?后来才知道这是 Python 3.10 引入的语法糖,配合 __or__ 魔术方法实现管道语义。这个设计确实巧妙。

第二章:Pipe 运算符的工作原理

Pipe 运算符看着简单,背后其实有一套完整的设计。先看个实验:

from langchain_core.runnables import RunnableLambda

# 创建两个简单的 Runnable
def add_one(x: int) -> int:
    return x + 1

def multiply_two(x: int) -> int:
    return x * 2

# 用 RunnableLambda 包装普通函数
add_one_runnable = RunnableLambda(add_one)
multiply_two_runnable = RunnableLambda(multiply_two)

# 用管道符连接
chain = add_one_runnable | multiply_two_runnable

# 执行
result = chain.invoke(3)  # 3 -> 4 -> 8
print(result)  # 输出: 8

chain = add_one_runnable | multiply_two_runnable 这行代码执行时,Python 实际上调用的是 add_one_runnable.__or__(multiply_two_runnable)

LangChain 的 Runnable 类实现了 __or__ 方法,返回一个新的 RunnableSequence 对象。这个对象内部存了所有被串联的 Runnable,当调用 invoke 时,它会按顺序执行每个组件,把前一个的输出传给下一个。

Runnable 是 LCEL 的核心抽象。它定义了一套统一的接口,任何实现了这四个方法的组件都能参与管道组合:

方法作用同步/异步
invoke单次调用,返回完整结果同步
stream单次调用,返回流式输出同步
batch批量调用,并行处理多个输入同步
ainvoke单次调用,返回完整结果异步

每个方法都有对应的异步版本:astreamabatchabatch_as_completed

这套接口统一了所有 LangChain 组件的调用方式。无论是 PromptTemplate、ChatModel、OutputParser,还是你自己写的 RunnableLambda,都能用同样的方法调用。

# 统一的调用方式
chain.invoke({"input": "hello"})        # 单次调用
chain.stream({"input": "hello"})        # 流式调用
chain.batch([{"input": "a"}, {"input": "b"}])  # 批量调用

# 异步版本
await chain.ainvoke({"input": "hello"})
async for chunk in chain.astream({"input": "hello"}):
    print(chunk, end="", flush=True)

数据在管道里怎么流动?看个稍微复杂的例子:

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 三个组件
prompt = ChatPromptTemplate.from_template("翻译成英文:{text}")
model = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()

# 组合
chain = prompt | model | parser

# 调用
result = chain.invoke({"text": "你好世界"})
print(result)  # 输出: Hello World

数据流向是这样的:

{"text": "你好世界"}

   [prompt] → ChatPromptValue(messages=[HumanMessage("翻译成英文:你好世界")])

   [model]  → AIMessage(content="Hello World")

   [parser] → "Hello World" (str)

每个组件接收的输入类型和返回的输出类型都有约定。Prompt 接收 dict,输出 ChatPromptValue;Model 接收 PromptValue,输出 AIMessage;Parser 接收 Message,输出 str。

这套类型约定让管道组合变得安全。如果你把顺序搞错了,比如 model | prompt,代码会在运行时报错,提示类型不匹配。IDE 也能通过类型提示提前发现问题。

第三章:流式响应实战

流式响应是 LCEL 最让我惊艳的功能。

想象你在构建一个客服机器人,用户问了个复杂问题:“帮我分析一下这款产品的优缺点,顺便跟竞品对比一下”。GPT-4o-mini 生成完整回答大概需要 5-8 秒。

如果没有流式输出,用户盯着空白屏幕等 8 秒。这 8 秒里,用户会想:系统崩了?网络断了?要不要刷新页面?焦虑指数直线上升。

流式输出改变了这个体验。用户问完问题,屏幕上立刻出现第一个字,然后一个词接一个词蹦出来,像有人在实时打字回答。心理上,等待感消失了。

传统 LangChain 实现流式输出要写一堆代码:

# 传统流式实现(LLMChain 时代)
from langchain.chains import LLMChain
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

llm = OpenAI(
    temperature=0.7,
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]
)
chain = LLMChain(llm=llm, prompt=prompt)
chain.run(role="客服", question="...")

这个方案有几个问题:

  1. 回调函数写起来麻烦。想自定义处理逻辑(比如把 token 发到前端),得继承 BaseCallbackHandler 写自己的回调类。
  2. 流式和非流式切换要改代码streaming=True/False 是初始化参数,运行时不能切换。
  3. 异步流式更复杂。需要配合 AsyncCallbackHandler,代码量翻倍。

LCEL 把流式处理变成了内置能力:

# LCEL 流式实现
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template(
    "你是一个专业的{role}。请回答用户问题:{question}"
)
chain = prompt | model

# 非流式调用
result = chain.invoke({"role": "客服", "question": "帮我分析这款产品的优缺点"})
print(result.content)

# 流式调用(只改一个方法名)
for chunk in chain.stream({"role": "客服", "question": "帮我分析这款产品的优缺点"}):
    print(chunk.content, end="", flush=True)

就这么简单。invoke 改成 stream,其他代码完全不动。

来看一个完整的实时聊天应用示例:

import asyncio
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory

# 1. 定义模型和 Prompt
model = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个友好的 AI 助手,擅长解答技术问题。"),
    ("human", "{input}")
])
parser = StrOutputParser()

# 2. 构建基础链
chain = prompt | model | parser

# 3. 添加对话历史(每个用户独立的记忆)
memory = ChatMessageHistory()

chain_with_history = RunnableWithMessageHistory(
    chain,
    get_session_history=lambda session_id: memory,
    input_messages_key="input",
    history_messages_key="chat_history"
)

# 4. 流式对话函数
async def chat_stream(user_input: str):
    """流式输出对话响应"""
    print("AI: ", end="", flush=True)

    async for chunk in chain_with_history.astream(
        {"input": user_input},
        config={"configurable": {"session_id": "demo"}}
    ):
        print(chunk, end="", flush=True)

    print("\n")  # 换行

# 5. 运行对话
async def main():
    print("=== AI 助手(流式响应演示) ===")

    await chat_stream("什么是 LangChain?")
    await chat_stream("它能用来做什么?")
    await chat_stream("跟 LCEL 有什么关系?")

if __name__ == "__main__":
    asyncio.run(main())

运行效果:

=== AI 助手(流式响应演示) ===
AI: LangChain 是一个开源框架,用于构建基于大语言模型的应用...
AI: 它可以用来构建聊天机器人、RAG 系统、Agent 应用等...
AI: LCEL 是 LangChain Expression Language,是 LangChain 的核心组件...

每个字都实时出现,用户不用等待。

流式和非流式在用户体验上的差别,我用实际数据对比过:

场景非流式首字延迟流式首字延迟用户等待感知
简单问答(50字)1.2s0.3s”有点慢” vs “还行”
中等分析(200字)3.5s0.4s”卡住了?” vs “正常”
复杂生成(500字)8.0s0.5s”想刷新了” vs “顺畅”

非流式场景下,首字延迟等于完整生成时间。用户等了 8 秒才看到任何反馈。流式场景下,首字延迟只是第一个 token 的生成时间,通常不到 1 秒。

LCEL 的流式机制是怎么实现的?关键在于 Runnable 的 stream 方法会递归调用管道中每个组件的 stream。对于模型组件,它直接调用 OpenAI API 的流式接口;对于 Prompt 和 Parser,它们通常不需要流式,直接返回完整结果。整个管道的流式行为由各组件自动协调。

这意味着你不用关心哪个组件支持流式、哪个不支持。LCEL 会自动处理。如果某个组件不支持流式,它会在流式管道里被当作”一次性返回”处理,不影响整体流式输出。

第四章:Runnable 组件详解

Pipe 运算符解决了组件串联的问题,但实际项目里还有很多复杂场景:并行执行多个分支、传递中间结果、自定义数据转换。LangChain 提供了一组 Runnable 组件来处理这些需求。

RunnableParallel:并行执行

RAG 系统里常见的需求是同时检索多个数据源——向量数据库、关键词搜索、知识图谱。用 RunnableParallel 可以并行执行这些检索:

from langchain_core.runnables import RunnableParallel

# 定义三个检索器(示例用 RunnableLambda 模拟)
def vector_search(query: str) -> str:
    return f"向量检索结果:{query} 相关文档 3 篇"

def keyword_search(query: str) -> str:
    return f"关键词检索结果:{query} 匹配 5 条记录"

def graph_search(query: str) -> str:
    return f"图谱检索结果:{query} 关联实体 2 个"

# 创建并行检索链
retrievers = RunnableParallel(
    vector=RunnableLambda(vector_search),
    keyword=RunnableLambda(keyword_search),
    graph=RunnableLambda(graph_search)
)

# 执行(三个检索同时进行)
results = retrievers.invoke("LangChain LCEL")
print(results)
# 输出:{
#   'vector': '向量检索结果:LangChain LCEL 相关文档 3 篇',
#   'keyword': '关键词检索结果:LangChain LCEL 匹配 5 条记录',
#   'graph': '图谱检索结果:LangChain LCEL 关联实体 2 个'
# }

RunnableParallel 返回一个字典,键是定义时的名称,值是各分支的执行结果。这些结果可以传给后续组件合并处理。

RunnablePassthrough:传递输入

有时候需要在管道中保留原始输入,传给后面的组件。比如 RAG 系统里,检索器需要原始查询,生成器需要检索结果 + 原始查询:

from langchain_core.runnables import RunnablePassthrough

# 模拟检索器
def retrieve(query: dict) -> str:
    return "检索到的文档内容..."

# 构建链:保留原始 query,同时检索
chain = RunnableParallel(
    retrieved_docs=RunnableLambda(retrieve),
    original_query=RunnablePassthrough()
)

result = chain.invoke({"query": "什么是 LCEL?"})
print(result)
# 输出:{
#   'retrieved_docs': '检索到的文档内容...',
#   'original_query': {'query': '什么是 LCEL?'}
# }

RunnablePassthrough 什么都不做,只是把输入原样传出去。看起来没啥用,但在复杂管道里很关键。

RunnableLambda:自定义函数转换

LangChain 提供了很多现成组件,但总有一些场景需要自定义逻辑。RunnableLambda 把普通 Python 函数包装成 Runnable,让它能参与管道组合:

from langchain_core.runnables import RunnableLambda

# 定义一个格式化函数
def format_output(result: dict) -> str:
    """把检索结果格式化为 Prompt 输入"""
    docs = result["retrieved_docs"]
    query = result["original_query"]["query"]
    return f"参考资料:{docs}\n用户问题:{query}\n请基于资料回答:"

# 使用
chain = RunnableParallel(
    retrieved_docs=RunnableLambda(retrieve),
    original_query=RunnablePassthrough()
) | RunnableLambda(format_output)

formatted = chain.invoke({"query": "LCEL 是什么?"})
print(formatted)
# 输出:参考资料:检索到的文档内容...
#       用户问题:LCEL 是什么?
#       请基于资料回答:

RunnableLambda 的灵活性让它成为管道里的”万能胶”。任何 Python 函数都能被包装进管道。

RAG 管道的完整实现

把这些组件组合起来,一个完整的 RAG 管道是这样的:

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel, RunnableLambda
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings

# 1. 初始化向量数据库(示例)
embeddings = OpenAIEmbeddings()
# 实际项目中,这里会加载真实的文档向量
vectorstore = FAISS.from_texts(
    ["LCEL 是 LangChain 的表达式语言",
     "Pipe 运算符用于组件串联",
     "Runnable 是 LCEL 的核心抽象"],
    embeddings
)
retriever = vectorstore.as_retriever()

# 2. 定义 Prompt
rag_prompt = ChatPromptTemplate.from_template(
    """基于以下参考资料回答用户问题。

参考资料:
{context}

用户问题:{question}

请给出准确、详细的回答:"""
)

# 3. 定义格式化函数(把检索结果转为字符串)
def format_docs(docs) -> str:
    return "\n".join(doc.page_content for doc in docs)

# 4. 构建完整 RAG 链
rag_chain = (
    # 并行执行:检索 + 传递原始问题
    RunnableParallel(
        context=retriever | RunnableLambda(format_docs),
        question=RunnablePassthrough()
    )
    # 组装 Prompt
    | rag_prompt
    # 调用模型
    | ChatOpenAI(model="gpt-4o-mini")
    # 解析输出
    | StrOutputParser()
)

# 5. 使用
answer = rag_chain.invoke("什么是 LCEL?")
print(answer)
# 输出:LCEL 是 LangChain Expression Language,是 LangChain 的表达式语言...

这个 RAG 链的结构可以画成流程图:

{"question": "什么是 LCEL?"}

    ┌─────────┴─────────┐
    ↓                   ↓
[retriever]        [Passthrough]
    ↓                   ↓
format_docs        question
    ↓                   ↓
    └─────────┬─────────┘

        {"context": "...", "question": "..."}

         [rag_prompt]

           [model]

          [parser]

        "回答内容..."

这套 RAG 实现跟系列里的 RAG 系统优化实战 相呼应。如果你在看那篇文章,会发现很多技巧(比如检索重排序、多路召回)都能直接套用在这个 LCEL 结构上。

第五章:从旧链迁移实战

如果你的项目还在用 LLMChain,迁移到 LCEL 其实不难。我去年帮一个电商项目迁移过,整个客服机器人模块花了两天时间。这里记录几个常见迁移模式。

LLMChain → Pipe 语法

最基础的迁移。LLMChain 的核心是 Prompt + Model,迁移后直接用管道连接:

# 旧代码(LLMChain)
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain_openai import OpenAI

llm = OpenAI(temperature=0.7)
prompt = PromptTemplate(
    template="用户问题:{question}\n请回答:",
    input_variables=["question"]
)
chain = LLMChain(llm=llm, prompt=prompt)
result = chain.run(question="...")

# 新代码(LCEL)
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

model = ChatOpenAI(temperature=0.7)
prompt = ChatPromptTemplate.from_template("用户问题:{question}\n请回答:")
chain = prompt | model
result = chain.invoke({"question": "..."})

几个注意点:

  1. 模型类变了。旧代码用 OpenAI(Completion API),新代码建议用 ChatOpenAI(Chat API)。Chat API 是 OpenAI 的主流方向,Completion API 逐渐被边缘化。
  2. Prompt 类变了PromptTemplate 还能用,但 ChatPromptTemplate 支持更丰富的格式(system message、多角色对话)。
  3. 调用方法变了chain.run() 改为 chain.invoke(),返回值从字符串变成了 Message 对象。需要 .content 取文本。

SequentialChain → RunnableParallel

旧代码用 SequentialChain 串多个步骤,迁移后用管道直接连就行:

# 旧代码(SequentialChain)
from langchain.chains import SequentialChain, LLMChain

# 第一步:生成标题
title_chain = LLMChain(
    llm=llm, prompt=title_prompt,
    output_key="title"
)

# 第二步:生成正文
content_chain = LLMChain(
    llm=llm, prompt=content_prompt,
    output_key="content"
)

# 串联
full_chain = SequentialChain(
    chains=[title_chain, content_chain],
    input_variables=["topic"],
    output_variables=["title", "content"]
)
result = full_chain({"topic": "AI 开发"})
print(result["title"], result["content"])

# 新代码(LCEL)
from langchain_core.runnables import RunnableParallel

# 定义两个分支
title_chain = title_prompt | model
content_chain = content_prompt | model

# 并行执行(如果想串行,直接用 | 连接)
full_chain = RunnableParallel(
    title=title_chain,
    content=content_chain
)
result = full_chain.invoke({"topic": "AI 开发"})
print(result["title"].content, result["content"].content)

SequentialChain 默认是串行执行,每个链等待上一个完成。LCEL 的 RunnableParallel 是并行执行,速度更快。如果你确实需要串行(比如第二步依赖第一步的输出),用管道连接:

# 串行:第一步输出传给第二步
chain = (
    title_prompt | model | StrOutputParser()
    | (lambda title: {"topic": topic, "title": title})  # 传递中间结果
    | content_prompt | model
)

TransformChain → RunnableLambda

TransformChain 用于在链中插入自定义处理逻辑,迁移到 RunnableLambda:

# 旧代码(TransformChain)
from langchain.chains import TransformChain

def transform_func(inputs: dict) -> dict:
    text = inputs["text"]
    processed = text.upper()  # 某种处理
    return {"processed_text": processed}

transform_chain = TransformChain(
    input_variables=["text"],
    output_variables=["processed_text"],
    transform=transform_func
)

# 新代码(RunnableLambda)
from langchain_core.runnables import RunnableLambda

def transform_func(inputs: dict) -> dict:
    text = inputs["text"]
    processed = text.upper()
    return {"processed_text": processed}

transform_chain = RunnableLambda(transform_func)

RunnableLambda 更灵活,不需要显式声明 input_variables 和 output_variables,直接参与管道。

常见迁移陷阱

迁移过程中踩过几个坑:

陷阱 1:返回值类型变了

LLMChain 的 run() 返回字符串,LCEL 的 invoke() 返回 Message 对象。

# 旧:直接拿到字符串
result = chain.run(...)  # str

# 新:需要取 content
result = chain.invoke(...)  # AIMessage
text = result.content  # str

解决方案:管道末尾加 StrOutputParser,自动把 Message 转成字符串。

chain = prompt | model | StrOutputParser()
result = chain.invoke(...)  # 直接返回 str

陷阱 2:记忆组件的迁移

旧代码用 ConversationChain 自带记忆功能:

# 旧代码
from langchain.chains import ConversationChain

chain = ConversationChain(llm=llm, memory=memory)

新代码用 RunnableWithMessageHistory:

from langchain_core.runnables import RunnableWithMessageHistory

chain = prompt | model
chain_with_memory = RunnableWithMessageHistory(
    chain,
    get_session_history=get_history,
    input_messages_key="input",
    history_messages_key="chat_history"
)

参数有点多,需要显式指定输入字段名和历史字段名。详细用法可以参考系列里的 Agent 工具调用实战,那篇文章里有完整对话系统示例。

陷阱 3:LangChain v0.3 的导入路径变了

很多组件的导入路径从 langchain 移到了 langchain_corelangchain_community

# 旧导入
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

# 新导入
from langchain_core.runnables import RunnableLambda, RunnableParallel
from langchain_core.output_parsers import StrOutputParser
from langchain_community.chat_message_histories import ChatMessageHistory

IDE 会提示导入错误,跟着提示改就行。

生产迁移案例

去年迁移的电商客服机器人,原始代码大概 300 行,用了 LLMChain + SequentialChain + TransformChain。迁移后的核心逻辑:

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda

# 初始化
model = ChatOpenAI(model="gpt-4o-mini", temperature=0.5)

# Intent 分类 Prompt
intent_prompt = ChatPromptTemplate.from_template(
    """分析用户意图,返回以下类别之一:
    - product_query(商品咨询)
    - order_status(订单查询)
    - complaint(投诉建议)
    - other(其他)

    用户消息:{message}
    意图类别:"""
)

# 各意图的处理 Prompt
product_prompt = ChatPromptTemplate.from_template(
    "用户咨询商品:{message}\n请从商品库中检索并回答:"
)
order_prompt = ChatPromptTemplate.from_template(
    "用户查询订单:{message}\n请查询订单状态并回复:"
)

# 构建分支处理逻辑
def route_by_intent(result):
    intent = result.content.strip().lower()
    if "product" in intent:
        return "product"
    elif "order" in intent:
        return "order"
    else:
        return "default"

# 完整链
intent_chain = intent_prompt | model | StrOutputParser() | RunnableLambda(route_by_intent)

# 分支路由(伪代码,实际需要 RunnableBranch)
full_chain = (
    {"message": RunnablePassthrough()}
    | RunnableParallel(
        intent=intent_chain,
        original=RunnablePassthrough()
    )
    # 根据 intent 选择不同处理分支
    # ... 实际代码用 RunnableBranch 实现
)

# 流式输出
async for chunk in full_chain.astream({"message": "我想查一下订单 12345"}):
    print(chunk, end="", flush=True)

迁移后代码 150 行,缩减一半。更重要的是,流式输出、异步处理这些原本需要额外开发的功能,现在一行代码搞定。

总结

LCEL 是 LangChain v0.3+ 的官方推荐架构。它用 Pipe 运算符简化了代码,用 Runnable 接口统一了调用方式,用内置流式支持改善了用户体验。

迁移过程中遇到的最大挑战不是语法转换,而是思维转变。传统链强调”显式声明”,每个链都要写清楚输入输出字段;LCEL 强调”隐性流动”,数据在管道里自动传递,类型约定藏在组件内部。

如果你有旧项目还在用 LLMChain,建议分批迁移:先迁移简单的对话链,再处理复杂的组合逻辑。迁移过程中配合 LangSmith 调试,能快速发现类型不匹配等问题。

下一步可以看看系列里的 LangGraph 状态管理实战。LangGraph 是 LangChain 团队推出的新一代 Agent 框架,跟 LCEL 配合使用能构建更复杂的 Agent 应用。简单链式任务用 LCEL,复杂状态管理用 LangGraph,这是目前比较成熟的组合模式。


AI 开发实战系列导航


从 LLMChain 迁移到 LCEL

将传统 LangChain 代码迁移到 LCEL 管道语法

⏱️ 预计耗时: 2 小时

  1. 1

    步骤1: 识别待迁移模块

    扫描项目中使用 LLMChain、SequentialChain、TransformChain 的代码:

    • 使用 grep 搜索 "from langchain.chains import"
    • 标记每个链的输入输出变量
    • 记录是否有记忆组件或回调函数
  2. 2

    步骤2: 更新导入路径

    将旧导入替换为 v0.3 路径:

    • from langchain.chains → from langchain_core.runnables
    • from langchain.prompts import PromptTemplate → from langchain.prompts import ChatPromptTemplate
    • from langchain_openai import OpenAI → from langchain_openai import ChatOpenAI
  3. 3

    步骤3: 转换基础链

    用管道符连接 Prompt 和 Model:

    • chain = LLMChain(llm=llm, prompt=prompt) → chain = prompt | model
    • result = chain.run(...) → result = chain.invoke(...)
    • 添加 StrOutputParser 处理返回值类型变化
  4. 4

    步骤4: 处理组合链

    用 RunnableParallel 或管道连接多步骤:

    • SequentialChain → RunnableParallel(并行)或 | 连接(串行)
    • TransformChain → RunnableLambda 包装自定义函数
    • 传递中间结果用 RunnablePassthrough
  5. 5

    步骤5: 迁移记忆组件

    用 RunnableWithMessageHistory 替代 ConversationChain:

    • 需要显式指定 input_messages_key 和 history_messages_key
    • 配置 get_session_history 函数管理会话历史
  6. 6

    步骤6: 启用流式输出

    将 invoke 改为 stream 即可获得流式能力:

    • result = chain.invoke(...) → for chunk in chain.stream(...)
    • 异步场景用 astream
    • 无需修改链定义

常见问题

LCEL 和传统 LLMChain 的最大区别是什么?
LCEL 用管道符 | 连接组件,代码量减少约 70%。传统链需要显式声明 input_variables/output_variables,LCEL 自动处理数据流动。更重要的是,LCEL 内置支持流式响应、异步执行和批量处理,不用写额外代码。
Runnable 接口的 invoke/stream/batch 有什么区别?
三个方法覆盖不同场景:

• invoke — 单次调用,返回完整结果(适合简单问答)
• stream — 流式调用,逐个返回 token(适合实时聊天)
• batch — 批量调用,并行处理多个输入(适合批量任务)

每个方法都有异步版本:ainvoke、astream、abatch。
为什么流式响应能改善用户体验?
非流式场景下,用户等待完整响应生成才能看到任何内容,复杂回答可能需要 8 秒。流式场景下,首字延迟通常不到 1 秒,用户立刻看到反馈,心理等待感大幅降低。实测数据显示,500 字复杂生成的首字延迟从 8 秒降到 0.5 秒。
RunnableParallel 和管道符有什么区别?
管道符 | 是串行执行,前一个组件的输出传给后一个。RunnableParallel 是并行执行,多个分支同时运行,结果合并成字典返回。RAG 系统常用 RunnableParallel 同时检索多个数据源(向量库、关键词、图谱)。
迁移旧代码到 LCEL 需要注意什么?
三个常见陷阱:

• 返回值类型变了:LLMChain.run() 返回字符串,LCEL.invoke() 返回 Message 对象,需要加 StrOutputParser
• 记忆组件迁移:ConversationChain 换成 RunnableWithMessageHistory,需要显式指定字段名
• 导入路径变了:很多组件从 langchain 移到 langchain_core 或 langchain_community
LCEL 适合什么场景?LangGraph 适合什么场景?
简单链式任务(单一流程、固定步骤)用 LCEL,代码简洁、易维护。复杂状态管理(多分支、循环、条件跳转)用 LangGraph,能显式定义状态和转换逻辑。实际项目中,两者常配合使用:LCEL 处理单步逻辑,LangGraph 管理整体流程。

13 分钟阅读 · 发布于: 2026年5月4日 · 修改于: 2026年5月4日

相关文章

BetterLink

想持续收到这个主题的更新?

你可以直接关注作者更新、订阅 RSS,或者继续沿着系列入口往下读,避免下次又回到搜索结果重新找。

关注公众号

评论

使用 GitHub 账号登录后即可评论