引入
上一篇我們提到了調優的常見切入點,核心就是通過數據產出情況發現問題,借助監控等手段收集信息排查瓶頸在哪,最后結合業務理解,等價重寫思路去解決問題。
在實際工作場景中,去保證數據鏈路產出SLA的時候,我們都會先默認任務的SQL實現都是沒問題的,也就是優先考慮,在不改寫SQL語句的前提下,通過調整參數配置、優化數據存儲等方法,提升任務效率,減少執行時間。
針對這一類調優,我們通常稱其為底層調優,下面我們就從參數和存儲兩方面去掌握。
參數調優
參數調優最簡單的思路就是加資源,比如加內存,提高并行度等,來讓任務更快的執行完成。除此之外,針對一些任務,我們可以通過調整框架自帶的一些特定參數,能更有效的使作業運行效率達到最優。
這里只例舉一下最常用的參數優化場景,針對不同數據處理引擎,不同業務場景,會有不同的參數優化方案,這一點在后續對應技術棧的專欄再去深入拓展。
并行執行
默認情況下,分布式任務會根據上下游依賴關系依次執行,每次僅執行一個任務。然而在某些條件下,如果任務之間沒有直接的依賴關系,就意味著這些任務可以并行執行。此時通過合理配置并行執行的相關參數,就可以顯著提高數據處理的效率和性能。具體參數的設置需要根據集群的硬件資源和數據量進行調整,才能達到最佳的并行處理效果。
Hive
相關參數:
- hive.exec.parallel:用于開啟或關閉并行執行,默認值為?true。設置為?true?時,啟用并行執行;設置為?false?時,禁用并行執行。
- hive.exec.parallel.thread.number:用于指定并行執行的線程數,默認值為?8。該參數決定了同時執行的任務數,可以根據集群的硬件資源情況進行調整。
- hive.exec.parallel.thread.queue.size:用于指定并行執行的線程隊列大小,默認值為?0。當并行執行線程數達到上限時,新的任務會被放入隊列中等待執行。
實現原理:
- Hive 的并行執行是通過將查詢任務分解為多個子任務(Map 和 Reduce 任務),并在集群中的多個節點上并行執行這些子任務來實現的。每個 Map 任務處理一個數據片段,多個 Map 任務可以并行執行。Reduce 階段也可以并行處理,不同的 Reduce 任務負責不同的數據分區。
優缺點:
- 優點:充分利用集群資源,加快查詢速度,尤其適用于處理大規模數據集。
- 缺點:如果并行度過高,可能會導致資源競爭,如 CPU、內存等,從而影響整體性能;同時,任務之間的依賴關系可能會變得復雜,增加調試和維護的難度。
Spark
相關參數:
- spark.default.parallelism:它定義了沒有顯式指定并行度時的默認并行度。這個參數對于一些操作如 RDD 的轉換操作、reduceByKey 等很重要。如果設置不當,可能會導致資源利用不充分或者任務執行時間過長。通常可以根據集群的資源情況和數據規模進行調整。
- spark.sql.shuffle.partitions:這個參數決定了在 shuffle 操作期間生成的分區數量。默認值通常取決于集群配置,但可以根據數據大小和集群資源進行調整。如果設置得太小,可能會導致數據傾斜,某些任務處理的數據量過大,從而延長作業執行時間;如果設置得太大,會產生過多的小任務,增加任務調度開銷。例如,在處理大規模數據集時,可以根據數據量和集群的核心數量進行合理調整,一般可以設置為集群核心數的 2-3 倍。
- spark.sql.adaptive.enabled:該參數用于啟用自適應查詢執行(AQE),允許 Spark 在運行時動態調整并行度。默認值為 false。
- spark.sql.adaptive.shuffle.targetPostShuffleInputSize:該參數用于設置 AQE 中每個 Shuffle 分區的目標大小(以字節為單位)。
實現原理:
- 并行度的設置:Spark SQL 通過設置 spark.sql.shuffle.partitions 和 spark.default.parallelism 參數來控制并行度。這些參數決定了數據在 Shuffle 操作和每個 Stage 中的分區數,從而影響任務的并行執行。當數據量較大時,增加并行度可以提高查詢性能,但過高的并行度可能會導致內存和網絡資源的瓶頸。
- 自適應查詢執行(AQE):AQE 通過在運行時動態調整并行度來優化查詢性能。當啟用 AQE 時,Spark 會根據實際數據分布動態調整分區數,從而避免因分區數過小或過大導致的性能問題。AQE 會根據 spark.sql.adaptive.shuffle.targetPostShuffleInputSize 參數來決定是否合并分區,從而優化查詢性能。
優缺點:
- 優點:
- 提高查詢性能:通過合理設置并行度,可以充分利用集群資源,提高查詢速度。
- 動態調整:啟用 AQE 后,Spark 可以根據實際數據分布動態調整并行度,從而優化查詢性能。
- 靈活性:可以根據不同的查詢需求和數據量,靈活調整并行度參數,以達到最佳性能。
- 缺點:
- 資源消耗:過高的并行度可能會導致內存和網絡資源的瓶頸,從而影響查詢性能。
- 動態調整的不確定性:啟用 AQE 后,動態調整并行度可能會引入一定的不確定性,需要仔細監控和調優,否則可能會出現反效果。
Flink
相關參數:
- parallelism.default:用于設置作業的并行度,可以在代碼中通過設置執行環境的并行度來指定整個作業的并行執行程度。這個參數可以控制任務執行的并發度,根據硬件資源和數據量的大小進行調整。
- table.exec.resource.default-parallelism:針對Flink SQL任務設置默認并行度,需與集群資源匹配,避免過高導致資源爭搶。
實現原理:
- 數據分區:Flink 將輸入數據進行分區,以便多個任務可以并行處理不同的數據分區。常見的分區方式有哈希分區、范圍分區等。例如,在進行聚合操作時,Flink 可以根據指定的鍵將數據進行哈希分區,使得相同鍵的數據被分配到同一個任務中進行處理。
- 任務調度:Flink 的任務調度器負責將任務分配到不同的任務槽(Task Slot)中執行。任務槽是 Flink 中用于執行任務的資源單元,可以理解為一個執行任務的容器。任務調度器會根據任務的并行度和可用的任務槽資源,將任務分配到不同的節點上執行。
- 數據交換:在并行執行過程中,不同任務之間需要進行數據交換。Flink 提供了多種數據交換的方式,如廣播(Broadcast)、重新分區(Repartitioning)等。例如,在進行連接操作時,Flink 可能需要將兩個輸入數據集進行重新分區,以便進行連接操作。
優缺點:
- 優點:
- 提高處理性能:通過并行執行,可以充分利用多核處理器和分布式計算環境,提高數據處理的速度和吞吐量。多個任務可以同時處理不同的數據分區,從而減少處理時間。
- 可擴展性強:Flink 的并行執行機制使得它可以很容易地擴展到大規模數據集和分布式計算環境中。可以根據數據量和計算資源的變化,動態調整并行度,以滿足不同的處理需求。
- 容錯性好:Flink 具有良好的容錯機制,當某個任務失敗時,它可以自動重新啟動失敗的任務,并從檢查點(Checkpoint)恢復狀態。在并行執行環境中,即使某個任務失敗,其他任務仍然可以繼續執行,不會影響整個作業的運行。
- 缺點:
- 資源競爭:在并行執行過程中,多個任務可能會競爭計算資源,如 CPU、內存等。如果資源分配不合理,可能會導致某些任務執行緩慢,影響整體性能。
- 數據傾斜:數據傾斜是指數據在不同任務之間分布不均勻的情況。如果某些任務處理的數據量遠遠大于其他任務,可能會導致這些任務執行時間過長,影響整個作業的性能。
預聚合
在一些聚合處理場景中,數據讀取和掃描之后,引擎可以先進行一次預聚合,然后再將聚合結果進行分發和Shuffle。這樣做可以減少在數據分發過程中的時間消耗和資源開銷,從而加快查詢進程。通過合理配置相關參數,可以顯著提高數據處理的效率和性能。
Hive
相關參數:
- hive.map.aggr:控制是否啟用 Map 端預聚合。當設置為 true 時,Hive 會在 Map 階段對數據進行局部聚合,從而減少傳輸到 Reduce 階段的數據量,提高查詢性能。
- hive.groupby.mapaggr.checkinterval:設置在 Map 端預聚合時,檢查聚合鍵數量的間隔。當 Map 端處理的數據行數達到這個值時,Hive 會檢查聚合鍵的數量。如果聚合鍵數量超過一定比例,Hive 會認為預聚合效果不理想,從而關閉預聚合功能。
- hive.groupby.skewindata:控制是否啟用數據傾斜處理。當設置為 true 時,Hive 會在處理數據傾斜時優化 Group By 操作,從而提高查詢性能。
實現原理:
- Hive 的預聚合是在 Map 階段對數據進行局部聚合,減少數據傳輸量和計算量。在 Map 階段,對每個分組鍵進行局部聚合,生成中間結果,然后將這些中間結果傳遞給 Reduce 階段進行最終聚合。
優缺點:
- 優點:顯著提高查詢效率,減少數據傳輸和計算開銷。
- 缺點:一些如計算中位數等的場景中,會導致最終結果不準確;增加內存使用量,內存不足時可能影響作業執行。
Spark
相關參數:
- spark.sql.autoBroadcastJoinThreshold:這個參數用于控制自動廣播連接的閾值。當一個表的數據量小于這個閾值時,Spark 會自動將該表廣播到所有的執行節點上,以避免 Shuffle 操作,從而提高連接操作的性能。在預聚合場景中,如果小表可以被廣播,那么可以在連接之前對小表進行預聚合,進一步提高性能。
- spark.sql.shuffle.partitions:該參數決定了 Shuffle 操作時的分區數。合理設置分區數可以避免數據傾斜,提高預聚合的效率。如果分區數過多,會導致過多的小文件,增加管理開銷;如果分區數過少,可能會導致數據傾斜,影響性能。
- spark.sql.inMemoryColumnarStorage.batchSize:這個參數控制內存列存儲的批處理大小。在預聚合過程中,合理調整這個參數可以優化內存使用和性能。較大的批處理大小可以減少內存開銷,但可能會增加內存壓力;較小的批處理大小可以更好地適應內存限制,但可能會降低性能。
實現原理:
- 廣播連接:當連接的表大小小于 spark.sql.autoBroadcastJoinThreshold 閾值時,Spark 會自動將小表廣播到所有節點,從而減少數據傳輸和 Shuffle 操作。通過廣播小表,可以避免數據的 Shuffle 操作,從而提高連接操作的性能。廣播連接適用于小表與大表的連接場景。
- Shuffle 分區數:通過設置 spark.sql.shuffle.partitions 參數,可以控制 Shuffle 操作的分區數,從而影響數據的并行處理能力。增加 Shuffle 分區數可以提高并行度,從而加快查詢速度,但過高的并行度可能會導致內存和網絡資源的瓶頸。
優缺點:
- 優點:
- 提高查詢性能:通過提前對數據進行聚合操作,可以減少后續計算的工作量,從而提高查詢性能。特別是在處理大規模數據和復雜查詢時,預聚合可以顯著縮短查詢執行時間。
- 減少數據傳輸和存儲開銷:預聚合可以減少中間結果的數據量,從而減少數據傳輸和存儲開銷。這對于分布式計算環境非常重要,因為數據傳輸和存儲開銷往往是性能瓶頸之一。
- 優化內存使用:通過合理配置預聚合相關參數,可以優化內存使用,避免內存溢出等問題。例如,可以調整 spark.sql.inMemoryColumnarStorage.batchSize 參數來控制內存列存儲的批處理大小,以適應不同的內存限制。
- 缺點:
- 增加計算開銷:預聚合需要額外的計算資源,特別是在數據量較大時,預聚合的計算開銷可能會比較大。如果預聚合的收益不足以抵消計算開銷,那么可能會導致性能下降。
- 可能導致數據傾斜:如果預聚合的結果不均勻,可能會導致數據傾斜。例如,如果某些分組的聚合結果非常大,而其他分組的聚合結果非常小,那么在后續的計算中可能會出現數據傾斜,影響性能。
- 不適合動態數據:預聚合通常適用于靜態數據或數據變化不頻繁的場景。如果數據變化頻繁,那么預聚合的結果可能會很快失效,需要重新進行計算,從而增加計算開銷。
Flink
在Flink聚合操作可以通過啟用微批處理和Local-Global策略來優化。這種方法將原本集中的聚合操作分解為兩個階段,首先在本地進行預聚合,然后在全局范圍內再次進行聚合。這樣做可以減少狀態訪問次數,提高處理吞吐量,并減少數據輸出量。
微批處理(MiniBatch)
實現原理:
微批處理通過緩存一定量的數據,然后一次性處理這些數據,從而減少對狀態(State)的訪問頻率,提升吞吐量并減少數據的輸出量。這種方式通過增加少量的延遲來換取更高的吞吐量。
參數配置:
- table.exec.mini-batch.enabled:是否開啟微批處理,默認值為?false。
- table.exec.mini-batch.allow-latency:批量輸出數據的時間間隔,例如?5s。
- table.exec.mini-batch.size:微批操作所緩存的最大數據條數,例如?20000。
優缺點:
- 優點:提升吞吐量,減少對狀態的訪問,適用于一般聚合場景。
- 缺點:增加延遲,不適用于對延遲要求極高的場景。
原理:
Local-Global 策略將聚合操作分為兩個階段:Local 聚合和 Global 聚合。第一階段在上游節點本地對數據進行聚合(LocalAgg),并輸出這次微批的增量值(Accumulator)。第二階段再將收到的 Accumulator 合并(Merge),得到最終的結果(GlobalAgg)。這種策略類似于 MapReduce 模型中的 Combine + Reduce 處理模式,通過本地聚合篩除部分傾斜數據,從而降低 Global 聚合的熱點,提升性能。
參數配置:
- table.optimizer.agg-phase-strategy:聚合階段的策略,可選值為 AUTO、TWO_PHASE(使用 Local-Global 兩階段聚合)和 ONE_PHASE(僅使用 Global 一階段聚合)。默認值為 AUTO。
優缺點:
- 優點:有效解決數據傾斜問題,提升聚合性能,適用于普通聚合操作。
- 缺點:需要聚合函數支持 Merge 方法,且在某些情況下可能效果不明顯。
在默認設置下,聚合算子會對每條流入的數據執行一系列操作,例如讀取狀態、更新狀態。當數據量較大時,這些狀態操作的開銷也會相應增加,從而影響整體效率,這種影響在使用如RocksDB這類序列化成本較高的State Backend時尤為明顯。而一旦啟用了Mini-Batch處理,流入的數據會被暫存于算子內部的緩沖區中,直到達到預設的容量或時間閾值,然后才會進行聚合邏輯處理。這種方法使得同一批數據中的每個唯一鍵(key)只需進行一次狀態的讀寫操作,特別是在鍵分布較為稀疏的情況下,這種優化的效果會更為顯著。
數據重用
如果查詢語句或任務中存在重復的查詢塊(即中間結果集)或子查詢,可以采取數據重用(Reuse)策略。例如根據不同的過濾條件多次篩選相同的表,每篩選一次就需要重新掃描一次表,通過數據緩存等方式,相較于之前多次掃描數據,則只需進行一次掃描,從而減少對表的讀取次數。這樣不僅降低了讀寫I/O和網絡的開銷,還能加快任務的執行速度,縮短產出時間。不同于常見的數據緩存(Cache),數據重用強調的是在程序中重復使用已計算的對象或結果,而緩存則是具體實現重用的一種機制。
Hive
參數配置:
- hive.optimize.cte.materialize.threshold:當設置為大于 0 的值時,如果一個 CTE(公共表表達式) 被引用的次數超過該閾值,Hive 會將該 CTE 的結果物化(即存儲到臨時表中),以避免重復計算。
CTE就是使用WITH AS語句
實現原理:
- 物化機制:當 CTE 的引用次數超過 hive.optimize.cte.materialize.threshold 的值時,Hive 會在執行主查詢之前,將 CTE 的結果存儲到一個臨時表中。
- 執行流程:
- Hive 在解析 SQL 時,會檢查 CTE 的引用次數。
- 如果引用次數超過閾值,Hive 會生成一個臨時表來存儲 CTE 的結果。
- 后續的查詢可以直接從臨時表中讀取數據,而不需要重新計算 CTE。
優缺點:
- 優點:
- 提高查詢效率:通過物化 CTE 的結果,減少了重復計算的開銷,特別是在 CTE 被多次引用的情況下,性能提升顯著。
- 簡化代碼:使用 CTE 可以使 SQL 代碼更加簡潔和易讀,同時通過物化避免了性能損失。
- 缺點:
- 存儲開銷:物化會占用額外的存儲空間,特別是當 CTE 的結果集較大時,可能會導致存儲壓力。
- 適用性限制:僅在 CTE 被多次引用時才有顯著效果,如果 CTE 僅被引用一次,物化可能不會帶來性能提升。
Spark
命令使用:
CACHE TABLE 是 Spark SQL 中的一個命令,用于將表的內容或查詢結果緩存到內存或磁盤中,以提高后續查詢的性能。通過緩存數據,可以減少對原始數據源的重復讀取,從而加快查詢速度。
- 緩存機制:CACHE TABLE 命令會將指定的表或查詢結果存儲在內存或磁盤中。Spark SQL 會根據配置的存儲級別(如 MEMORY_ONLY、MEMORY_AND_DISK 等)來決定數據的存儲方式。
- 存儲級別:可以通過 OPTIONS 子句指定存儲級別。例如,MEMORY_AND_DISK 表示數據優先存儲在內存中,如果內存不足則存儲在磁盤上。
- 查詢優化:緩存的數據會在后續查詢中直接使用,避免了重復的計算和 I/O 操作,從而提高查詢性能。
-
優點:
- 提高查詢性能:通過緩存數據,減少了對原始數據源的重復讀取,加快了查詢速度,特別是在多次查詢同一數據集的情況下,性能提升顯著。
- 靈活的存儲級別:可以根據實際需求選擇不同的存儲級別,以平衡性能和存儲資源的使用。
-
- 存儲資源消耗:緩存數據會占用內存或磁盤資源,如果數據量較大,可能會導致存儲壓力,影響系統的整體性能。
- 緩存一致性問題:如果底層數據發生變化,緩存的數據可能不再一致,需要手動刷新或清除緩存,以確保數據的準確性。
Flink
對Flink來說,基于其流式計算的底層設計,數據重用策略主要通過內存、Redis 和 RocksDB 等方式實現緩存。內存緩存通過內部內存去存儲數據,實現數據重用;Redis 和RocksDB緩存則是通過外部鍵值存儲實現數據共享和通信。
存儲調優
存儲調優的思路,主要是采用合適的存儲格式和壓縮類型,來減少大數據處理中需要傳輸的文件大小,從而提高任務完成效率。
存儲格式
常見存儲格式如下:
文本格式(TextFile):這是最基本的存儲格式,數據以純文本形式存儲,每行一條記錄。優點是簡單直觀,易于理解和查看,可直接使用文本編輯器打開。缺點是存儲效率低,占用空間大,并且在讀取和處理時相對較慢。例如,在處理大規模數據集時,讀取文本格式的數據可能會消耗大量的時間和資源。
JSON 格式:JSON 是一種輕量級的數據交換格式,它在數據處理中也被廣泛使用。在 Spark 中,可以使用 JSON 格式存儲數據。JSON 格式具有良好的可讀性和可擴展性,但是存儲效率相對較低。在對數據的可讀性要求較高或者需要與其他系統進行數據交換時,可以考慮使用 JSON 格式。
CSV (Comma-Separated Values)格式:CSV格式是一種常見的表格數據存儲格式,它以逗號分隔各個字段。在 Spark 中,可以使用 CSV 格式存儲數據。CSV 格式簡單直觀,易于理解和處理,但是存儲效率較低,并且在處理大規模數據集時可能會遇到性能問題。
ORC(Optimized Row Columnar)格式:ORC 是一種高效的列式存儲格式,它對數據進行了優化存儲,能夠提高數據的讀取和寫入效率。與文本格式相比,ORC 格式占用的存儲空間更小,查詢性能更好。ORC 文件支持復雜數據類型和嵌套結構,并且可以進行高效的壓縮。在 Hive 中使用 ORC 格式可以顯著提高查詢性能,特別是對于大規模數據集。
Parquet 格式:Parquet 也是一種列式存儲格式,它具有高效的壓縮比和良好的查詢性能。Parquet 格式支持多種壓縮算法,如 Snappy、Gzip 和 Zstandard(zstd)等。通過選擇合適的壓縮算法,可以進一步減小存儲占用空間并提高查詢效率。在 Hive 中,使用 Parquet 格式結合適當的壓縮算法可以提高存儲效率和查詢性能。
Avro 格式:Avro 是一種數據序列化系統,它具有良好的兼容性和可擴展性。在 Flink 中,可以使用 Avro 格式存儲數據。Avro 格式支持復雜數據類型和嵌套結構,并且可以進行高效的壓縮。通過使用 Avro 格式,可以方便地與其他系統進行數據交換和集成。
壓縮類型
Gzip 壓縮:最早由Jean-loup Gailly和Mark Adler創建,用于UNIX系統的文件壓縮。我們在Linux中經常會用到后綴為.gz的文件,它們就是GZIP格式的。Gzip 是一種廣泛使用的壓縮算法,它具有較高的壓縮比,可以有效地減小存儲占用空間。但是,Gzip 壓縮和解壓縮的速度相對較慢,可能會對數據處理的性能產生一定的影響。在對存儲空間要求較高而對性能要求相對較低的場景下,可以考慮使用 Gzip 壓縮。
Deflate 壓縮:Deflate 是一種通用的壓縮算法,它在壓縮比和壓縮和解壓縮速度之間取得了較好的平衡。
Snappy 壓縮:是一款由Google開發的開源壓縮庫。它的目的不是最大程度地壓縮或與任何其他壓縮庫兼容,相反,它旨在實現高速且合理的壓縮。例如,與Gzip的最快模式相比,Snappy的壓縮速度對于大多數輸入來說要快一個數量級,但生成的壓縮文件要大20%~100%。
LZO(Lempel-Ziv-Oberhumer) 壓縮:LZO是一種快速的壓縮算法,它在壓縮和解壓縮速度方面與 Snappy 類似。它能夠提供非常快速的壓縮和解壓功能。解壓并不需要內存的支持,即使使用非常大的壓縮比例緩慢壓縮出的數據,依然能夠非常快速地解壓。LZO遵循GPL。
LZ4壓縮:LZ4是無損壓縮算法,提供每核大于500MB/s的壓縮速度,可通過多核CPU進行擴展。它具有極快的解碼器,每個內核的速度可達數GB/s,通常在多核系統上達到RAM速度限制。一方面,LZ4的速度可以動態調整,即選擇一個“加速”因子,以壓縮比換取更快的速度。另一方面,它還提供了高壓縮率的衍生產品LZ4_HC,以CPU時間換取更高的壓縮率。所有版本都具有相同的解壓縮速度。
Zstandard(zstd)壓縮:Zstandard是Facebook在2016年開源的一種快速、無損壓縮算法,簡稱zstd算法,適用于實時壓縮場景,并擁有更好的壓縮比。相比業內其他壓縮算法,zstd算法的特點是當需要時,它可以將壓縮速度交換為更高的壓縮比率(壓縮速度與壓縮比率的權衡可以通過小增量來配置)?。
關于壓縮可以看看之前我寫的這篇文章。
在大數據開源系統中,Hadoop、Kafka、Pulsar等都支持若干種壓縮協議,ORC、Parquet、Avro等文件格式也支持其中大部分壓縮算法。在大數據系統中,尤其是在HDFS中,壓縮后的文件是否可拆分也是一個重要考量標準,LZO、LZ4等格式是支持拆分的,Zstandard也通過Hadoop 4mc實現了可拆分性。