除了循環和分支之外,工作流還可以并發地執行步驟。當你有多個可以相互獨立運行的步驟,并且這些步驟中包含需要等待的耗時操作時,這種并發執行的方式就非常有用,因為它允許其他步驟并行運行。
觸發多個事件
到目前為止,在我們的示例中,每個步驟只觸發了一個事件。但在很多情況下,你可能希望并行執行多個步驟。要做到這一點,你需要觸發多個事件。你可以通過 send_event 來實現這一點:
import asyncio
import randomfrom llama_index.core.workflow import Workflow, step, Context, StartEvent, Event, StopEvent
from llama_index.utils.workflow import draw_all_possible_flowsclass StepTowEvent(Event):payload: strquery: strclass ParallelFlow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent)-> StepTowEvent:print(ev.payload)ctx.send_event(StepTowEvent(payload="跳轉到第二步", query="查詢 1"))ctx.send_event(StepTowEvent(payload="跳轉到第二步", query="查詢 2"))ctx.send_event(StepTowEvent(payload="跳轉到第二步", query="查詢 3"))return None@step(num_workers=4)async def step_tow(self, ctx: Context, ev: StepTowEvent) -> StopEvent:print(ev.query)await asyncio.sleep(random.randint(1, 5))return StopEvent(result="結束!")async def run_workflow():w = ParallelFlow(timeout=60, verbose=True)result = await w.run(payload="開始工作流...")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可視化工具
draw_all_possible_flows(ParallelFlow, filename="parallel_workflow.html")
運行結果:
Running step step_one
開始工作流...
Step step_one produced no event
Running step step_tow
查詢 1
Running step step_tow
查詢 2
Running step step_tow
查詢 3
Step step_tow produced event StopEvent
結束!
parallel_workflow.html
流程圖:
在這個示例中,我們的起始步驟觸發了三個 StepTwoEvents。step_two 步驟使用了 num_workers=4 進行裝飾,這告訴工作流最多可以同時運行 4 個該步驟的實例(這也是默認值)。
收集事件
如果你執行前面的示例,你會發現工作流會在第一個完成的查詢后停止。有時候這種行為是有用的,但在其他情況下,你可能希望在繼續執行下一步之前,等待所有耗時操作都完成。你可以通過 collect_events 來實現這一點:
import asyncio
import randomfrom llama_index.core.workflow import Workflow, Event, step, StartEvent, Context, StopEventfrom llama_index.utils.workflow import draw_all_possible_flowsclass StepTwoEvent(Event):payload: strquery: strclass StepThreeEvent(Event):payload: strclass ConcurrentFlow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent)-> StepTwoEvent:print(ev.payload)ctx.send_event(StepTwoEvent(payload="", query="查詢 1"))ctx.send_event(StepTwoEvent(payload="", query="查詢 2"))ctx.send_event(StepTwoEvent(payload="", query="查詢 3"))return None@step(num_workers=4)async def step_two(self, ctx: Context, ev: StepTwoEvent)-> StepThreeEvent:print("第二步...", ev.query)await asyncio.sleep(random.randint(1, 5))return StepThreeEvent(payload="來自第二步")@stepasync def step_three(self, ctx: Context, ev: StepThreeEvent)-> StopEvent:print("運行第三步...")# 等待直到我們接收到 3 個事件result = ctx.collect_events(ev, [StepThreeEvent] * 3)if result is None:return None# 將這三個結果一并進行處理print("將這三個結果一并進行處理", result)return StopEvent(result="Done")async def run_workflow():w = ConcurrentFlow(timeout=60, verbose=True)result = await w.run(payload="開始工作流..")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可視化工具
draw_all_possible_flows(ConcurrentFlow, filename="concurrent_workflow.html")
運行結果:
Running step step_one
開始工作流..
Step step_one produced no event
Running step step_two
第二步... 查詢 1
Running step step_two
第二步... 查詢 2
Running step step_two
第二步... 查詢 3
Step step_two produced event StepThreeEvent
Step step_two produced event StepThreeEvent
Step step_two produced event StepThreeEvent
Running step step_three
運行第三步...
Step step_three produced no event
Running step step_three
運行第三步...
Step step_three produced no event
Running step step_three
運行第三步...
將這三個結果一并進行處理 [StepThreeEvent(payload='來自第二步'), StepThreeEvent(payload='來自第二步'), StepThreeEvent(payload='來自第二步')]
Step step_three produced event StopEvent
Done
concurrent_workflow.html
流程圖:
collect_events 方法位于 Context 上,它接收觸發當前步驟的事件以及一個需要等待的事件類型數組作為參數。在本例中,我們正在等待 3 個相同類型的 StepThreeEvent 事件。
每當接收到一個 StepThreeEvent 事件時,step_three 步驟就會被觸發,但 collect_events 在接收到全部 3 個事件之前會一直返回 None。一旦收集到所有 3 個事件,該步驟將繼續執行,并可以將這三個結果一并進行處理。
collect_events 返回的結果是一個按接收順序排列的事件數組,其中包含所收集到的所有事件。
多種事件類型
當然,你并不要求必須等待相同類型的事件。你可以根據需要等待任意組合的事件,例如在以下示例中:
import asynciofrom llama_index.core.workflow import Event, Workflow, step, Context, StartEvent, StopEvent
from llama_index.utils.workflow import draw_all_possible_flowsclass StepAEvent(Event):query: str
class StepBEvent(Event):query: str
class StepCEvent(Event):query: strclass StepACompleteEvent(Event):query: str
class StepBCompleteEvent(Event):query: str
class StepCCompleteEvent(Event):query: strclass ConcurrentFlow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent)-> StepAEvent | StepBEvent | StepCEvent:print(ev.payload)ctx.send_event(StepAEvent(query="A 查詢..."))ctx.send_event(StepBEvent(query="B 查詢..."))ctx.send_event(StepCEvent(query="C 查詢..."))return None@stepasync def step_a(self, ctx: Context, ev: StepAEvent)-> StepACompleteEvent:print(ev.query)return StepACompleteEvent(query="A 查詢...完成")@stepasync def step_b(self, ctx: Context, ev: StepBEvent)-> StepBCompleteEvent:print(ev.query)return StepBCompleteEvent(query="B 查詢...完成")@stepasync def step_c(self, ctx: Context, ev: StepCEvent)-> StepCCompleteEvent:print(ev.query)return StepCCompleteEvent(query="C 查詢...完成")@stepasync def step_tow(self,ctx: Context,ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,) -> StopEvent:print("接收到的事件 ", ev.query)# 等待直到我們接收到 3 個事件result = ctx.collect_events(ev, [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],)if result is None:return None# do something with all 3 results togetherreturn StopEvent(result="Done")async def run_workflow():w = ConcurrentFlow(timeout=60, verbose=True)result = await w.run(payload="開始工作流..")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可視化工具
draw_all_possible_flows(ConcurrentFlow, filename="concurrent_workflow.html")
運行結果:
Running step step_one
開始工作流..
Step step_one produced no event
Running step step_a
A 查詢...
Step step_a produced event StepACompleteEvent
Running step step_b
B 查詢...
Step step_b produced event StepBCompleteEvent
Running step step_c
C 查詢...
Step step_c produced event StepCCompleteEvent
Running step step_tow
接收到的事件 A 查詢...完成
Step step_tow produced no event
Running step step_tow
接收到的事件 B 查詢...完成
Step step_tow produced no event
Running step step_tow
接收到的事件 C 查詢...完成
Step step_tow produced event StopEvent
Done
concurrent_workflow.html
流程圖: