1. 多組件 invoke 嵌套的缺點
prompt = ChatPromptTemplate.from_template("{query}")
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
parser = StrOutputParser()
# 獲取輸出內容
content = parser.invoke(
llm.invoke(
prompt.invoke(
{"query": req.query.data}
)
)
)
這種寫法雖然能實現對應的功能,但是存在很多缺陷:
- 嵌套式寫法讓程序的維護性與可閱讀性大大降低,當需要修改某個組件時,變得異常困難。
- 沒法得知每一步的具體結果與執行進度,出錯時難以排查。
- 嵌套式寫法沒法集成大量的組件,組件越來越多時,代碼會變成“一次性”代碼
2. 手寫一個"Chain"優化代碼
觀察發現,雖然 Prompt、Model、OutputParser 分別有自己獨立的調用方式,例如:
Prompt 組件 :format、invoke、to_string、to_messages。
Model 組件 :generate、invoke、batch。
OutputParser 組件 :parse、invoke
但是有一個共同的調用方法:invoke,并且每一個組件的輸出都是下一個組件的輸入,是否可以將所有組件組裝得到一個列表,然后循環依次調用 invoke 執行每一個組件,然后將當前組件的輸出作為下一個組件的輸入。
from typing import Any
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
prompt = ChatPromptTemplate.from_template("{query}")
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
parser = StrOutputParser()
class Chain: steps: list = []
def __init__(self, steps: list): self.steps = steps
def invoke(self, input: Any) -> Any: output: Any = input
for step in self.steps: output = step.invoke(output)
print(step)
print("執行結果:", output)
print("===============")
return output
chain = Chain([prompt, llm, parser])
print(chain.invoke({"query": "你好,你是?"}))
輸出結果為:
input_variables=['query'] messages=
[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['query'],
template='{query}'))]
執行結果: messages=[HumanMessage(content='你好,你是?')] ===============
client=<openai.resources.chat.completions.Completions object at
0x000001C6BF694310> async_client=
<openai.resources.chat.completions.AsyncCompletions object at 0x000001C6BF695BD0> model_name='gpt-3.5-turbo-16k' openai_api_key=SecretStr('**********') openai_api_base='https://api.xty.app/v1' openai_proxy='' 執行結果: content='你好!我是 ChatGPT,一個由OpenAI開發的人工智能語言模型。我可以回答各種各樣
的問題,幫助解決問題,提供信息和創意。有什么我可以幫助你的嗎?' response_metadata=
{'token_usage': {'completion_tokens': 72, 'prompt_tokens': 13, 'total_tokens': 85}, 'model_name': 'gpt-3.5-turbo-16k', 'system_fingerprint': 'fp_b28b39ffa8',
'finish_reason': 'stop', 'logprobs': None} id='run-5bf9e183-4b28-4be9-bf65- ce0ad9590785-0' =============== 執行結果: 你好!我是 ChatGPT,一個由OpenAI開發的人工智能語言模型。我可以回答各種各樣的問題,幫
助解決問題,提供信息和創意。有什么我可以幫助你的嗎?
=============== 你好!我是 ChatGPT,一個由OpenAI開發的人工智
3. Runnable 簡介與 LCEL 表達式
為了盡可能簡化創建自定義鏈,LangChain 官方實現了一個 Runnable 協議,這個協議適用于
LangChain 中的絕大部分組件,并實現了大量的標準接口,涵蓋:
- stream :將組件的響應塊流式返回,如果組件不支持流式則會直接輸出。
- invoke :調用組件并得到對應的結果。
- batch :批量調用組件并得到對應的結果。
- astream :stream 的異步版本。
- ainvoke :invoke 的異步版本。
- abatch :batch 的異步版本。
- astream_log :除了流式返回最終響應塊之外,還會流式返回中間步驟。
除此之外,在 Runnable 中還重寫了 or 和 ror 方法,這是 Python 中 | 運算符的計算邏輯,所有的 Runnable 組件,均可以通過 | 或者 pipe() 的方式將多個組件拼接起來形成一條鏈。
組件 | 輸入類型 | 輸出類型 |
---|---|---|
Prompt 提示 | Dict 字典 | PromptValue 提示值 |
ChatModel 聊天模型 | 字符串、聊天消息列表、PromptValue提示值 | ChatMessage 聊天消息 |
LLM 大語言模型 | 字符串、聊天消息列表、PromptValue提示值 | String 字符串 |
OutputParser 輸出解析器 | LLM 或聊天模型的輸出 | 取決于解析器 |
Retriever 檢索器 | 單個字符串 | List of Document 文檔列表 |
Tool 工具 | 字符串、字典或取決于工具 | 取決于工具 |
例如上面自定義"Chain"的寫法等同于如下的代碼:
from typing import Any
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
prompt = ChatPromptTemplate.from_template("{query}")
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
parser = StrOutputParser()
chain = prompt | llm | parser # 等價于以下寫法
composed_chain_with_pipe = (
RunnableParallel({"query": RunnablePassthrough()})
.pipe(prompt)
.pipe(llm)
.pipe(parser)
)
print(chain.invoke({"query": "你好,你是?"}))
Runnable 底層的運行邏輯本質上也是將每一個組件添加到列表中,然后按照順序執行并返回最終結果,這里參考核心源碼:
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
from langchain_core.beta.runnables.context import config_with_context
# setup callbacks and context config = config_with_context(ensure_config(config), self.steps) callback_manager = get_callback_manager_for_config(config)
# start the root run
run_manager = callback_manager.on_chain_start(
dumpd(self),
input, name=config.get("run_name") or self.get_name(), run_id=config.pop("run_id", None),
)
# 調用所有步驟并逐個執行得到對應的輸出,然后作為下一個的輸入
try:
for i, step in enumerate(self.steps):
input = step.invoke(
input, # mark each step as a child run patch_config( config, callbacks=run_manager.get_child(f"seq:step:{i+1}")
),
)
# finish the root run
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(input)
return cast(Output, input)
4. 兩個Runnable核心類的講解與使用
1. RunnableParallel 并行運行
RunnableParallel 是 LangChain 中封裝的支持運行多個 Runnable 的類,一般用于操作 Runnable 的輸出,以匹配序列中下一個 Runnable 的輸入,起到并行運行 Runnable 并格式化輸出結構的作用。例如 RunnableParallel 可以讓我們同時執行多條 Chain,然后以字典的形式返回各個 Chain 的結果,對比每一條鏈單獨執行,效率會高很多。
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
# 1.編排2個提示模板
joke_prompt = ChatPromptTemplate.from_template("請講一個關于{subject}的冷笑話,盡可能
短")
poem_prompt = ChatPromptTemplate.from_template("請寫一篇關于{subject}的詩,盡可能短")
# 2.創建大語言模型
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
# 3.創建輸出解析器
parser = StrOutputParser()
# 4.構建兩條鏈
joke_chain = joke_prompt | llm | parser poem_chain = poem_prompt | llm | parser
# 5.使用RunnableParallel創建并行可運行
map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)
# 6.運行并行可運行組件得到響應結果
resp = map_chain.invoke({"subject": "程序員"})
print(resp)
輸出內容:
{'joke': '為什么程序員總是用尺子測電腦屏幕?因為他們聽說了“像素”是屏幕上的一種“尺寸”。',
'poem': '在代碼的海洋里徜徉,\n程序員心懷夢想與創意。\n鍵盤敲擊是旋律,\nbug 是詩歌的瑕疵。
\n\n算法如詩的韻律,\n邏輯是句子的構思。\n編程者如詩人般,\n創造出數字的奇跡。'}
除了并行執行,RunnableParallel 還可以用于操作 Runnable 的輸出,用于產生符合下一個 Runnable 組件的數據。
例如:用戶傳遞數據,并行執行檢索策略得到上下文隨后傳遞給 Prompt 組件,如下。
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
def retrieval(query: str) -> str: """模擬一個檢索器,傳入查詢query,輸出文本""" print("執行檢索:", query)
return "我叫老鐵,是一名AI應用開發工程師。"
# 1.編排Prompt
prompt = ChatPromptTemplate.from_template("""請根據用戶的提問回答問題,可以參考對應的上下
文進行回復。
<context>
{context} <context> 用戶的問題是: {query}""")
# 2.構建大語言模型
llm = ChatOpenAI(model="gpt-3.5-turbo")
# 3.創建輸出解析器
parser = StrOutputParser()
# 4.編排鏈
chain = RunnableParallel( context=retrieval, query=RunnablePassthrough(),
) | prompt | llm | parser
# 5.調用鏈生成結果
content = chain.invoke("你好,我叫什么?")
print(content)
輸出內容:
執行檢索: 你好,我叫什么?
你好,你叫老鐵。
在創建 RunnableParallel 的時候,支持傳遞字典、函數、映射、鍵值對數據等多種方式,
RunnableParallel 底層會執行檢測并將數據統一轉換為 Runnable,核心源碼如下:
# langchain-core/runnables/base.py
def __init__( self, steps__: Optional[
Mapping[
str, Union[
Runnable[Input, Any], Callable[[Input], Any], Mapping[str, Union[Runnable[Input, Any], Callable[[Input], Any]]],
],
]
] = None, **kwargs: Union[
Runnable[Input, Any], Callable[[Input], Any], Mapping[str, Union[Runnable[Input, Any], Callable[[Input], Any]]],
],
) -> None: # 1.檢測是否傳遞字典,如果傳遞,則提取字段內的所有鍵值對
merged = {**steps__} if steps__ is not None else {}
# 2.傳遞了鍵值對,則將鍵值對更新到merged進行合并
merged.update(kwargs) super().__init__( # type: ignore[call-arg]
# 3.循環遍歷merged的所有鍵值對,并將每一個元素轉換成Runnable
steps__={key: coerce_to_runnable(r) for key, r in merged.items()}
)
除此之外,在 Chains 中使用時,可以簡寫成字典的方式, or 和 ror 會自動將字典轉換成
RunnableParallel,核心源碼:
# langchain_core/runnables/base.py
def coerce_to_runnable(thing: RunnableLike) -> Runnable[Input, Output]: """Coerce a runnable-like object into a Runnable.
Args:
thing: A runnable-like object.
Returns: A Runnable. """
if isinstance(thing, Runnable): return thing
elif is_async_generator(thing) or inspect.isgeneratorfunction(thing):
return RunnableGenerator(thing) elif callable(thing): return RunnableLambda(cast(Callable[[Input], Output], thing)) elif isinstance(thing, dict): # 如果類型為字典,使用字典創建RunnableParallel并轉換成Runnable格式
return cast(Runnable[Input, Output], RunnableParallel(thing)) else:
raise TypeError(
f"Expected a Runnable, callable or dict."
f"Instead got an unsupported type: {type(thing)}"
)
2. RunnablePassthrough 傳遞數據
除了 RunnablePassthrough,在 LangChain 中,另外一個高頻使用的 Runnable 類是RunnablePassthrough,這個類透傳上游參數輸入,簡單來說,就是可以獲取上游的數據,并保持不變或者新增額外的鍵。通常與 RunnableParallel 一起使用,將數據分配給映射中的新鍵。
例如:使用 RunnablePassthrough 來簡化 invoke 的調用流程。
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
# 1.編排prompt
prompt = ChatPromptTemplate.from_template("{query}")
# 2.構建大語言模型
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
# 3.創建鏈
chain = {"query": RunnablePassthrough()} | prompt | llm | StrOutputParser()
# 4.調用鏈并獲取結果
content = chain.invoke("你好,你是")
print(content)
輸出內容:
你好!是的,我是ChatGPT,你需要什么幫助嗎?
RunnablePassthrough() 獲取的是整個輸入的內容(字符串或者字典),如果想獲取字典內的某個部
分,可以使用 itemgetter 函數,并傳入對應的字段名即可,如下:
from operator import itemgetter
chain = {"query": itemgetter("query")} | prompt | llm | StrOutputParser()
content = chain.invoke({"query": "你好,你是?"})
除此之外,如果想在傳遞的數據中添加數據,還可以使用 RunnablePassthrough.assign() 方法來實現快速添加。例如:為
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
def retrieval(query: str) -> str: """模擬一個檢索器,傳入查詢query,輸出文本""" print("執行檢索:", query)
return "我叫老鐵,是一名AI應用開發工程師。"
# 1.編排Prompt
prompt = ChatPromptTemplate.from_template("""請根據用戶的提問回答問題,可以參考對應的上下
文進行回復。
<context>
{context} <context> 用戶的問題是: {query}""")
# 2.構建大語言模型
llm = ChatOpenAI(model="gpt-3.5-turbo")
# 3.創建輸出解析器
parser = StrOutputParser()
# 4.編排鏈,RunnablePassthrough.assign寫法
chain = (
RunnablePassthrough.assign(context=lambda query: retrieval(query)) |
prompt |
llm |
parser
)
# 5.調用鏈生成結果
content = chain.invoke({"query": "你好,我叫什么?"})
print(content)
5. LangChain 中 Runnable 的進階組合用法
1. 整體流程說明
一個典型的 AI 應用鏈可以分為以下幾個模塊,每個模塊都是一個 Runnable
:
用戶輸入 → Prompt 模板 → LLM 模型 → 輸出解析器 → 最終輸出
2. 所需組件導入
from langchain_core.runnables import RunnableLambda, RunnableMap
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
3. 定義 Prompt 模板(PromptTemplate)
prompt = PromptTemplate.from_template("請用中文寫一首關于{topic}的詩。")
這是一個可填充的提示模板,接收參數 topic
。
4. 初始化 LLM 模型(ChatOpenAI)
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
你也可以替換為 Claude、Gemini、Qwen 等其他模型。
5. 定義輸出解析器(StrOutputParser)
parser = StrOutputParser()
用于將 LLM 的結構化輸出(如 JSON、Message)轉換為純字符串。
6. 構建完整的鏈(組合多個 Runnable)
# 將輸入 topic 映射為 prompt 所需格式
input_mapper = RunnableLambda(lambda x: {"topic": x})# 構建完整鏈條:輸入 → Prompt → LLM → 解析器
chain = input_mapper | prompt | llm | parser
7. 調用鏈條(invoke、batch、stream)
7.1 單次調用
result = chain.invoke("長城")
print(result)
7.2 批量調用
topics = ["長江", "黃山", "故宮"]
results = chain.batch(topics)
print(results)
7.3 流式輸出(逐 token 打印)
for chunk in chain.stream("泰山"):print(chunk, end="")
8. 使用 RunnableMap 處理多輸入字段
如果你的提示模板需要多個輸入字段(如 product
和 audience
),可以使用 RunnableMap
:
prompt = PromptTemplate.from_template("請寫一個關于{product},面向{audience}的廣告文案。")# 輸入映射器:將用戶輸入拆分為多個字段
input_map = RunnableMap({"product": lambda x: x["product"],"audience": lambda x: x["audience"]
})# 構建鏈
chain = input_map | prompt | llm | parser# 調用
result = chain.invoke({"product": "智能手表","audience": "年輕上班族"
})
print(result)
9. 使用 OutputParser 解析 JSON 輸出(可選)
如果你的 LLM 輸出是結構化 JSON,可以使用 JsonOutputParser
:
from langchain.output_parsers import JsonOutputParserparser = JsonOutputParser()
然后在提示中引導模型輸出 JSON 格式:
prompt = PromptTemplate.from_template("""
請以 JSON 格式回答以下問題:
問題:{question}
輸出格式:
{{"answer": "...","confidence": "高/中/低"
}}
""")
10. 總結:鏈式組合的優勢
特性 | 描述 |
---|---|
可組合 | 各模塊可自由組合、替換 |
可調試 | 每個模塊可單獨測試 |
可擴展 | 支持加入工具、檢索器、函數調用等 |
可追蹤 | 可與 LangSmith 集成,進行鏈路追蹤 |
完整組合鏈一覽
from langchain_core.runnables import RunnableLambda
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI# 模塊定義
prompt = PromptTemplate.from_template("請寫一首關于{topic}的七言絕句。")
llm = ChatOpenAI(model="gpt-3.5-turbo")
parser = StrOutputParser()# 輸入映射
input_mapper = RunnableLambda(lambda x: {"topic": x})# 鏈式組合
chain = input_mapper | prompt | llm | parser# 調用鏈
print(chain.invoke("西湖"))