概念 :線性任務流設計
- ? 優點:邏輯清晰易調試,適合線性處理流程
- ? 缺點:缺乏動態分支能力
from typing import TypedDictfrom langgraph.graph import StateGraph, END# 定義后續用到的一些變量
class CustomState(TypedDict):planet1: str # 星球1的名稱planet2: str # 星球2的名稱mass1: float # 星球1的質量mass2: float # 星球2的質量total_mass: float # 總質量# 定義函數,后續作為節點
def add_numbers(state: CustomState):"""計算兩數之和參數:state (State): 包含兩個數字的字典返回:None"""state["total_mass"] = state["mass1"] + state["mass2"]return statedef get_planet_mass(state: CustomState):"""查詢星球質量(單位:千克)參數:state (State): 包含星球名稱的字典返回:None"""PLANET_MASSES = {'Mercury': 3.301e23,'Venus': 4.867e24,'Earth': 5.972e24,'Mars': 6.417e23,'Jupiter': 1.899e27,'Saturn': 5.685e26,'Uranus': 8.682e25,'Neptune': 1.024e26,'Sun': 1.989e30}state["mass1"] = PLANET_MASSES.get(state["planet1"], 0)state["mass2"] = PLANET_MASSES.get(state["planet2"], 0)return state#定義圖
workflow = StateGraph(CustomState)
# 定義節點
workflow.add_node("get_planet_mass", get_planet_mass)
workflow.add_node("add_numbers", add_numbers)# 定義邊
workflow.set_entry_point("get_planet_mass")
workflow.add_edge("get_planet_mass", "add_numbers")
workflow.add_edge("add_numbers", END)# 編譯
graph = workflow.compile()# 測試
custom_state = CustomState(planet1="Earth",planet2="Mars",mass1=0.0,mass2=0.0,total_mass=0.0
)
result = graph.invoke(custom_state)
print(result)
執行結果