LlamaIndex 工作流 并發執行

除了循環和分支之外,工作流還可以并發地執行步驟。當你有多個可以相互獨立運行的步驟,并且這些步驟中包含需要等待的耗時操作時,這種并發執行的方式就非常有用,因為它允許其他步驟并行運行。

觸發多個事件

到目前為止,在我們的示例中,每個步驟只觸發了一個事件。但在很多情況下,你可能希望并行執行多個步驟。要做到這一點,你需要觸發多個事件。你可以通過 send_event 來實現這一點:

import asyncio
import randomfrom llama_index.core.workflow import Workflow, step, Context, StartEvent, Event, StopEvent
from llama_index.utils.workflow import draw_all_possible_flowsclass StepTowEvent(Event):payload: strquery: strclass ParallelFlow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent)-> StepTowEvent:print(ev.payload)ctx.send_event(StepTowEvent(payload="跳轉到第二步", query="查詢 1"))ctx.send_event(StepTowEvent(payload="跳轉到第二步", query="查詢 2"))ctx.send_event(StepTowEvent(payload="跳轉到第二步", query="查詢 3"))return None@step(num_workers=4)async def step_tow(self, ctx: Context, ev: StepTowEvent) -> StopEvent:print(ev.query)await asyncio.sleep(random.randint(1, 5))return StopEvent(result="結束!")async def run_workflow():w = ParallelFlow(timeout=60, verbose=True)result = await w.run(payload="開始工作流...")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可視化工具
draw_all_possible_flows(ParallelFlow, filename="parallel_workflow.html")

運行結果:

Running step step_one
開始工作流...
Step step_one produced no event
Running step step_tow
查詢 1
Running step step_tow
查詢 2
Running step step_tow
查詢 3
Step step_tow produced event StopEvent
結束!
parallel_workflow.html

流程圖:

在這個示例中,我們的起始步驟觸發了三個 StepTwoEvents。step_two 步驟使用了 num_workers=4 進行裝飾,這告訴工作流最多可以同時運行 4 個該步驟的實例(這也是默認值)。

收集事件

如果你執行前面的示例,你會發現工作流會在第一個完成的查詢后停止。有時候這種行為是有用的,但在其他情況下,你可能希望在繼續執行下一步之前,等待所有耗時操作都完成。你可以通過 collect_events 來實現這一點:

import asyncio
import randomfrom llama_index.core.workflow import Workflow, Event, step, StartEvent, Context, StopEventfrom llama_index.utils.workflow import draw_all_possible_flowsclass StepTwoEvent(Event):payload: strquery: strclass StepThreeEvent(Event):payload: strclass ConcurrentFlow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent)-> StepTwoEvent:print(ev.payload)ctx.send_event(StepTwoEvent(payload="", query="查詢 1"))ctx.send_event(StepTwoEvent(payload="", query="查詢 2"))ctx.send_event(StepTwoEvent(payload="", query="查詢 3"))return None@step(num_workers=4)async def step_two(self, ctx: Context, ev: StepTwoEvent)-> StepThreeEvent:print("第二步...", ev.query)await asyncio.sleep(random.randint(1, 5))return StepThreeEvent(payload="來自第二步")@stepasync def step_three(self, ctx: Context, ev: StepThreeEvent)-> StopEvent:print("運行第三步...")# 等待直到我們接收到 3 個事件result = ctx.collect_events(ev, [StepThreeEvent] * 3)if result is None:return None# 將這三個結果一并進行處理print("將這三個結果一并進行處理", result)return StopEvent(result="Done")async def run_workflow():w = ConcurrentFlow(timeout=60, verbose=True)result = await w.run(payload="開始工作流..")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可視化工具
draw_all_possible_flows(ConcurrentFlow, filename="concurrent_workflow.html")

運行結果:

Running step step_one
開始工作流..
Step step_one produced no event
Running step step_two
第二步... 查詢 1
Running step step_two
第二步... 查詢 2
Running step step_two
第二步... 查詢 3
Step step_two produced event StepThreeEvent
Step step_two produced event StepThreeEvent
Step step_two produced event StepThreeEvent
Running step step_three
運行第三步...
Step step_three produced no event
Running step step_three
運行第三步...
Step step_three produced no event
Running step step_three
運行第三步...
將這三個結果一并進行處理 [StepThreeEvent(payload='來自第二步'), StepThreeEvent(payload='來自第二步'), StepThreeEvent(payload='來自第二步')]
Step step_three produced event StopEvent
Done
concurrent_workflow.html

流程圖:

collect_events 方法位于 Context 上,它接收觸發當前步驟的事件以及一個需要等待的事件類型數組作為參數。在本例中,我們正在等待 3 個相同類型的 StepThreeEvent 事件。

每當接收到一個 StepThreeEvent 事件時,step_three 步驟就會被觸發,但 collect_events 在接收到全部 3 個事件之前會一直返回 None。一旦收集到所有 3 個事件,該步驟將繼續執行,并可以將這三個結果一并進行處理。

collect_events 返回的結果是一個按接收順序排列的事件數組,其中包含所收集到的所有事件。

多種事件類型

當然,你并不要求必須等待相同類型的事件。你可以根據需要等待任意組合的事件,例如在以下示例中:

import asynciofrom llama_index.core.workflow import Event, Workflow, step, Context, StartEvent, StopEvent
from llama_index.utils.workflow import draw_all_possible_flowsclass StepAEvent(Event):query: str
class StepBEvent(Event):query: str
class StepCEvent(Event):query: strclass StepACompleteEvent(Event):query: str
class StepBCompleteEvent(Event):query: str
class StepCCompleteEvent(Event):query: strclass ConcurrentFlow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent)-> StepAEvent | StepBEvent | StepCEvent:print(ev.payload)ctx.send_event(StepAEvent(query="A 查詢..."))ctx.send_event(StepBEvent(query="B 查詢..."))ctx.send_event(StepCEvent(query="C 查詢..."))return None@stepasync def step_a(self, ctx: Context, ev: StepAEvent)-> StepACompleteEvent:print(ev.query)return StepACompleteEvent(query="A 查詢...完成")@stepasync def step_b(self, ctx: Context, ev: StepBEvent)-> StepBCompleteEvent:print(ev.query)return StepBCompleteEvent(query="B 查詢...完成")@stepasync def step_c(self, ctx: Context, ev: StepCEvent)-> StepCCompleteEvent:print(ev.query)return StepCCompleteEvent(query="C 查詢...完成")@stepasync def step_tow(self,ctx: Context,ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,) -> StopEvent:print("接收到的事件 ", ev.query)# 等待直到我們接收到 3 個事件result = ctx.collect_events(ev, [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],)if result is None:return None# do something with all 3 results togetherreturn StopEvent(result="Done")async def run_workflow():w = ConcurrentFlow(timeout=60, verbose=True)result = await w.run(payload="開始工作流..")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可視化工具
draw_all_possible_flows(ConcurrentFlow, filename="concurrent_workflow.html")

運行結果:

Running step step_one
開始工作流..
Step step_one produced no event
Running step step_a
A 查詢...
Step step_a produced event StepACompleteEvent
Running step step_b
B 查詢...
Step step_b produced event StepBCompleteEvent
Running step step_c
C 查詢...
Step step_c produced event StepCCompleteEvent
Running step step_tow
接收到的事件  A 查詢...完成
Step step_tow produced no event
Running step step_tow
接收到的事件  B 查詢...完成
Step step_tow produced no event
Running step step_tow
接收到的事件  C 查詢...完成
Step step_tow produced event StopEvent
Done
concurrent_workflow.html

流程圖:

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/83671.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/83671.shtml
英文地址,請注明出處:http://en.pswp.cn/web/83671.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

精粹匯總:大廠編程規范(持續更新)

歡迎來到啾啾的博客🐱。 記錄學習點滴。分享工作思考和實用技巧,偶爾也分享一些雜談💬。 有很多很多不足的地方,歡迎評論交流,感謝您的閱讀和評論😄。 目錄 1 引言2 并發控制 (Concurrency Control)3 事務控…

curl 檢查重定向的命令總結

查看是否發生了重定向: curl -I http://yourdomain.com跟蹤整個重定向鏈: curl -IL http://yourdomain.com禁止跳轉,檢查是否返回 301/302: curl -I --max-redirs 0 http://yourdomain.com如果你只想看跳沒跳 HTTPS&#xff0c…

STM32 Bootloader:使用文件頭加載并啟動應用程序

文章目錄 STM32 Bootloader:使用文件頭加載并啟動應用程序的完整解析一、系統整體流程二、鏡像頭結構 image\_header\_t三、Bootloader 主函數流程1. 初始化 UART2. 調用啟動函數3. 拷貝 APP 并跳轉啟動 四、跳轉執行 APP 的實現五、總結與擴展思路 明白了&#xff…

無外接物理顯示器的Ubuntu系統的遠程桌面連接(升級版)

文章目錄 操作步驟實踐截圖配置 Xorg 的虛擬顯示界面(升級版) 操作步驟 “遠程連接”,在設置里直接打開就可以.進行配置就行. 1.配置 GRUB 以支持無顯示器啟動 sudo nano /etc/default/grub (里面有一行改為: GRUB_CMDLINE_LINUX_DEFAULT"quiet splash videovesa:off vi…

ACCU-100安科瑞協調控制器:精準調控光伏逆變器

產品概述 ACCU-100微電網協調控制器是一款應用于微電網、分布式發電、儲能等領域的智能協調控制器。它能接入光伏系統、風力發電、儲能系統以及充電樁等設備,通過對微電網系統進行數據采集與分析,實時監控各類設備的運行狀態和健康狀況。在此基礎上&…

長春光博會 | 麒麟信安:構建工業數字化安全基座,賦能智能制造轉型升級

6月10日-13日,2025長春國際光電博覽會Light國際會議(簡稱長春光博會)在長春東北亞國際博覽中心盛大舉行,吉林省委書記黃強出席并宣布開幕,省委副書記、省長胡玉亭致辭。本屆大會聚焦光電信息領域的前沿技術和最新產品&…

書寫時垂直筆畫比水平筆畫表現更好的心理機制分析

你有寫字的時候總是垂直方向筆畫好寫,水平方向的筆畫不好寫的情況存在嗎? 書寫時垂直筆畫比水平筆畫表現更好的心理機制分析 從人類認知和行為模式的角度來理解這種現象。以下是深度心理分析: 核心心理動因 重力知覺內化: 垂直…

SpringAI使用總結

SpringAI使用總結 基本使用ChatModel和ChatClient簡單對話流式輸出預設角色prompt(提示詞)function call(工具調用)參考 基本使用 ChatModel和ChatClient SpringAi支持非常多的模型,為了統一處理,SpringA…

歷史交易數據漲跌分級

歷史交易數據漲跌分級 # encoding:utf-8 import sys,traceback from loguru import loggersys.path.append("..") from QhSpiderTool import QhDorpFiled from QhCsvMode import *def QhZhangDieFenJi(QhDfData,QhFangFa"A"):"""歷史交易數…

Kafka入門4.0.0版本(基于Java、SpringBoot操作)

Kafka入門4.0.0版本(基于Java、SpringBoot操作) 一、kafka概述 Kafka最初是由LinkedIn公司開發的,是一個高可靠、高吞吐量、低延遲的分布式發布訂閱消息系統,它使用Scala語言編寫,并于2010年被貢獻給了Apache基金會&…

react react-router-dom中獲取自定義參數v6.4版本之后

路由配置, AutnToken 組件作為權限、登錄管理 import { createBrowserRouter, Navigate } from react-router-dom; import Layout from /layout/index; import Login from /pages/login; import Page404 from /pages/404;import AutnToken from /components/authToken; import…

AI中的Prompt

1. System 作用:設定 AI 的“角色設定”和“行為準則”。 內容:通常是描述 LLM 的身份、語氣、行為范圍、約束規則。 類似:在大語言模型中是最優先被考慮的提示。 示例: 你是一個專業的商品評價分析助手,請根據用戶…

從人工到智能:IACheck如何重構檢測報告審核工作流?

從人工到智能:IACheck如何重構檢測報告審核工作流? 在當今AI技術迅猛發展的時代,各行各業正經歷從“人工驅動”到“智能驅動”的根本性變革。檢測認證(TIC)行業作為關乎質量與安全的重要支柱,也不例外。在…

React事件處理:如何給按鈕綁定onClick點擊事件?

系列回顧: 在前幾篇文章中,我們已經學會了如何使用 State 管理組件的內部數據,以及如何通過 Props 實現父子組件之間的通信。我們的組件現在已經有了“數據”和“外觀”。但是,它還像一個只能看的“模型”,無法與用戶進…

【機器學習|學習筆記】粒子群優化(Particle Swarm Optimization, PSO)詳解,附代碼。

【機器學習|學習筆記】粒子群優化(Particle Swarm Optimization, PSO)詳解,附代碼。 【機器學習|學習筆記】粒子群優化(Particle Swarm Optimization, PSO)詳解,附代碼。 文章目錄 【機器學習|學習筆記】粒…

深度剖析:AI 社媒矩陣營銷工具,如何高效獲客?

在社交媒體營銷領域,競爭日益激烈,傳統的社媒矩陣運營方式面臨諸多挑戰。而 AI 社媒矩陣營銷工具的出現,正以前所未有的方式重構社媒矩陣的底層架構,為營銷人員帶來了全新的機遇與變革。接下來,我們將從技術破局、實戰…

Spring XML 常用命名空間配置

Spring XML 常用命名空間配置 下面是一個綜合性的Spring XML配置樣例&#xff0c;展示了各種常用命名空間的使用方式&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <beans xmlns"http://www.springframework.org/schema/beans&quo…

UE5場景漫游——開始界面及關卡跳轉

UE中實現UMG游戲界面搭建及藍圖控制&#xff0c;點擊游戲界面中的按鈕實現關卡的跳轉效果。 一、游戲界面顯示。1.創建UMG&#xff0c;2.搭建UI。3.關卡藍圖控制顯示 二、點擊按鈕之后實現關卡跳轉

CSS 外邊距合并(Margin Collapsing)問題研究

在 CSS 中&#xff0c;margin-top 屬性會導致外部 DIV 移動的現象主要是由于 外邊距合并&#xff08;Margin Collapsing&#xff09; 造成的。這是 CSS 盒模型的一個特性&#xff0c;可能會與直覺相悖。 外邊距合并的原理 當一個元素&#xff08;如內部 DIV&#xff09;的 ma…

清理電腦C磁盤,方法N:使用【360軟件】中的【清理C盤空間】

1、先下載并打開【360安全衛士】&#xff0c;點擊如下位置&#xff1a; 之后&#xff0c;可以把這個東西&#xff0c;創建快捷方式到電腦桌面&#xff0c;方便以后使用&#xff1a;