Chukonu:一個將原生計算引擎集成到 Spark 中的全功能高性能大數據框架
摘要
Apache Spark 是一種廣泛部署的大數據分析框架,它提供了諸如彈性、負載均衡和豐富的生態系統等吸引人的特性。然而,其性能仍有很大的改進空間。盡管用原生編程語言編寫的數據并行系統可以顯著提高性能,但它可能需要重新實現 Spark 的許多功能才能成為一個功能齊全的系統。原生大數據系統最好只用原生語言編寫計算引擎以確保高效率,并重用 Spark 提供的其他成熟功能,而不是重新實現所有內容。
希望提升spark的性能,原來為了提高性能,需要掀了spark,自己實現一些功能,本文提出的方法復用了spark。
但是 JVM 和原生世界之間的交互可能會成為瓶頸。
本文提出了一種名為Chukonu的原生大數據框架,該框架復用了Spark提供的關鍵大數據特性。由于我們提出的新型DAG分割方法,潛在的Spark集成開銷得以減輕,甚至優于現有的純原生大數據框架。Chukonu將DAG程序分割為運行時部分和編譯時部分:運行時部分委托給Spark,以減輕由于特性實現而產生的復雜性。編譯時部分進行原生編譯。我們提出了一系列優化技術,應用于編譯時部分,例如算子融合、向量化和壓縮,以顯著降低Spark集成開銷。評估結果表明,在六個常用的的大數據應用上,Chukonu的加速比高達71.58倍(幾何平均數為6.09倍),超過Apache Spark,并且高達7.20倍(幾何平均數為2.30倍),超過純原生框架。通過將SparkSQL生成的物理計劃轉換為Chukonu程序,Chukonu將SparkSQL的TPC-DS性能提高了2.29倍。
引言
大數據指的是以異構格式快速增長的大量數據集,可以從中提取有價值的信息 [31]。Apache Hadoop [2] 是一個早期的開源大數據解決方案,它包括一個分布式文件系統 (HDFS) 用于持久存儲大數據,以及一個基于 MapReduce 抽象的分析框架 [13]。最近,Apache Spark [44] 引入了一種名為彈性分布式數據集 (RDD) [42] 的新抽象,以實現容錯數據重用,從而支持迭代工作負載。與 Hadoop MapReduce 相比,它可以實現數量級上的性能提升。Spark 提供了一個富有表現力且易于使用的 API,可以自然地表達函數式轉換、MapReduce 和連接,從而可以構建支持圖計算 [19]、流處理 [9, 43]、機器學習 [25] 和 SQL 查詢 [10] 的支持庫。如今,Spark 已被廣泛部署,用于提供大數據分析服務 [3]。
背景,大數據服務中spark很重要。
盡管Spark在內存數據集重用方面具有優勢,但最近的研究表明,其性能可以得到顯著提升。例如,通過使用C++構建大數據分析框架,Thrill [11] 在典型的大數據工作負載上實現了相對于Spark的3.26倍的幾何平均加速。實際上,性能提升仍有很大空間:對于Java矩陣乘法內核,切換到C語言可以帶來4.4倍的加速,并且通過C編譯器提供的向量化和AVX內部函數,性能可以進一步提高9.45倍 [23]。
重用的概念應該更多的是用的少,性能的顯著提升。
然而,性能只是大數據處理的一個方面。Spark 提供了許多其他關鍵特性,例如其基于血統的彈性:大數據分析通常在多租戶商品集群中執行,由于機器故障、網絡抖動和搶占式調度,任務失敗非常常見,這使得檢查點對于處理如此頻繁的故障效率不高。
跟檢查點也帶點關系
Spark 基于血統的容錯機制允許僅重新計算部分數據,而不是全部數據。
到底什么是血統容錯
lineagebased resiliency
大概意思是原生容錯,或者說設計的容錯。
其彈性還支持其他特性,例如負載均衡、落后者緩解和自動縮放,從而提高了集群的資源利用率。此外,Spark 的生態系統,例如帶有 Web UI 的性能分析器以及與各種資源管理器的集成,使其易于在各種私有或公共云上部署、監控和分析應用程序。
webUI好評,能夠借用spark生態系統好評
Thrill [11] 具有一種原生的類似 RDD 的抽象,稱為 DIA,但它將數據分布緊密地耦合到物理機器,這使得彈性失效。
看來彈性意味著容易移植的特性
Husky [41] 使用上游消息日志記錄容錯機制,即使在沒有故障的情況下也會產生不可忽略的開銷 [40, 42]。與 Spark 相比,它們缺乏成為功能完備的系統所需的許多基本特性。
顯然,擁有一個功能齊全的原生大數據框架是可取的。
這是目標
一個直接的解決方案是用原生編程語言重新實現一個新的純原生框架,這既直接又在理論上可行,但成本可能高得令人望而卻步,而且是不必要的:Spark 3.0.1 的核心組件有 74K 行代碼1,其中只有 9K 行與編程框架直接相關,包括 RDD API 和算子實現,其余是為各種大數據功能服務的組件。Ousterhout 等人 [32] 已經揭示,計算是 Spark 的瓶頸。
這啟發我們設計一個高效的大數據框架,通過構建一個可以重用 Spark 完善的大數據功能(超過 Spark 核心 LoC 的 87%)的原生編程框架,如圖 1 所示。
復用了87%
然而,特征重用方法帶來了一些挑戰。首先,現有的純原生大數據框架與Spark的執行模型不兼容,使得將它們集成到Spark中變得不可行。例如,Thrill將每個數據集分區耦合到特定的機器,而Husky依賴于有狀態的任務執行,這違反了Spark的動態調度和無狀態假設。
大概感覺是,我要從thrill和husky借一點東西放到spark里邊,但是借用沒有那么容易
其次,JVM和原生世界之間細粒度的交互,無論是通過JNI還是JNA,都會產生很高的開銷[15, 35],這有可能成為一個新的性能瓶頸。因此,需要一個尊重Spark執行模型的原生編程框架,并且最大限度地減少與Spark集成相關的開銷至關重要。
減少兼容性帶來的問題
本文介紹了Chukonu,一個原生的、具有完整功能和高性能的大數據框架,它同時采用了特征重用的方法。它通過集成到Spark并重用其特性來支持關鍵的大數據功能。Chukonu的開發投入了合理的人力,包括9K行的C++代碼和1.5K行的Scala代碼,因為它重用了已充分開發的大數據功能。盡管存在高集成開銷的風險,Chukonu已成功克服了這一挑戰:其性能甚至超越了為性能而設計的現有純原生大數據框架。
這歸功于我們新穎的方法,該方法將DAG程序拆分為編譯時部分和運行時部分:編譯時部分處理諸如轉換、數據過濾和按分區排序等簡單數據流行為。運行時部分處理諸如調度、緩存和通信等復雜的數據流行為。Chukonu將運行時部分委托給Spark,從而減輕了復雜性。
轉換,數據過濾,按分區排序,是開發最核心的內容。
為了減少Spark集成的開銷,Chukonu應用了一系列針對編譯時部分的優化:算子融合消除了不必要的JNA調用。向量化將每個元素的數據處理分批進行,以減少JNA調用的次數。壓縮將一批元素緊湊地存儲在幾個緩沖區中,以便在JNA調用中消除數據復制。
為了在Spark中執行運行時部分,Chukonu實現了對Spark的輕量級封裝。它是非侵入式的,可以輕松提交到現有環境,而無需重新配置現有集群資源管理器或重新編譯現有Spark。
不需要重新編譯spark,是一個即插即用的模塊。
它還進行了一些增強,以進一步減少Spark集成開銷,包括優化的序列化快速路徑、顯式指針傳遞和高效的數據加載。
盡管Chukonu的性能更優越,但編譯時部分所需的優化會延長編譯時間。這使得Chukonu不適合即席分析,因為縮短的執行時間不足以彌補額外編譯時間帶來的損失。
復雜的編譯過程,不適合在線
本文做出以下貢獻:
(1)我們提出了一種將原生計算引擎以低開銷集成到 Spark 中的方法,通過將 DAG 程序拆分為編譯時部分和運行時部分,分別用于原生編譯和 Spark 執行。
什么是原生計算引擎->就是一些加速運行的組件,轉換,數據過濾,按分區排序,這樣的思路。
(2)我們開發了一系列優化措施,以減少JNA調用次數并減輕JNA調用開銷,包括算子融合、向量化和壓縮。
JNA
JNA(Java Native Access)是 Java 平臺上的一個開源庫,用于實現 Java 代碼與本地代碼(如 C、C++ 編寫的代碼或動態鏈接庫)的交互。
簡單來說,Java 語言本身運行在 JVM(Java 虛擬機)中,受限于虛擬機的沙箱機制,直接調用操作系統底層的本地庫(如 Windows 的.dll 文件、Linux 的.so 文件)并不方便。而 JNA 提供了一種簡化的方式,讓 Java 程序可以直接訪問本地代碼的函數和數據結構,無需像傳統的 JNI(Java Native Interface)那樣編寫大量繁瑣的中間適配代碼。
在實際開發中,JNA 常被用于:
調用操作系統提供的底層 API(如系統硬件操作、特殊系統功能);
復用已有的 C/C++ 庫,避免重復開發;
優化性能敏感的模塊(某些場景下本地代碼執行效率更高)。
不過,JNA 調用本地代碼時存在一定的開銷(如數據類型轉換、跨虛擬機上下文切換等),因此在高頻率調用場景下,需要像你提到的 “算子融合、向量化和壓縮” 等優化措施來減少調用次數、降低開銷。
(3)據我們所知,這是首個基于有向無環圖(DAG)拆分方法構建的大數據框架的實現,它具有Spark核心的全部功能,并且能夠提供與純原生框架相媲美的性能。
(4)我們通過將Chukonu與Spark和純原生基線框架進行比較,從而對其進行了仔細的評估。
我們使用 YARN 管理的內部 Hadoop 集群評估了 Chukonu,并將 Chukonu 與 Spark RDD、Spark Tungsten [7]、Husky [41] 和 Thrill [11] 進行了比較。選擇了六個大數據應用程序來評估這些系統。我們還實現了一個代碼生成器,可以將 SparkSQL 生成的物理計劃轉換為 Chukonu 程序。TPC-DS 基準測試的所有查詢都用于評估 Spark 和 Chukonu 的結構化分析能力。**盡管它站在 Apache Spark 的肩膀上,但結果表明 Chukonu 提供了卓越的性能。**在六個大數據應用程序上,與 Spark、Husky 和 Thrill 相比,其平均加速比分別為 6.09×(最高 71.58×)、2.53×(最高 7.20×)和 2.08×(最高 3.45×)。對于 TPC-DS 基準測試,與 Spark 相比,其總查詢執行時間的加速比為 2.29×(Q67 最高可達 5.14×)。這表明 Chukonu 的性能明顯優于 Spark,并且具有與純原生框架相媲美的性能,這證明了 DAG 分割方法的合理性。
背景
數據并行系統。數據并行系統[1, 2, 13, 20, 22, 34, 44]為用戶提供了一種編程抽象,以隱藏分布式處理的復雜細節。MapReduce [13] 是一種用戶友好的編程模型,它能夠基于重新執行實現低開銷的容錯。集群計算中需要這樣做以抵抗服務器故障或搶占。Dryad [22] 引入了數據流圖以促進多階段管道。隨后提出了 RDD [42] 和 Apache Spark [44],以實現迭代工作負載的內存數據重用。這提供了基于譜系的容錯,以提供低開銷的彈性。Piccolo [34] 提供了一種替代方法,該方法將用戶暴露于分布式共享可變 KV 接口。它依賴于檢查點來實現容錯。
反正都是為了隱藏分布式細節,對于用戶高度抽象,對于底層高度優化,包括對于效率的考量和對于容錯/復用的優化
盡管Spark在內存數據集重用方面具有優勢,但最近的研究表明,其性能可以得到顯著提升。例如,通過使用C++構建大數據分析框架,Thrill [11] 在典型的大數據工作負載上實現了相對于Spark的3.26倍的幾何平均加速。實際上,性能提升仍有很大空間:對于Java矩陣乘法內核,切換到C語言可以帶來4.4倍的加速,并且通過C編譯器提供的向量化和AVX內部函數,性能可以進一步提高9.45倍 [23]。
把spark換掉,性能顯著提升,把某個算子換掉,性能還是顯著提升。
在JVM中優化大數據系統。一些研究已經考察了在JVM中優化現有大數據系統的性能。在JVM中進行優化允許API保持不變,從而用戶應用程序無需更改。Facade [30]、Lu等[24]和Gerenuk [27]提出了用戶自定義函數的自動轉換,使其能夠支持以序列化形式訪問數據,并通過連續存儲序列化數據來減少垃圾回收開銷。NumaGiC [18]提出了一個NUMA感知的JVM垃圾回收器。Yak [29]提出了一個大數據友好的垃圾回收器,該回收器能夠感知大數據的對象生命周期模式。
原生集成到 Apache Spark 中。利用原生編譯的能力有助于消除 JVM 的開銷,并實現顯著的加速 [8, 15, 25, 35, 38]。
java虛擬機層次帶來的靈活性,同時也意味著開銷
Rosenfeld 等人 [35] 在 C++ 原生引擎中加速了 SparkSQL,但將 Java UDF 留在嵌入式 JVM 中。Flare [15] 通過將整個查詢計劃原生編譯到單機原生后端,與 SparkSQL 相比,展現了數量級的加速,但放棄了 Spark 對分布式執行的支持。Anderson 等人 [8] 通過將 MPI [36] 程序集成到 Spark 中,提供了數量級的加速。然而,MPI 不支持商品服務器上大數據分析所需的低開銷容錯 [13]。MLlib [25] 和 SparkJNI [38] 通過卸載到原生內核函數來加速性能關鍵的計算,但這會導致在 JVM 世界中沒有原生內核的計算,這可能成為一個新的瓶頸,并產生跨語言的數據編組開銷。
這些優化都要舍棄一些事情。
純原生大數據系統。一些研究已經從頭開始構建原生系統以提高性能[11, 21, 41]。Thrill [11] 提出了一種基于分布式不可變數組 (DIA) 內存抽象的原生數據并行系統,該系統與 MPI 的執行模型緊密耦合,并且不支持像 RDD 那樣基于血統的容錯。Husky [41] 和 Tangram [21] 引入了數據突變以實現細粒度的訪問和異步執行,但代價是昂貴的數據日志記錄和用于容錯的檢查點。