Langchain[3]:Langchain架構演進與功能擴展:流式事件處理、事件過濾機制、回調傳播策略及裝飾器應用
1. Langchain的演變
v0.1: 初始版本,包含基本功能。
-
從0.1~0.2完成的特性:
- 通過事件流 API 提供更好的流式支持。
- 標準化工具調用支持Tools Calling。
- 標準化的輸出結構接口。
- @chain 裝飾器,更容易創建 RunnableLambdas。
- 在 Python 中對許多核心抽象的更好異步支持。
- 在 AIMessage 中包含響應元數據,方便訪問底層模型的原始輸出。
- 可視化 runnables 或 langgraph 應用的工具。
- 大多數提供商之間的聊天消息歷史記錄互操作性。
- 超過 20 個 Python 流行集成的合作伙伴包。
-
LangChain 的未來發展
- 持續致力于 langgraph 的開發(向langgraph遷移),增強代理架構的能力。
- 重新審視 vectorstores 抽象,以提高可用性和可靠性。
- 改進文檔和版本化文檔。
- 計劃在 7 月至 9 月之間發布 0.3.0 版本,全面支持 Pydantic 2,并停止對 Pydantic 1 的支持。
注意:自 0.2.0 版本起,langchain 不再依賴 langchain-community。langchain-community 將依賴于 langchain-core 和 langchain。
- 具體變化
從 0.2.0 版開始,langchain 必須與集成無關。這意味著,langchain 中的代碼默認情況下不應實例化任何特定的聊天模型、llms、嵌入模型、vectorstores 等;相反,用戶需要明確指定這些模型。
以下這些API從0.2版本起要顯式的傳遞LLM
langchain.agents.agent_toolkits.vectorstore.toolkit.VectorStoreToolkit
langchain.agents.agent_toolkits.vectorstore.toolkit.VectorStoreRouterToolkit
langchain.chains.openai_functions.get_openapi_chain
langchain.chains.router.MultiRetrievalQAChain.from_retrievers
langchain.indexes.VectorStoreIndexWrapper.query
langchain.indexes.VectorStoreIndexWrapper.query_with_sources
langchain.indexes.VectorStoreIndexWrapper.aquery_with_sources
langchain.chains.flare.FlareChain
langchain.indexes.VectostoreIndexCreator
以下代碼已被移除
langchain.natbot.NatBotChain.from_default removed in favor of the from_llm class method.
- @tool修飾符:
@tool
def my_tool(x: str) -> str:"""Some description."""return "something"print(my_tool.description)
0.2前運行結果會是:my_tool: (x: str) -> str - Some description. 0.2后的運行結果是:Some description.
更多內容見langchain 0.2 :https://python.langchain.com/v0.2/docs/versions/v0_2/deprecations/
LangChain 簡化了 LLM 應用程序生命周期的每個階段:
開發
:使用 LangChain 的開源構建塊、組件和第三方集成構建您的應用程序。使用LangGraph構建具有一流流媒體和人機交互支持的狀態代理。生產化
:使用LangSmith檢查、監控和評估您的鏈,以便您可以不斷優化和自信地部署。部署
:使用LangGraph Cloud將您的 LangGraph 應用程序轉變為可用于生產的 API 和助手。
該框架目前將自身定位為覆蓋LLM應用開發全生命周期的框架。包含開發、部署、工程化三個大方向,在這三個大方向,都有專門的產品或產品集:
開發階段
:主要是python和javascript兩種語言的SDK,配合開放的社區組件模板,來便捷的實現跨LLM的APP開發工程化或產品化階段
:主要是以LangSmith為代表的產品,集監控、playground、評估等功能于一身部署階段
:主要是LangServer產品,基于fastapi封裝的LLM API服務器。
基本的方向是開發員的SDK和組件來壯大社區,然后通過類似LangSmith等工具產品實現商業化。
langchain-core
:主要的SDK依賴包,包括基本的抽象結構和LECL腳本語言。langchain-community
:第三方集成。- 合作伙伴包(例如langchain-openai、langchain-anthropic等):一些集成被進一步拆分成自己的僅依賴于的輕量級包langchain-core。
langchain
:構成應用程序認知架構的鏈、代理和檢索策略(剝離后只有Chains、Agents、以及構成應用程序認知結構的檢索策略)。LangGraph
:通過將步驟建模為圖中的邊和節點,使用 LLM 構建強大且有狀態的多參與者應用程序。與 LangChain 順利集成,但可以在沒有 LangChain 的情況下使用。【多Agents框架的實現】LangServe
:將 LangChain 鏈部署為 REST API。LangSmith
:功能很多包括提示詞模板聚合、監控、調試、評測LLM等等,部分功能會收費。
2. 如何遷移到0.2.x版本
- 安裝 0.2.x 版本的 langchain-core、langchain,并將可能使用的其他軟件包升級到最新版本。(例如,langgraph、langchain-community、langchain-openai 等)。
- 驗證代碼是否能在新軟件包中正常運行(例如,單元測試通過)。
- 安裝最新版本的 langchain-cli,并使用該工具將代碼中使用的舊導入替換為新導入。
- 手動解決所有剩余的棄用警告。
- 重新運行單元測試。
- 如果正在使用 astream_events,請查看如何遷移到 astream events v2。
- 如何遷移到0.2.x - 升級依賴包
0.2版本對依賴包做了較大調整,詳細參照下表:
- 如何遷移到0.2.x - 使用langchain-cli工具
安裝該工具
pip install langchain-cli
langchain-cli --version # <-- 確保版本至少為 0.0.22
注意,該工具并不完美,在遷移前你應該備份好你的代碼。使用的時候您需要運行兩次遷移腳本,因為每次運行只能應用一次導入替換。
#例如,您的代碼仍然使用
from langchain.chat_models import ChatOpenAI
#第一次運行后,您將得到:
from langchain_community.chat_models import ChatOpenAI
#第二次運行后,您將得到:
from langchain_openai import ChatOpenAI
ang-cli的其他命令:
#See help menu
langchain-cli migrate --help
#Preview Changes without applying
langchain-cli migrate --diff [path to code]
#run on code including ipython notebooks
#Apply all import updates except for updates from langchain to langchain-core
langchain-cli migrate --disable langchain_to_core --include-ipynb [path to code]
3.基于runnables的流式事件支持
大模型在推理時由于要對下一個字的概率進行計算,所以無論多么牛逼的LLM,在推理的時候或多或少都有一些延遲,而這種延遲在類似Chat的場景里,體驗非常不好,除了在LLM上下功夫外,提升最明顯的就是從用戶體驗著手,采用類似流式輸出的方式,加快反饋提升用戶體驗,讓用戶感覺快樂很多,這也是為什么chatG{T會采用這種類似打字機效果的原因。流式在langchain前面版本已經支持不少,在0.2版本里,主要是增加了事件支持,方便開發者對流可以有更細致的操作顆粒度。
- 流的主要接口
我們知道從0.1大版本開始,langchain就支持所謂的runnable協議,為大部分組件都添加了一些交互接口,其中流的接口有:- 同步方式的stream以及異步的astream:他們會以流的方式得到chain的最終結果。
- 異步方式的astream_event和astream_log:這兩個都可以獲得到流的中間步驟和最終結果。
3.1 直接使用大模型輸出流
from langchain_community.chat_models import ChatZhipuAI
import os
os.environ["ZHIPUAI_API_KEY"] = "zhipuai_api_key"model = ChatZhipuAI(model="glm-4",temperature=0,streaming=True,
)chunks = []
async for chunk in model.astream("你好關于降本增效你都知道什么?"): #采用異步比同步輸出更快chunks.append(chunk)print(chunk.content, end="|", flush=True)
- 結果
#異步輸出
降|本|增效|是企業|為了|提高|市場|競爭力|、|優化|資源配置|、|提升|經濟效益|而|采取|的一系列|措施|。|其|核心|是|降低|成本|、|提高|效率|,|具體|來說|,|包括|以下幾個方面|:1|.| **|成本|控制|**|:|企業|通過|精細|化管理|,|嚴格控制|生產|成本|,|減少|不必要的|開支|。|比如|,|優化|供應鏈|管理|,|降低|原材料|采購|成本|;|提高|能源|利用|效率|,|減少|能源|消耗|;|精|簡|人員|結構|,|提高|勞動|生產|率|等|。2|.| **|技術創新|與|研發|**|:|通過|技術創新|和|研發|,|改進|生產工藝|,|提高|產品質量|,|降低|單位|產品|成本|。|同時|,|新技術|、|新|產品的|開發|也能|提升|企業的|市場競爭|力和|盈利|能力|。3|.| **|管理|優化|**|:|優化|企業|內部|管理|流程|,|提高|決策|效率|和管理|效率|。|如|實施|信息化|管理|,|提高|數據處理|速度|和|準確性|,|減少|人為|錯誤|和|重復|勞動|。4|.| **|市場|與|銷售|策略|調整|**|:|根據|市場|變化|調整|銷售|策略|,|優化|產品|結構|,|提高|高|附加值|產品的|比重|,|增強|市場|適應|能力和|盈利|能力|。5|.| **|資金|運作|**|:|合理|規劃和|優化|企業|融資|結構|,|降低|財務|成本|。|比如|,|通過|發行|低|利率|債券|等方式|籌集|資金|,|減少|利息|支出|。6|.| **|規模|效應|**|:|擴大|生產|規模|,|實現|規模|經濟|,|降低|單位|成本|。以下|是根據|提供的|參考|信息|,|對|幾|家企業|降|本|增效|措施|的具體|案例分析|:-| **|山東|鋼鐵|**|:|面臨|行業|困境|,|山東|鋼鐵|通過|增持|公司|股份|增強|市場|信心|,|同時|實施|包括|提高|增量|、|降低|費用|、|加強|采購|優化|銷售等|在內的|多項|措施|,|并通過|財務|手段|降低|貸款|利率|,|成功|發行|低成本|融資|券|。-| **|銀|輪|股份|**|:|公司|通過|持續推進|降|本|增效|措施|,|提升|運營|效率|,|改善|海外|工廠|運營|,|提升|盈利|能力|,|并通過|加大|研發|投入|,|強化|技術|產品|優勢|。-| **|山|鷹|國際|**|:|通過|提升|產能|利用率|,|持續|推動|降|本|增效|和|精益|生產|,|優化|運營|資金|,|降低|管|銷|費用|。-| **|野|馬|電池|**|:|公司|優化|營銷|網絡|布局|,|加強|銷售|推廣|,|同時|推進|精細|化管理|,|全面|降|本|增效|,|提高|運營|效率|。-| **|麗|尚|國|潮|**|:|在|消費|復蘇|背景下|,|公司|通過|優化|商業模式|,|實施|精細化|運營|管理|,|提升|業務|效率和|盈利|能力|。-| **|白銀|有色|**|:|通過|強化|內部|管理|,|多|措|并舉|降|本|增效|、|開源|節|流|,|提升|經濟效益|,|改善|經營|狀況|。這些|案例|表明|,|降|本|增效|是|企業在|各種|市場|環境下|提升|競爭力|、|保證|可持續發展|的重要|途徑|。||
3.2 Chain中的流輸出
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplateimport os
os.environ["ZHIPUAI_API_KEY"] = "xxx"model = ChatZhipuAI(model="glm-4",temperature=0,streaming=True,
)prompt = ChatPromptTemplate.from_template("告訴我一個關于{topic}的笑話")
parser = StrOutputParser()
chain = prompt | model | parserasync for chunk in chain.astream({"topic": "裁員"}):print(chunk, end="|", flush=True)
有一天|,|公司|老板|走進|辦公室|,|對所有|員工|說|:“|我|有個|好消息|和一個|壞|消息|要|告訴大家|。”|員工|們|緊張|地|等待著|,|老板|接著|說|:“|壞|消息|是|,|我們|公司|要|裁員|了|。”|大家|一片|沉默|,|這時|老板|又|笑著說|:“|好消息|是|,|我們|公司|要|裁員|了|,|你們|終于|可以|擺脫|這些|無聊|的工作|,|去|追求|自己的|夢想|了|!”|員工|們|面|面|相|覷|,|其中|一個人|小|聲|嘀|咕|:“|那|我還是|先|回去|做|一下|簡歷|吧|。”|這個|笑話|雖然|有些|黑色|幽默|,|但也|反映了|裁員|這個|話題|在|職場|中的|敏感性|。|希望大家|在|現實生活中|都能|順利|度過|各種|職場|挑戰|。||
3.3 高級使用:在chain中使用流式輸出json結構
很多時候的實際場景是,我們希望接口輸出的是一個json結構,這樣在前端應用層面會比較靈活,但是如果是流式輸出,很可能因為字符結構沒有輸出結束會導致json報錯,這種情況可以這樣處理:
from langchain_community.chat_models import ChatZhipuAI
from langchain_core.output_parsers import JsonOutputParser
import osos.environ["ZHIPUAI_API_KEY"] = "key"model = ChatZhipuAI(model="glm-4",temperature=0,streaming=True,
)#異步方法
import asyncio
async def my_async_function():chain = (model | JsonOutputParser())async for text in chain.astream("輸出 JSON 格式的法國、西班牙和日本國家及其人口列表。 "'使用一個外鍵為 "countries "的 dict,其中包含一個國家列表。'"每個國家都應有 `name` 和 `population`鍵"):print(text, flush=True)async def main():await my_async_function()asyncio.run(main()) #In plain Python
#await main() # In jupyter jupyter 已經運行了loop,無需自己激活,采用await()調用即可
- 結果
{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 67}]}
{'countries': [{'name': 'France', 'population': 673}]}
{'countries': [{'name': 'France', 'population': 673900}]}
{'countries': [{'name': 'France', 'population': 67390000}]}
{'countries': [{'name': 'France', 'population': 67390000}, {}]}
{'countries': [{'name': 'France', 'population': 67390000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67390000}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67390000}, {'name': 'Spain', 'population': 46}]}
{'countries': [{'name': 'France', 'population': 67390000}, {'name': 'Spain', 'population': 467}]}
{'countries': [{'name': 'France', 'population': 67390000}, {'name': 'Spain', 'population': 467330}]}
{'countries': [{'name': 'France', 'population': 67390000}, {'name': 'Spain', 'population': 46733038}]}
{'countries': [{'name': 'France', 'population': 67390000}, {'name': 'Spain', 'population': 46733038}, {}]}
{'countries': [{'name': 'France', 'population': 67390000}, {'name': 'Spain', 'population': 46733038}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67390000}, {'name': 'Spain', 'population': 46733038}, {'name': 'Japan'}]}
{'countries': [{'name': 'France', 'population': 67390000}, {'name': 'Spain', 'population': 46733038}, {'name': 'Japan', 'population': 125}]}
{'countries': [{'name': 'France', 'population': 67390000}, {'name': 'Spain', 'population': 46733038}, {'name': 'Japan', 'population': 1258}]}
{'countries': [{'name': 'France', 'population': 67390000}, {'name': 'Spain', 'population': 46733038}, {'name': 'Japan', 'population': 125880}]}
{'countries': [{'name': 'France', 'population': 67390000}, {'name': 'Spain', 'population': 46733038}, {'name': 'Japan', 'population': 125880000}]}
看到流的輸出總是保持這合法的json結構,從而避免了報錯,如果我們期待在這種結構下,可以以流式來取到國家名稱該怎么做?是的這里就要在Json輸出后,繼續處理。
from langchain_community.chat_models import ChatZhipuAI
from langchain_core.output_parsers import JsonOutputParser
import osos.environ["ZHIPUAI_API_KEY"] = "9a0"model = ChatZhipuAI(model="glm-4",temperature=0,streaming=True,
)#異步方法
import asyncio#自定義函數用來過濾上一步的輸入
async def _extract_country_names_streaming(input_stream):"""A function that operates on input streams."""country_names_so_far = set()async for input in input_stream:if not isinstance(input, dict):continueif "countries" not in input:continuecountries = input["countries"]if not isinstance(countries, list):continuefor country in countries:name = country.get("name")if not name:continueif name not in country_names_so_far:yield namecountry_names_so_far.add(name)async def my_async_function():# 在json輸出后,調用自定義函數用來過濾國家這個字段chain = model | JsonOutputParser() | _extract_country_names_streamingasync for text in chain.astream("輸出 JSON 格式的法國、西班牙和日本國家及其人口列表。 "'使用一個外鍵為 "countries "的 dict,其中包含一個國家列表。'"每個國家都應有 `name` 和 `population`鍵"):#以|符號分割開字符print(text, end="|", flush=True)async def main():await my_async_function()asyncio.run(main())
Franc| Spain| Japan|
3.4 不支持流式的組件處理(檢索器)
并不是所有的組件都支持流式輸出,比如檢索器就不支持,在原生的langchain中,當你給不支持stram的組件調用流接口時,一般不會有打字機效果,而是和使用invoke效果差不多。而當你使用LCEL去調用類似檢索器組件的時候,它依然可以搞出來打字機效果,這也是為什么要盡量使用LCEL的原因。我們看個例子:
#安裝依賴
pip install faiss-cpu
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddingstemplate = """Answer the question based only on the following context:
{context}Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)vectorstore = FAISS.from_texts(["harrison worked at kensho", "harrison likes spicy food"],embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks
原生的檢索器在這種情況下只會返回最終結果,并沒有流的效果:
[[Document(page_content='harrison worked at kensho'),Document(page_content='harrison likes spicy food')]]
而使用LCEL調用后,則可以輸出中間的過程:
retrieval_chain = ({"context": retriever.with_config(run_name="Docs"),"question": RunnablePassthrough(),}| prompt| model| StrOutputParser()
)for chunk in retrieval_chain.stream("Where did harrison work? " "Write 3 made up sentences about this place."
):print(chunk, end="|", flush=True)
- 全代碼【智譜llM+百川詞嵌入模型】
from langchain_community.chat_models import ChatZhipuAI
from langchain_core.output_parsers import JsonOutputParser
from langchain_community.embeddings import BaichuanTextEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
import osos.environ["ZHIPUAI_API_KEY"] = "7c182nN"model = ChatZhipuAI(model="glm-4",temperature=0,streaming=True,
)template = """Answer the question based only on the following context:
{context}Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)embeddings=BaichuanTextEmbeddings(baichuan_api_key="sk-175510")vectorstore = FAISS.from_texts(["harrison worked at kensho", "harrison likes spicy food"],embedding=embeddings,
)
retriever = vectorstore.as_retriever()retrieval_chain = ({"context": retriever.with_config(run_name="Docs"),"question": RunnablePassthrough(),}| prompt| model| StrOutputParser()
)for chunk in retrieval_chain.stream("Where did harrison work? " "Write 3 made up sentences about this place."
):print(chunk, end="|", flush=True)
H|arrison| worked| at| Kens|ho|,| a| cutting|-edge| technology| company| known| for| its| innovative| AI| solutions|.|
1|.| Kens|ho| is| renowned| for| its| vibrant| work| culture|,| where| employees| are| encouraged| to| think| outside| the| box| and| push| the| boundaries| of| technology|.
2|.| The| office| environment| at| Kens|ho| is| dynamic| and| fast|-paced|,| with| a| strong| emphasis| on| collaboration| and| continuous| learning|.
3|.| Kens|ho| is| located| in| a| state|-of|-the|-art| facility|,| boasting| impressive| amenities| and| a| sleek|,| modern| design| that| fost|ers| creativity| and| productivity|.||
4. v0.2的核心特性:流中的事件支持
如要使用該特性,你首先要確認自己的langchain_core版本等于0.2
import langchain_core
langchain_core.__version__
#'0.2.18'
官方給到了一些注意事項:
- 使用流要盡量使用異步方式編程。
- 如果你自定義了函數一定要配置callback。
- 不使用LCEL的話盡量使用.astram來訪問LLM。
langchain將流的過程細化,并在每個階段給了開發者一個事件鉤子,每個階段都可以獲取輸出結果:
4.1 在chatmodel中使用:
from langchain_community.chat_models import ChatZhipuAI
from langchain_core.output_parsers import JsonOutputParser
import osos.environ["ZHIPUAI_API_KEY"] = "7epjHnN"model = ChatZhipuAI(model="glm-4",temperature=0,streaming=True,
)#異步方法
import asyncio
async def my_async_function():events = []async for event in model.astream_events("hello", version="v2"):events.append(event)print(events[:3])async def main():await my_async_function()#asyncio.run(main())
await main()
注意:version=v2這個參數表明events事件流依然是一個beta API,后面肯定還有更改,所以商業應用要慎重!該參數只在 langchain-core>=0.2.0作用!
- 結果
[{'event': 'on_chat_model_start', 'data': {'input': 'hello'}, 'name': 'ChatZhipuAI', 'tags': [], 'run_id': 'c87b9c20-6dbf-41d3-989a-0b609c0b3fb4', 'metadata': {'ls_model_type': 'chat'}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': 'c87b9c20-6dbf-41d3-989a-0b609c0b3fb4', 'name': 'ChatZhipuAI', 'tags': [], 'metadata': {'ls_model_type': 'chat'}, 'data': {'chunk': AIMessageChunk(content='Hello', id='run-c87b9c20-6dbf-41d3-989a-0b609c0b3fb4')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': 'c87b9c20-6dbf-41d3-989a-0b609c0b3fb4', 'name': 'ChatZhipuAI', 'tags': [], 'metadata': {'ls_model_type': 'chat'}, 'data': {'chunk': AIMessageChunk(content=' 👋!', id='run-c87b9c20-6dbf-41d3-989a-0b609c0b3fb4')}, 'parent_ids': []}]
4.2 在Chain中的使用:
from langchain_community.chat_models import ChatZhipuAI
from langchain_core.output_parsers import JsonOutputParser
import osos.environ["ZHIPUAI_API_KEY"] = "7cjHnN"model = ChatZhipuAI(model="glm-4",temperature=0,streaming=True,
)#異步方法
import asyncioasync def my_async_function():chain = (model | JsonOutputParser())num_events = 0async for event in chain.astream_events("輸出 JSON 格式的法國、西班牙和日本國家及其人口列表。 "'使用一個外鍵為 "countries "的 dict,其中包含一個國家列表。'"每個國家都應有 `name` 和 `population`鍵",version="v2",):#篩選eventkind = event["event"]if kind == "on_chat_model_stream":print(f"Chat model chunk: {repr(event['data']['chunk'].content)}",flush=True,)if kind == "on_parser_stream":print(f"Parser chunk: {event['data']['chunk']}", flush=True)num_events += 1if num_events > 30:# Truncate the outputprint("...")breakasync def main():await my_async_function()#asyncio.run(main())
await main()
Chat model chunk: '以下是'
Chat model chunk: '按照'
Chat model chunk: '您'
Chat model chunk: '的要求'
Chat model chunk: ','
Chat model chunk: '以'
Chat model chunk: ' JSON'
Chat model chunk: ' 格'
Chat model chunk: '式'
Chat model chunk: '表示'
Chat model chunk: '法國'
Chat model chunk: '、'
Chat model chunk: '西班牙'
Chat model chunk: '和'
Chat model chunk: '日本'
Chat model chunk: '國家'
Chat model chunk: '及其'
Chat model chunk: '人口'
Chat model chunk: '的一個'
Chat model chunk: '示例'
Chat model chunk: ':\n\n'
Chat model chunk: 'json'
Chat model chunk: '\n{\n '
Parser chunk: {}
Chat model chunk: ' "'
Chat model chunk: 'countries'
Chat model chunk: ':'
...
5.事件過濾
結合事件以及配置參數,可以很方便的找出你想要的階段數據
通過定義名字實現事件的篩選,后續想要使用的塊
from langchain_community.chat_models import ChatZhipuAI
from langchain_core.output_parsers import JsonOutputParser
import osos.environ["ZHIPUAI_API_KEY"] = "key"model = ChatZhipuAI(model="glm-4",temperature=0,streaming=True,
)#異步方法
import asyncioasync def my_async_function():chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config({"run_name": "my_parser"}
)num_events = 0async for event in chain.astream_events("輸出 JSON 格式的法國、西班牙和日本國家及其人口列表。 "'使用一個外鍵為 "countries "的 dict,其中包含一個國家列表。'"每個國家都應有 `name` 和 `population`鍵",version="v2",include_names=["my_parser"],):print(event)max_events += 1if max_events > 10:# Truncate outputprint("...")breakasync def main():await my_async_function()asyncio.run(main())
6.回調傳播
在工具中使用調用可運行項,則需要將回調傳播到可運行項;否則,不會生成任何流事件。
from langchain_community.chat_models import ChatZhipuAI
from langchain_core.output_parsers import JsonOutputParser
import osos.environ["ZHIPUAI_API_KEY"] = "key"model = ChatZhipuAI(model="glm-4",temperature=0,streaming=True,
)from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool#反轉單詞
def reverse_word(word: str):return word[::-1]reverse_word = RunnableLambda(reverse_word)@tool
def correct_tool(word: str, callbacks):"""A tool that correctly propagates callbacks."""return reverse_word.invoke(word, {"callbacks": callbacks})async for event in correct_tool.astream_events("hello", version="v2"):print(event)
{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'correct_tool', 'tags': [], 'run_id': '97cd4122-e699-4a54-8370-699ed9e6cdb4', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '9e6635fe-879f-4b74-9c23-8de768b49a39', 'metadata': {}, 'parent_ids': ['97cd4122-e699-4a54-8370-699ed9e6cdb4']}
{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '9e6635fe-879f-4b74-9c23-8de768b49a39', 'name': 'reverse_word', 'tags': [], 'metadata': {}, 'parent_ids': ['97cd4122-e699-4a54-8370-699ed9e6cdb4']}
{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': '97cd4122-e699-4a54-8370-699ed9e6cdb4', 'name': 'correct_tool', 'tags': [], 'metadata': {}, 'parent_ids': []}
更多優質內容請關注公號:汀丶人工智能;會提供一些相關的資源和優質文章,免費獲取閱讀。