張馮君(遠遠)
Koupleless PMC
螞蟻集團技術工程師
就職于螞蟻集團中間件團隊,參與維護與建設螞蟻 SOFAArk 和 Koupleless 開源項目、內部 SOFAServerless 產品的研發和實踐。
本文?3488?字,預計閱讀?11?分鐘?
業務背景
基于開源 Apache Flink 打造的螞蟻流式計算引擎在螞蟻有著廣泛的應用,基本覆蓋螞蟻所有實時業務。螞蟻 Flink 實時計算服務的提交服務主要負責提交、重啟、重置、停止作業等一系列運維操作。服務端在處理 Flink 作業提交請求的時候,需要對用戶提交上來的 Java 代碼或者 SQL 代碼進行編譯,將用戶代碼翻譯成 Flink 引擎可以識別的執行計劃,這部分邏輯需要依賴 Flink 引擎層的代碼,同時服務端需要支持 Flink 多版本的編譯,為保證不同編譯請求的正確性、隔離性和安全性,采用了目前業界常用的進程模型來處理編譯請求——服務端每收到一次編譯請求,都起一個子進程來執行編譯邏輯。
上圖是一次編譯請求的執行過程,這樣的方式具有進程模型的固有缺陷:
響應速度慢:實際業務中大部分編譯請求本身并不是很復雜的操作,但是啟動子進程需要經歷 VM 冷啟動、字節碼文件加載,以及 JIT 編譯技術對解釋執行的字節碼進行優化,生成本地執行代碼的過程還需加上 JVM 內部垃圾回收所耗費的時間。據統計實際平均請求耗時達到了?15s 及以上,嚴重影響用戶提交任務的體驗。
資源消耗大:啟動子進程需要更多內存,CPU 耗時更多,需要消耗大量的資源,導致服務端單機處理能力有限,系統穩定性差,極大影響整體的吞吐率。
實時計算團隊對編譯任務優化做了很多的探索,包括采用 CDS(Class Data Sharing)進程熱啟動、復用引擎類加載器的線程模型等方式,但都遇到了不同的資源消耗加大,資源不安全等問題,在生產上基本不可用,無法全量推開。
在進一步探索中,實時計算團隊嘗試將進程模型改造為線程模型,但面臨核心問題:線程執行意味著共享 JVM,在需要支持不同 Flink 引擎版本、不同業務自定義 ODF 包的場景下,需要一個機制來為不同編譯任務實現版本隔離、包隔離。并確保在運行時不同的編譯任務互相不影響且結果正確。
Koupleless 開源以來,除了致力于多應用合并部署節省資源外,一直在進行輕量化模塊研發的探索。其特性高度契合 Flink 編譯場景:
輕量化:適合依賴簡單、邏輯簡單,甚至是代碼片段的應用,Flink 編譯任務通常輕量級,流量觸發運行,運行完即可結束。
原生隔離能力:Koupleless 進行合并部署的底層原理就是通過類加載隔離來實現多應用的代碼隔離,這塊的類隔離框架正好符合多 Flink 編譯任務進行類隔離的訴求。
整體方案
讀到這里默認大家對 Koupleless 類隔離框架有一定的了解(不了解的話推薦閱讀:https://github.com/sofastack/sofa-ark),就不再贅述框架細節,直接分享在原類加載機制下,針對當前編譯任務場景做了哪些架構升級的改造和技術特性的支持。
業務改造
Flink 提交作業的核心在于編譯用戶提交過來的代碼,這個過程會解析提交請求,獲取提交請求依賴的 Flink 引擎包,Connector/Backend 插件包,以及用戶上傳的 UDF 包,然后定義環境變量和 Classpath,啟動子進程來編譯用戶代碼,獲取執行計劃。Flink 編譯任務改造前的進程模型流程:
編譯進程的核心在于正確的構造 Classpath,在進程模型下,很容易做到,而在線程模型下,則是需要保證 ClassLoader 的正確性,需要滿足以下需求:
編譯結果準確性:每次編譯線程都應該獲取正確完整的 ClassLoader,主要包括 Flink Lib 包的 ClassLoader,Flink Connector/Backend 插件的 ClassLoader,用戶 UDF 的 ClassLoader。
資源高效復用:編譯請求需要盡可能復用通用 Flink Lib 包,Flink Connector/Backend 包等,以期達到最優的性能。
多版本類隔離:不同的編譯請求會依賴不同版本的引擎,需要對不同版本的 Flink 包進行類隔離,使得多個編譯請求同時運行時互不影響。
基于以上訴求,我們設計出了一套類加載的框架:
針對 Flink Lib,Flink Connector/Backend 的高頻通用的 ClassLoader 會直接構建出來,常駐在內存中,可直接復用,生命周期和服務端進程一致;
針對用戶 UDF 代碼,構建請求級別的 ClassLoader,生命周期和請求一致。
執行流程如下:
在接收到編譯請求的時候,復用對應的 flink-lib,flink-connector/backend ClassLoader
基于請求依賴的 UDF 包,構建請求級別的 ClassLoader
啟動線程執行編譯邏輯,并且回收 UDF ClassLoader
Flink 編譯任務進行 Koupleless 改造后的線程模型:
服務端收到編譯請求
解析請求參數,得到依賴的 flink-lib 包版本、flink-opt 包版本、UDF 業務自定義包信息等
根據參數準備本次編譯任務需要的所有 Ark Plugin
若發現存在 Plugin 未安裝,動態構造 Classpath 并啟動
所有 Plugin 準備完畢后,構造本地編譯任務對應的 Biz
異步線程啟動 Biz 執行編譯任務
同時,為了盡可能復用和降低 Plugin、Biz 構建的開銷,我們設計整個模式支持運行時動態構建 Plugin、Biz 及其 Classpath,而無需提前準備眾多 Plugin FatJar,同時支持運行時動態按需加載 Plugin。
引擎無關的能力放在 Container 層,也稱基座層,實際上就是服務端進程原有的能力;
引擎相關的能力放在 Plugin 層,將引擎相關的包各自構造獨立的 ClassLoader,抽象成 Plugin,實際上就是把上文所說的 flink-lib,flink-connector/backend 抽象成了 Ark Plugin 組件,編譯請求來了之后會加載對應的 Plugin,并根據指定順序來加載類,這一層可以實現 ClassLoader 的復用,注意這一層需要將每個版本的引擎的每個包都抽象成 Plugin,這樣可以保證不同的編譯請求可以復用正確的 Plugin;
具體的編譯請求由 Biz 層處理,這里的 Biz 層實際上就是針對每一個編譯請求,會啟動一個新的線程,從 Plugin 層加載需要的 ClassLoader,并構造 Biz ClassLoader 來加載一些特定類,最后使用線程模型來啟動編譯請求對應的 main 函數,實現線程化編譯。
在整個三層結構中,基座層幾乎沒有特殊的改造,核心設計優化聚焦于 Plugin 層與 Biz 層,設計了更靈活的 Plugin、Biz 構造方式。
由于無法預先得知編譯請求需要的 Flink 引擎版本列表,需要提前在服務器中準備好所有版本的 Plugin 供請求來時直接使用,我們支持按需動態構建運行時 Plugin 并動態裝載到 JVM 中,因此,我們無需為 Flink 各個 SDK 的所有版本提前構建完整的 Plugin FatJar,同時無需提前做所有版本 Plugin 的預熱,只需要在請求到來時,檢查所需 Plugin 是否已裝載,若沒有,按需裝載即可。
為了穩定性和該模型的持續運行,我們建設了配套的線程回收邏輯和自愈流程。因為本方案使用了線程模型,不可避免的會存在少量資源泄漏問題,我們設計了一套線程回收邏輯:
定時掃描內存中處于空閑狀態的線程池;
判斷線程池對應的 ClassLoader,若線程池對應的 ClassLoader 是 Biz CalssLoader 或 Plugin CalssLoader,那么該線程池是編譯期間構造的;
此時追蹤到對應的編譯請求,若請求已經失效,直接強制回收線程池。
此外,為了解決編譯線程緩慢的 Meta 增長問題,我們建設了 Meta 檢測,超過一定閾值時觸發 JVM 重啟等自愈流程。
技術特性
階段一
動態裝配 Plugin 及其 Classpath
在 Flink 編譯任務中,業務依賴很復雜,不同編譯請求可能依賴不同的 flink-lib、flink-opt、UDF 包等,為了編譯的正確性需要類隔離,同時因 Flink 引擎包是所有編譯任務都需要依賴的,因此對通用的包需要能共享且最大程度提高共享,降低隔離重復加載成本。在 Koupleless 類加載模型下,天然針對這一特性設計了 Plugin、Biz 的加載方案。
每個 Flink、UDF 包都對應一個 Plugin,為了不在服務啟動時就加載全量 Plugin,我們支持了動態裝配 Plugin 的特性(目前暫無需要動態卸載的場景),根據請求按需加載 Plugin。
同時由于版本很多,為每個包的每個版本都提前構建 Plugin FatJar 也是很大的工程,比如要 flink-version1-plugin、flink-version2-plugin、flink-opt1-plugin、flink-opt2-plugin 等等,我們更輕量化地支持了在運行時根據 Plugin 依賴的 Jar List 動態構造 Classpath 的能力。
如 flink-version1-plugin classpath = common-plugin url + jar1 url + jar2 url + ... + jarN url 動態構成。這樣的模式只需要提前構建最簡單的 common-plugin 供所有 Flink、Opt Plugin 復用即可。
階段二
Biz 運行時只對依賴的 Plugin 可見
之前 Ark Container 中的所有 Plugin 對所有 Biz 可見,Biz 進行類加載時,會檢索所有的運行時 Plugin Export 列表,查找 export 了當前 class/resource 的 Plugin 進行委托加載。實際 Flink 編譯任務中,每次編譯請求對應創建一個新 Biz,每次編譯請求只會依賴部分指定版本的 Flink、Opt 包,即只依賴部分 Plugin,Biz 運行時進行類加載時只在這些依賴的 Plugin 中查找 Export 信息并進行委托加載。
比如編譯任務 Biz1 在進行類加載時,只從依賴的 flink-lib-plugin、flink-opt2-plugin 中委托加載,其余 Plugin 對該編譯任務 Biz1 完全不可見。
結果
隨機抽取一部分作業,直接測試進程模型和線程模型編譯結果的一致性,直接比對生成的執行計劃內容。
隨機抽取一部分包含 UDF(用戶自定義依賴)和不包含 UDF 的作業,直接使用線程模型編譯,觀察成功率,耗時,機器負載等指標。
整體來說,Flink 編譯任務使用線程模型編譯從功能上來說,可以正確替代原來進程模型的能力,編譯的結果一致,編譯出來的執行計劃一致。編譯任務執行耗時從原來的平均?10s 多降低到 5.6s,平均降低?50%,吞吐從?5~10/min?個編譯任務提升到?50/min?及以上,提升?5 倍及以上。
總結
這次 Flink 編譯任務,是 Koupleless 在新的實時計算場景中落地的成功探索,以一種新的方式使用類加載框架。在一個大基座上面運行 Job 類模塊,流量觸發運行,請求完即執行卸載,輕量快捷。歡迎大家碰到相關場景時使用 Koupleless,一起探索 Koupleless 更多的使用場景吧~