Flink狀態和容錯-基礎篇

1. 概念

flink的狀態和容錯繞不開3個概念,state backends和checkpoint、savepoint。本文重心即搞清楚這3部分內容。

容錯機制是基于在狀態快照的一種恢復方式。但是狀態和容錯要分開來看。

  • 什么是狀態,為什么需要狀態?

流計算和批計算在數據源上最大的區別是,流計算中的數據是無邊界的,數據持續不斷,而批計算中數據是有邊界的,在計算時可以一次性將數據全部拿到。在流計算中無法拿到全部數據進行計算結果,因此需要將歷史數據的處理結果記錄下來,這個就是狀態。通過狀態實現歷史數據和未來數據的結合處理。

舉個例子,在對數據求和的場景中,第n條數據的求和結果是前n-1條數據的求和結果再加上第n條數據的值。因此必須記錄前n-1條數據的累加和,才能在第n條數據到達時,得到前n條數據的累加和。
記錄的前n-1條數據的累加和就是狀態數據。

并不是全部的流計算場景都需要狀態,如單詞大小寫轉換的場景中,每條數據的處理僅和當前數據有關,因此不需要狀態。

  • 什么是state backend?

state backend用于存儲flink管理的狀態。有多種實現方式,不同實現方式中數據結構和存儲方式不同,對快照的支持也有所區別。

  • 什么是checkpoint?

checkpoint可以簡單理解為游戲中的進度存檔,在游戲中當玩家死亡或再次打開游戲時,可以從最近的游戲存檔繼續,而無需重頭開始。checkpoint即作業在某個時刻的快照信息(狀態的快照),如某個時刻數據源(消息隊列)的消費位移,算子狀態等。當發生故障恢復時,會從最新(最近)的一次checkpoint來恢復整個應用程序。

checkpoint是容錯機制。checkpoint默認關閉,開啟后會根據配置來持續自動保存。

  • 什么是savepoint?

checkpoint是根據配置項自動周期性進行的“存檔,而savepoint則是需要手動觸發的“存檔”。savepoint是手動觸發的checkpoint。

snapshot、checkpoint、savepoint在一些語境中是可以互換的,表示相同的含義。

  • state backend、checkpoint和savepoint的關系?

state backend表示的是狀態,而checkpoint和savepoint表示的是狀態在某個時刻的快照。state backend是狀態存儲。checkpoint和savepoint是容錯機制。

2. checkpoint

2.1. 工作原理

checkpoint是由Jobmanager中的checkpoint coordinator來協調并在TaskManager執行的。當checkpoint開始時,所有的Source將會記錄數據的偏移量,并將有編號的barrier插入到流中。barrier將流分劃分了前一個checkpoint和下一個checkpoint兩部分。當job graph中每個運算符收到其中一個barrier時,將開始記錄狀態。

當具有兩個Input的運算符,默認情況下會執行barrier alignment。一個算子可能有多個Input,每個Input中都會攜帶barrier,根據運算符是否要等待全部Input中barrier將checkpoint分成aligned和unaligned(對齊和不對齊)的checkpoint。

state backend使用copy-on-write機制允許流處理不受阻礙得繼續執行,同時對舊版本的狀態進行異步快照,當快照被持久化后,舊版本的狀態被清理。

2.2. 精確一次語義和端到端的精確一次?

  • 精確一次語義

精確一次語義的含義是,每個事件都會只影響flink管理的狀態一次。并不是每個事件都只會處理一次。barrier alignment僅僅在精確一次語義提供保障,如果不需要精確一次語義,可以使用至少一次來獲取一些性能。這具有禁用barrier alignment的效果。

  • 端到端的精確一次

端到端的精確一次語句是指來自Source中每個每個事件恰好影響sink一次。實現這個必須具備兩個條件,Source必須是可重放的(如kafka)并且sink必須支持事務或冪等的。

2.3. checkpoint storage

checkpoint期間狀態快照保留在哪里取決于所配置的checkpoint storage。

checkpoint storage提供了兩種實現,基于分布式文件系統和基于JobManager jvm heap

  • 基于分布式文件系統,FileSystemCheckpointStorage
  • 基于Jobmanager jvm heap,JobManagerCheckpointStorage,默認方式。

JobManagerCheckpointStorage

使用方式

env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE));

MAX_MEM_STATE_SIZE的默認值5M,當快照超過此大小時checkpoint將失敗,從而避免jobmanager OOM。無論配置的狀態最大值是多少,狀態都不能大于akka frame size(用于控制jogmanager和taskmanager之間發送消息的最大大小)。

FileSystemCheckpointStorage,配置了checkpoint路徑后將會使用此方式,在執行checkpoint期間,狀態快照將寫入配置的文件系統和目錄中的文件中。極少的元數據存儲在JobManager內存中。

使用方式

// 全局配置 state.checkpoints.dir: hdfs:///checkpoints/
// 或在代碼中為每個job配置
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");
// 或
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs:///checkpoints-data/"));

目錄結構

/user-defined-checkpoint-dir/{job-id}|+ --shared/ (這個目錄表示多個checkpoint一部分的狀態)+ --taskowned/ (該目錄表示jobmanager絕對無法丟失的狀態)+ --chk-1/ (其他目錄表示單獨屬于一個checkpoint的狀態)+ --chk-2/+ --chk-3/...  

2.4. retained checkpoint

默認情況下,flink僅僅保存最近的n個checkpoint并且取消作業或作業失敗時刪除它們,checkpoint結果僅用于在作業失敗時自動使用其進行恢復作業。可以手動配置將checkpoint保留下來。這樣在作業取消或失敗時,可以手動使用保留下來的快照進行作業恢復。

使用方式

CheckpointConfig config = env.getCheckpointConfig();
config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 可選項
// NO_EXTERNALIZED_CHECKPOINTS,默認值,禁用retained checkpoint
// DELETE_ON_CANCELLATION,作業被取消時ck將被刪除。但當作業狀態為JobStatus.FAILED時ck會保留
// RETAIN_ON_CANCELLATION,作業被取消時ck將保留(保留下來的數據需要手動刪除)。但當作業狀態為JobStatus.FAILED時ck會保留

2.5. 相關配置

  • checkpoint間隔
env.enableCheckpointing(1000);
  • checkpoint存儲,設置存儲checkpoint快照數據的位置,默認情況下使用jobmanager heap
// 當配置路徑后,將使用FileSystemCheckpointStorage方式的checkpoint storage
env.getCheckpointConfig().setCheckpointStorage("hdfs:///my/checkpoint/dir");
  • 外部化的checkpoint,即上文retained checkpoint
env.getCheckpointConfig().setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  • 精確一次 至少一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  • 超時時間
env.getCheckpointConfig().setCheckpointTimeout(60000);
  • 兩次checkpoint的最小時間間隔,如果將該值設置為5s,則無論持續時間和間隔時間設置為何值,下一次checkpoint都將在上一個checkpoint完成后不早于5秒啟動,這個值意味間隔時間將永遠不會小于該時間,這個值還以意味著checkpoint的并發量為1。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  • 可以容忍的checkpoint連續失敗的個數,默認為0。
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
  • checkpoint的并行數量,默認情況下flink將不會再一個checkpoint進行時觸發另一個checkpoint。定義了最小間隔時間后則不能使用該選項。
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  • 非對齊的checkpoint
env.getCheckpointConfig().enableUnalignedCheckpoints();
  • dag中包含有界數據源時的checkpoint,從1.14版本開始支持,1.15版本默認開啟。
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env.configure(config);

2.6. checkpointing under backpressure

通常情況下,checkpoint時間由此過程中的同步和異步部分主導。但是在作業處于背壓下時,checkpoint端到端時間主要影響因數是barriers傳播到所有subtask/operators的時間。

3. savepoint

savepoint也是由checkpoint機制創建的。

savepoint由兩部分組成,包含二進制文件的目錄(通常較大)和元數據文件(相對較小)。二進制文件是狀態快照的純凈文件,元數據文件中包含了二進制文件相對路徑的指針。

通過state.savepoints.dir來指定savepoint文件路徑。

env.setDefaultSavepointDir("hdfs:///flink/savepoints");

savepoint可以將整個文件目錄移動或復制到其他地方使用, 而checkpoint由于包含一些絕對路徑,無法使用移動或復制后文件。

3.1. savepoint和checkpoint

savepoint和checkpoint之間的不同類似于傳統數據中備份與恢復日志的不同。savepoint 代表數據庫中的備份,checkpoint 代表數據中的日志恢復。

checkpoint的主要意圖是一種異常時的容錯機制。生命周期由flink管理,無需用戶交互。checkpoint被頻繁觸發和依賴于故障恢復,因此checkpoint主要設計目標是盡可能輕量級的創建和盡可能的盡快恢復。

盡管savepoint也是使用checkpoint機制來創建的。但是和checkpoint的概念是不同的。savepoint的設計意圖是可移植性和操作靈活性,尤其是作業更改方面,用于有計劃的手動操作。如升級flink版本,更改job graph等,使用savepoint對作業進行恢復。

4. state backend

flink狀態存儲在state backend。state backend有兩種實現:基于rocksDB(本地磁盤)和基于jvm heap(內存)

  • 基于rocksDB,EmbeddedRocksDBStateBackend,訪問和更新狀態涉及序列化和反序列化,成本更高,相對內存而言較慢,但是可以允許巨量的狀態。支持增量快照。
  • 基于Java heap,HashMapStateBackend,默認方式,訪問和更新狀態涉及在heap上讀寫對象。

HashMapStateBackend

狀態將作為jvm heap上的對象進行存儲。由于作為對象進行存儲,因此對象重用(reuse Object)是不安全的。這種建議將 managed memory 設置為0。從而使JVM為用戶代碼分配最大內存。狀態大小受限于集群可用的內容大小。

EmbeddedRocksDBStateBackend

狀態存儲在rocksDB數據庫中,即TaskManager的本地磁盤中,數據存儲為序列化的字節數組。狀態大小受限于磁盤空間大小。對象重用是安全的。當前唯一支持增量checkpoint的實現方式。

使用方式

// state.backend 配置項,可選項hashmap (HashMapStateBackend), rocksdb (EmbeddedRocksDBStateBackend)
// 或
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());

4.1. 舊版的state backend實現

從1.13版本開始,社區重新設計了state backend實現類,新實現類目的是為了幫助用戶更好的理解狀態存儲和checkpoint存儲的分離。并未影響flink的state backend和checkpoint進行運行時實現和特征。可以使用新api來遷移老版本的應用程序,且不會丟失狀態和一致性。

  • MemoryStateBackend,等價于HashMapStateBackend and JobManagerCheckpointStorage.
  • FsStateBackend,等價于HashMapStateBackend and FileSystemCheckpointStorage.
  • RocksDBStateBackend,等價于EmbeddedRocksDBStateBackend and FileSystemCheckpointStorage.

4.2. 基于rocksDB的state backend

rocksDB的方式支持增量快照,增量快照是建立在先前舊快照基礎上的。flink利用了rocksDB的內部的壓縮機制對舊快照進行合并,因此增量快照的歷史記錄不會無限增長,舊的快照結果最終會被自動合并和修剪。

增量快照需要手動開啟。開啟增量快照后,在web UI上展示checkpoint data size僅僅表示增量數據的大小。

使用方式

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// true表示開啟增量快照
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
env.setStateBackend(backend);
4.2.1. 內存管理

rocksDB state backend的性能很大程度上取決于它可用的內存量。增加內存對提高性能有很大幫助。或者調整內存的功能。

默認情況下,RocksDB State Backend 使用 Flink 的managed memory預算作為 RocksDB 的緩沖區和緩存(state.backend.rocksdb.memory.managed:true)。

flink并不是直接將Manager memory的內存分配給rocksDB,而是將通過配置和限制來確保rocksDB的內存使用保持在Manager momery內存范圍內。

flink在單個slot中的所有rocksDB實例之間共享緩存和寫緩沖區。這種共享管理幫助限制rocksDB主要內存消耗組件的內存使用。

  • block cache:用于緩存從磁盤讀取的數據塊
  • index and bloom filters:用于加速數據檢索,過濾不必要的讀取
  • memtables:用于暫時存儲寫入的數據,直到他們被刷新到磁盤。

flink提供了額外的配置來調整rocksDB中寫路徑和讀路徑之間的內存分配

  • 寫路徑(memtable),如果發生頻繁的刷新,則表示寫緩存區內存不足,適當調整這部分內存。state.backend.rocksdb.memory.write-buffer-ratio,默認0.5,即50%
  • 讀路徑(index and bloom fliters,剩余緩存),如果發生頻繁緩存未命中,則表示讀操作的內存不足,適當調整這部分內存。state.backend.rocksdb.memory.high-prio-pool-ratio,默認0.1,即10%

在內存層面的調優,大多數情況下,增加Manager memory。

建議設置的配置。

state.backend.rocksdb.predefined-options,這個參數允許用戶選擇一組預定義的 RocksDB 配置,以優化不同的使用場景。具體來說,這個參數可以幫助用戶在性能和資源使用之間進行權衡,而不需要手動調整大量的 RocksDB 配置項。

  • DEFAULT: 使用 RocksDB 的默認配置。
  • SPINNING_DISK_OPTIMIZED: 針對傳統硬盤(HDD)進行優化。建議設置成此項
  • FLASH_SSD_OPTIMIZED: 針對固態硬盤(SSD)進行優化。
  • SPINNING_DISK_OPTIMIZED_HIGH_MEM: 針對高內存環境下的 HDD 進行優化。單個slot的狀態達到GB,且托管內存充裕,設置為此最佳。
  • FLASH_SSD_OPTIMIZED_HIGH_MEM: 針對高內存環境下的 SSD 進行優化。

必要的RocksDB監控,觀察是否有性能瓶頸,觀察完畢后關閉它們

  • state.backend.rocksdb.metrics.block-cache-capacity,顯示了為塊緩存分配的總內存量,幫助了解塊緩存的大小是否適合當前的需求。
  • state.backend.rocksdb.metrics.block-cache-usage,監視塊緩存內存的使用情況,幫助了解在運行時使用了多少內存來緩存數據塊。
  • state.backend.rocksdb.metrics.cur-size-all-mem-tables,監視以字節為單位的active和unflush的不可變 memtables 的大致大小。
  • state.backend.rocksdb.metrics.mem-table-flush-pending,監控 RocksDB 中掛起的 memtable 刷新次數。
  • state.backend.rocksdb.metrics.num-running-flushes,監控當前正在運行的flush次數。
  • state.backend.rocksdb.metrics.num-running-compactions,監控當前運行的壓縮次數。
4.3. Timers

當state backend選則rocksDB時,定時器默認也存儲在rocksDB中,這是更健壯和可擴展的選則。但是這樣會有較高的成本,因此flink提供了基于JVM heap memory存儲的定時器的選項(state.backend.rocksdb.timer-service.factory=heap)。當在無窗口、在ProcessFunction為使用定時器的場景下,將定時器存儲在Heap中將會有更優的性能。但是可能會增加checkpoint時間,并且無法自然的擴展到內存之外。

當使用基于rocksDb的state backend和基于heap存儲的定時器時,定時器不支持異步快照,其他狀態仍會異步存儲。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/86615.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/86615.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/86615.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【若依學習記錄】RuoYi后臺手冊——分頁實現

目錄 若依系統簡介 前端調用實現 前端調用舉例 后臺邏輯實現 若依系統簡介 RuoYi 是一個基于 Spring Boot、Apache Shiro、MyBatis 和 Thymeleaf 的后臺管理系統,旨在降低技術難度,助力開發者聚焦業務核心,從而節省人力成本、縮短項目周…

從臺式電腦硬件架構看前后端分離開發模式

在軟件開發領域,前后端分離早已成為主流架構設計理念。它將系統的業務邏輯處理與用戶界面展示解耦,提升開發效率與系統可維護性。有趣的是,我們日常生活中常見的臺式電腦硬件架構,竟與這一理念有著異曲同工之妙。今天,就讓我們從臺式電腦的硬件組成出發,深入探討其與前后…

可觀測性的哲學

在現代系統架構中,“可觀測性(Observability)”已不僅僅是一個工程實踐,是一種關于“理解世界”的哲學姿態, 還是一種幫助架構演變的認知工具。從柏拉圖的“洞穴寓言”出發,我們可以構建起一條從被動接受投影&#xff…

開疆智能CCLinkIE轉ModbusTCP網關連接傲博機器人配置案例

本案例是通過CClinkIE轉ModbusTCP網關,連接傲博機器人的配置案例 PLC配置 打開三菱PLC組態軟件GXWORK3設置CClinkIE一側的參數配置,首先設置PLC的IP地址 雙擊詳細設置進入CClinkIE配置 添加通用從站IP地址以及占用點數 設置好分配的軟元件,確…

Bash Shellshock

CVE-2014-6271(Bash Shellshock遠程命令注入漏洞) 該服務啟動后有路徑http://your-ip:port/victim.cgi和http://your-ip:port/safe.cgi。其中safe.cgi是新版頁面,victim是bash4.3生成的頁面。 漏洞位置在User-Agent中victim.cgi: User-Agent: () { foo; }; echo C…

以軟件系統開發為例,解釋PMO 與IPD、CMMI、項目管理什么區別和聯系

以「開發一套智能倉儲管理系統(WMS)」為例,拆解軟件項目經理視角下的IPD、CMMI、項目管理和PMO如何協同運作: 場景設定 項目目標:6個月內交付WMS系統,支持日均10萬訂單處理關鍵角色: 你&#x…

TDengine 3.3.5.0 新功能 —— 查看庫文件占用空間、壓縮率

1. 背景 TDengine 之前版本一直沒有通過 SQL 命令查看數據庫占用的磁盤空間大小,從 3.3.5.0 開始,增加了這個方便且實用的小功能,這里詳細介紹下。 2. SQL 基本語法 select expr from information_schema.ins_disk_usage [where condtion]…

螞蟻百寶箱體驗:如何快速創建“旅游小助手”AI智能體

螞蟻百寶箱作為站式智能體應用開發平臺,致力于為AI開發者提供簡單、高效、快捷的智能體創作體驗。作為業內領先的AI應用開發平臺,開發者可以根據自身的個性化需求,基于各式各樣的大模型來創建一個屬于自己的智能體應用。 螞蟻百寶箱&#xf…

AI助力JMeter—從靜態參數化到智能動態化的進化之路

Apache JMeter作為開源利器被廣泛應用于Web系統、API接口、數據庫及消息隊列等多場景性能驗證。而“變量的使用”作為測試腳本靈活性和可維護性的核心手段,決定了腳本的復用性、可擴展性和數據驅動能力。傳統的變量管理手段已難以應對大規模復雜測試任務中“動態化、…

第十六屆藍橋杯C/C++程序設計研究生組國賽 國二

應該是最后一次參加藍橋杯比賽了,很遺憾,還是沒有拿到國一。 大二第一次參加藍橋杯,印象最深刻的是居然不知道1s是1000ms,花了很多時間在這題,后面節奏都亂了,抗壓能力也不行,身體也不適。最后…

OpenCV計算機視覺實戰(12)——圖像金字塔與特征縮放

OpenCV計算機視覺實戰(12)——圖像金字塔與特征縮放 0. 前言1. 高斯金字塔1.1 應用場景1.2 實現過程 2. 拉普拉斯金字塔2.1 應用場景2.2 實現過程 3. 圖像融合實例3.1 應用場景3.2 實現過程 小結系列鏈接 0. 前言 圖像金字塔技術通過對原始圖像按不同分…

【案例】基于Python的生源數據可視化分析:從Excel處理到動態地圖展示

文章目錄 需求分析技術要點程序流程一些細節核心代碼表格的一些操作 心得體會代碼匯總 需求分析 請設計一個程序,要求能夠統計分析分散在不同表格中的數萬條信息,以信息中的身份證號碼或生源地代碼字段為目標字段,統計每一年全國各省份及本省…

設計模式 | 原型模式

原型模式通過克隆機制實現對象高效創建,是性能敏感場景的利器。本文結合C示例詳解實現原理、深拷貝陷阱、應用場景,并與工廠模式對比分析。 為何需要原型模式? 當遇到以下場景時,傳統構造方法面臨挑戰: 創建成本高&am…

Go 語言中的單元測試

1、如何編寫單元測試 在任何生產級別的項目開發中,單元測試都扮演著至關重要的角色。盡管許多初創項目在早期可能忽略了它,但隨著項目逐漸成熟并成為核心業務,為其編寫健壯的單元測試是保障代碼質量和項目穩定性的必然選擇。本文將帶您快速掌…

8. 接口專業測試報告生成pytest-html

pytest-html 終極指南:打造專業級接口測試報告 在接口自動化測試中,清晰的測試報告是質量保障的關鍵。本文將深入解析如何通過pytest-html插件生成專業級測試報告。 一、核心安裝與基礎使用 快速安裝(國內鏡像) pip install -i …

Day45 Tensorboard使用介紹

目錄 一、tensorboard的發展歷史和原理及基本操作 1.1 發展歷史 1.2 tensorboard的原理 1.3 日志目錄自動管理 1.4 記錄標量數據(Scalar) 1.5 可視化模型結構(Graph) 1.6 可視化圖像(Image) 1.7 記…

用AI給AR加“智慧”:揭秘增強現實智能互動的優化秘密

用AI給AR加“智慧”:揭秘增強現實智能互動的優化秘密 引子:增強現實,到底還能怎么更聰明? 還記得當年Pokmon GO火爆全球的場景嗎?玩家們手機對準街頭,虛擬小精靈活靈活現地跳出來,那就是增強現實(AR)最經典的應用之一。隨著硬件發展和算法進步,AR正逐步從“炫酷玩具…

1 Studying《Computer Vision: Algorithms and Applications 2nd Edition》1-5

目錄 Chapter 1 Introduction 1.1 什么是計算機視覺? 1.2 簡史 1.3 書籍概述 1.4 樣本教學大綱 1.5 符號說明 1.6 其他閱讀材料 Chapter 2 Image formation 2.1 幾何基本元素和變換 2.2 光度圖像形成 2.3 數碼相機 2.4 其他閱讀材料 2.5 練習 Chapter…

Augment插件macOS

macOS蘋果電腦vscode-augment免費額度續杯跑滿 前言 在AI輔助編程日益普及的今天,Augment作為VS Code中的智能代碼助手,為開發者提供了強大的代碼生成和優化功能。然而,免費版本每月300次的使用限制往往讓重度用戶感到困擾。本文將詳細介紹如…

OpenCV CUDA模塊設備層-----創建一個“常量指針訪問器” 的工具函數constantPtr()

操作系統:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 編程語言:C11 算法描述 在 CUDA 設備端模擬一個“指向常量值”的虛擬指針訪問器,使得你可以像訪問數組一樣訪問一個固定值。 這在某些核函數中非常有用&…