1. 概念
flink的狀態和容錯繞不開3個概念,state backends和checkpoint、savepoint。本文重心即搞清楚這3部分內容。
容錯機制是基于在狀態快照的一種恢復方式。但是狀態和容錯要分開來看。
- 什么是狀態,為什么需要狀態?
流計算和批計算在數據源上最大的區別是,流計算中的數據是無邊界的,數據持續不斷,而批計算中數據是有邊界的,在計算時可以一次性將數據全部拿到。在流計算中無法拿到全部數據進行計算結果,因此需要將歷史數據的處理結果記錄下來,這個就是狀態。通過狀態實現歷史數據和未來數據的結合處理。
舉個例子,在對數據求和的場景中,第n條數據的求和結果是前n-1條數據的求和結果再加上第n條數據的值。因此必須記錄前n-1條數據的累加和,才能在第n條數據到達時,得到前n條數據的累加和。
記錄的前n-1條數據的累加和就是狀態數據。
并不是全部的流計算場景都需要狀態,如單詞大小寫轉換的場景中,每條數據的處理僅和當前數據有關,因此不需要狀態。
- 什么是state backend?
state backend用于存儲flink管理的狀態。有多種實現方式,不同實現方式中數據結構和存儲方式不同,對快照的支持也有所區別。
- 什么是checkpoint?
checkpoint可以簡單理解為游戲中的進度存檔,在游戲中當玩家死亡或再次打開游戲時,可以從最近的游戲存檔繼續,而無需重頭開始。checkpoint即作業在某個時刻的快照信息(狀態的快照),如某個時刻數據源(消息隊列)的消費位移,算子狀態等。當發生故障恢復時,會從最新(最近)的一次checkpoint來恢復整個應用程序。
checkpoint是容錯機制。checkpoint默認關閉,開啟后會根據配置來持續自動保存。
- 什么是savepoint?
checkpoint是根據配置項自動周期性進行的“存檔,而savepoint則是需要手動觸發的“存檔”。savepoint是手動觸發的checkpoint。
snapshot、checkpoint、savepoint在一些語境中是可以互換的,表示相同的含義。
- state backend、checkpoint和savepoint的關系?
state backend表示的是狀態,而checkpoint和savepoint表示的是狀態在某個時刻的快照。state backend是狀態存儲。checkpoint和savepoint是容錯機制。
2. checkpoint
2.1. 工作原理
checkpoint是由Jobmanager中的checkpoint coordinator來協調并在TaskManager執行的。當checkpoint開始時,所有的Source將會記錄數據的偏移量,并將有編號的barrier插入到流中。barrier將流分劃分了前一個checkpoint和下一個checkpoint兩部分。當job graph中每個運算符收到其中一個barrier時,將開始記錄狀態。
當具有兩個Input的運算符,默認情況下會執行barrier alignment。一個算子可能有多個Input,每個Input中都會攜帶barrier,根據運算符是否要等待全部Input中barrier將checkpoint分成aligned和unaligned(對齊和不對齊)的checkpoint。
state backend使用copy-on-write機制允許流處理不受阻礙得繼續執行,同時對舊版本的狀態進行異步快照,當快照被持久化后,舊版本的狀態被清理。
2.2. 精確一次語義和端到端的精確一次?
- 精確一次語義
精確一次語義的含義是,每個事件都會只影響flink管理的狀態一次。并不是每個事件都只會處理一次。barrier alignment僅僅在精確一次語義提供保障,如果不需要精確一次語義,可以使用至少一次來獲取一些性能。這具有禁用barrier alignment的效果。
- 端到端的精確一次
端到端的精確一次語句是指來自Source中每個每個事件恰好影響sink一次。實現這個必須具備兩個條件,Source必須是可重放的(如kafka)并且sink必須支持事務或冪等的。
2.3. checkpoint storage
checkpoint期間狀態快照保留在哪里取決于所配置的checkpoint storage。
checkpoint storage提供了兩種實現,基于分布式文件系統和基于JobManager jvm heap
- 基于分布式文件系統,
FileSystemCheckpointStorage
- 基于Jobmanager jvm heap,
JobManagerCheckpointStorage
,默認方式。
JobManagerCheckpointStorage
使用方式
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE));
MAX_MEM_STATE_SIZE的默認值5M,當快照超過此大小時checkpoint將失敗,從而避免jobmanager OOM。無論配置的狀態最大值是多少,狀態都不能大于akka frame size(用于控制jogmanager和taskmanager之間發送消息的最大大小)。
FileSystemCheckpointStorage,配置了checkpoint路徑后將會使用此方式,在執行checkpoint期間,狀態快照將寫入配置的文件系統和目錄中的文件中。極少的元數據存儲在JobManager內存中。
使用方式
// 全局配置 state.checkpoints.dir: hdfs:///checkpoints/
// 或在代碼中為每個job配置
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");
// 或
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs:///checkpoints-data/"));
目錄結構
/user-defined-checkpoint-dir/{job-id}|+ --shared/ (這個目錄表示多個checkpoint一部分的狀態)+ --taskowned/ (該目錄表示jobmanager絕對無法丟失的狀態)+ --chk-1/ (其他目錄表示單獨屬于一個checkpoint的狀態)+ --chk-2/+ --chk-3/...
2.4. retained checkpoint
默認情況下,flink僅僅保存最近的n個checkpoint并且取消作業或作業失敗時刪除它們,checkpoint結果僅用于在作業失敗時自動使用其進行恢復作業。可以手動配置將checkpoint保留下來。這樣在作業取消或失敗時,可以手動使用保留下來的快照進行作業恢復。
使用方式
CheckpointConfig config = env.getCheckpointConfig();
config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 可選項
// NO_EXTERNALIZED_CHECKPOINTS,默認值,禁用retained checkpoint
// DELETE_ON_CANCELLATION,作業被取消時ck將被刪除。但當作業狀態為JobStatus.FAILED時ck會保留
// RETAIN_ON_CANCELLATION,作業被取消時ck將保留(保留下來的數據需要手動刪除)。但當作業狀態為JobStatus.FAILED時ck會保留
2.5. 相關配置
- checkpoint間隔
env.enableCheckpointing(1000);
- checkpoint存儲,設置存儲checkpoint快照數據的位置,默認情況下使用jobmanager heap
// 當配置路徑后,將使用FileSystemCheckpointStorage方式的checkpoint storage
env.getCheckpointConfig().setCheckpointStorage("hdfs:///my/checkpoint/dir");
- 外部化的checkpoint,即上文retained checkpoint
env.getCheckpointConfig().setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- 精確一次 至少一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- 超時時間
env.getCheckpointConfig().setCheckpointTimeout(60000);
- 兩次checkpoint的最小時間間隔,如果將該值設置為5s,則無論持續時間和間隔時間設置為何值,下一次checkpoint都將在上一個checkpoint完成后不早于5秒啟動,這個值意味間隔時間將永遠不會小于該時間,這個值還以意味著checkpoint的并發量為1。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
- 可以容忍的checkpoint連續失敗的個數,默認為0。
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
- checkpoint的并行數量,默認情況下flink將不會再一個checkpoint進行時觸發另一個checkpoint。定義了最小間隔時間后則不能使用該選項。
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
- 非對齊的checkpoint
env.getCheckpointConfig().enableUnalignedCheckpoints();
- dag中包含有界數據源時的checkpoint,從1.14版本開始支持,1.15版本默認開啟。
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env.configure(config);
2.6. checkpointing under backpressure
通常情況下,checkpoint時間由此過程中的同步和異步部分主導。但是在作業處于背壓下時,checkpoint端到端時間主要影響因數是barriers傳播到所有subtask/operators的時間。
3. savepoint
savepoint也是由checkpoint機制創建的。
savepoint由兩部分組成,包含二進制文件的目錄(通常較大)和元數據文件(相對較小)。二進制文件是狀態快照的純凈文件,元數據文件中包含了二進制文件相對路徑的指針。
通過state.savepoints.dir
來指定savepoint文件路徑。
env.setDefaultSavepointDir("hdfs:///flink/savepoints");
savepoint可以將整個文件目錄移動或復制到其他地方使用, 而checkpoint由于包含一些絕對路徑,無法使用移動或復制后文件。
3.1. savepoint和checkpoint
savepoint和checkpoint之間的不同類似于傳統數據中備份與恢復日志的不同。savepoint 代表數據庫中的備份,checkpoint 代表數據中的日志恢復。
checkpoint的主要意圖是一種異常時的容錯機制。生命周期由flink管理,無需用戶交互。checkpoint被頻繁觸發和依賴于故障恢復,因此checkpoint主要設計目標是盡可能輕量級的創建和盡可能的盡快恢復。
盡管savepoint也是使用checkpoint機制來創建的。但是和checkpoint的概念是不同的。savepoint的設計意圖是可移植性和操作靈活性,尤其是作業更改方面,用于有計劃的手動操作。如升級flink版本,更改job graph等,使用savepoint對作業進行恢復。
4. state backend
flink狀態存儲在state backend。state backend有兩種實現:基于rocksDB(本地磁盤)和基于jvm heap(內存)
- 基于rocksDB,
EmbeddedRocksDBStateBackend
,訪問和更新狀態涉及序列化和反序列化,成本更高,相對內存而言較慢,但是可以允許巨量的狀態。支持增量快照。 - 基于Java heap,
HashMapStateBackend
,默認方式,訪問和更新狀態涉及在heap上讀寫對象。
HashMapStateBackend
狀態將作為jvm heap上的對象進行存儲。由于作為對象進行存儲,因此對象重用(reuse Object)是不安全的。這種建議將 managed memory 設置為0。從而使JVM為用戶代碼分配最大內存。狀態大小受限于集群可用的內容大小。
EmbeddedRocksDBStateBackend
狀態存儲在rocksDB數據庫中,即TaskManager的本地磁盤中,數據存儲為序列化的字節數組。狀態大小受限于磁盤空間大小。對象重用是安全的。當前唯一支持增量checkpoint的實現方式。
使用方式
// state.backend 配置項,可選項hashmap (HashMapStateBackend), rocksdb (EmbeddedRocksDBStateBackend)
// 或
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
4.1. 舊版的state backend實現
從1.13版本開始,社區重新設計了state backend實現類,新實現類目的是為了幫助用戶更好的理解狀態存儲和checkpoint存儲的分離。并未影響flink的state backend和checkpoint進行運行時實現和特征。可以使用新api來遷移老版本的應用程序,且不會丟失狀態和一致性。
- 舊
MemoryStateBackend
,等價于HashMapStateBackend
andJobManagerCheckpointStorage
. - 舊
FsStateBackend
,等價于HashMapStateBackend
andFileSystemCheckpointStorage
. - 舊
RocksDBStateBackend
,等價于EmbeddedRocksDBStateBackend
andFileSystemCheckpointStorage
.
4.2. 基于rocksDB的state backend
rocksDB的方式支持增量快照,增量快照是建立在先前舊快照基礎上的。flink利用了rocksDB的內部的壓縮機制對舊快照進行合并,因此增量快照的歷史記錄不會無限增長,舊的快照結果最終會被自動合并和修剪。
增量快照需要手動開啟。開啟增量快照后,在web UI上展示checkpoint data size僅僅表示增量數據的大小。
使用方式
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// true表示開啟增量快照
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
env.setStateBackend(backend);
4.2.1. 內存管理
rocksDB state backend的性能很大程度上取決于它可用的內存量。增加內存對提高性能有很大幫助。或者調整內存的功能。
默認情況下,RocksDB State Backend 使用 Flink 的managed memory預算作為 RocksDB 的緩沖區和緩存(state.backend.rocksdb.memory.managed:true
)。
flink并不是直接將Manager memory的內存分配給rocksDB,而是將通過配置和限制來確保rocksDB的內存使用保持在Manager momery內存范圍內。
flink在單個slot中的所有rocksDB實例之間共享緩存和寫緩沖區。這種共享管理幫助限制rocksDB主要內存消耗組件的內存使用。
- block cache:用于緩存從磁盤讀取的數據塊
- index and bloom filters:用于加速數據檢索,過濾不必要的讀取
- memtables:用于暫時存儲寫入的數據,直到他們被刷新到磁盤。
flink提供了額外的配置來調整rocksDB中寫路徑和讀路徑之間的內存分配
- 寫路徑(memtable),如果發生頻繁的刷新,則表示寫緩存區內存不足,適當調整這部分內存。
state.backend.rocksdb.memory.write-buffer-ratio
,默認0.5,即50% - 讀路徑(index and bloom fliters,剩余緩存),如果發生頻繁緩存未命中,則表示讀操作的內存不足,適當調整這部分內存。
state.backend.rocksdb.memory.high-prio-pool-ratio
,默認0.1,即10%
在內存層面的調優,大多數情況下,增加Manager memory。
建議設置的配置。
state.backend.rocksdb.predefined-options
,這個參數允許用戶選擇一組預定義的 RocksDB 配置,以優化不同的使用場景。具體來說,這個參數可以幫助用戶在性能和資源使用之間進行權衡,而不需要手動調整大量的 RocksDB 配置項。
- DEFAULT: 使用 RocksDB 的默認配置。
- SPINNING_DISK_OPTIMIZED: 針對傳統硬盤(HDD)進行優化。建議設置成此項
- FLASH_SSD_OPTIMIZED: 針對固態硬盤(SSD)進行優化。
- SPINNING_DISK_OPTIMIZED_HIGH_MEM: 針對高內存環境下的 HDD 進行優化。單個slot的狀態達到GB,且托管內存充裕,設置為此最佳。
- FLASH_SSD_OPTIMIZED_HIGH_MEM: 針對高內存環境下的 SSD 進行優化。
必要的RocksDB監控,觀察是否有性能瓶頸,觀察完畢后關閉它們
state.backend.rocksdb.metrics.block-cache-capacity
,顯示了為塊緩存分配的總內存量,幫助了解塊緩存的大小是否適合當前的需求。state.backend.rocksdb.metrics.block-cache-usage
,監視塊緩存內存的使用情況,幫助了解在運行時使用了多少內存來緩存數據塊。state.backend.rocksdb.metrics.cur-size-all-mem-tables
,監視以字節為單位的active和unflush的不可變 memtables 的大致大小。state.backend.rocksdb.metrics.mem-table-flush-pending
,監控 RocksDB 中掛起的 memtable 刷新次數。state.backend.rocksdb.metrics.num-running-flushes
,監控當前正在運行的flush次數。state.backend.rocksdb.metrics.num-running-compactions
,監控當前運行的壓縮次數。
4.3. Timers
當state backend選則rocksDB時,定時器默認也存儲在rocksDB中,這是更健壯和可擴展的選則。但是這樣會有較高的成本,因此flink提供了基于JVM heap memory存儲的定時器的選項(state.backend.rocksdb.timer-service.factory
=heap)。當在無窗口、在ProcessFunction為使用定時器的場景下,將定時器存儲在Heap中將會有更優的性能。但是可能會增加checkpoint時間,并且無法自然的擴展到內存之外。
當使用基于rocksDb的state backend和基于heap存儲的定時器時,定時器不支持異步快照,其他狀態仍會異步存儲。