01. 數據狀態與歸納函數
在前面的課時中,我們說過在 LangGraph 中?節點?在默認情況下返回的字典數據會將原始數據覆蓋,例如下面的代碼最終返回結果是?{"messages": [4]}?而不是?[1,2,3,4],如下
class MyState(TypedDict):
????messages: list
def fn1(state: MyState):
????return {"messages": [4]}
# ... (ignore codes of start->fn1->end, blah blah)
r = graph.invoke({"messages": [1, 2, 3]})
如果就是想要?[1, 2, 3, 4]?呢?第一種方法就是拿到?原始狀態?的值,更新新數據,然后返回def fn1(state: MyState):
????old = state.get("messages", [])
return {"messages": old + [4]}
除此之外,在 LangGraph 中,還針對?Annotated?進行了封裝,在 Python 中?Annotated?只是另外一種形態的?注釋,對類型的聲明+使用并沒有任何影響,例如
不過這樣聲明有一個好處,在程序中,我們可以通過?.__metadata__?元數據屬性拿到這個值,如下
cn_salary.__metadata__
于是在 LangGraph 中就針對?注釋+元數據?進行了封裝,使用?Annotated?外掛需要處理數據的?歸納函數,如果外掛了則使用,不外掛也沒有任何影響,這就是利用?歸納函數?來更新狀態的核心。
代碼經過更新后,就可以正常對數據執行相應操作了:
def concat_lists(original: list, new: list) -> list:
????return original + new
class MyState(TypedDict):
????# messages: list
????messages: Annotated[list, concat_lists]
def fn1(state: MyState):
????return {"messages": [4]}
r = graph.invoke({"messages": [1, 2, 3]})
print(r)
# 輸出是 {'messages': [1, 2, 3, 4]}
使用?歸納函數?的優點也非常明顯,可以讓每個?節點?獨立執行,不用理會別人在做啥,不需要花額外的功夫去處理 state 里的其他數據,而且在更新 state 結構時,也不需要逐個節點更新,但是添加?歸納函數?后要想執行一些特殊的操作也非常麻煩,要額外花很多功夫,例如在?add_messages()?中的?RemoveMessage?和?更新消息,就進行了額外的判斷與處理
02. 多節點并行同時執行
在 LangGraph 中,END?節點非常特殊,并不是?圖結構?程序走到?END?節點就終止了,只是?當前路線結束?了, 也就是說?END?是結束當前?路線,并不是結束?圖,理解好這個概念才能處理好?多節點并行執行?的情況。
例如如下并行路線
在上述的節點中,如果將圖結構轉換成帶有層級的圖,則?左1?和?右1?處于同一層級上,所以這兩個節點是并行執行的,但是順序不一定能保證,雖然在 LangGraph 中會按照連接的順序來執行,最終輸出就是:START->左1->右1->合并。
如果是以下的并行路線
在這個?圖結構?中,合并?雖然屬于?END?節點,并且在?左1?執行完成之后就會執行?合并,但是?合并節點?并不會終止整個圖的執行,而是會和?右2?作為同一層一起執行(并行執行,順序不確定),所以最終輸出:START->左1->右1->合并->右2->右3->合并。
如果想讓?合并?節點只執行一次,只要把?左1?和?右3?合并同時連接到?合并?節點上即可,這樣這兩個節點就處于同一層,更新代碼如下
graph.add_edge(["left1", "right3"], "merge")
03. 檢查點 CheckPoint
檢查點的概念因為它的名字,初次使用理解起來可能會比較吃力,其實只需要把?檢查點?看成是一個?存儲介質,用來記錄這些資料,就好比游戲存檔、不同玩家不同場次、可以存起來,然后載入,甚至篡改更新
所以在 LangGraph 圖程序中加入?檢查點?就等同于加入了一個?外部存儲介質,會將每一個節點的?狀態?都存儲起來(StateSnapshot),變成一個歷史的 list,所以對于?圖程序?必須配置檢查點才可以拿到?snapshot游戲存檔:
- graph.get_state(config):拿到?檢查點?的最后一次存檔(最后一個節點更新后的狀態)。
- graph.get_state_history(config):拿到檢查點的所有存檔(每個節點更新后的狀態列表)。
在前面的課時中,我們傳遞的?config?里只有?thread_id,但是在?get_state_history(config)?拿到的所有存檔列表中,還存在另外一個字段?thread_ts?代表線程的執行時間,通過該字段就可以唯一定位檢查點中的某個存檔,例如
StateSnapshot(..., config={'configurable': {'thread_id': '1', 'thread_ts': '1ef2985c-bed5-6dee-8003-6037939ae5aa'}}, ...)
和游戲存檔回退一樣,如果我們想回退到指定的存檔,只需調用?invoke()?玩游戲,并載入指定存檔的配置即可
for s in graph.stream(
????????input=None,
????????config=past_config, ?# <--
????????stream_mode="values"
):
????print(s)
甚至是我們想篡改?存檔?的數據也是可以的,還記得?update_state()?這個函數么,同樣可以傳入對應的?config,只需要在修改時,傳遞需要更改的?存檔配置?即可,例如
graph.update_state(
????config=past_config,
????values={"crew": [66, 77], "v": "BAD GUY"}
)
不過因為回退機制用得比較少,所以該功能在 LangGraph 的官網藏得也比較深,也沒有過多文章做出詳細的講解