Flink是一個有狀態的流,👅一起深入了解這個有狀態的流
3?? 目標
- 掌握State知識
- 掌握Flink三種State Backend
- 掌握Flink checkpoint和savepoint原理
- 了解Flink的重啟策略
- checkpoint+two phase commit保證E-O語義
4?? 要點
📖 1. Flink的State
1.1 state概述
Apache Flink? — Stateful Computations over Data Streams
Flink 是一個默認就有狀態的分析引擎,前面的WordCount 案例可以做到單詞的數量的累加,其實是因為在內存中保證了每個單詞的出現的次數,這些數據其實就是狀態數據。但是如果一個 Task 在處理過程中掛掉了,那么它在內存中的狀態都會丟失,所有的數據都需要重新計算。從容錯和消息處理的語義(At -least-once 和 Exactly-once)上來說,Flink引入了State 和 CheckPoint。State一般指一個具體的 Task/Operator 的狀態,State數據默認保存在 Java 的堆內存中。
-
回顧單詞計數的例子
package com.kaikeba.demo1import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, WindowedStream} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow/*** 使用滑動窗口* 每隔1秒鐘統計最近2秒鐘的每個單詞出現的次數*/ object FlinkStream {def main(args: Array[String]): Unit = {//構建流處理的環境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//從socket獲取數據 val sourceStream: DataStream[String] = env.socketTextStream("node01",9999)//導入隱式轉換的包import org.apache.flink.api.scala._//對數據進行處理val result: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")) //按照空格切分.map(x => (x, 1)) //每個單詞計為1.keyBy(0) //按照下標為0的單詞進行分組.sum(1) //按照下標為1累加相同單詞出現的1//對數據進行打印result.print()//開啟任務env.execute("FlinkStream") }}
-
輸入
hadoop hadoophadoophive hadoop
-
輸出
8> (hadoop,1)1> (hive,1)8> (hadoop,2)8> (hadoop,3)8> (hadoop,4)
1.2 state類型
- Flink中有兩種基本類型的State, ,他們兩種都可以以兩種形式存在:
- 原生狀態(raw state)
- 由算子自己管理數據結構,當觸發Checkpoint操作過程中,Flink并不知道狀態數據內部的數據結構,只是將數據轉換成bytes數據存儲在Checkpoints中,當從Checkpoints恢復任務時,算子自己再反序列化出狀態的數據結構。
- 托管狀態(managed state)
- 由Flink Runtime控制和管理狀態數據,并將狀態數據轉換成為內存的Hash tables或 RocksDB的對象存儲,然后將這些數據通過內部的接口持久化到checkpoints中,任務異常時可以通過這些狀態數據恢復任務。
- 推薦使用ManagedState管理狀態數據,ManagedState更好的支持狀態數據的重平衡以及更加完善的內存管理
- 原生狀態(raw state)
Managed State | Raw State | |
---|---|---|
狀態管理方式 | Flink Runtime托管,自動存儲、自動恢復、自動伸縮 | 用戶自己管理 |
狀態數據結構 | Flink提供的常用數據結構,如ListState、MapState等 | 字節數組:byte[] |
使用場景 | 絕大多數Flink算子 | 用戶自定義算子 |
1.2.1 Operator State(算子狀態)
- operator state是task級別的state,說白了就是每個task對應一個state。
- Kafka Connector source中的每個分區(task)都需要記錄消費的topic的partition和offset等信息。
- 對于Operator State,我們還需進一步實現
CheckpointedFunction
接口。 - Operator State的實際應用場景不如Keyed State多,它經常被用在Source或Sink等算子上,用來保存流入數據的偏移量或對輸出數據做緩存,以保證Flink應用的Exactly-Once語義。
1.2.2 keyed State(鍵控狀態)
-
Keyed State:
- 顧名思義就是基于KeyedStream上的狀態,這個狀態是跟特定的Key 綁定的。KeyedStream流上的每一個Key,都對應一個State。Flink針對 Keyed State 提供了以下可以保存State的數據結構.
-
Keyed state托管狀態有六種類型:
-
1、ValueState
保存一個可以更新和檢索的值(如上所述,每個值都對應到當前的輸入數據的key,因此算子接收到的每個key都可能對應一個值)。 這個值可以通過update(T) 進行更新,通過 T value() 進行檢索
-
2、ListState
保存一個元素的列表。可以往這個列表中追加數據,并在當前的列表上進行檢索。可以通過 add(T) 或者 addAll(List<T>) 進行添加元素,通過 Iterable<T> get() 獲得整個列表。還可以通過 update(List<T>) 覆蓋當前的列表 。
-
3、MapState
維護了一個映射列表。 你可以添加鍵值對到狀態中,也可以獲得 反映當前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 檢索特定 key。 使用 entries(),keys() 和 values() 分別檢索映射、 鍵和值的可迭代視圖。
-
4、ReducingState
保存一個單值,表示添加到狀態的所有值的聚合。接口與ListState類似,但使用add(T) 增加元素,會使用提供的 ReduceFunction 進行聚合。
-
5、AggregatingState
AggregatingState<IN, OUT>: 保留一個單值,表示添加到狀態的所有值的聚合。和 ReducingState 相反的是, 聚合類型可能與添加到狀態的元素的類型不同。 接口與 ListState類似,但使用 add(IN) 添加的元素會用指定的 AggregateFunction 進行聚合
-
-
keyedState使用方法
- 1、只能用于
RichFunction
- 2、將
State
聲明為實例變量 - 3、在
open()
方法中為State賦值- 創建一個
StateDescriptor
- 利用
getRuntimeContext().getXXState(...)
構建不同的State
- 創建一個
- 4、調用State的方法進行
讀寫
- 例如 state.value()、state.update(…)等等
- 1、只能用于
1.3 Keyed State案例演示
1.3.1 ValueState
-
作用
- 保存一個可以更新和檢索的值
-
需求
- 使用valueState實現平均值求取
-
代碼開發
package com.kaikeba.keystateimport org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector/*** 使用valueState實現平均值求取*/ object ValueStateOperate {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.fromCollection(List((1L, 3d),(1L, 5d),(1L, 7d),(1L, 4d),(1L, 2d))).keyBy(_._1).flatMap(new CountAverageWithValue()).print()env.execute()} }class CountAverageWithValue extends RichFlatMapFunction[(Long, Double), (Long, Double)] {//定義ValueState類型的變量private var sum: ValueState[(Long, Double)] = _override def open(parameters: Configuration): Unit = {//初始化獲取歷史狀態的值sum = getRuntimeContext.getState(new ValueStateDescriptor[(Long, Double)]("average", classOf[(Long, Double)])) }override def flatMap(input: (Long, Double), out: Collector[(Long, Double)]): Unit = {// access the state valueval tmpCurrentSum = sum.value// If it hasn't been used before, it will be nullval currentSum = if (tmpCurrentSum != null) {tmpCurrentSum} else {(0L, 0d)}// update the countval newSum = (currentSum._1 + 1, currentSum._2 + input._2)// update the statesum.update(newSum)// if the count reaches 2, emit the average and clear the stateif (newSum._1 >= 2) {out.collect((input._1, newSum._2 / newSum._1))//將狀態清除//sum.clear()}}}
1.3.2 ListState
-
作用
- 用于保存每個key的歷史數據數據成為一個列表
-
需求
- 使用ListState求取數據平均值
-
代碼開發
package com.kaikeba.keystateimport java.langimport java.util.Collectionsimport org.apache.flink.api.common.functions.RichFlatMapFunctionimport org.apache.flink.api.common.state.{ListState, ListStateDescriptor}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collector/*** 使用ListState實現平均值求取* ListState<T> :這個狀態為每一個 key 保存集合的值* get() 獲取狀態值* add() / addAll() 更新狀態值,將數據放到狀態中* clear() 清除狀態*/object ListStateOperate {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.fromCollection(List((1L, 3d),(1L, 5d),(1L, 7d),(2L, 4d),(2L, 2d),(2L, 6d))).keyBy(_._1).flatMap(new CountAverageWithList).print()env.execute()}}class CountAverageWithList extends RichFlatMapFunction[(Long,Double),(Long,Double)]{//定義我們歷史所有的數據獲取private var elementsByKey: ListState[(Long,Double)] = _override def open(parameters: Configuration): Unit = {//初始化獲取歷史狀態的值,每個key對應的所有歷史值,都存儲在list集合里面了val listState = new ListStateDescriptor[(Long,Double)]("listState",classOf[(Long,Double)])elementsByKey = getRuntimeContext.getListState(listState)}override def flatMap(element: (Long, Double), out: Collector[(Long, Double)]): Unit = {//獲取當前key的狀態值val currentState: lang.Iterable[(Long, Double)] = elementsByKey.get()//如果初始狀態為空,那么就進行初始化,構造一個空的集合出來,準備用于存儲后續的數據if(currentState == null){elementsByKey.addAll(Collections.emptyList())}//添加元素elementsByKey.add(element)import scala.collection.JavaConverters._val allElements: Iterator[(Long, Double)] = elementsByKey.get().iterator().asScalaval allElementList: List[(Long, Double)] = allElements.toListif(allElementList.size >= 3){var count = 0Lvar sum = 0dfor(eachElement <- allElementList){count +=1sum += eachElement._2}out.collect((element._1,sum/count))}}}
1.3.3 MapState
-
作用
- 用于將每個key對應的數據都保存成一個map集合
-
需求
- 使用MapState求取每個key對應的平均值
-
代碼開發
package com.kaikeba.keystateimport java.util.UUIDimport org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector import org.apache.flink.api.scala._/*** 使用MapState求取每個key對應的平均值*/ object MapStateOperate {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.fromCollection(List((1L, 3d),(1L, 5d),(1L, 7d),(2L, 4d),(2L, 2d),(2L, 6d))).keyBy(_._1).flatMap(new CountAverageMapState).print()env.execute()} }class CountAverageMapState extends RichFlatMapFunction[(Long,Double),(Long,Double)]{private var mapState:MapState[String,Double] = _//初始化獲取mapState對象override def open(parameters: Configuration): Unit = {val mapStateOperate = new MapStateDescriptor[String,Double]("mapStateOperate",classOf[String],classOf[Double])mapState = getRuntimeContext.getMapState(mapStateOperate)}override def flatMap(input: (Long, Double), out: Collector[(Long, Double)]): Unit = {//將相同的key對應的數據放到一個map集合當中去,就是這種對應 key -> Map((key1, value1),(key2, value2)) //每次都構建一個map集合mapState.put(UUID.randomUUID().toString,input._2)import scala.collection.JavaConverters._//獲取map集合當中所有的value,我們每次將數據的value給放到map的value里面去val listState: List[Double] = mapState.values().iterator().asScala.toListif(listState.size >=3){var count = 0Lvar sum = 0dfor(eachState <- listState){count +=1sum += eachState}out.collect(input._1,sum/count)}} }
1.3.4 ReducingState
-
作用
- 用于數據的聚合
-
需求
- 使用ReducingState求取每個key對應的平均值
-
代碼開發
package com.kaikeba.keystateimport org.apache.flink.api.common.functions.{ReduceFunction, RichFlatMapFunction} import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector/*** ReducingState<T> :這個狀態為每一個 key 保存一個聚合之后的值* get() 獲取狀態值* add() 更新狀態值,將數據放到狀態中* clear() 清除狀態*/object ReduceingStateOperate {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.fromCollection(List((1L, 3d),(1L, 5d),(1L, 7d),(2L, 4d),(2L, 2d),(2L, 6d))).keyBy(_._1).flatMap(new CountAverageReduceStage).print()env.execute()} }class CountAverageReduceStage extends RichFlatMapFunction[(Long,Double),(Long,Double)]{//定義ReducingStateprivate var reducingState:ReducingState[Double] = _//定義一個計數器var counter=0Loverride def open(parameters: Configuration): Unit = {val reduceSum = new ReducingStateDescriptor[Double]("reduceSum", new ReduceFunction[ Double] {override def reduce(value1: Double, value2: Double): Double = {value1+ value2}}, classOf[Double])//初始化獲取reducingState對象reducingState = getRuntimeContext.getReducingState[Double](reduceSum)} override def flatMap(input: (Long, Double), out: Collector[(Long, Double)]): Unit = {//計數器+1counter+=1//添加數據到reducingStatereducingState.add(input._2)out.collect(input._1,reducingState.get()/counter)} }
1.3.5 AggregatingState
-
作用
- 將相同key的數據進行聚合
-
需求
- 將相同key的數據聚合成為一個字符串
-
代碼開發
package com.kaikeba.keystateimport org.apache.flink.api.common.functions.{AggregateFunction, RichFlatMapFunction}import org.apache.flink.api.common.state.{AggregatingState, AggregatingStateDescriptor}import org.apache.flink.configuration.Configurationimport org.apache.flink.runtime.state.memory.MemoryStateBackendimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collectorimport org.apache.flink.api.scala._/*** 將相同key的數據聚合成為一個字符串*/object AggregrageStateOperate {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.fromCollection(List((1L, 3d),(1L, 5d),(1L, 7d),(2L, 4d),(2L, 2d),(2L, 6d))).keyBy(_._1).flatMap(new AggregrageState).print()env.execute()}}/*** (1L, 3d),(1L, 5d),(1L, 7d), 把相同key的value拼接字符串:Contains-3-5-7*/class AggregrageState extends RichFlatMapFunction[(Long,Double),(Long,String)]{//定義AggregatingStateprivate var aggregateTotal:AggregatingState[Double, String] = _override def open(parameters: Configuration): Unit = {/*** name: String,* aggFunction: AggregateFunction[IN, ACC, OUT],* stateType: Class[ACC]*/val aggregateStateDescriptor = new AggregatingStateDescriptor[Double, String, String]("aggregateState", new AggregateFunction[Double, String, String] {//創建一個初始值override def createAccumulator(): String = {"Contains"}//對數據進行累加override def add(value: Double, accumulator: String): String = {accumulator + "-" + value}//獲取累加的結果override def getResult(accumulator: String): String = {accumulator}//數據合并的規則override def merge(a: String, b: String): String = {a + "-" + b}}, classOf[String])//獲取AggregatingState對象aggregateTotal = getRuntimeContext.getAggregatingState(aggregateStateDescriptor)}override def flatMap(input: (Long, Double), out: Collector[(Long, String)]): Unit = {aggregateTotal.add(input._2)out.collect(input._1,aggregateTotal.get())}}
1.4 Operator State案例演示
- 需求
-
實現每兩條數據進行輸出打印一次,不用區分數據的key
-
這里使用ListState實現
package com.kaikeba.operatorstateimport org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import scala.collection.mutable.ListBuffer/*** 實現每兩條數據進行輸出打印一次,不用區分數據的key*/ object OperatorListState {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val sourceStream: DataStream[(String, Int)] = env.fromCollection(List(("spark", 3),("hadoop", 5),("hive", 7),("flume", 9)))sourceStream.addSink(new OperateTaskState).setParallelism(1)env.execute()}}class OperateTaskState extends SinkFunction[(String,Int)]{//定義一個list 用于我們每兩條數據打印一下private var listBuffer:ListBuffer[(String,Int)] = new ListBuffer[(String, Int)]override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {listBuffer.+=(value)if(listBuffer.size ==2){println(listBuffer)//清空state狀態listBuffer.clear()}}}
-
📖 2. Flink的狀態管理之State Backend
- 默認情況下,state會保存在taskmanager的內存中,checkpoint會存儲在JobManager的內存中。state 的存儲和checkpoint的位置取決于State Backend的配置。
- Flink一共提供了3種StateBackend
- MemoryStateBackend
- 基于內存存儲
- FsStateBackend
- 基于文件系統存儲
- RocksDBStateBackend
- 基于數據庫存儲
- MemoryStateBackend
- 可以通過 ==StreamExecutionEnvironment.setStateBackend(…)==來設置state存儲的位置
2.1 MemoryStateBackend
將數據持久化狀態存儲到內存當中,state數據保存在java堆內存中,執行checkpoint的時候,會把state的快照數據保存到jobmanager的內存中。基于內存的state backend在生產環境下不建議使用。
- 代碼配置:
environment.setStateBackend(new MemoryStateBackend())
- 使用場景:
(1)本地調試
(2)flink任務狀態數據量較小的場景
2.2 FsStateBackend
state數據保存在taskmanager的內存中,執行checkpoint的時候,會把state的快照數據保存到配置的文件系統中。可以使用hdfs等分布式文件系統.FsStateBackend 適合場景:狀態數據特別的多,還有長時間的window算子等,它很安全,因為基于hdfs,所以數據有備份很安全。
- 代碼配置:
environment.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink/checkDir"))
- 適用場景:
(1)大狀態、長窗口、大key/value狀態的的任務
(2)全高可用配置
2.3 RocksDBStateBackend (生產中推薦)
RocksDB介紹:RocksDB使用一套日志結構的數據庫引擎,它是Flink中內置的第三方狀態管理器,為了更好的性能,這套引擎是用C++編寫的。 Key和value是任意大小的字節流。RocksDB跟上面的都略有不同,它會在本地文件系統中維護狀態,state會直接寫入本地rocksdb中。同時它需要配置一個遠端的filesystem uri(一般是HDFS),在做checkpoint的時候,會把本地的數據直接復制到fileSystem中。fail over的時候從fileSystem中恢復到本地RocksDB克服了state受內存限制的缺點,同時又能夠持久化到遠端文件系統中,比較適合在生產中使用.
- 代碼配置:導入jar包然后配置代碼
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.9.2</version>
</dependency>
- 配置代碼
environment.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink/checkDir",true))
- 使用場景
(1)大狀態、長窗口、大key/value狀態的的任務
(2)全高可用配置由于RocksDBStateBackend將工作狀態存儲在taskManger的本地文件系統,狀態數量僅僅受限于本地磁盤容量限制,對比于FsStateBackend保存工作狀態在內存中,RocksDBStateBackend能避免flink任務持續運行可能導致的狀態數量暴增而內存不足的情況,因此適合在生產環境使用。
2.4 修改state-backend的兩種方式
- 第一種:單任務調整
- 修改當前任務代碼
env.setStateBackend(
new FsStateBackend("hdfs://node01:8020/flink/checkDir"))
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】
-
第二種:全局調整
- 修改flink-conf.yaml
state.backend: filesystem state.checkpoints.dir: hdfs://node01:8020/flink/checkDir
- 注意:state.backend的值可以是下面幾種
(1) jobmanager 表示使用 MemoryStateBackend (2) filesystem 表示使用 FsStateBackend (3) rocksdb 表示使用 RocksDBStateBackend
📖 3. Flink的checkPoint保存數據實現容錯
3.1 checkPoint的基本概念
為了保證state的容錯性,Flink需要對state進行checkpoint。Checkpoint是Flink實現容錯機制最核心的功能,它能夠根據配置周期性地基于Stream中各個Operator/task的狀態來生成快照,從而將這些狀態數據定期持久化存儲下來,當Flink程序一旦意外崩潰時,重新運行程序時可以有選擇地從這些快照進行恢復,從而修正因為故障帶來的程序數據異常。
3.2 checkPoint的前提
- Flink的checkpoint機制可以與(stream和state)的持久化存儲交互的前提
-
1、持久化的source,它需要支持在一定時間內重放事件。這種sources的典型例子是持久化的消息隊列(比如Apache Kafka,RabbitMQ等)或文件系統(比如HDFS,S3,GFS等)
-
2、用于state的持久化存儲,例如分布式文件系統(比如HDFS,S3,GFS等)
-
3.3 Flink進行checkpoint步驟
- (1)暫停新數據的輸入
- (2)等待流中on-the-fly的數據被處理干凈,此時得到flink graph的一個snapshot
- (3)將所有Task中的State拷貝到State Backend中,如HDFS。此動作由各個Task Manager完成
- (4)各個Task Manager將Task State的位置上報給Job Manager,完成checkpoint
- (5)恢復數據的輸入
如上所述,這里才需要“暫停輸入 + 排干on-the-fly 數據”的操作,這樣才能拿到同一時刻下所有subtask的state
3.4 配置checkPoint
-
默認checkpoint功能是disabled的,想要使用的時候需要先啟用
-
checkpoint開啟之后,默認的checkPointMode是Exactly-once
-
checkpoint的checkPointMode有兩種
- Exactly-once: 數據處理且只被處理一次
- At-least-once:數據至少被處理一次
Exactly-once對于大多數應用來說是最合適的。At-least-once可能用在某些延遲超低的應用程序(始終延遲為幾毫秒)
//默認checkpoint功能是disabled的,想要使用的時候需要先啟用
// 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】
environment.enableCheckpointing(1000);
// 高級選項:
// 設置模式為exactly-once (這是默認值)
environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
environment.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
// 檢查點必須在一分鐘內完成,或者被丟棄【checkpoint的超時時間】
environment.getCheckpointConfig.setCheckpointTimeout(60000);
// 同一時間只允許進行一個檢查點
environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1);
// 表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備注】/*** ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint* ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后,會刪除Checkpoint數據,只有job執行失敗的時候才會保存checkpoint*/
environment.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
??3.5 重啟策略概述
- Flink支持不同的重啟策略,以在故障發生時控制作業如何重啟,集群在啟動時會伴隨一個默認的重啟策略,在沒有定義具體重啟策略時會使用該默認策略。
- 如果在工作提交時指定了一個重啟策略,該策略會覆蓋集群的默認策略,默認的重啟策略可以通過 Flink 的配置文件 flink-conf.yaml 指定。配置參數 restart-strategy 定義了哪個策略被使用。
- 常用的重啟策略
- (1)固定間隔 (Fixed delay)
- (2)失敗率 (Failure rate)
- (3)無重啟 (No restart)
- 如果沒有啟用 checkpointing,則使用無重啟 (no restart) 策略。
- 如果啟用了 checkpointing,重啟策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在應用代碼中動態指定,會覆蓋全局配置
- 但沒有配置重啟策略,則使用固定間隔 (fixed-delay) 策略, 嘗試重啟次數默認值是:Integer.MAX_VALUE,。
3.6 重啟策略配置實現
- 固定間隔 (Fixed delay)
第一種:全局配置 flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s第二種:應用代碼設置//重啟次數、重啟時間間隔
environment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000))
- 失敗率 (Failure rate)
第一種:全局配置 flink-conf.yaml
//5分鐘內若失敗了3次則認為該job失敗,重試間隔為10s
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s第二種:應用代碼設置
environment.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(100), org.apache.flink.api.common.time.Time.seconds(10)))
- 無重啟 (No restart)
第一種:全局配置 flink-conf.yaml
restart-strategy: none第二種:應用代碼設置
environment.setRestartStrategy(RestartStrategies.noRestart())
??📖 4. 從checkPoint恢復數據以及checkPoint保存多個歷史版本
4.1 保存多個歷史版本
-
默認情況下,如果設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,可以從最近的這個Checkpoint來進行恢復。
-
如果我們希望保留多個Checkpoint,并能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,我們發現最近4個小時數據記錄處理有問題,希望將整個狀態還原到4小時之前
-
Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數。
state.checkpoints.num-retained: 20
- 這樣設置以后就查看對應的Checkpoint在HDFS上存儲的文件目錄
hdfs dfs -ls hdfs://node01:8020/flink/checkpoints
- 如果希望回退到某個Checkpoint點,只需要指定對應的某個Checkpoint路徑即可實現
4.2 恢復歷史某個版本數據
- 如果Flink程序異常失敗,或者最近一段時間內數據處理錯誤,我們可以將程序從某一個Checkpoint點進行恢復
flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -s hdfs://node01:8020/fsStateBackend/971ae7ac4d5f20e704747ea7c549b356/chk-50/_metadata -c com.kaikeba.checkpoint.TestCheckPoint original-flink_study-1.0-SNAPSHOT.jar
- 程序正常運行后,還會按照Checkpoint配置進行運行,繼續生成Checkpoint數據
📖 5. Flink的savePoint保存數據
5.1 savePoint的介紹
-
savePoint是檢查點一種特殊實現,底層其實也是使用Checkpoints的機制。
-
savePoint是用戶以手工命令的方式觸發checkpoint,并將結果持久化到指定的存儲目錄中
-
作用
- 1、應用程序代碼升級
- 通過觸發保存點并從該保存點處運行新版本,下游的應用程序并不會察覺到不同
- 2、Flink版本更新
- Flink 自身的更新也變得簡單,因為可以針對正在運行的任務觸發保存點,并從保存點處用新版本的 Flink 重啟任務。
- 3、維護和遷移
- 使用保存點,可以輕松地“暫停和恢復”應用程序
- 1、應用程序代碼升級
5.2 savePoint的使用
- 1:在flink-conf.yaml中配置Savepoint存儲位置
不是必須設置,但是設置后,后面創建指定Job的Savepoint時,可以不用在手動執行命令時指定Savepoint的位置
state.savepoints.dir: hdfs://node01:8020/flink/savepoints
-
2:觸發一個savepoint
-
(1)手動觸發savepoint
#【針對on standAlone模式】 bin/flink savepoint jobId [targetDirectory] #【針對on yarn模式需要指定-yid參數】 bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]#jobId 需要觸發savepoint的jobId編號 #targetDirectory 指定savepoint存儲數據目錄 #-yid 指定yarnAppId ##例如: flink savepoint 8d1bb7f88a486815f9b9cf97c304885b -yid application_1594807273214_0004
-
(2)取消任務并手動觸發savepoint
##【針對on standAlone模式】 bin/flink cancel -s [targetDirectory] jobId ##【針對on yarn模式需要指定-yid參數】 bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]##例如: flink cancel 8d1bb7f88a486815f9b9cf97c304885b -yid application_1594807273214_0004
-
-
3:從指定的savepoint啟動job
bin/flink run -s savepointPath [runArgs]##例如: flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -s hdfs://node01:8020/flink/savepoints/savepoint-8d1bb7-c9187993ca94 -c com.kaikeba.checkpoint.TestCheckPoint original-flink_study-1.0-SNAPSHOT.jar
-
4、清除savepoint數據
bin/flink savepoint -d savepointPath
📖 6. Flink流式處理集成kafka
-
對于實時處理當中,我們實際工作當中的數據源一般都是使用kafka,所以我們一起來看看如何通過Flink來集成kafka
-
Flink提供了一個特有的kafka connector去讀寫kafka topic的數據。flink消費kafka數據,并不是完全通過跟蹤kafka消費組的offset來實現去保證exactly-once的語義,而是flink內部去跟蹤offset和做checkpoint去實現exactly-once的語義,而且對于kafka的partition,Flink會啟動對應的并行度去處理kafka當中的每個分區的數據
-
Flink整合kafka官網介紹
- https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html
6.1 導入pom依賴
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.9.2</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.9.2</version>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.1.0</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version>
</dependency>
6.2 將kafka作為flink的source來使用
- 實際工作當中一般都是將kafka作為flink的source來使用
6.2.1 創建kafka的topic
- 安裝好kafka集群,并啟動kafka集群,然后在node01執行以下命令創建kafka的topic為test
kafka-topics.sh --create --partitions 3 --topic test --replication-factor 1 --zookeeper node01:2181,node02:2181,node03:2181
6.2.2 代碼實現:
package com.kaikeba.kafkaimport java.util.Propertiesimport org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema/*** 將kafka作為flink的source來使用*/
object FlinkKafkaSource {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//**隱式轉換import org.apache.flink.api.scala._//checkpoint**配置env.enableCheckpointing(100)env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)env.getCheckpointConfig.setCheckpointTimeout(60000)env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)//設置statebackendenv.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka_sink/checkpoints",true));val topic = "test"val prop = new Properties()prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")prop.setProperty("group.id","con1")prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");val kafkaConsumer = new FlinkKafkaConsumer[String]("test",new SimpleStringSchema,prop)kafkaConsumer.setCommitOffsetsOnCheckpoints(true)val kafkaSource: DataStream[String] = env.addSource(kafkaConsumer)kafkaSource.print()env.execute()}
}
6.2.3 kafka生產數據
- node01執行以下命令,通過shell命令行來生產數據到kafka當中去
##創建topickafka-topics.sh --create --topic test --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181 ##發送數據
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
6.3 將kafka作為flink的sink來使用
- 我們也可以將kafka作為flink的sink來使用,就是將flink處理完成之后的數據寫入到kafka當中去
6.3.1 socket發送數據
- node01執行以下命令,從socket當中發送數據
nc -lk 9999
6.3.2 代碼實現
package com.kaikeba.kafkaimport java.util.Propertiesimport org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import org.apache.flink.streaming.util.serialization.SimpleStringSchema/*** 將kafka作為flink的sink來使用*/
object FlinkKafkaSink {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉換import org.apache.flink.api.scala._//checkpoint配置env.enableCheckpointing(5000);env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);env.getCheckpointConfig.setCheckpointTimeout(60000);env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設置statebackendenv.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka_sink/checkpoints",true));val socketStream = env.socketTextStream("node01",9999)val topic = "test"val prop = new Properties()prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")prop.setProperty("group.id","kafka_group1")//第一種解決方案,設置FlinkKafkaProducer里面的事務超時時間//設置事務超時時間prop.setProperty("transaction.timeout.ms",60000*15+"");//第二種解決方案,設置kafka的最大事務超時時間//FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema());//使用支持僅一次語義的形式/*** defaultTopic: String,* serializationSchema: KafkaSerializationSchema[IN],* producerConfig: Properties,* semantic: FlinkKafkaProducer.Semantic*/val kafkaSink = new FlinkKafkaProducer[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), prop,FlinkKafkaProducer.Semantic.EXACTLY_ONCE)socketStream.addSink(kafkaSink)env.execute("StreamingFromCollectionScala")}
}
6.3.3 啟動kafka消費者
- node01執行以下命令啟動kafka消費者,消費數據
kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test
📖 7. Flink當中的window窗口
-
對于流式處理,如果我們需要求取總和,平均值,或者最大值,最小值等,是做不到的,因為數據一直在源源不斷的產生,即數據是沒有邊界的,所以沒法求最大值,最小值,平均值等,所以為了一些數值統計的功能,我們必須指定時間段,對某一段時間的數據求取一些數據值是可以做到的。或者對某一些數據求取數據值也是可以做到的
-
所以,流上的聚合需要由 window 來劃定范圍,比如 “計算過去的5分鐘” ,或者 “最后100個元素的和” 。
-
window是一種可以把無限數據切割為有限數據塊的手段
- 窗口可以是 時間驅動的 【Time Window】(比如:每30秒)
- 或者 數據驅動的【Count Window】 (比如:每100個元素)
-
窗口類型匯總:
7.1 窗口的基本類型介紹
- 窗口通常被區分為不同的類型:
-
tumbling windows:滾動窗口 【沒有重疊】
- 滾動窗口下窗口之間之間不重疊,且窗口長度是固定的
-
sliding windows:滑動窗口 【有重疊】
- 滑動窗口以一個步長(Slide)不斷向前滑動,窗口的長度固定
-
session windows:會話窗口 ,一般沒人用
- Session window的窗口大小,則是由數據本身決定,它沒有固定的開始和結束時間。
- 會話窗口根據Session gap切分不同的窗口,當一個窗口在大于Session gap的時間內沒有接收到新數據時,窗口將關閉
-
7.2 Flink的窗口介紹
7.2.1 Time Window窗口的應用
- time window又分為滾動窗口和滑動窗口,這兩種窗口調用方法都是一樣的,都是調用timeWindow這個方法,如果傳入一個參數就是滾動窗口,如果傳入兩個參數就是滑動窗口
?
-
需求:每隔5s時間,統計最近10s出現的數據
-
代碼實現:
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Timeobject TestTimeWindow {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val socketSource: DataStream[String] = environment.socketTextStream("node01",9999)socketSource.flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).timeWindow(Time.seconds(10),Time.seconds(5)).sum(1).print()environment.execute()}}
7.2.2 Count Windos窗口的應用
-
與timeWindow類型,CountWinodw也可以分為滾動窗口和滑動窗口,這兩個窗口調用方法一樣,都是調用countWindow,如果傳入一個參數就是滾動窗口,如果傳入兩個參數就是滑動窗口
?
-
需求:使用count Window 統計最近5條數的最大值
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}/*** 使用countWindow統計最近5條數據的最大值*/
object TestCountWindow {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val socketSource: DataStream[String] = environment.socketTextStream("node01",9999)/*** 發送數據* spark 1* spark 2* spark 3* spark 4* spark 5* hello 100* hello 90* hello 80* hello 70* hello 60* hello 10*/socketSource.map(x => (x.split(" ")(0),x.split(" ")(1).toInt)).keyBy(0).countWindow(5).aggregate(new AggregateFunction[(String,Int),Int,Double]{var initAccumulator :Int = 0override def createAccumulator(): Int = {initAccumulator}override def add(value: (String, Int), accumulator: Int): Int = {if(accumulator >= value._2){accumulator}else{value._2}}override def getResult(accumulator: Int): Double = {accumulator}override def merge(a: Int, b: Int): Int = {if(a>=b){a}else{b}}}).print()environment.execute()}
}
7.2.3 自定義window的應用
-
如果time window 和 countWindow 還不夠用的話,我們還可以使用自定義window來實現數據的統計等功能。
7.3 window窗口數據的集合統計
-
前面我們可以通過aggregrate實現數據的聚合,對于求最大值,最小值,平均值等操作,我們也可以通過process方法來實現
-
對于某一個window內的數值統計,我們可以增量的聚合統計或者全量的聚合統計
7.3.1 增量聚合統計
- 窗口當中每加入一條數據,就進行一次統計
- 常用的聚合算子
- reduce(reduceFunction)
- aggregate(aggregateFunction)
-
需求
- 通過接收socket當中輸入的數據,統計每5秒鐘數據的累計的值
-
代碼實現
package com.kaikeba.windowimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Timeobject FlinkTimeCount {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val socketStream: DataStream[String] = environment.socketTextStream("node01",9999)socketStream.map(x => (1, x.toInt)).keyBy(0).timeWindow(Time.seconds(5)).reduce((c1,c2)=>(c1._1,c1._2+c2._2)).print()environment.execute("FlinkTimeCount")}}
7.3.2 全量聚合統計
-
等到窗口截止,或者窗口內的數據全部到齊,然后再進行統計,可以用于求窗口內的數據的最大值,或者最小值,平均值等
-
等屬于窗口的數據到齊,才開始進行聚合計算【可以實現對窗口內的數據進行排序等需求】
- apply(windowFunction)
- process(processWindowFunction)
- processWindowFunction比windowFunction提供了更多的上下文信息。
-
需求
- 通過全量聚合統計,求取每3條數據的平均值
-
代碼實現
package com.kaikeba.windowimport org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.util.Collector/*** 求取每3條數據的平均值*/
object FlinkCountWindowAvg {/*** 輸入數據* 1* 2* 3* 4* 5* 6* @param args*/def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val socketStream: DataStream[String] = environment.socketTextStream("node01",9999)//統計一個窗口內的數據的平均值socketStream.map(x => (1, x.toInt)).keyBy(0).countWindow(3)//通過process方法來統計窗口的平均值.process(new MyProcessWindowFunctionclass).print()//必須調用execute方法,否則程序不會執行environment.execute("count avg")}
}/**ProcessWindowFunction 需要跟四個參數* 輸入參數類型,輸出參數類型,聚合的key的類型,window的下界**/
class MyProcessWindowFunctionclass extends ProcessWindowFunction[(Int , Int) , Double , Tuple , GlobalWindow]{override def process(key: Tuple, context: Context, elements: Iterable[(Int, Int)], out: Collector[Double]): Unit = {var totalNum = 0;var countNum = 0;for(data <- elements){totalNum +=1countNum += data._2}out.collect(countNum/totalNum)}
}
📖 ??8. checkpoint機制原理深度剖析
- checkpoint是flink為了解決state一致性和容錯性引入的一種分布式的狀態快照機制。
8.1 Flink分布式快照流程
- 首先我們來看一下一個簡單的Checkpoint的大致流程:
- 暫停處理新流入數據,將新數據緩存起來。
- 將算子子任務的本地狀態數據拷貝到一個遠程的持久化存儲上。
- 繼續處理新流入的數據,包括剛才緩存起來的數據。
8.2 Barrier機制
? flink是如何來實現分布式狀態快照的呢,由于flink是流式的計算引擎,基于這種特定的場景,Flink通過向流數據中注入特殊的事件來作為快照的信號,這種特殊事件就叫Barrier(屏障,柵欄)。當算子任務處理到Barrier n的時候就會執行狀態的快照并把它標記為n的狀態快照。
-
checkpoint的調用流程:
-
- 首先是JobManager中的checkpoint Coordinator(協調器) 向任務中的所有source Task周期性發送barrier(柵欄)進行快照請求。
-
- source Task接受到barrier后, 會把當前自己的狀態進行snapshot(可以保存在HDFS上)。
-
- source向checkpoint coordinator確認snapshot已經完成。
-
- source繼續向下游transformation operator發送 barrier。
-
- transformation operator重復source的操作,直到sink operator向協調器確認snapshot完成。
-
- coordinator確認完成本周期的snapshot已經完成。
// 5秒啟動一次checkpoint env.enableCheckpointing(5000)// 設置checkpoint只checkpoint一次 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// 設置兩次checkpoint的最小時間間隔 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)// checkpoint超時的時長 env.getCheckpointConfig.setCheckpointTimeout(60000)// 允許的最大checkpoint并行度 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)// 當程序關閉的時,觸發額外的checkpoint env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)// 設置checkpoint的地址 env.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink-checkpoint/"))
-
-
注意
Checkpoint Barrier被插入到數據流中,它將數據流切分成段。Flink的Checkpoint邏輯是,一段新數據流入導致狀態發生了變化,Flink的算子接收到Checkpoint Barrier后,對狀態進行快照。每個Checkpoint Barrier有一個ID,表示該段數據屬于哪次Checkpoint。如圖所示,當ID為n的Checkpoint Barrier到達每個算子后,表示要對n-1和n之間狀態的更新做快照。
8.3 多任務并行下的checkpoint
- 我們構建一個并行數據流圖,用這個并行數據流圖來演示Flink的分布式快照機制。這個數據流圖有兩個Source子任務,數據流會在這些并行算子上從Source流動到Sink。
- 首先,Flink的檢查點協調器(Checkpoint Coordinator)觸發一次Checkpoint(Trigger Checkpoint),這個請求會發送給Source的各個子任務。
- 各Source算子子任務接收到這個Checkpoint請求之后,會將自己的狀態寫入到狀態后端,生成一次快照,并且會向下游廣播Checkpoint Barrier。
- Source算子做完快照后,還會給Checkpoint Coodinator發送一個確認,告知自己已經做完了相應的工作。這個確認中包括了一些元數據,其中就包括剛才備份到State Backend的狀態句柄,或者說是指向狀態的指針。至此,Source完成了一次Checkpoint。跟Watermark的傳播一樣,一個算子子任務要把Checkpoint Barrier發送給所連接的所有下游算子子任務。
- 對于下游算子來說,可能有多個與之相連的上游輸入,我們將算子之間的邊稱為通道。Source要將一個ID為n的Checkpoint Barrier向所有下游算子廣播,這也意味著下游算子的多個輸入里都有同一個Checkpoint Barrier,而且不同輸入里Checkpoint Barrier的流入進度可能不同。Checkpoint Barrier傳播的過程需要進行對齊(Barrier Alignment),我們從數據流圖中截取一小部分來分析Checkpoint Barrier是如何在算子間傳播和對齊的。
如上圖所示,對齊分為四步:
(1). 算子子任務在某個輸入通道中收到第一個ID為n的Checkpoint Barrier,但是其他輸入通道中ID為n的Checkpoint Barrier還未到達,該算子子任務開始準備進行對齊。
(2). 算子子任務將第一個輸入通道的數據緩存下來,同時繼續處理其他輸入通道的數據,這個過程被稱為對齊。
(3). 第二個輸入通道的Checkpoint Barrier抵達該算子子任務,該算子子任務執行快照,將狀態寫入State Backend,然后將ID為n的Checkpoint Barrier向下游所有輸出通道廣播。
(4). 對于這個算子子任務,快照執行結束,繼續處理各個通道中新流入數據,包括剛才緩存起來的數據。
- 數據流圖中的每個算子子任務都要完成一遍上述的對齊、快照、確認的工作,當最后所有Sink算子確認完成快照之后,說明ID為n的Checkpoint執行結束,Checkpoint Coordinator向State Backend寫入一些本次Checkpoint的元數據。
? 之所以要進行barrier對齊,主要是為了保證一個Flink作業所有算子的狀態是一致的。也就是說,某個ID為n的Checkpoint Barrier從前到后流入所有算子子任務后,所有算子子任務都能將同樣的一段數據寫入快照。
8.4 快照性能優化方案
-
上面講到了一致性快照的具體流程,這種方式保證了數據的一致性,但有一些潛在的問題
-
(1)每次進行Checkpoint前,都需要暫停處理新流入數據,然后開始執行快照,假如狀態比較大,一次快照可能長達幾秒甚至幾分鐘。
-
(2)Checkpoint Barrier對齊時,必須等待所有上游通道都處理完,假如某個上游通道處理很慢,這可能造成整個數據流堵塞。
-
-
優化方案
- (1)對于第一個問題,Flink提供了異步快照(Asynchronous Snapshot)的機制。當實際執行快照時,Flink可以立即向下廣播Checkpoint Barrier,表示自己已經執行完自己部分的快照。一旦數據同步完成,再給Checkpoint Coordinator發送確認信息
- (2)對于第二個問題,Flink允許跳過對齊這一步,或者說一個算子子任務不需要等待所有上游通道的Checkpoint Barrier,直接將Checkpoint Barrier廣播,執行快照并繼續處理后續流入數據。為了保證數據一致性,Flink必須將那些較慢的數據流中的元素也一起快照,一旦重啟,這些元素會被重新處理一遍。
??
Barrier對齊會影響執行效率,怎么跳過Barrier對齊,跳過后還能保證?Exactly-Once語義嗎?
8.5 任務重啟恢復流程
-
Flink的重啟恢復邏輯相對比較簡單:
-
1、重啟應用,在集群上重新部署數據流圖。
-
2、從持久化存儲上讀取最近一次的Checkpoint數據,加載到各算子子任務上。
-
3、繼續處理新流入的數據。
-
-
這樣的機制可以保證Flink內部狀態的Excatly-Once一致性。至于端到端的Exactly-Once一致性,要根據Source和Sink的具體實現而定。當發生故障時,一部分數據有可能已經流入系統,但還未進行Checkpoint,Source的Checkpoint記錄了輸入的Offset;當重啟時,Flink能把最近一次的Checkpoint恢復到內存中,并根據Offset,讓Source從該位置重新發送一遍數據,以保證數據不丟不重。像Kafka等消息隊列是提供重發功能的,
socketTextStream
就不具有這種功能,也意味著不能保證Exactly-Once投遞保障。
📖 9. Flink兩階段提交 TwoPhaseCommit
9.1 EXACTLY_ONCE語義概述
- 何為EXACTLY_ONCE?
- EXACTLY_ONCE簡稱EOS,每條輸入消息只會影響最終結果一次,注意這里是影響一次,而非處理一次,Flink一直宣稱自己支持EOS,實際上主要是對于Flink應用內部來說的,對于外部系統(端到端)則有比較強的限制
- Flink實現端到端的EXACTLY_ONCE語義需要滿足:
- 1.外部系統寫入支持冪等性
- 2.外部系統支持以事務的方式寫入
- Flink的基本思路就是將狀態定時地checkpiont到hdfs中去,當發生failure的時候恢復上一次的狀態,然后將輸出update到外部。這里需要注意的是輸入流的offset也是狀態的一部分,因此一旦發生failure就能從最后一次狀態恢復,從而保證輸出的結果是exactly once。這是Flink1.4之前的實現。
- Flink在1.4.0版本引入了TwoPhaseCommitSinkFunction接口,并在Kafka Producer的connector中實現了它,支持了對外部Kafka Sink的EXACTLY_ONCE語義,來讓開發者用更少的代碼來實現端到端的exactly-once語義
9.2 兩階段提交協議介紹
-
兩階段提交協議是協調所有分布式原子事務參與者,并決定提交或取消(回滾)的分布式算法
-
協議參與者
兩階段提交指的是一種協議,經常用來實現分布式事務,可以簡單理解為預提交+實際提交,一般分為協調器Coordinator(以下簡稱C)和若干事務參與者Participant(以下簡稱P)兩種角色。
-
兩個階段的執行
-
1.請求階段(commit-request phase,或稱表決階段,voting phase)
在請求階段,協調者將通知事務參與者準備提交或取消事務,然后進入表決過程。 在表決過程中,參與者將告知協調者自己的決策:同意(事務參與者本地作業執行成功)或取消(本地作業執行故障)。
-
- 提交階段(commit phase)
在該階段,協調者將基于第一個階段的投票結果進行決策:提交或取消。 當且僅當所有的參與者同意提交事務協調者才通知所有的參與者提交事務,否則協調者將通知所有的參與者取消事務。 參與者在接收到協調者發來的消息后將執行響應的操作。
-
9.3 兩階段提交實現原理機制
-
Flink和外部系統(如Kafka)之間的消息傳遞如何做到exactly once呢?
-
先看下面這幅圖會出現的問題
- 當sink A已經往Kafka寫入了數據,而sink B fail.
- 根據Flink的exactly once保證,系統會回滾到最近的checkpoint,
- 但是sink A已經把數據寫入到kafka了.
- Flink無法回滾kafka的state.因此,kafka將在之后再次接收到一份同樣的來自sink A的數據,
- 這樣的message delivery便成為了at least once
-
Flink采用Two phase commit來解決這個問題.
-
Two phase commit
- Phase 1: Pre-commit 預提交
- Flink的JobManager向source注入checkpoint barrier以開啟這snapshot,barrier從source流向sink,
- 每個進行snapshot的算子成功snapshot后,都會向JobManager發送ACK.
- 當sink完成snapshot后, 向JobManager發送ACK的同時向kafka進行pre-commit.
- Phase 2: Commit 實際提交
- 當JobManager接收到所有算子的ACK后, 就會通知所有的算子這次checkpoint已經完成
- Sink接收到這個通知后, 就向kafka進行commit, 正式把數據寫入到kafka
- Phase 1: Pre-commit 預提交
-
-
下面我們來看看flink消費并寫入kafka的例子是如何通過兩部提交來保證exactly-once語義的。
-
kafka從0.11開始支持事物操作,若要使用flink端到端exactly-once語義需要flink的sink的kafka是0.11版本以上的
-
這個例子包括以下幾個步驟:
- 從kafka讀取數據
- 一個聚合窗操作
- 向kafka寫入數據
-
1、JobManager向Source發送Barrier,開始進入pre-Commit階段,當Source收到Barrier后,將自身的狀態進行保存,后端可以根據配置進行選擇,這里的狀態是指消費的每個分區對應的offset。然后將Barrier發送給下一個Operator。
-
2、當Window這個Operator收到Barrier之后,對自己的狀態進行保存,這里的狀態是指聚合的結果(sum或count的結果),然后將Barrier發送給Sink。Sink收到后也對自己的狀態進行保存,之后會進行一次預提交。
-
3、預提交成功后,JobManager通知每個Operator,這一輪檢查點已經完成,這個時候,Kafka Sink會向Kafka進行真正的事務Commit。
-
以上便是兩階段的完整流程,不同階段fail over的recovery舉措:
? (1) 在pre-commit前fail over, 系統恢復到最近的checkponit
? (2) 在pre-commit后,commit前fail over,系統恢復到剛完成pre-commit時的狀態
因此,所有opeartor必須對checkpoint最終結果達成共識:
? 即所有operator都必須認定數據提交要么成功執行,要么被終止然后回滾。
9.4 兩階段提交的TwoPhaseCommitSinkFunction類
-
在使用兩步提交算子時,我們可以繼承TwoPhaseCommitSinkFunction這個類。
-
TwoPhaseCommitSinkFunction有4個方法
-
-
beginTransaction()
開啟事務:創建一個臨時文件.后續把原要寫入到外部系統的數據寫入到這個臨時文件
-
-
- preCommit()
預提交:flush并close這個文件,之后便不再往其中寫數據.同時開啟一個新的事務供下個checkponit使用
-
-
commit()
正式提交: 把pre-committed的臨時文件移動到指定目錄
-
-
-
abort()
丟棄: 刪除掉pre-committed的臨時文件
-
-
-