我們對 Windmill 進行了基準測試,認為它是 Airflow、Prefect 甚至 Temporal 中最快的自托管通用工作流引擎。對于 Airflow,有速度快了 10 倍!
工作流引擎編排工作人員的有向無環圖 (DAG) 中定義的作業,同時尊重依賴性。
主要優點包括資源分配、并行性、可觀察性和持久性。高效的工作流引擎可以快速調度工作人員的作業,并且工作人員可以快速拉取和運行作業。
由于使用 Postgresql 進行狀態管理和原始 SQL 語句之間的轉換,避免了最終的一致性問題,Windmill 被強調為非常快。工作被流水線化以提高效率。
網友評論:
- Windmill 因其快速的性能、對 Discord 的積極開發人員支持以及按需觸發工作流程和腳本的能力而受到贊譽。
- 雖然速度很重要,但更重要的因素是工作流引擎是否可以支持同時運行的多個作業,并且每個作業的性能可預測。
- 使用 Postgres 作為后端受到質疑,建議探索 MongoDB 等基于文檔的數據庫,以避免數據庫遷移問題。
- 由于 Windmill 的腳本功能以及與 VS Code 等工具的集成,Windmill 被認為特別適合開發人員,但非開發人員也成功將其用于內部工具。
Temporal
從某種程度上來說,Temporal不是一個工作流引擎,而是一個專門的持久執行引擎。
Temporal 實際上并不管理工作,而只管理任務隊列。因此,即使在編寫了臨時工作流程之后,您仍然需要單獨管理您的工作人員。
Windmill 還支持反應性(又名等待事件),并且也可以作為持久執行引擎。
Temporal 的功能令人驚嘆,如果 Windmill 和 Temporal 之間存在重疊,那么顯然在某些用例中您應該使用 Temporal 而不是 Windmill(作為 Uber 規模的微服務異步模式的骨干)
另一方面,將任意作業發送到內部集群超出了 Temporal 的范圍,因為您仍然需要事先痛苦地部署“工作程序”。
我們將 Spark 或 Dagster 等分析/ETL 引擎排除在外,因為它們本身不是工作流引擎,即使它們是構建在工作流引擎之上的。
工作流引擎與作業
作業隊列是工作流引擎的核心,構成任何后臺作業處理的關鍵。已經有很多出色的隊列實現,以托管服務 (SQS)、分布式可擴展服務 (Kafka) 和軟件(例如:帶有 rmsq 的 Redis)或庫 (Orban) 的形式。它們大多足以自行使用,并且許多開發人員會通過圍繞作業隊列構建自己的邏輯來完全避免使用工作流引擎,從而獲得滿足。這類似于編寫您自己的專用工作流引擎。
什么是工作流引擎,什么是“包羅萬象”的工作流
首先是一些定義,工作流是由代表作業規范的節點組成的有向無環圖 DAG。工作流引擎是一種分布式系統,它能接收工作流,并協調工作流在工作者身上完成,同時尊重每個作業的所有依賴約束。工作流的種類繁多,許多軟件都有特定領域的工作流引擎和工作流規范(如果你是一名軟件工程師,你可能已經在不知不覺中編寫了一個)。
在這里,我們感興趣的是至少能用一種主要編程語言(Python/Typescript/Go/Bash)運行任意代碼的工作流。這些工作流最通用,但也最復雜,最難優化。每個節點都是一段代碼,它從其他步驟(或流程輸入)獲取參數和數據作為輸入,執行一些副作用(http 請求、計算、寫入磁盤/s3),然后返回一些數據供其他步驟使用。
工作流程引擎的主要5個優勢:
- 資源分配:可充分利用集群,將每項作業分配給擁有不同資源(CPU、內存、GPU)的不同工作員,并保證工作員的全部資源都能為作業所用。
- 并行性:并行性:當工作流的約束條件允許某些步驟并行運行(分支、for-loop)時,工作流引擎可以將這些步驟分派給多個物理上獨立的工作者,而不僅僅是線程。
- 可觀察性:每個作業都有一個唯一的 ID,可以單獨觀察:可以檢查輸入、日志、輸出和狀態
- 耐用性:機器死機、副作用因意外原因失效。工作流需要在接近意外事件發生時可重新啟動。實現這一點的方法之一是惰性:單個操作與多個相同操作的效果相同。如果有疑問,可以重放整個流程而不產生任何后果。這通常通過日志文件和 sdk 來實現,當操作附加的唯一 ID 是日志的一部分時,sdk 會跳過副作用。另一種方法是對流程狀態進行事務快照,存儲每次操作后的狀態。要恢復時,只需重新加載最后的狀態,然后從那里開始執行。Windmill 采用的是后者,并假定在用戶領域需要時可以實現冪等性。
- 反應性:暫停流程,直到根據 webhook 或批準等事件再次恢復流程。
此外,一個包羅萬象的工作流引擎應能動態注冊新的可用工作人員,并能輕松部署新的工作流,將不同的工作分配給不同的工作人員,以及監控工作人員和系統本身的健康狀況。
工作流引擎之上的開發者平臺應能處理權限問題,使不同權限級別的用戶可以運行不同的工作流,并使這些工作流能根據調用者的角色訪問不同的資源。
為什么 Windmill 速度很快?
在工作流引擎中,"效率 "一詞取決于:
-
計算轉換的效率,根據上次完成的工作安排新工作的效率,以及工人本身提取已安排工作并運行它們的效率
-
在步驟之間傳遞數據的效率
-
工作程序提取作業、開始執行代碼并提交結果和新狀態的效率
Windmill 的速度極快,因為它在這三個方面都采用了簡單的設計,處處優化,最大限度地利用了 Postgresql 和 Rust。
系統設計和隊列
Windmill 提供了一個二進制文件(由 Rust 編譯而成),既可以作為 api 服務器運行,也可以作為 Worker 運行。Worker 和服務器都連接到 Postgresql,但彼此互不連接。服務器只公開 api 和前端。隊列是在 Postgresql 本身中實現的。作業可以通過調用 API 從外部觸發,API 會將新作業推送到隊列中。
作業存儲在 Postgresql 的兩個表中:
-
queue(當作業未完成時,甚至在運行時)
-
completed_job
作業在啟動時不會從隊列中移除,但其字段 running 會被設置為 true。隊列是通過傳統的 UPDATE SKIP LOCKED 來實現的。
UPDATE queue
SET running = true
, started_at = coalesce(started_at, now())
, last_ping = now()
, suspend_until = null
WHERE id = (
SELECT id
FROM queue
WHERE running = false AND scheduled_for <= now() AND tag = ANY($1)
ORDER BY priority DESC NULLS LAST, scheduled_for, created_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *
只要每個字段都有適當的索引,Postgresql 就能做到最快的速度。這種方法的一些變種是將作業從隊列中刪除,而不是更新標記,或者設置一個時間,在這個時間段內,其他作業無法被拉動。在引擎蓋下的原理都是一樣的。
推送流程作業時,其輸入、指向不可變流程定義的指針和初始流程狀態(見下)都會被推送到作業行中(這是一個很大的行!)。
然后,一個 Worker 會選擇流程作業,讀取流程定義和流程狀態,并意識到它需要推送隊列中的第一步,僅此而已。這就是流程的初始轉換。
如果作業是流程的一個步驟(該流程作業是一個單獨的作業,并將父作業設置為該流程作業),則工作者會一次拉一個作業,將其運行至完成,并推進狀態。服務器本身不進行協調,每個流程的轉換都由工人自己完成。
狀態
工作流引擎用有限狀態機(FSM)來表示作業。常用的 4 種主要狀態是
-
等待先決條件(收到 webhook 等事件,或完成所有相關作業)
-
前提條件已滿足,等待工作者拉取作業并執行它
-
運行
-
完成(成功或失敗)
其他狀態通常是對上述 4 種狀態的細化
在 Windmill 中,整個流程本身就是一個有限狀態機,流程的規格和流程狀態都是易于讀取的結構。
流程狀態完全由步進計數器和流程模塊狀態數組定義。有些流程模塊狀態更為復雜,例如與 for 循環和分支相對應的狀態。需要注意的一個有趣方面是,分支或 for 循環迭代等子流程都是各自定義明確的作業,它們的指針指向觸發這些作業的父流程(parent_flow)。
因此,分支和 for-loop 是一種特殊的流程狀態,其中包含一個 ID 數組,指向所有已啟動的子流程。當按順序執行 for 循環/分支時,過渡包括查看流程狀態,如果還有迭代,則開始下一次迭代;如果沒有迭代,則完成該步驟。
這種設計的方便之處在于,每個狀態轉換都只是一個事務性的 Postgresql 語句。Postgresql 是 ACID,我們可以充分利用這些特性。在工作流引擎中,達到最終一致性是最難、最慢的事情。我們可以通過以下方式跳過困難的部分
-
使用實現了 MVCC 和行級鎖的 Postgresql。
-
在工作結束時,由工作者自己完成轉換
流狀態是以 JSONB 的形式實現的,因此狀態轉換是以原始 sql 編寫的,它直接更改隊列中與流作業相對應的行中的流狀態。這既正確又極其高效。這部分并沒有真正受益于 Rust,可以用任何語言實現,甚至可以直接用 PL/SQL 實現。
最棘手的兩個過渡
-
嵌套流程的最后一步(分支的分支)
-
并行分支/迭代
詳細介紹
- 在嵌套流程的最后一步,工作程序會更新父流程的狀態,但會意識到該流程現在也已完成,因此會簡單地遞歸到該流程的父流程,并進行流程轉換。
- 并行步驟:在啟動該步驟時,所有子流程都會排隊,而不是一次運行一個流程。然后,任何完成每個分支子流程最后一步的工作者都會原子式地增加一個計數器。將計數器增加到與迭代次數一樣長的工作者知道,由于整個步驟已經完成,因此它需要對整個步驟進行轉換。
此外,對已完成任務的處理是在后臺 tokio 任務中完成的,該任務在通道中接收已完成的任務,從而在一定程度上實現了流水線作業。工人無需等待數據庫完全確認作業,就能接收另一個可用作業。
Windmill 的轉換速度非常快,因為它們是作為原始 Postgresql 語句實現的,而且在執行作業和更新數據庫之間有管道連接。
數據傳遞
在 Windmill 中,有 3 種主要的數據傳遞方式:
-
一個步驟的每個輸入都可以是一個 javascript 表達式,可以引用任何步驟的輸出
typecript、python、go、bash 的每個腳本都有其主要簽名(由前端的 WASM 程序解析),可以預先計算給定步驟所需的不同輸入。對于每個輸入,我們都可以定義一個靜態輸入或一個 javascript 表達式,它可以引用任何步驟的結果,例如:results.d.foo,其中 d 是步驟的 id。復雜的 javascript 表達式將由嵌入式 v8 使用 Deno 運行時進行評估。按表達式計算大約需要 8 毫秒。
鑒于每個步驟結果本身就是一個作業,其中包含 json 格式的結果,因此只需一條 sql 語句就能檢索到所需的結果。節點 id 到作業 id 的映射保存在流程作業狀態中。
此外,大多數表達式都很瑣碎,可以直接轉換為原始 jsonb 語句:
SELECT result #> $3 as result FROM completed_job WHERE id = $1 AND workspace_id = $2"
其中 $3?為
json*path.map(|x| x.split(".").map(|x| x.to_string()).collect::<Vec<*>>())
- 共享臨時文件夾中的數據 可以將流程配置為完全在同一個 Worker 上執行。在這種情況下,每個作業的臨時文件夾內都會共享一個文件夾并用同義詞鏈接(作業在臨時文件夾中啟動,執行結束后會刪除該文件夾)
- 使用 s3 集成在 s3 中傳遞數據(該部分的具體更新將在第 5 天介紹)
工作者worker效率
在正常模式下,工作者一次拉取一個作業,識別作業使用的語言(python、typecript、go、bash、snowflake、Postgresql、mysql、mssql、bigquery),然后生成相應的運行時,然后運行作業。
與基于容器的工作流引擎相比,Worker 可以裸運行作業,而無需運行容器,從而提高了性能。不過,出于沙箱的目的,工作程序本身可以在容器內運行,并在 nsjail 沙箱中運行每個作業。
對于查詢語言來說,除了建立連接外,沒有冷啟動。bash 沒有冷啟動,而 go 腳本在 AOT 中編譯,然后在本地緩存二進制文件,以避免冷啟動。
支持執行任意的 python 代碼很難,因為我們必須支持任何帶鎖文件的導入。由于 Worker 不會在容器中運行 python 代碼,因此必須動態處理依賴關系。為此,我們開發了一種高效的分布式緩存系統,它可以動態 pip 安裝特定的一對(軟件包、版本),并在運行中創建一個動態虛擬環境來執行代碼。在運行代碼之前,我們會對導入進行有效分析。
類似地,在 typescript 中,運行時要么是 deno,要么是 bun,這樣它們就能從全局緩存中獲益,永遠不必重復安裝相同的依賴關系。
然而,快速處理依賴關系還不足以達到最高速度,還需要冷啟動生成一個 python(約 60 毫秒)或 deno/bun (約 30 毫秒)進程。在事件流情況下,邏輯本身大約需要 1 毫秒,因此冷啟動是運行腳本開銷的 30 倍。
幸運的是,我們最近為腳本實現了專用 Worker,它在 Worker 開始時催生一次 python 進程,然后在 while 循環中執行腳本邏輯,在 stdin 中接收作業輸入,并將輸出返回到 sdout。
我們將這種方法擴展為流程專用 Worker,本質上是相同的。一開始,這些 Worker 會為 Python 或 Typescript 實現的每個步驟生成一個相應的專用進程。這就徹底消除了冷啟動,使 Windmill 流能夠處理偶數流用例,每個 Worker 每秒可處理多達 1000 個步驟。
流的專用工作者會拉取與流相關的任何作業,然后將其路由到適當的專用進程。仍然一次只執行一個作業,但在預熱流程中執行。
結論
Windmill 的速度非常快,因為它依賴于 Postgresql 和 Rust,并采用了簡單的設計,能夠優化每個部分,無論大小。這與用 Zig 實現的 Bun 快速的原因有點類似,都是盡可能地進行了優化。
Windmill是一個開源且可自托管的無服務器運行時和平臺,將代碼的強大功能與低代碼的速度相結合。我們將您的腳本轉換為內部應用程序和可組合的流程步驟,以自動化重復的工作流程。
https://www.jdon.com/70126.html