Flink 詳解(二):核心篇 Ⅲ
29、Flink 通過什么實現可靠的容錯機制?
Flink 使用 輕量級分布式快照,設計檢查點(checkpoint)實現可靠容錯。
30、什么是 Checkpoin 檢查點?
Checkpoint 被叫做 檢查點,是 Flink 實現容錯機制最核心的功能,是 Flink 可靠性的基石,它能夠根據配置周期性地基于 Stream 中各個 Operator 的 狀態 來生成 Snapshot 快照,從而將這些狀態數據定期持久化存儲下來,當 Flink 程序一旦意外崩潰時,重新運行程序時可以有選擇地從這些 Snapshot 進行恢復,從而修正因為故障帶來的程序數據狀態中斷。
Flink 的 Checkpoint 機制原理來自 Chandy-Lamport algorithm
算法(一種分布式快照算法)。
注意區分 State 和 Checkpoint:
1.State
- 一般指一個具體的 Task/Operator 的狀態(Operator 的狀態表示一些算子在運行的過程中會產生的一些中間結果)。
- State 數據默認保存在 Java 的堆內存中 / TaskManage 節點的內存中。
- State 可以被記錄,在失敗的情況下數據還可以恢復。
2.Checkpoint
-
表示了一個 FlinkJob 在一個特定時刻的一份全局狀態快照,即包含了所有 Task/Operator 的狀態。
-
可以理解為 Checkpoint 是把 State 數據定時持久化存儲了。
-
比如 KafkaConsumer 算子中維護的 Offset 狀態,當任務重新恢復的時候可以從 Checkpoint 中獲取。
31、什么是 Savepoint 保存點?
保存點 在 Flink 中叫作 Savepoint,是基于 Flink 檢查點機制的應用完整快照備份機制,用來保存狀態,可以在另一個集群或者另一個時間點從保存的狀態中將作業恢復回來。適用于 應用升級、集群遷移、 Flink 集群版本更新、A/B 測試 以及 假定場景、暫停和重啟、歸檔 等場景。保存點可以視為一個(算子 ID → State)的 Map,對于每一個有狀態的算子,Key 是算子 ID,Value 是算子 State。
32、什么是 CheckpointCoordinator 檢查點協調器?
Flink 中檢查點協調器叫作 CheckpointCoordinator,負責協調 Flink 算子的 State 的分布式快照。當觸發快照的時候,CheckpointCoordinator 向 Source 算子中注入 Barrier 消息 ,然后等待所有的 Task 通知檢查點確認完成,同時持有所有 Task 在確認完成消息中上報的 State 句柄。
33、Checkpoint 中保存的是什么信息?
檢查點里面到底保存著什么信息呢?我們以 Flink 消費 Kafka 數據 Wordcount 為例:
1、我們從 Kafka 讀取到一條條的日志,從日志中解析出 app_id
,然后將統計的結果放到內存中一個 Map 集合,app_id
做為 key,對應的 pv
做為 value,每次只需要將相應 app_id
的 pv
值 + 1 +1 +1 后 put 到 Map 中即可;
2、kafka topic:test;
3、Flink 運算流程如下:
kafka topic 有且只有一個分區。
假設 kafka 的 topic-test 只有一個分區,flink 的 Source task 記錄了當前消費到 kafka test topic 的所有 partition 的 offset
。
例:(0,1000)表示 0 號 partition 目前消費到 offset 為 1000 的數據。
Flink 的 pv task 記錄了當前計算的各 app 的 pv
值,為了方便講解,我這里有兩個 app:app1、app2。
例:(app1,50000)(app2,10000)
表示 app1 當前 pv 值為 50000
表示 app2 當前 pv 值為 10000
每來一條數據,只需要確定相應 app_id,將相應的 value 值 +1 后 put 到 map 中即可。
該案例中,CheckPoint 保存的其實就是 第 n 次 CheckPoint 消費的 offset
信息和各 app 的 pv
值信息,記錄一下發生 CheckPoint 當前的狀態信息,并將該狀態信息保存到相應的狀態后端。圖下代碼:(注:狀態后端是保存狀態的地方,決定狀態如何保存,如何保障狀態高可用,我們只需要知道,我們能從狀態后端拿到 offset
信息和 pv
信息即可。狀態后端必須是高可用的,否則我們的狀態后端經常出現故障,會導致無法通過 checkpoint 來恢復我們的應用程序)。
chk-100
offset:(0,1000)
pv:(app1,50000)(app2,10000)
該狀態信息表示第 100 次 CheckPoint 的時候, partition 0 offset 消費到了 1000,pv 統計。
34、當作業失敗后,檢查點如何恢復作業?
Flink 提供了 應用自動恢復機制 和 手動作業恢復機制。
1、應用自動恢復機制
Flink 設置有作業失敗重啟策略,包含三種:
-
定期恢復策略(
fixed-delay
):固定延遲重啟策略會嘗試一個給定的次數來重啟 Job,如果超過最大的重啟次數,Job 最終將失敗,在連續兩次重啟嘗試之間,重啟策略會等待一個固定時間,默認Integer.MAX_VALUE
次。 -
失敗比率策略(
failure-rate
):失敗比率重啟策略在 Job 失敗后重啟,但是超過失敗率后,Job 會最終被認定失敗,在兩個連續的重啟嘗試之間,重啟策略會等待一個固定的時間。 -
直接失敗策略(
None
):失敗不重啟。
2、手動作業恢復機制
因為 Flink 檢查點目錄分別對應的是 JobId,每通過 flink run 方式 / 頁面提交方式恢復都會重新生成 JobId,Flink 提供了在啟動之時通過設置 -s
參數指定檢查點目錄的功能,讓新的 Jobld 讀取該檢查點元文件信息和狀態信息,從而達到指定時間節點啟動作業的目的。
啟動方式如下:
/bin/flink -s /flink/checkpoints/03112312a12398740a87393/chk-50/_metadata
35、當作業失敗后,從保存點如何恢復作業?
從保存點恢復作業并不簡單,尤其是在作業變更(如修改邏輯、修復 Bug)的情況下, 需要考慮如下幾點:
- 算子的順序改變。如果對應的 UID 沒變,則可以恢復;如果對應的 UID 變了,恢復失敗。
- 作業中添加了新的算子。如果是無狀態算子,沒有影響,可以正常恢復;如果是有狀態的算子,跟無狀態的算子一樣處理。
- 從作業中刪除了一個有狀態的算子。默認需要恢復保存點中所記錄的所有算子的狀態,如果刪除了一個有狀態的算子,從保存點恢復的時候被刪除的 OperatorID 找不到,所以會報錯,可以通過在命令中添加
--allowNonReStoredSlale(short: -n)
跳過無法恢復的算子 。 - 添加和刪除無狀態的算子。如果手動設置了 UID 則可以恢復,保存點中不記錄無狀態的算子。如果是自動分配的 UID ,那么有狀態算子的可能會變(Flink 一個單調遞增的計數器生成 UID,DAG 改變,計數器極有可能會變)很有可能恢復失敗。
36、Flink 如何實現輕量級異步分布式快照?
要實現分布式快照,最關鍵的是能夠將數據流切分。Flink 中使用 屏障(Barrier
)來切分數據流。 Barrierr 會周期性地注入數據流中,作為數據流的一部分,從上游到下游被算子處理。Barrier 會嚴格保證順序,不會超過其前邊的數據。Barrier 將記錄分割成記錄集,兩個 Barrier 之間的數據流中的數據隸屬于同一個檢查點。每一個 Barrier 都攜帶一個其所屬快照的 ID 編號。Barrier 隨著數據向下流動,不會打斷數據流,因此非常輕量。 在一個數據流中,可能會存在多個隸屬于不同快照的 Barrier ,并發異步地執行分布式快照,如下圖所示:
Barrier 會在數據流源頭被注人并行數據流中。Barrier n n n 所在的位置就是恢復時數據重新處理的起始位置。 例如,在 Kafka 中,這個位置就是最后一個記錄在分區內的偏移量(offset
),作業恢復時,會根據這個位置從這個偏移量之后向 Kafka 請求數據,這個偏移量就是 State 中保存的內容之一。
Barrier 接著向下游傳遞。當一個非數據源算子從所有的輸入流中收到了快照 n n n 的 Barrier 時,該算子就會對自己的 State 保存快照,并向自己的下游 廣播發送 快照 n n n 的 Barrier。一旦 Sink 算子接收到 Barrier,有兩種情況:
- 如果是引擎內嚴格一次處理保證,當 Sink 算子已經收到了所有上游的 Barrie n n n 時, Sink 算子對自己的 State 進行快照,然后通知檢查點協調器(
CheckpointCoordinator
)。當所有的算子都向檢查點協調器匯報成功之后,檢查點協調器向所有的算子確認本次快照完成。 - 如果是端到端嚴格一次處理保證,當 Sink 算子已經收到了所有上游的 Barrie n n n 時, Sink 算子對自己的 State 進行快照,并 預提交事務(兩階段提交的第一階段),再通知檢查點協調器(
CheckpointCoordinator
),檢查點協調器向所有的算子確認本次快照完成,Sink 算子 提交事務(兩階段提交的第二階段),本次事務完成。
我們接著 33 33 33 的案例來具體說一下如何執行分布式快照:
對應到 pv
案例中就是,Source Task 接收到 JobManager 的編號為 chk-100
(從最近一次恢復)的 CheckPoint 觸發請求后,發現自己恰好接收到 Kafka offset(0,1000)
處的數據,所以會往 offset(0,1000)
數據之后,offset(0,1001)
數據之前,安插一個 Barrier,然后自己開始做快照,也就是將 offset(0,1000)
保存到狀態后端 chk-100
中。然后 Barrier 接著往下游發送,當統計 pv
的 task 接收到 Barrier 后,也會暫停處理數據,將自己內存中保存的 pv
信息 (app1,50000)
、(app2,10000)
保存到狀態后端 chk-100
中。OK,Flink 大概就是通過這個原理來保存快照的。
統計 pv
的 task 接收到 Barrier,就意味著 Barrier 之前的數據都處理了,所以說,不會出現丟數據的情況。
37、什么是 Barrier 對齊?
上圖從左至右分別表示:開始對齊,對齊,執行檢查點,繼續處理數據。
一旦 operator 從輸入流接收到 checkpoint barrier n n n,它就不能處理來自該流的任何數據記錄,直到它從其他所有輸入接收到 barrier n n n 為止。否則,它會混合屬于快照 n n n 的記錄和屬于快照 n + 1 n + 1 n+1 的記錄;
如上圖所示:
- 圖 1 1 1:算子收到數字流的 Barrier,字母流對應的 Barrier 尚未到達。
- 圖 2 2 2:算子收到數字流的 Barrier,會繼續從數字流中接收數據,但這些流只能被擱置,記錄不能被處理,而是放入緩存中,等待字母流 Barrier 到達。在字母流到達前, 1 、 2 、 3 1、2、3 1、2、3 數據已經被緩存。
- 圖 3 3 3:字母流到達,算子開始對齊 State 進行異步快照,并將 Barrier 向下游廣播,并不等待快照執行完畢。
- 圖 4 4 4:算子做異步快照,首先處理緩存中積壓數據,然后再從輸入通道中獲取數據。
38、什么是 Barrier 不對齊?
Checkpoint 是要等到所有的 Barrier 全部都到才算完成。
上述圖 2 2 2 中,當還有其他輸入流的 Barrier 還沒有到達時,會把已到達的 Barrier 之后的數據 1 、 2 、 3 1、2、3 1、2、3 擱置在緩沖區,等待其他流的 Barrier 到達后才能處理。
Barrier 不對齊:就是指當還有其他流的 Barrier 還沒到達時,為了不影響性能,也不用理會,直接處理 Barrier 之后的數據。等到所有流的 Barrier 的都到達后,就可以對該 Operator 做 Checkpoint 了。
39、為什么要進行 Barrier 對齊?不對齊到底行不行?
Exactly Once
時必須 Barrier 對齊,如果 Barrier 不對齊就變成了 At Least Once
。
Checkpoint 的目的就是為了保存快照,如果不對齊,那么在 chk-100
快照之前,已經處理了一些 chk-100
對應的 offset
之后的數據,當程序從 chk-100
恢復任務時,chk-100
對應的 offset
之后的數據還會被處理一次,所以就出現了重復消費。
41、要實現 Exactly-Once 需具備什么條件?
流系統要實現 Exactly-Once
,需要保證上游 Source 層、中間計算層和下游 Sink 層三部分同時滿足端到端嚴格一次處理,如下圖:
Source 端:數據從上游進入 Flink,必須保證消息嚴格一次消費。同時 Source 端必須滿足可重放(replay
)。否則 Flink 計算層收到消息后未計算,卻發生 failure 而重啟,消息就會丟失。
Flink 計算層:利用 Checkpoint 機制,把狀態數據定期持久化存儲下來,Flink 程序一旦發生故障的時候,可以選擇狀態點恢復,避免數據的丟失、重復。
Sink 端:Flink 將處理完的數據發送到 Sink 端時,通過 兩階段提交協議 ,即 TwoPhaseCommitSinkFunction
函數。該 SinkFunction 提取并封裝了兩階段提交協議中的公共邏輯,保證 Flink 發送 Sink 端時實現嚴格一次處理語義。同時,Sink 端必須支持事務機制,能夠進行數據回滾或者滿足冪等性。
- 回滾機制:即當作業失敗后,能夠將部分寫入的結果回滾到之前寫入的狀態。
- 冪等性:就是一個相同的操作,無論重復多少次,造成的結果和只操作一次相等。即當作業失敗后,寫入部分結果,但是當重新寫入全部結果時,不會帶來負面結果,重復寫入不會帶來錯誤結果。
42、什么是兩階段提交協議?
兩階段提交協議(Two-Phase Commit
,2PC
)是解決分布式事務問題最常用的方法,它可以保證在分布式事務中,要么所有參與進程都提交事務,要么都取消,即實現 A C I D ACID ACID 中的 A A A(原子性)。
兩階段提交協議中有兩個重要角色,協調者(Coordinator
)和 參與者(Participant
),其中協調者只有一個,起到分布式事務的協調管理作用,參與者有多個。
兩階段提交階段分為兩個階段:投票階段(Voting
)和 提交階段(Commit
)。
(1)投票階段
- 協調者向所有參與者發送
prepare
請求和事務內容,詢問是否可以準備事務提交,等待參與者的響應。 - 參與者執行事務中包含的操作,并記錄
undo
日志(用于回滾)和redo
日志(用于重放),但不真正提交。 - 參與者向協調者返回事務操作的執行結果,執行成功返回
yes
,失敗返回no
。
(2)提交階段
- 分為成功與失敗兩種情況。
- 若所有參與者都返回
yes
,說明事務可以提交:- 協調者向所有參與者發送
commit
請求。 - 參與者收到
commit
請求后,將事務真正地提交上去,并釋放占用的事務資源,并向協調者返回ack
。 - 協調者收到所有參與者的
ack
消息,事務成功完成,如下圖:
- 協調者向所有參與者發送
- 若有參與者返回
no
或者超時未返回,說明事務中斷,需要回滾:- 協調者向所有參與者發送
rollback
請求。 - 參與者收到
rollback
請求后,根據undo
日志回滾到事務執行前的狀態,釋放占用的事務資源,并向協調者返回ack
。 - 協調者收到所有參與者的
ack
消息,事務回滾完成。
- 協調者向所有參與者發送
43、Flink 如何保證 Exactly-Once 語義?
Flink 通過 兩階段提交協議 來保證 Exactly-Once
語義。
對于 Source 端:Source 端嚴格一次處理比較簡單,因為數據要進入 Flink 中,所以 Flink 只需要保存消費數據的偏移量(offset)即可。如果 Source 端為 Kafka,Flink 將 Kafka Consumer 作為 Source,可以將偏移量保存下來,如果后續任務出現了故障,恢復的時候可以由連接器重置偏移量,重新消費數據,保證一致性。
對于 Sink 端:Sink 端是最復雜的,因為數據是落地到其他系統上的,數據一旦離開 Flink 之后,Flink 就監控不到這些數據了,所以嚴格一次處理語義必須也要應用于 Flink 寫入數據的外部系統,故這些外部系統必須提供一種手段允許提交或回滾這些寫入操作,同時還要保證與 Flink Checkpoint 能夠協調使用(Kafka 0.11 0.11 0.11 版本已經實現精確一次處理語義)。
我們以 Kafka - Flink - Kafka
為例 說明如何保證 Exactly-Once
語義。
如上圖所示:Flink 作業包含以下算子。
- 一個 Source 算子,從 Kafka 中讀取數據(即
KafkaConsumer
) - 一個窗口算子,基于時間窗口化的聚合運算(即
window+window
函數) - 一個 Sink 算子,將結果寫會到 Kafka(即
KafkaProducer
)
Flink 使用 兩階段提交協議,即 預提交(Pre-commit
)階段和 提交(Commit
)階段保證端到端嚴格一次。
1、預提交階段
(1)當 Checkpoint 啟動時,進入預提交階段,JobManager 向 Source Task 注入檢查點分界線(CheckpointBarrier
),Source Task 將 CheckpointBarrier 插入數據流,向下游廣播開啟本次快照,如下圖所示:
(2)Source 端:Flink Data Source 負責保存 KafkaTopic 的 offset 偏移量,當 Checkpoint 成功時 Flink 負責提交這些寫入,否則就終止取消掉它們,當 Checkpoint 完成位移保存,它會將 Checkpoint Barrier(檢查點分界線) 傳給下一個 Operator,然后每個算子會對當前的狀態做個快照,保存到 狀態后端(State Backend)。
對于 Source 任務而言,就會把當前的 offset
作為狀態保存起來。下次從 Checkpoint 恢復時,Source 任務可以重新提交偏移量,從上次保存的位置開始重新消費數據,如下圖所示:
(3)Slink 端:從 Source 端開始,每個內部的 Transformation 任務遇到 Checkpoint Barrier(檢查點分界線)時,都會把狀態存到 Checkpoint 里。數據處理完畢到 Sink 端時,Sink 任務首先把數據寫入外部 Kafka,這些數據都屬于預提交的事務(還不能被消費),此時的 Pre-commit 預提交階段下 ,Data Sink 在保存狀態到狀態后端的同時還必須預提交它的外部事務,如下圖所示:
2、提交階段
(4)當所有算子任務的快照完成(所有創建的快照都被視為是 Checkpoint 的一部分),也就是這次的 Checkpoint 完成時,JobManager 會向所有任務發通知,確認這次 Checkpoint 完成,此時 Pre-commit 預提交階段才算完成。才正式到兩階段提交協議的第二個階段:Commit 階段。該階段中 JobManager 會為應用中每個 Operator 發起 Checkpoint 已完成的回調邏輯。
本例中的 Data Source 和窗口操作無外部狀態,因此在該階段,這兩個 Opeartor 無需執行任何邏輯,但是 Data Sink 是有外部狀態的,此時我們必須提交外部事務,當 Sink 任務收到確認通知,就會正式提交之前的事務,Kafka 中未確認的數據就改為 “已確認”,數據就真正可以被消費了,如下圖所示:
注:Flink 由 JobManager 協調各個 TaskManager 進行 Checkpoint 存儲,Checkpoint 保存在 StateBackend(狀態后端) 中,默認 StateBackend 是內存級的,也可以改為文件級的進行持久化保存。
44、數的很好,很清楚,那你對 Flink 端到端 嚴格一次 Exactly-Once 語義做個總結。