最近我們組在大規模上線Flink SQL作業。首先,在進行跑批量初始化完歷史數據后,剩下的就是消費Kafka歷史數據進行追數了。但是發現某些作業的追數過程十分緩慢,要運行一晚上甚至三四天才能追上最新數據。由于是實時數倉指標計算上線初期,經常驗證作業如果有問題就得重蹈覆轍重新追數,效率很低,于是我開始分析Flink SQL的優化。
問題
insert into tableB
select a, max(b), max(c), sum(d) ...
from tableA
group by a
上面這個作業的簡化版SQL,主要就是做一個分組聚合:
- 從tableA分組聚合出結果插入tableB
- tableA的聯合主鍵是:a,b(但是a的離散度已經很高了)
- tableA的Flink表類型為upset-kafka
- tableB的Flink表類型為HBase
初步分析
這個作業跑在集群上的job graph如下:
可以看到有三個vertex:
- 第一個是TableSourceScan
- 第二個是ChangelogNormalize
- 第三個是GroupAggregate
TableSourceScan接入tableA表的upsert-kafka流;
ChangelogNormalize對upset-kafka進行撤回語義的解析;
GroupAggregate對撤回流進行分組聚合,然后寫入tableB的HBase;
優化思路1:local/global agg
agg分類:
- group agg
select count(a) from t group by b
- over agg
select count(a) over (partition by b order by c) from t
- window agg
select count(a) from t group by tumble(ts, interval '10' seconds), b
local/global agg:
核心思想與hadoop的combiner是一致的,就是在mapreduce的過程中,在map階段就做一個預聚合,即combine操作。
[圖片上傳失敗…(image-c0ad24-1650075387085)]
帶來的收益是:減少網絡shuffle數據,提升計算引擎的性能。
前提條件:
- agg的所有agg function都是mergeable(實現merge方法)
- table.optimizer.agg-phase-strategy為AUTO或TWO_PHASE
- Stream下,minibatch開啟;Batch下,AUTO會根據cost選擇
解釋說明:
mergeable其實就是能用分治法解決的計算問題,例如sum、count等,而avg就不能用分治法先計算部分元素的avg,再計算最終avg了,結果有時候會出錯。
table.optimizer.agg-phase-strategy:默認為AUTO,意思是引擎盡量做預聚合;TWO_PHASE表示所有聚合操作都做預聚合;ONE_PHASE表示所有聚合都不做預聚合。
minibatch:即開啟微批模式。主要有三個參數:
table.exec.mini-batch.enabled:是否開啟,默認不開啟
table.exec.mini-batch.size:微批的record buffer大小
table.exec.mini-batch.allow-latency:微批的time buffer大小
minibatch的本質就是平衡實時性和吞吐量的刻度尺。
所以,local/global agg一共需要三個參數控制。
驗證
經過對比驗證,在這個SQL場景下的效率提升很小。
local/global agg降低了第二個vertex即ChangelogNormalize的sent records的數據量,而并沒有使得第一個vertex的數據處理效率有顯著提升。
所以,這個作業的瓶頸并不在vertex間, 而在于第一個vertex的處理數據效率。
優化思路二:調大并行度
這個思路的關鍵在于source upsert-kafka的分區數,這是制約吞吐量的瓶頸。因為在upsert-kafka中,每個partition最多被一個Flink線程讀取。
增加了10倍的并行度,source分區也增加10倍后,作業周轉時間縮短了將近一半。
優化思路三:RocksDB性能調優
仔細分析這個SQL作業,是對一個聯合主鍵的字段做group by,那么state一定會非常大。
經過在對這個表在數倉中的數據進行分析,發現這個字段的離散度幾乎接近于主鍵的離散度。
而進行group by必然要根據每一條upsert kafka的數據去查驗在flink statebackend中物化的source table中該字段值的分布情況,這應該是才是瓶頸所在!
沿著這個思路,開始分析Flink的statebackend機制。
這里我們簡單回顧一下Flink statebackend(后面再做專題總結):
由 Flink 管理的 keyed state 是一種分片的鍵/值存儲,每個 keyed state 的工作副本都保存在負責該鍵的 taskmanager 本地中。另外,Operator state 也保存在機器節點本地。Flink 定期獲取所有狀態的快照,并將這些快照復制到持久化的位置,例如分布式文件系統。
如果發生故障,Flink 可以恢復應用程序的完整狀態并繼續處理,就如同沒有出現過異常。
Flink 管理的狀態存儲在 state backend 中。Flink 有兩種 state backend 的實現 – 一種基于 RocksDB 內嵌 key/value 存儲將其工作狀態保存在磁盤上的,另一種基于堆的 state backend,將其工作狀態保存在 Java 的堆內存中。這種基于堆的 state backend 有兩種類型:FsStateBackend,將其狀態快照持久化到分布式文件系統;MemoryStateBackend,它使用 JobManager 的堆保存狀態快照。
當使用基于堆的 state backend 保存狀態時,訪問和更新涉及在堆上讀寫對象。但是對于保存在 RocksDBStateBackend 中的對象,訪問和更新涉及序列化和反序列化,所以會有更大的開銷。但 RocksDB 的狀態量僅受本地磁盤大小的限制。還要注意,只有 RocksDBStateBackend 能夠進行增量快照,這對于具有大量變化緩慢狀態的應用程序來說是大有裨益的。
所有這些 state backends 都能夠異步執行快照,這意味著它們可以在不妨礙正在進行的流處理的情況下執行快照。
我們的線上一般采用的是RocksDB作為狀態后端,checkpoint dir采用hdfs文件系統。其實我個人覺得這個應該根據作業的特性進行選擇,根據我個人的經驗以及知識沉淀,選擇的主要因素是作業的state大小及對處理數據性能的要求:
- RocksDBStateBackend可以突破內存的限制,rocksDB的數據邏輯結構和redis相似,但是數據的物理存儲結構又和hbase相似,繼承自levelDB的LSM樹思想,缺點是性能太低
- 而FsStateBackend是在做snapshot的時候才將內存的state持久化到遠端,速度接近于內存狀態
- MemoryStateBackend是純內存的,一般只用做調試。
但是由于這個大狀態作業追數速度實在太慢,我甚至想過:
在追數的時候用FsStateBackend,并配置大內存,且把managed memory調成0,同時將ck的周期設置的很大,基本上不做ck,追上后savepoint。再把狀態后端換成RocksDB,并且從FSSatebackend的savepoint處恢復,但是發現1.13才支持savepoint切換statebackend類型。
只剩下調優RocksDB一條路了。根據之前對HBase的LSM原理的理解,進行知識遷移,馬上對RocksDB有了一定的認識。在HBase中調優效果最明顯無乎:
blockcache讀緩存、memStore寫緩存、增加布隆過濾器、提升compact效率
沿著這個思路,再查閱了一番RocksDB資料后,決定先對如下參數進行調優:
- state.backend.rocksdb.block.cache-size
state.backend.rocksdb.block.blocksize
Block 塊是 RocksDB 保存在磁盤中的 SST 文件的基本單位,它包含了一系列列有序的 Key 和 Value 集合,可以設置固定的大小。
但是,通過增加 Block Size,會顯著增加讀放大(Read Amplification)效應,令讀取數據時,吞吐量下降。原因是 Block Size增加以后,如果 Block Cache 的大小沒有變,就會?大減少 Cache 中可存放的 Block 數。如果 Cache 中還存處理索引和過濾?等內容,那么可放置的數據塊數目就會更少,可能需要更多的磁盤 IO 操作,找到數據就更更慢了,此時讀取性能會大幅下降。反之,如果減小BlockSize,會讓讀的性能有不少提升,但是寫性能會下降,?而且對 SSD 壽命也不利。
因此我的調優經驗是,如果需要增加 Block Size 的大小來提升讀寫性能,請務必一并增加 Block Cache Size 的大小,這樣才可以取得比較好的讀寫性能。Block Cache,緩存清除算法?用的是 LRU(Least Recently Used)。
驗證
測試對比后發現,原本半天左右完成的作業只需要一到兩個小時即可追上數據!
感悟
性能調優就如同把脈治病,關鍵在于對癥下藥。
前期,要分析當前場景下真正制約性能的瓶頸所在,后期,在癥結處用效果最明顯的方式處理癥結。