言語を切り替える
テーマを切り替える

LangChain LCEL 実践:従来のチェーンからストリーミング対応のモダンなパラダイムへ

昨年、あるレガシープロジェクトを引き継いだ時のこと。ソースコードを開いた瞬間、私は言葉を失いました。単一の対話チェーンだけで 200 行以上のコードが書かれていたのです。PromptTemplate の初期化、LLMChain の設定、入出力マッピングの手動処理、さらにはストリーミング対応のためのコールバック関数まで自前で書く必要がありました。さらに驚いたことに、チーム内の誰もこのコードに触ろうとしません。「とりあえず動いているから」という暗黙の了解が支配していました。

これは LangChain 初期バージョンの典型的な問題です。LLMChain や SequentialChain といった旧 API は 2023 年までは主流でしたが、現在では公式に非推奨とされています。問題は、ネット上の多くのチュートリアルがまだ古い書き方を使っていることです。

本記事は AI 開発実践シリーズの第 13 回です。実際のコードを使った比較を通じて、なぜ LCEL(LangChain Expression Language)を使うと同じ機能のコードが 70% も削減できるのか、そして本来大量のボイラープレートコードが必要だったストリーミング対応や非同期実行をどのように自動的に実現できるのかを解説します。

ちなみに、RAG システムや Agent アプリケーションを構築している方は、本記事と同じシリーズの RAG システム最適化実践LangGraph 状態管理 も併せて読むことをお勧めします。どちらも LangChain エコシステムで避けて通れない重要なトピックです。

第 1 章:LCEL とは?なぜ使うのか?

2023 年のチュートリアルから LangChain を学び始めた方は、おそらくこのようなコードを書いたことがあるでしょう。

# 従来の LLMChain の書き方(非推奨)
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain_openai import OpenAI

# 1. モデルの初期化
llm = OpenAI(temperature=0.7)

# 2. プロンプトテンプレートの定義
template = """あなたはベテランの{role}です。
ユーザーの質問:{question}
専門的な回答をしてください:"""
prompt = PromptTemplate(
    template=template,
    input_variables=["role", "question"]
)

# 3. チェーンの作成
chain = LLMChain(llm=llm, prompt=prompt)

# 4. チェーンの呼び出し(パラメータの渡し方に注目)
result = chain.run(role="フロントエンドエンジニア", question="React と Vue はどう選べばいい?")
print(result)

悪くないように見えますね。しかし、ストリーミング出力やバッチ処理、複数のチェーンの組み合わせを追加しようとすると、コード量は指数関数的に増えていきます。従来のチェーンには 3 つの致命的な問題があります。

第一に、ストリーミング対応が不十分。LLMChain はデフォルトでストリーミング出力をサポートしておらず、コールバック関数を自前で書いてトークン生成イベントを監視する必要があります。コードが肥大化するだけでなく、非同期処理のバグも出やすくなります。

第二に、組み合わせ方が煩雑。2 つのチェーンを直列につなぐには SequentialChain を、並列実行には別の API を使う必要があります。組み合わせパターンごとに新しいインターフェースを学ぶ必要があり、認知的負荷が高いです。

第三に、入出力マッピングが明示的で煩わしい。各チェーンで input_variables と output_variables を宣言し、チェーン間のデータフローでは手動でフィールド名を合わせる必要があります。

LCEL はまさにこれらの問題を解決するために登場しました。同じ機能を LCEL で書くとどうなるか見てみましょう。

# LCEL の書き方(LangChain v0.3+ 推奨)
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

# 1. モデルの定義
model = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)

# 2. プロンプトの定義
prompt = ChatPromptTemplate.from_template(
    "あなたはベテランの{role}です。\nユーザーの質問:{question}\n専門的な回答をしてください:"
)

# 3. パイプ演算子でコンポーネントを接続
chain = prompt | model

# 4. チェーンの呼び出し(入出力マッピングは自動処理)
result = chain.invoke({"role": "フロントエンドエンジニア", "question": "React と Vue はどう選べばいい?"})
print(result.content)

コードは 15 行から 9 行に減りました。しかし、本当に凄いのはこれだけではありません。

  • ストリーミング出力の自動サポート:invoke を stream に変えるだけで、他のコードは一切変更不要
  • 非同期の自動サポート:ainvoke や astream を使えば、非同期実行も 1 行で完結
  • バッチ処理の自動サポート:batch メソッドを使えば、リストを渡すだけで並列実行

Pipe 演算子 | の設計インスピレーションは Linux パイプから来ています。Linux では cat log.txt | grep error | wc -l で 3 つのコマンドを直列につなぎ、前の出力が次の入力になります。LCEL も同じ考えを LangChain に持ち込みました。prompt | model | output_parser で、データは左から右へ流れ、コードがまるで文章のように自然に読めます。

正直なところ、最初この構文を見た時は戸惑いました。Python では | はビット単位の OR 演算子ではありませんか?後に、これは Python 3.10 で導入された構文糖で、or マジックメソッドと組み合わせてパイプのセマンティクスを実現していると知りました。実に巧みな設計です。

第 2 章:Pipe 演算子の仕組み

Pipe 演算子はシンプルに見えますが、その背後には完全な設計があります。まず実験してみましょう。

from langchain_core.runnables import RunnableLambda

# 2 つのシンプルな Runnable を作成
def add_one(x: int) -> int:
    return x + 1

def multiply_two(x: int) -> int:
    return x * 2

# RunnableLambda で通常の関数をラップ
add_one_runnable = RunnableLambda(add_one)
multiply_two_runnable = RunnableLambda(multiply_two)

# パイプ演算子で接続
chain = add_one_runnable | multiply_two_runnable

# 実行
result = chain.invoke(3)  # 3 -> 4 -> 8
print(result)  # 出力: 8

chain = add_one_runnable | multiply_two_runnable という行が実行される時、Python は実際には add_one_runnable.__or__(multiply_two_runnable) を呼び出しています。

LangChain の Runnable クラスは or メソッドを実装しており、新しい RunnableSequence オブジェクトを返します。このオブジェクトは内部的に接続されたすべての Runnable を保持し、invoke が呼ばれると各コンポーネントを順番に実行し、前の出力を次に渡します。

Runnable は LCEL の中核となる抽象概念です。この 4 つのメソッドを実装したコンポーネントは、どれもパイプラインに参加できます。

メソッド役割同期/非同期
invoke単一呼び出し、完全な結果を返す同期
stream単一呼び出し、ストリーミング出力を返す同期
batchバッチ呼び出し、複数の入力を並列処理同期
ainvoke単一呼び出し、完全な結果を返す非同期

各メソッドには非同期バージョンがあります。astream、abatch、abatch_as_completed です。

このインターフェースはすべての LangChain コンポーネントの呼び出し方法を統一しています。PromptTemplate、ChatModel、OutputParser、あるいは自作の RunnableLambda でも、すべて同じメソッドで呼び出せます。

# 統一された呼び出し方法
chain.invoke({"input": "hello"})        # 単一呼び出し
chain.stream({"input": "hello"})        # ストリーミング呼び出し
chain.batch([{"input": "a"}, {"input": "b"}])  # バッチ呼び出し

# 非同期バージョン
await chain.ainvoke({"input": "hello"})
async for chunk in chain.astream({"input": "hello"}):
    print(chunk, end="", flush=True)

パイプライン内でデータはどのように流れるのでしょうか?少し複雑な例を見てみましょう。

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 3 つのコンポーネント
prompt = ChatPromptTemplate.from_template("次のテキストを英語に翻訳してください:{text}")
model = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()

# 組み合わせ
chain = prompt | model | parser

# 呼び出し
result = chain.invoke({"text": "こんにちは世界"})
print(result)  # 出力: Hello World

データの流れはこのようになります。

{"text": "こんにちは世界"}

   [prompt] → ChatPromptValue(messages=[HumanMessage("次のテキストを英語に翻訳してください:こんにちは世界")])

   [model]  → AIMessage(content="Hello World")

   [parser] → "Hello World" (str)

各コンポーネントが受け取る入力型と返す出力型には決まりがあります。Prompt は dict を受け取り ChatPromptValue を出力、Model は PromptValue を受け取り AIMessage を出力、Parser は Message を受け取り str を出力します。

この型の約束により、パイプラインの組み合わせが安全になります。順序を間違えて model | prompt と書いても、実行時に型不一致のエラーが出ます。IDE も型ヒントで事前に問題を発見できます。

第 3 章:ストリーミング対応の実践

ストリーミング対応は LCEL の中でも最も驚いた機能です。

カスタマーサービスボットを構築していると想像してください。ユーザーが「この製品のメリット・デメリットを分析して、競合製品との比較もお願いします」という複雑な質問をしたとします。GPT-4o-mini が完全な回答を生成するには 5〜8 秒かかります。

ストリーミング出力がない場合、ユーザーは 8 秒間空白の画面を見つめることになります。この 8 秒の間、ユーザーは「システムが落ちた?」「ネットが切れた?」「ページを更新すべき?」と不安になり、ストレスが蓄積していきます。

ストリーミング出力はこの体験を変えます。ユーザーが質問すると、画面に即座に最初の文字が表示され、その後単語ごとに次々と表示されていきます。まるで誰かがリアルタイムでタイプしているかのように。心理的に、待ち時間の感覚が消えます。

従来の LangChain でストリーミング出力を実装するには、大量のコードを書く必要がありました。

# 従来のストリーミング実装(LLMChain 時代)
from langchain.chains import LLMChain
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

llm = OpenAI(
    temperature=0.7,
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]
)
chain = LLMChain(llm=llm, prompt=prompt)
chain.run(role="カスタマーサービス", question="...")

このアプローチにはいくつかの問題があります。

  1. コールバック関数の記述が面倒。カスタム処理(例えばトークンをフロントエンドに送信する)をしたい場合、BaseCallbackHandler を継承して自分のコールバッククラスを書く必要があります。
  2. ストリーミングと非ストリーミングの切り替えでコードを変更streaming=True/False は初期化パラメータで、実行時に切り替えられません。
  3. 非同期ストリーミングはさらに複雑。AsyncCallbackHandler と組み合わせる必要があり、コード量が倍増します。

LCEL はストリーミング処理を組み込み機能にしました。

# LCEL ストリーミング実装
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

model = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template(
    "あなたはプロの{role}です。ユーザーの質問に答えてください:{question}"
)
chain = prompt | model

# 非ストリーミング呼び出し
result = chain.invoke({"role": "カスタマーサービス", "question": "この製品のメリット・デメリットを分析してください"})
print(result.content)

# ストリーミング呼び出し(メソッド名を変えるだけ)
for chunk in chain.stream({"role": "カスタマーサービス", "question": "この製品のメリット・デメリットを分析してください"}):
    print(chunk.content, end="", flush=True)

これだけです。invoke を stream に変えるだけで、他のコードは全く変更不要です。

完全なリアルタイムチャットアプリケーションの例を見てみましょう。

import asyncio
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory

# 1. モデルとプロンプトの定義
model = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
prompt = ChatPromptTemplate.from_messages([
    ("system", "あなたは親しみやすい AI アシスタントです。技術的な質問に答えるのが得意です。"),
    ("human", "{input}")
])
parser = StrOutputParser()

# 2. 基本チェーンの構築
chain = prompt | model | parser

# 3. 対話履歴の追加(ユーザーごとに独立したメモリ)
memory = ChatMessageHistory()

chain_with_history = RunnableWithMessageHistory(
    chain,
    get_session_history=lambda session_id: memory,
    input_messages_key="input",
    history_messages_key="chat_history"
)

# 4. ストリーミング対話関数
async def chat_stream(user_input: str):
    """ストリーミングで対話応答を出力"""
    print("AI: ", end="", flush=True)

    async for chunk in chain_with_history.astream(
        {"input": user_input},
        config={"configurable": {"session_id": "demo"}}
    ):
        print(chunk, end="", flush=True)

    print("\n")  # 改行

# 5. 対話の実行
async def main():
    print("=== AI アシスタント(ストリーミング対応デモ) ===")

    await chat_stream("LangChain とは何ですか?")
    await chat_stream("何に使えますか?")
    await chat_stream("LCEL とどう関係していますか?")

if __name__ == "__main__":
    asyncio.run(main())

実行結果:

=== AI アシスタント(ストリーミング対応デモ) ===
AI: LangChain は大規模言語モデルを活用したアプリケーションを構築するためのオープンソースフレームワークで...
AI: チャットボット、RAG システム、Agent アプリケーションなどの構築に使用できます...
AI: LCEL は LangChain Expression Language の略で、LangChain の中核コンポーネントです...

すべての文字がリアルタイムで表示され、ユーザーは待つ必要がありません。

ストリーミングと非ストリーミングのユーザー体験の違いを実測データで比較しました。

シナリオ非ストリーミングの最初の文字の待ち時間ストリーミングの最初の文字の待ち時間ユーザーの待ち感覚
単純な質問(50 文字)1.2s0.3s”少し遅い” vs “普通”
中程度の分析(200 文字)3.5s0.4s”固まった?” vs “正常”
複雑な生成(500 文字)8.0s0.5s”更新したい” vs “スムーズ”

非ストリーミングでは、最初の文字の待ち時間は完全な生成時間と等しくなります。ユーザーは 8 秒待ってからやっと何かを見られます。ストリーミングでは、最初の文字の待ち時間は最初のトークンの生成時間だけで、通常 1 秒未満です。

LCEL のストリーミング機能はどのように実装されているのでしょうか?ポイントは、Runnable の stream メソッドがパイプライン内の各コンポーネントの stream を再帰的に呼び出すことです。モデルコンポーネントの場合、OpenAI API のストリーミングインターフェースを直接呼び出します。プロンプトやパーサーの場合、通常ストリーミングは不要で、完全な結果を直接返します。パイプライン全体のストリーミング動作は各コンポーネントが自動的に調整します。

つまり、どのコンポーネントがストリーミングをサポートしているか、どれがサポートしていないかを気にする必要がありません。LCEL が自動的に処理します。あるコンポーネントがストリーミングをサポートしていない場合、ストリーミングパイプライン内では「一度に返す」処理として扱われ、全体のストリーミング出力には影響しません。

第 4 章:Runnable コンポーネントの詳細

Pipe 演算子はコンポーネントの直列接続の問題を解決しましたが、実際のプロジェクトではさらに複雑なシナリオがあります。並列実行、中間結果の受け渡し、カスタムデータ変換などです。LangChain はこれらのニーズに対応する Runnable コンポーネントを提供しています。

RunnableParallel:並列実行

RAG システムでよくあるニーズは、複数のデータソース(ベクトルデータベース、キーワード検索、ナレッジグラフ)を同時に検索することです。RunnableParallel を使うと、これらの検索を並列で実行できます。

from langchain_core.runnables import RunnableParallel

# 3 つの検索器を定義(例として RunnableLambda を使用)
def vector_search(query: str) -> str:
    return f"ベクトル検索結果:{query} 関連ドキュメント 3 件"

def keyword_search(query: str) -> str:
    return f"キーワード検索結果:{query} で 5 件のレコードがマッチ"

def graph_search(query: str) -> str:
    return f"グラフ検索結果:{query} に関連するエンティティ 2 件"

# 並列検索チェーンを作成
retrievers = RunnableParallel(
    vector=RunnableLambda(vector_search),
    keyword=RunnableLambda(keyword_search),
    graph=RunnableLambda(graph_search)
)

# 実行(3 つの検索が同時に進行)
results = retrievers.invoke("LangChain LCEL")
print(results)
# 出力:{
#   'vector': 'ベクトル検索結果:LangChain LCEL 関連ドキュメント 3 件',
#   'keyword': 'キーワード検索結果:LangChain LCEL で 5 件のレコードがマッチ',
#   'graph': 'グラフ検索結果:LangChain LCEL に関連するエンティティ 2 件'
# }

RunnableParallel は辞書を返します。キーは定義時の名前で、値は各分岐の実行結果です。これらの結果は後続のコンポーネントに渡して統合処理できます。

RunnablePassthrough:入力の受け渡し

パイプライン内で元の入力を保持し、後続のコンポーネントに渡したい場合があります。例えば RAG システムでは、検索器は元のクエリを必要とし、生成器は検索結果と元のクエリの両方を必要とします。

from langchain_core.runnables import RunnablePassthrough

# 検索器をシミュレート
def retrieve(query: dict) -> str:
    return "検索されたドキュメントの内容..."

# チェーンを構築:元の query を保持しながら検索
chain = RunnableParallel(
    retrieved_docs=RunnableLambda(retrieve),
    original_query=RunnablePassthrough()
)

result = chain.invoke({"query": "LCEL とは何ですか?"})
print(result)
# 出力:{
#   'retrieved_docs': '検索されたドキュメントの内容...',
#   'original_query': {'query': 'LCEL とは何ですか?'}
# }

RunnablePassthrough は何もせず、入力をそのまま渡します。何もしないように見えますが、複雑なパイプラインでは非常に重要です。

RunnableLambda:カスタム関数の変換

LangChain は多くの既成コンポーネントを提供していますが、カスタムロジックが必要なシナリオは常にあります。RunnableLambda は通常の Python 関数を Runnable としてラップし、パイプラインに参加させます。

from langchain_core.runnables import RunnableLambda

# フォーマット関数を定義
def format_output(result: dict) -> str:
    """検索結果をプロンプト入力にフォーマット"""
    docs = result["retrieved_docs"]
    query = result["original_query"]["query"]
    return f"参考資料:{docs}\nユーザーの質問:{query}\n資料に基づいて回答してください:"

# 使用
chain = RunnableParallel(
    retrieved_docs=RunnableLambda(retrieve),
    original_query=RunnablePassthrough()
) | RunnableLambda(format_output)

formatted = chain.invoke({"query": "LCEL とは何ですか?"})
print(formatted)
# 出力:参考資料:検索されたドキュメントの内容...
#       ユーザーの質問:LCEL とは何ですか?
#       資料に基づいて回答してください:

RunnableLambda の柔軟性により、パイプライン内での「万能な接着剤」になります。任意の Python 関数をパイプラインに組み込めます。

RAG パイプラインの完全な実装

これらのコンポーネントを組み合わせると、完全な RAG パイプラインはこのようになります。

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel, RunnableLambda
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings

# 1. ベクトルデータベースの初期化(例)
embeddings = OpenAIEmbeddings()
# 実際のプロジェクトでは、ここで実際のドキュメントベクトルを読み込みます
vectorstore = FAISS.from_texts(
    ["LCEL は LangChain の式言語です",
     "Pipe 演算子はコンポーネントの直列接続に使用します",
     "Runnable は LCEL の中核抽象概念です"],
    embeddings
)
retriever = vectorstore.as_retriever()

# 2. プロンプトの定義
rag_prompt = ChatPromptTemplate.from_template(
    """以下の参考資料に基づいてユーザーの質問に答えてください。

参考資料:
{context}

ユーザーの質問:{question}

正確で詳細な回答をしてください:"""
)

# 3. フォーマット関数の定義(検索結果を文字列に変換)
def format_docs(docs) -> str:
    return "\n".join(doc.page_content for doc in docs)

# 4. 完全な RAG チェーンを構築
rag_chain = (
    # 並列実行:検索 + 元の質問の受け渡し
    RunnableParallel(
        context=retriever | RunnableLambda(format_docs),
        question=RunnablePassthrough()
    )
    # プロンプトの組み立て
    | rag_prompt
    # モデルの呼び出し
    | ChatOpenAI(model="gpt-4o-mini")
    # 出力の解析
    | StrOutputParser()
)

# 5. 使用
answer = rag_chain.invoke("LCEL とは何ですか?")
print(answer)
# 出力:LCEL は LangChain Expression Language で、LangChain の式言語です...

この RAG チェーンの構造をフローチャートで表すと:

{"question": "LCEL とは何ですか?"}

    ┌─────────┴─────────┐
    ↓                   ↓
[retriever]        [Passthrough]
    ↓                   ↓
format_docs        question
    ↓                   ↓
    └─────────┬─────────┘

        {"context": "...", "question": "..."}

         [rag_prompt]

           [model]

          [parser]

        "回答内容..."

この RAG 実装は、同じシリーズの RAG システム最適化実践 と呼応しています。その記事を読んでいる方は、多くのテクニック(検索リランキング、マルチパスリコールなど)がこの LCEL 構造に直接適用できることに気づくでしょう。

第 5 章:従来のチェーンからの移行実践

プロジェクトでまだ LLMChain を使っている場合、LCEL への移行はそれほど難しくありません。昨年ある EC プロジェクトを移行した際、カスタマーボットモジュール全体で 2 日かかりました。ここでは一般的な移行パターンをいくつか紹介します。

LLMChain → Pipe 構文

最も基本的な移行です。LLMChain の中核は Prompt + Model で、移行後はパイプで直接接続します。

# 旧コード(LLMChain)
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain_openai import OpenAI

llm = OpenAI(temperature=0.7)
prompt = PromptTemplate(
    template="ユーザーの質問:{question}\n回答してください:",
    input_variables=["question"]
)
chain = LLMChain(llm=llm, prompt=prompt)
result = chain.run(question="...")

# 新コード(LCEL)
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

model = ChatOpenAI(temperature=0.7)
prompt = ChatPromptTemplate.from_template("ユーザーの質問:{question}\n回答してください:")
chain = prompt | model
result = chain.invoke({"question": "..."})

いくつかの注意点:

  1. モデルクラスが変わった。旧コードは OpenAI(Completion API)を使っていますが、新コードでは ChatOpenAI(Chat API)の使用を推奨します。Chat API が OpenAI の主流方向で、Completion API は徐々に廃止されています。
  2. プロンプトクラスが変わった。PromptTemplate も使えますが、ChatPromptTemplate はより豊富なフォーマット(system message、マルチロール対話)をサポートしています。
  3. 呼び出しメソッドが変わった。chain.run() は chain.invoke() に変更、戻り値は文字列から Message オブジェクトに変わりました。.content でテキストを取得します。

SequentialChain → RunnableParallel

旧コードで SequentialChain を使って複数ステップを直列につないでいた場合、移行後はパイプで直接つなげます。

# 旧コード(SequentialChain)
from langchain.chains import SequentialChain, LLMChain

# ステップ 1:タイトル生成
title_chain = LLMChain(
    llm=llm, prompt=title_prompt,
    output_key="title"
)

# ステップ 2:本文生成
content_chain = LLMChain(
    llm=llm, prompt=content_prompt,
    output_key="content"
)

# 直列接続
full_chain = SequentialChain(
    chains=[title_chain, content_chain],
    input_variables=["topic"],
    output_variables=["title", "content"]
)
result = full_chain({"topic": "AI 開発"})
print(result["title"], result["content"])

# 新コード(LCEL)
from langchain_core.runnables import RunnableParallel

# 2 つの分岐を定義
title_chain = title_prompt | model
content_chain = content_prompt | model

# 並列実行(直列が必要な場合は | で接続)
full_chain = RunnableParallel(
    title=title_chain,
    content=content_chain
)
result = full_chain.invoke({"topic": "AI 開発"})
print(result["title"].content, result["content"].content)

SequentialChain はデフォルトで直列実行で、各チェーンは前の完了を待ちます。LCEL の RunnableParallel は並列実行で、より高速です。本当に直列が必要な場合(例えばステップ 2 がステップ 1 の出力に依存する場合)は、パイプでつなぎます。

# 直列:ステップ 1 の出力をステップ 2 に渡す
chain = (
    title_prompt | model | StrOutputParser()
    | (lambda title: {"topic": topic, "title": title})  # 中間結果の受け渡し
    | content_prompt | model
)

TransformChain → RunnableLambda

TransformChain はチェーン内にカスタム処理ロジックを挿入するために使います。RunnableLambda に移行します。

# 旧コード(TransformChain)
from langchain.chains import TransformChain

def transform_func(inputs: dict) -> dict:
    text = inputs["text"]
    processed = text.upper()  # なんらかの処理
    return {"processed_text": processed}

transform_chain = TransformChain(
    input_variables=["text"],
    output_variables=["processed_text"],
    transform=transform_func
)

# 新コード(RunnableLambda)
from langchain_core.runnables import RunnableLambda

def transform_func(inputs: dict) -> dict:
    text = inputs["text"]
    processed = text.upper()
    return {"processed_text": processed}

transform_chain = RunnableLambda(transform_func)

RunnableLambda はより柔軟で、input_variables と output_variables を明示的に宣言する必要がなく、直接パイプラインに参加できます。

よくある移行の落とし穴

移行プロセスでいくつかの落とし穴に遭遇しました。

落とし穴 1:戻り値の型が変わった

LLMChain の run() は文字列を返しますが、LCEL の invoke() は Message オブジェクトを返します。

# 旧:直接文字列を取得
result = chain.run(...)  # str

# 新:content を取り出す必要がある
result = chain.invoke(...)  # AIMessage
text = result.content  # str

解決策:パイプラインの末尾に StrOutputParser を追加し、Message を自動的に文字列に変換。

chain = prompt | model | StrOutputParser()
result = chain.invoke(...)  # 直接 str を返す

落とし穴 2:メモリコンポーネントの移行

旧コードでは ConversationChain がメモリ機能を内蔵していました。

# 旧コード
from langchain.chains import ConversationChain

chain = ConversationChain(llm=llm, memory=memory)

新コードでは RunnableWithMessageHistory を使います。

from langchain_core.runnables import RunnableWithMessageHistory

chain = prompt | model
chain_with_memory = RunnableWithMessageHistory(
    chain,
    get_session_history=get_history,
    input_messages_key="input",
    history_messages_key="chat_history"
)

パラメータが少し多く、入力フィールド名と履歴フィールド名を明示的に指定する必要があります。詳細な使い方は、同じシリーズの Agent ツール呼び出し実践 を参照してください。そこに完全な対話システムの例があります。

落とし穴 3:LangChain v0.3 でインポートパスが変わった

多くのコンポーネントのインポートパスが langchain から langchain_core または langchain_community に移動しました。

# 旧インポート
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

# 新インポート
from langchain_core.runnables import RunnableLambda, RunnableParallel
from langchain_core.output_parsers import StrOutputParser
from langchain_community.chat_message_histories import ChatMessageHistory

IDE がインポートエラーを表示するので、指示に従って修正すれば大丈夫です。

本番移行のケーススタディ

昨年移行した EC カスタマーボットは、元のコードが約 300 行で、LLMChain + SequentialChain + TransformChain を使っていました。移行後の中核ロジック:

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda

# 初期化
model = ChatOpenAI(model="gpt-4o-mini", temperature=0.5)

# インテント分類プロンプト
intent_prompt = ChatPromptTemplate.from_template(
    """ユーザーの意図を分析し、以下のカテゴリのいずれかを返してください:
    - product_query(商品に関する問い合わせ)
    - order_status(注文状況の確認)
    - complaint(苦情・要望)
    - other(その他)

    ユーザーメッセージ:{message}
    意図カテゴリ:"""
)

# 各意図の処理プロンプト
product_prompt = ChatPromptTemplate.from_template(
    "ユーザーが商品について問い合わせ:{message}\n商品データベースから検索して回答してください:"
)
order_prompt = ChatPromptTemplate.from_template(
    "ユーザーが注文を確認:{message}\n注文状況を確認して返信してください:"
)

# 分岐処理ロジックを構築
def route_by_intent(result):
    intent = result.content.strip().lower()
    if "product" in intent:
        return "product"
    elif "order" in intent:
        return "order"
    else:
        return "default"

# 完全なチェーン
intent_chain = intent_prompt | model | StrOutputParser() | RunnableLambda(route_by_intent)

# 分岐ルーティング(擬似コード、実際は RunnableBranch を使用)
full_chain = (
    {"message": RunnablePassthrough()}
    | RunnableParallel(
        intent=intent_chain,
        original=RunnablePassthrough()
    )
    # インテントに応じて異なる処理分岐を選択
    # ... 実際のコードは RunnableBranch で実装
)

# ストリーミング出力
async for chunk in full_chain.astream({"message": "注文 12345 を確認したいです"}):
    print(chunk, end="", flush=True)

移行後のコードは 150 行で、半分に減りました。さらに重要なのは、ストリーミング出力や非同期処理など、本来追加開発が必要だった機能が、今では 1 行のコードで実現できるようになったことです。

まとめ

LCEL は LangChain v0.3 以降の公式推奨アーキテクチャです。Pipe 演算子でコードを簡素化し、Runnable インターフェースで呼び出し方法を統一し、組み込みのストリーミングサポートでユーザー体験を改善します。

移行プロセスで最大の課題は、構文の変換ではなく、考え方の転換です。従来のチェーンは「明示的な宣言」を重視し、各チェーンで入出力フィールドを明確に書く必要がありました。LCEL は「暗黙のフロー」を重視し、データはパイプライン内で自動的に流れ、型の約束はコンポーネント内部に隠されています。

まだ LLMChain を使っているレガシープロジェクトがある場合は、段階的な移行をお勧めします。まず単純な対話チェーンを移行し、その後複雑な組み合わせロジックを処理します。移行プロセスでは LangSmith を使ってデバッグすると、型不一致などの問題を迅速に発見できます。

次は、同じシリーズの LangGraph 状態管理実践 を見てみましょう。LangGraph は LangChain チームが開発した次世代 Agent フレームワークで、LCEL と組み合わせるとより複雑な Agent アプリケーションを構築できます。単純なチェーンタスクには LCEL、複雑な状態管理には LangGraph、これが現時点で成熟した組み合わせパターンです。


AI 開発実践シリーズ ナビゲーション


LLMChain から LCEL への移行

従来の LangChain コードを LCEL パイプライン構文に移行する

⏱️ 目安時間: 2 時間

  1. 1

    ステップ1: 移行対象モジュールの特定

    LLMChain、SequentialChain、TransformChain を使用しているコードをスキャン:

    • grep で "from langchain.chains import" を検索
    • 各チェーンの入出力変数をマーク
    • メモリコンポーネントやコールバック関数の有無を記録
  2. 2

    ステップ2: インポートパスの更新

    旧インポートを v0.3 パスに置き換え:

    • from langchain.chains → from langchain_core.runnables
    • from langchain.prompts import PromptTemplate → from langchain.prompts import ChatPromptTemplate
    • from langchain_openai import OpenAI → from langchain_openai import ChatOpenAI
  3. 3

    ステップ3: 基本チェーンの変換

    パイプ演算子でプロンプトとモデルを接続:

    • chain = LLMChain(llm=llm, prompt=prompt) → chain = prompt | model
    • result = chain.run(...) → result = chain.invoke(...)
    • StrOutputParser を追加して戻り値の型変更に対処
  4. 4

    ステップ4: 複合チェーンの処理

    RunnableParallel またはパイプで複数ステップを接続:

    • SequentialChain → RunnableParallel(並列)または | 接続(直列)
    • TransformChain → RunnableLambda でカスタム関数をラップ
    • 中間結果の受け渡しには RunnablePassthrough を使用
  5. 5

    ステップ5: メモリコンポーネントの移行

    RunnableWithMessageHistory で ConversationChain を置き換え:

    • input_messages_key と history_messages_key を明示的に指定
    • get_session_history 関数でセッション履歴を管理
  6. 6

    ステップ6: ストリーミング出力の有効化

    invoke を stream に変更するだけでストリーミング機能を獲得:

    • result = chain.invoke(...) → for chunk in chain.stream(...)
    • 非同期シナリオでは astream を使用
    • チェーン定義の変更は不要

FAQ

LCEL と従来の LLMChain の最大の違いは何ですか?
LCEL はパイプ演算子 | でコンポーネントを接続し、コード量を約 70% 削減します。従来のチェーンは input_variables/output_variables を明示的に宣言する必要がありましたが、LCEL はデータフローを自動的に処理します。さらに重要なのは、LCEL がストリーミング対応、非同期実行、バッチ処理を組み込みでサポートし、追加のコードが不要なことです。
Runnable インターフェースの invoke/stream/batch の違いは何ですか?
3 つのメソッドは異なるシナリオをカバー:

• invoke — 単一呼び出し、完全な結果を返す(単純な質問応答に適している)
• stream — ストリーミング呼び出し、トークンごとに返す(リアルタイムチャットに適している)
• batch — バッチ呼び出し、複数の入力を並列処理(バッチタスクに適している)

各メソッドには非同期バージョンがあります:ainvoke、astream、abatch。
なぜストリーミング対応がユーザー体験を改善するのですか?
非ストリーミングでは、ユーザーは完全な回答が生成されるまで待つ必要があり、複雑な回答では 8 秒かかることもあります。ストリーミングでは、最初の文字の待ち時間は通常 1 秒未満で、ユーザーはすぐにフィードバックを見られ、心理的な待ち感が大幅に減ります。実測データでは、500 文字の複雑な生成で最初の文字の待ち時間が 8 秒から 0.5 秒に短縮されました。
RunnableParallel とパイプ演算子の違いは何ですか?
パイプ演算子 | は直列実行で、前のコンポーネントの出力が次のコンポーネントに渡されます。RunnableParallel は並列実行で、複数の分岐が同時に実行され、結果が辞書としてマージされます。RAG システムでは RunnableParallel を使って複数のデータソース(ベクトルDB、キーワード検索、グラフ)を同時に検索することがよくあります。
旧コードを LCEL に移行する際の注意点は?
3 つのよくある落とし穴:

• 戻り値の型が変わった:LLMChain.run() は文字列を返すが、LCEL.invoke() は Message オブジェクトを返すため、StrOutputParser を追加する必要がある
• メモリコンポーネントの移行:ConversationChain を RunnableWithMessageHistory に置き換え、フィールド名を明示的に指定する必要がある
• インポートパスが変わった:多くのコンポーネントが langchain から langchain_core または langchain_community に移動した
LCEL はどのようなシナリオに適していますか?LangGraph はどのようなシナリオに適していますか?
単純なチェーンタスク(単一フロー、固定ステップ)には LCEL が適しており、コードが簡潔で保守しやすいです。複雑な状態管理(マルチブランチ、ループ、条件分岐)には LangGraph が適しており、状態と遷移ロジックを明示的に定義できます。実際のプロジェクトでは、両者を組み合わせて使うことが多く、LCEL は単一ステップのロジックを処理し、LangGraph は全体のフローを管理します。

8 min read · 公開日: 2026年5月4日 · 更新日: 2026年5月4日

関連記事

コメント

GitHubアカウントでログインしてコメントできます