Flink深入淺出之03:狀態、窗口、checkpoint、兩階段提交

Flink是一個有狀態的流,👅一起深入了解這個有狀態的流

3?? 目標

  1. 掌握State知識
  2. 掌握Flink三種State Backend
  3. 掌握Flink checkpoint和savepoint原理
  4. 了解Flink的重啟策略
  5. checkpoint+two phase commit保證E-O語義

4?? 要點

📖 1. Flink的State

1.1 state概述

Apache Flink? — Stateful Computations over Data Streams

	Flink 是一個默認就有狀態的分析引擎,前面的WordCount 案例可以做到單詞的數量的累加,其實是因為在內存中保證了每個單詞的出現的次數,這些數據其實就是狀態數據。但是如果一個 Task 在處理過程中掛掉了,那么它在內存中的狀態都會丟失,所有的數據都需要重新計算。從容錯和消息處理的語義(At -least-once 和 Exactly-once)上來說,Flink引入了State 和 CheckPoint。State一般指一個具體的 Task/Operator 的狀態,State數據默認保存在 Java 的堆內存中。
  • 回顧單詞計數的例子

    package com.kaikeba.demo1import org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, WindowedStream}
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow/*** 使用滑動窗口* 每隔1秒鐘統計最近2秒鐘的每個單詞出現的次數*/
    object FlinkStream {def main(args: Array[String]): Unit = {//構建流處理的環境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//從socket獲取數據
    val sourceStream: DataStream[String] = env.socketTextStream("node01",9999)//導入隱式轉換的包import org.apache.flink.api.scala._//對數據進行處理val result: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")) //按照空格切分.map(x => (x, 1))  //每個單詞計為1.keyBy(0)         //按照下標為0的單詞進行分組.sum(1)          //按照下標為1累加相同單詞出現的1//對數據進行打印result.print()//開啟任務env.execute("FlinkStream")
    }}
    
  • 輸入

      hadoop hadoophadoophive hadoop 
    
  • 輸出

      8> (hadoop,1)1> (hive,1)8> (hadoop,2)8> (hadoop,3)8> (hadoop,4)
    

在這里插入圖片描述

1.2 state類型
  • Flink中有兩種基本類型的State, ,他們兩種都可以以兩種形式存在:
    • 原生狀態(raw state)
      • 由算子自己管理數據結構,當觸發Checkpoint操作過程中,Flink并不知道狀態數據內部的數據結構,只是將數據轉換成bytes數據存儲在Checkpoints中,當從Checkpoints恢復任務時,算子自己再反序列化出狀態的數據結構。
    • 托管狀態(managed state)
      • Flink Runtime控制和管理狀態數據,并將狀態數據轉換成為內存的Hash tables或 RocksDB的對象存儲,然后將這些數據通過內部的接口持久化到checkpoints中,任務異常時可以通過這些狀態數據恢復任務。
      • 推薦使用ManagedState管理狀態數據,ManagedState更好的支持狀態數據的重平衡以及更加完善的內存管理
Managed StateRaw State
狀態管理方式Flink Runtime托管,自動存儲、自動恢復、自動伸縮用戶自己管理
狀態數據結構Flink提供的常用數據結構,如ListState、MapState等字節數組:byte[]
使用場景絕大多數Flink算子用戶自定義算子
1.2.1 Operator State(算子狀態)
  • operator state是task級別的state,說白了就是每個task對應一個state。
  • Kafka Connector source中的每個分區(task)都需要記錄消費的topic的partition和offset等信息。
  • 對于Operator State,我們還需進一步實現CheckpointedFunction接口。
  • Operator State的實際應用場景不如Keyed State多,它經常被用在Source或Sink等算子上,用來保存流入數據的偏移量或對輸出數據做緩存,以保證Flink應用的Exactly-Once語義。

在這里插入圖片描述

1.2.2 keyed State(鍵控狀態)
  • Keyed State:

    • 顧名思義就是基于KeyedStream上的狀態,這個狀態是跟特定的Key 綁定的。KeyedStream流上的每一個Key,都對應一個State。Flink針對 Keyed State 提供了以下可以保存State的數據結構.
  • Keyed state托管狀態有六種類型:

    • 1、ValueState

       保存一個可以更新和檢索的值(如上所述,每個值都對應到當前的輸入數據的key,因此算子接收到的每個key都可能對應一個值)。 這個值可以通過update(T) 進行更新,通過 T value() 進行檢索	
      
    • 2、ListState

      	保存一個元素的列表。可以往這個列表中追加數據,并在當前的列表上進行檢索。可以通過 add(T) 或者 addAll(List<T>) 進行添加元素,通過 Iterable<T> get() 獲得整個列表。還可以通過 update(List<T>) 覆蓋當前的列表	。
      
    • 3、MapState

      	維護了一個映射列表。 你可以添加鍵值對到狀態中,也可以獲得 反映當前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 檢索特定 key。 使用 entries(),keys() 和 values() 分別檢索映射、 鍵和值的可迭代視圖。
      
    • 4、ReducingState

      	保存一個單值,表示添加到狀態的所有值的聚合。接口與ListState類似,但使用add(T) 增加元素,會使用提供的 ReduceFunction 進行聚合。
      
    • 5、AggregatingState

      	AggregatingState<IN, OUT>: 保留一個單值,表示添加到狀態的所有值的聚合。和 ReducingState 相反的是, 聚合類型可能與添加到狀態的元素的類型不同。 接口與 ListState類似,但使用 add(IN) 添加的元素會用指定的 AggregateFunction 進行聚合
      
  • keyedState使用方法

    • 1、只能用于RichFunction
    • 2、將State 聲明為實例變量
    • 3、在 open() 方法中為State賦值
      • 創建一個StateDescriptor
      • 利用getRuntimeContext().getXXState(...) 構建不同的State
    • 4、調用State的方法進行讀寫
      • 例如 state.value()、state.update(…)等等

在這里插入圖片描述

1.3 Keyed State案例演示
1.3.1 ValueState
  • 作用

    • 保存一個可以更新和檢索的值
  • 需求

    • 使用valueState實現平均值求取
  • 代碼開發

    package com.kaikeba.keystateimport org.apache.flink.api.common.functions.RichFlatMapFunction
    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.util.Collector/*** 使用valueState實現平均值求取*/
    object ValueStateOperate {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.fromCollection(List((1L, 3d),(1L, 5d),(1L, 7d),(1L, 4d),(1L, 2d))).keyBy(_._1).flatMap(new CountAverageWithValue()).print()env.execute()}
    }class CountAverageWithValue extends RichFlatMapFunction[(Long, Double), (Long, Double)] {//定義ValueState類型的變量private var sum: ValueState[(Long, Double)] = _override def open(parameters: Configuration): Unit = {//初始化獲取歷史狀態的值sum = getRuntimeContext.getState(new ValueStateDescriptor[(Long, Double)]("average", classOf[(Long, Double)]))
    }override def flatMap(input: (Long, Double), out: Collector[(Long, Double)]): Unit = {// access the state valueval tmpCurrentSum = sum.value// If it hasn't been used before, it will be nullval currentSum = if (tmpCurrentSum != null) {tmpCurrentSum} else {(0L, 0d)}// update the countval newSum = (currentSum._1 + 1, currentSum._2 + input._2)// update the statesum.update(newSum)// if the count reaches 2, emit the average and clear the stateif (newSum._1 >= 2) {out.collect((input._1, newSum._2 / newSum._1))//將狀態清除//sum.clear()}}}
    
1.3.2 ListState
  • 作用

    • 用于保存每個key的歷史數據數據成為一個列表
  • 需求

    • 使用ListState求取數據平均值
  • 代碼開發

    package com.kaikeba.keystateimport java.langimport java.util.Collectionsimport org.apache.flink.api.common.functions.RichFlatMapFunctionimport org.apache.flink.api.common.state.{ListState, ListStateDescriptor}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collector/*** 使用ListState實現平均值求取* ListState<T> :這個狀態為每一個 key 保存集合的值*      get() 獲取狀態值*      add() / addAll() 更新狀態值,將數據放到狀態中*      clear() 清除狀態*/object ListStateOperate {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.fromCollection(List((1L, 3d),(1L, 5d),(1L, 7d),(2L, 4d),(2L, 2d),(2L, 6d))).keyBy(_._1).flatMap(new CountAverageWithList).print()env.execute()}}class CountAverageWithList extends RichFlatMapFunction[(Long,Double),(Long,Double)]{//定義我們歷史所有的數據獲取private var elementsByKey: ListState[(Long,Double)] = _override def open(parameters: Configuration): Unit = {//初始化獲取歷史狀態的值,每個key對應的所有歷史值,都存儲在list集合里面了val listState = new ListStateDescriptor[(Long,Double)]("listState",classOf[(Long,Double)])elementsByKey = getRuntimeContext.getListState(listState)}override def flatMap(element: (Long, Double), out: Collector[(Long, Double)]): Unit = {//獲取當前key的狀態值val currentState: lang.Iterable[(Long, Double)] = elementsByKey.get()//如果初始狀態為空,那么就進行初始化,構造一個空的集合出來,準備用于存儲后續的數據if(currentState == null){elementsByKey.addAll(Collections.emptyList())}//添加元素elementsByKey.add(element)import scala.collection.JavaConverters._val allElements: Iterator[(Long, Double)] = elementsByKey.get().iterator().asScalaval allElementList: List[(Long, Double)] = allElements.toListif(allElementList.size >= 3){var count = 0Lvar sum = 0dfor(eachElement <- allElementList){count +=1sum += eachElement._2}out.collect((element._1,sum/count))}}}
    
1.3.3 MapState
  • 作用

    • 用于將每個key對應的數據都保存成一個map集合
  • 需求

    • 使用MapState求取每個key對應的平均值
  • 代碼開發

    package com.kaikeba.keystateimport java.util.UUIDimport org.apache.flink.api.common.functions.RichFlatMapFunction
    import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.util.Collector
    import org.apache.flink.api.scala._/*** 使用MapState求取每個key對應的平均值*/
    object MapStateOperate {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.fromCollection(List((1L, 3d),(1L, 5d),(1L, 7d),(2L, 4d),(2L, 2d),(2L, 6d))).keyBy(_._1).flatMap(new CountAverageMapState).print()env.execute()}
    }class CountAverageMapState extends RichFlatMapFunction[(Long,Double),(Long,Double)]{private var mapState:MapState[String,Double] = _//初始化獲取mapState對象override def open(parameters: Configuration): Unit = {val mapStateOperate = new MapStateDescriptor[String,Double]("mapStateOperate",classOf[String],classOf[Double])mapState = getRuntimeContext.getMapState(mapStateOperate)}override def flatMap(input: (Long, Double), out: Collector[(Long, Double)]): Unit = {//將相同的key對應的數據放到一個map集合當中去,就是這種對應  key -> Map((key1, value1),(key2, value2)) //每次都構建一個map集合mapState.put(UUID.randomUUID().toString,input._2)import scala.collection.JavaConverters._//獲取map集合當中所有的value,我們每次將數據的value給放到map的value里面去val listState: List[Double] = mapState.values().iterator().asScala.toListif(listState.size >=3){var count = 0Lvar sum = 0dfor(eachState <- listState){count +=1sum += eachState}out.collect(input._1,sum/count)}}
    }
    
1.3.4 ReducingState
  • 作用

    • 用于數據的聚合
  • 需求

    • 使用ReducingState求取每個key對應的平均值
  • 代碼開發

    package com.kaikeba.keystateimport org.apache.flink.api.common.functions.{ReduceFunction, RichFlatMapFunction}
    import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.util.Collector/*** ReducingState<T> :這個狀態為每一個 key 保存一個聚合之后的值* get() 獲取狀態值* add()  更新狀態值,將數據放到狀態中* clear() 清除狀態*/object ReduceingStateOperate {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.fromCollection(List((1L, 3d),(1L, 5d),(1L, 7d),(2L, 4d),(2L, 2d),(2L, 6d))).keyBy(_._1).flatMap(new CountAverageReduceStage).print()env.execute()}
    }class CountAverageReduceStage extends RichFlatMapFunction[(Long,Double),(Long,Double)]{//定義ReducingStateprivate var reducingState:ReducingState[Double] = _//定義一個計數器var counter=0Loverride def open(parameters: Configuration): Unit = {val reduceSum = new ReducingStateDescriptor[Double]("reduceSum", new ReduceFunction[ Double] {override def reduce(value1: Double, value2: Double): Double = {value1+ value2}}, classOf[Double])//初始化獲取reducingState對象reducingState = getRuntimeContext.getReducingState[Double](reduceSum)}
    override def flatMap(input: (Long, Double), out: Collector[(Long, Double)]): Unit = {//計數器+1counter+=1//添加數據到reducingStatereducingState.add(input._2)out.collect(input._1,reducingState.get()/counter)}
    }
    
1.3.5 AggregatingState
  • 作用

    • 將相同key的數據進行聚合
  • 需求

    • 將相同key的數據聚合成為一個字符串
  • 代碼開發

    package com.kaikeba.keystateimport org.apache.flink.api.common.functions.{AggregateFunction, RichFlatMapFunction}import org.apache.flink.api.common.state.{AggregatingState, AggregatingStateDescriptor}import org.apache.flink.configuration.Configurationimport org.apache.flink.runtime.state.memory.MemoryStateBackendimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collectorimport org.apache.flink.api.scala._/*** 將相同key的數據聚合成為一個字符串*/object AggregrageStateOperate {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.fromCollection(List((1L, 3d),(1L, 5d),(1L, 7d),(2L, 4d),(2L, 2d),(2L, 6d))).keyBy(_._1).flatMap(new AggregrageState).print()env.execute()}}/***   (1L, 3d),(1L, 5d),(1L, 7d),   把相同key的value拼接字符串:Contains-3-5-7*/class AggregrageState extends RichFlatMapFunction[(Long,Double),(Long,String)]{//定義AggregatingStateprivate var aggregateTotal:AggregatingState[Double, String] = _override def open(parameters: Configuration): Unit = {/*** name: String,* aggFunction: AggregateFunction[IN, ACC, OUT],* stateType: Class[ACC]*/val aggregateStateDescriptor = new AggregatingStateDescriptor[Double, String, String]("aggregateState", new AggregateFunction[Double, String, String] {//創建一個初始值override def createAccumulator(): String = {"Contains"}//對數據進行累加override def add(value: Double, accumulator: String): String = {accumulator + "-" + value}//獲取累加的結果override def getResult(accumulator: String): String = {accumulator}//數據合并的規則override def merge(a: String, b: String): String = {a + "-" + b}}, classOf[String])//獲取AggregatingState對象aggregateTotal = getRuntimeContext.getAggregatingState(aggregateStateDescriptor)}override def flatMap(input: (Long, Double), out: Collector[(Long, String)]): Unit = {aggregateTotal.add(input._2)out.collect(input._1,aggregateTotal.get())}}
1.4 Operator State案例演示
  • 需求
    • 實現每兩條數據進行輸出打印一次,不用區分數據的key

    • 這里使用ListState實現

      package com.kaikeba.operatorstateimport org.apache.flink.streaming.api.functions.sink.SinkFunction
      import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import scala.collection.mutable.ListBuffer/*** 實現每兩條數據進行輸出打印一次,不用區分數據的key*/
      object OperatorListState {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val sourceStream: DataStream[(String, Int)] = env.fromCollection(List(("spark", 3),("hadoop", 5),("hive", 7),("flume", 9)))sourceStream.addSink(new OperateTaskState).setParallelism(1)env.execute()}}class OperateTaskState extends SinkFunction[(String,Int)]{//定義一個list 用于我們每兩條數據打印一下private var listBuffer:ListBuffer[(String,Int)] = new ListBuffer[(String, Int)]override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {listBuffer.+=(value)if(listBuffer.size ==2){println(listBuffer)//清空state狀態listBuffer.clear()}}}

📖 2. Flink的狀態管理之State Backend

  • 默認情況下,state會保存在taskmanager的內存中,checkpoint會存儲在JobManager的內存中。state 的存儲和checkpoint的位置取決于State Backend的配置。
  • Flink一共提供了3種StateBackend
    • MemoryStateBackend
      • 基于內存存儲
    • FsStateBackend
      • 基于文件系統存儲
    • RocksDBStateBackend
      • 基于數據庫存儲
  • 可以通過 ==StreamExecutionEnvironment.setStateBackend(…)==來設置state存儲的位置
2.1 MemoryStateBackend
	將數據持久化狀態存儲到內存當中,state數據保存在java堆內存中,執行checkpoint的時候,會把state的快照數據保存到jobmanager的內存中。基于內存的state backend在生產環境下不建議使用。

在這里插入圖片描述

  • 代碼配置:
environment.setStateBackend(new MemoryStateBackend())
  • 使用場景:
(1)本地調試
(2)flink任務狀態數據量較小的場景
2.2 FsStateBackend
	state數據保存在taskmanager的內存中,執行checkpoint的時候,會把state的快照數據保存到配置的文件系統中。可以使用hdfs等分布式文件系統.FsStateBackend 適合場景:狀態數據特別的多,還有長時間的window算子等,它很安全,因為基于hdfs,所以數據有備份很安全。

在這里插入圖片描述

  • 代碼配置:
environment.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink/checkDir"))
  • 適用場景:
(1)大狀態、長窗口、大key/value狀態的的任務
(2)全高可用配置
2.3 RocksDBStateBackend (生產中推薦)
RocksDB介紹:RocksDB使用一套日志結構的數據庫引擎,它是Flink中內置的第三方狀態管理器,為了更好的性能,這套引擎是用C++編寫的。 Key和value是任意大小的字節流。RocksDB跟上面的都略有不同,它會在本地文件系統中維護狀態,state會直接寫入本地rocksdb中。同時它需要配置一個遠端的filesystem uri(一般是HDFS),在做checkpoint的時候,會把本地的數據直接復制到fileSystem中。fail over的時候從fileSystem中恢復到本地RocksDB克服了state受內存限制的缺點,同時又能夠持久化到遠端文件系統中,比較適合在生產中使用.

在這里插入圖片描述

  • 代碼配置:導入jar包然后配置代碼
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.9.2</version>
</dependency>
  • 配置代碼
environment.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink/checkDir",true))
  • 使用場景
(1)大狀態、長窗口、大key/value狀態的的任務
(2)全高可用配置由于RocksDBStateBackend將工作狀態存儲在taskManger的本地文件系統,狀態數量僅僅受限于本地磁盤容量限制,對比于FsStateBackend保存工作狀態在內存中,RocksDBStateBackend能避免flink任務持續運行可能導致的狀態數量暴增而內存不足的情況,因此適合在生產環境使用。
2.4 修改state-backend的兩種方式
  • 第一種:單任務調整
    • 修改當前任務代碼
env.setStateBackend(
new FsStateBackend("hdfs://node01:8020/flink/checkDir"))
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】
  • 第二種:全局調整

    • 修改flink-conf.yaml
    state.backend: filesystem
    state.checkpoints.dir: hdfs://node01:8020/flink/checkDir
    
    • 注意:state.backend的值可以是下面幾種
    (1) jobmanager    表示使用 MemoryStateBackend
    (2) filesystem    表示使用 FsStateBackend
    (3) rocksdb       表示使用 RocksDBStateBackend
    

📖 3. Flink的checkPoint保存數據實現容錯

3.1 checkPoint的基本概念
為了保證state的容錯性,Flink需要對state進行checkpoint。Checkpoint是Flink實現容錯機制最核心的功能,它能夠根據配置周期性地基于Stream中各個Operator/task的狀態來生成快照,從而將這些狀態數據定期持久化存儲下來,當Flink程序一旦意外崩潰時,重新運行程序時可以有選擇地從這些快照進行恢復,從而修正因為故障帶來的程序數據異常。
3.2 checkPoint的前提
  • Flink的checkpoint機制可以與(stream和state)的持久化存儲交互的前提
    • 1、持久化的source,它需要支持在一定時間內重放事件。這種sources的典型例子是持久化的消息隊列(比如Apache Kafka,RabbitMQ等)或文件系統(比如HDFS,S3,GFS等)

    • 2、用于state的持久化存儲,例如分布式文件系統(比如HDFS,S3,GFS等)

3.3 Flink進行checkpoint步驟
  • (1)暫停新數據的輸入
  • (2)等待流中on-the-fly的數據被處理干凈,此時得到flink graph的一個snapshot
  • (3)將所有Task中的State拷貝到State Backend中,如HDFS。此動作由各個Task Manager完成
  • (4)各個Task Manager將Task State的位置上報給Job Manager,完成checkpoint
  • (5)恢復數據的輸入

如上所述,這里才需要“暫停輸入 + 排干on-the-fly 數據”的操作,這樣才能拿到同一時刻下所有subtask的state

3.4 配置checkPoint
  • 默認checkpoint功能是disabled的,想要使用的時候需要先啟用

  • checkpoint開啟之后,默認的checkPointMode是Exactly-once

  • checkpoint的checkPointMode有兩種

    • Exactly-once: 數據處理且只被處理一次
    • At-least-once:數據至少被處理一次

Exactly-once對于大多數應用來說是最合適的。At-least-once可能用在某些延遲超低的應用程序(始終延遲為幾毫秒)

//默認checkpoint功能是disabled的,想要使用的時候需要先啟用
// 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】
environment.enableCheckpointing(1000);
// 高級選項:
// 設置模式為exactly-once (這是默認值)
environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
environment.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
// 檢查點必須在一分鐘內完成,或者被丟棄【checkpoint的超時時間】
environment.getCheckpointConfig.setCheckpointTimeout(60000);
// 同一時間只允許進行一個檢查點
environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1);
// 表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備注】/*** ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint* ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后,會刪除Checkpoint數據,只有job執行失敗的時候才會保存checkpoint*/
environment.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
??3.5 重啟策略概述
  • Flink支持不同的重啟策略,以在故障發生時控制作業如何重啟,集群在啟動時會伴隨一個默認的重啟策略,在沒有定義具體重啟策略時會使用該默認策略。
  • 如果在工作提交時指定了一個重啟策略,該策略會覆蓋集群的默認策略,默認的重啟策略可以通過 Flink 的配置文件 flink-conf.yaml 指定。配置參數 restart-strategy 定義了哪個策略被使用。
  • 常用的重啟策略
    • (1)固定間隔 (Fixed delay)
    • (2)失敗率 (Failure rate)
    • (3)無重啟 (No restart)
  • 如果沒有啟用 checkpointing,則使用無重啟 (no restart) 策略。
  • 如果啟用了 checkpointing,重啟策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在應用代碼中動態指定,會覆蓋全局配置
    • 但沒有配置重啟策略,則使用固定間隔 (fixed-delay) 策略, 嘗試重啟次數默認值是:Integer.MAX_VALUE,。
3.6 重啟策略配置實現
  • 固定間隔 (Fixed delay)
第一種:全局配置 flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s第二種:應用代碼設置//重啟次數、重啟時間間隔
environment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000))
  • 失敗率 (Failure rate)
第一種:全局配置 flink-conf.yaml
//5分鐘內若失敗了3次則認為該job失敗,重試間隔為10s
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s第二種:應用代碼設置
environment.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(100), org.apache.flink.api.common.time.Time.seconds(10)))
  • 無重啟 (No restart)
第一種:全局配置 flink-conf.yaml
restart-strategy: none第二種:應用代碼設置
environment.setRestartStrategy(RestartStrategies.noRestart())

??📖 4. 從checkPoint恢復數據以及checkPoint保存多個歷史版本

4.1 保存多個歷史版本
  • 默認情況下,如果設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,可以從最近的這個Checkpoint來進行恢復。

  • 如果我們希望保留多個Checkpoint,并能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,我們發現最近4個小時數據記錄處理有問題,希望將整個狀態還原到4小時之前

  • Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數。

state.checkpoints.num-retained: 20
  • 這樣設置以后就查看對應的Checkpoint在HDFS上存儲的文件目錄
hdfs dfs -ls hdfs://node01:8020/flink/checkpoints
  • 如果希望回退到某個Checkpoint點,只需要指定對應的某個Checkpoint路徑即可實現
4.2 恢復歷史某個版本數據
  • 如果Flink程序異常失敗,或者最近一段時間內數據處理錯誤,我們可以將程序從某一個Checkpoint點進行恢復
flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -s hdfs://node01:8020/fsStateBackend/971ae7ac4d5f20e704747ea7c549b356/chk-50/_metadata -c com.kaikeba.checkpoint.TestCheckPoint original-flink_study-1.0-SNAPSHOT.jar
  • 程序正常運行后,還會按照Checkpoint配置進行運行,繼續生成Checkpoint數據

📖 5. Flink的savePoint保存數據

5.1 savePoint的介紹
  • savePoint是檢查點一種特殊實現,底層其實也是使用Checkpoints的機制。

  • savePoint是用戶以手工命令的方式觸發checkpoint,并將結果持久化到指定的存儲目錄中

  • 作用

    • 1、應用程序代碼升級
      • 通過觸發保存點并從該保存點處運行新版本,下游的應用程序并不會察覺到不同
    • 2、Flink版本更新
      • Flink 自身的更新也變得簡單,因為可以針對正在運行的任務觸發保存點,并從保存點處用新版本的 Flink 重啟任務。
    • 3、維護和遷移
      • 使用保存點,可以輕松地“暫停和恢復”應用程序
5.2 savePoint的使用
  • 1:在flink-conf.yaml中配置Savepoint存儲位置

不是必須設置,但是設置后,后面創建指定Job的Savepoint時,可以不用在手動執行命令時指定Savepoint的位置

state.savepoints.dir: hdfs://node01:8020/flink/savepoints
  • 2:觸發一個savepoint

    • (1)手動觸發savepoint

      
      #【針對on standAlone模式】
      bin/flink savepoint jobId [targetDirectory] #【針對on yarn模式需要指定-yid參數】
      bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]#jobId 				需要觸發savepoint的jobId編號
      #targetDirectory     指定savepoint存儲數據目錄
      #-yid                指定yarnAppId ##例如:
      flink savepoint 8d1bb7f88a486815f9b9cf97c304885b  -yid application_1594807273214_0004
      
    • (2)取消任務并手動觸發savepoint

      ##【針對on standAlone模式】
      bin/flink cancel -s [targetDirectory] jobId ##【針對on yarn模式需要指定-yid參數】
      bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]##例如:
      flink cancel 8d1bb7f88a486815f9b9cf97c304885b -yid application_1594807273214_0004
      
  • 3:從指定的savepoint啟動job

    bin/flink run -s savepointPath [runArgs]##例如:
    flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -s hdfs://node01:8020/flink/savepoints/savepoint-8d1bb7-c9187993ca94 -c com.kaikeba.checkpoint.TestCheckPoint original-flink_study-1.0-SNAPSHOT.jar
  • 4、清除savepoint數據

    bin/flink savepoint -d savepointPath
    

📖 6. Flink流式處理集成kafka

  • 對于實時處理當中,我們實際工作當中的數據源一般都是使用kafka,所以我們一起來看看如何通過Flink來集成kafka

  • Flink提供了一個特有的kafka connector去讀寫kafka topic的數據。flink消費kafka數據,并不是完全通過跟蹤kafka消費組的offset來實現去保證exactly-once的語義,而是flink內部去跟蹤offset和做checkpoint去實現exactly-once的語義,而且對于kafka的partition,Flink會啟動對應的并行度去處理kafka當中的每個分區的數據

  • Flink整合kafka官網介紹

    • https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html
6.1 導入pom依賴
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.9.2</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.9.2</version>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.1.0</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version>
</dependency>
6.2 將kafka作為flink的source來使用
  • 實際工作當中一般都是將kafka作為flink的source來使用
6.2.1 創建kafka的topic
  • 安裝好kafka集群,并啟動kafka集群,然后在node01執行以下命令創建kafka的topic為test
kafka-topics.sh --create --partitions 3 --topic test --replication-factor 1 --zookeeper node01:2181,node02:2181,node03:2181
6.2.2 代碼實現:
package com.kaikeba.kafkaimport java.util.Propertiesimport org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema/***  將kafka作為flink的source來使用*/
object FlinkKafkaSource {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//**隱式轉換import org.apache.flink.api.scala._//checkpoint**配置env.enableCheckpointing(100)env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)env.getCheckpointConfig.setCheckpointTimeout(60000)env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)//設置statebackendenv.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka_sink/checkpoints",true));val topic = "test"val prop = new Properties()prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")prop.setProperty("group.id","con1")prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");val kafkaConsumer = new FlinkKafkaConsumer[String]("test",new SimpleStringSchema,prop)kafkaConsumer.setCommitOffsetsOnCheckpoints(true)val kafkaSource: DataStream[String] = env.addSource(kafkaConsumer)kafkaSource.print()env.execute()}
}
6.2.3 kafka生產數據
  • node01執行以下命令,通過shell命令行來生產數據到kafka當中去
##創建topickafka-topics.sh --create --topic test --partitions 3 --replication-factor 2 --zookeeper node01:2181,node02:2181,node03:2181 ##發送數據
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic  test
6.3 將kafka作為flink的sink來使用
  • 我們也可以將kafka作為flink的sink來使用,就是將flink處理完成之后的數據寫入到kafka當中去
6.3.1 socket發送數據
  • node01執行以下命令,從socket當中發送數據
 nc -lk 9999
6.3.2 代碼實現
package com.kaikeba.kafkaimport java.util.Propertiesimport org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import org.apache.flink.streaming.util.serialization.SimpleStringSchema/*** 將kafka作為flink的sink來使用*/
object FlinkKafkaSink {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉換import org.apache.flink.api.scala._//checkpoint配置env.enableCheckpointing(5000);env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);env.getCheckpointConfig.setCheckpointTimeout(60000);env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設置statebackendenv.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink_kafka_sink/checkpoints",true));val socketStream = env.socketTextStream("node01",9999)val topic = "test"val prop = new Properties()prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")prop.setProperty("group.id","kafka_group1")//第一種解決方案,設置FlinkKafkaProducer里面的事務超時時間//設置事務超時時間prop.setProperty("transaction.timeout.ms",60000*15+"");//第二種解決方案,設置kafka的最大事務超時時間//FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema());//使用支持僅一次語義的形式/*** defaultTopic: String,* serializationSchema: KafkaSerializationSchema[IN],* producerConfig: Properties,* semantic: FlinkKafkaProducer.Semantic*/val kafkaSink = new FlinkKafkaProducer[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), prop,FlinkKafkaProducer.Semantic.EXACTLY_ONCE)socketStream.addSink(kafkaSink)env.execute("StreamingFromCollectionScala")}
}
6.3.3 啟動kafka消費者
  • node01執行以下命令啟動kafka消費者,消費數據
kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test

📖 7. Flink當中的window窗口

  • 對于流式處理,如果我們需要求取總和,平均值,或者最大值,最小值等,是做不到的,因為數據一直在源源不斷的產生,即數據是沒有邊界的,所以沒法求最大值,最小值,平均值等,所以為了一些數值統計的功能,我們必須指定時間段,對某一段時間的數據求取一些數據值是可以做到的。或者對某一些數據求取數據值也是可以做到的

  • 所以,流上的聚合需要由 window 來劃定范圍,比如 “計算過去的5分鐘” ,或者 “最后100個元素的和” 。

  • window是一種可以把無限數據切割為有限數據塊的手段

    • 窗口可以是 時間驅動的 【Time Window】(比如:每30秒)
    • 或者 數據驅動的【Count Window】 (比如:每100個元素)
      在這里插入圖片描述
  • 窗口類型匯總:
    在這里插入圖片描述

7.1 窗口的基本類型介紹
  • 窗口通常被區分為不同的類型:
    • tumbling windows:滾動窗口 【沒有重疊】

      • 滾動窗口下窗口之間之間不重疊,且窗口長度是固定的

      在這里插入圖片描述

    • sliding windows:滑動窗口 【有重疊】

      • 滑動窗口以一個步長(Slide)不斷向前滑動,窗口的長度固定

      在這里插入圖片描述

    • session windows:會話窗口 ,一般沒人用

      • Session window的窗口大小,則是由數據本身決定,它沒有固定的開始和結束時間。
      • 會話窗口根據Session gap切分不同的窗口,當一個窗口在大于Session gap的時間內沒有接收到新數據時,窗口將關閉

在這里插入圖片描述

7.2 Flink的窗口介紹
7.2.1 Time Window窗口的應用
  • time window又分為滾動窗口和滑動窗口,這兩種窗口調用方法都是一樣的,都是調用timeWindow這個方法,如果傳入一個參數就是滾動窗口,如果傳入兩個參數就是滑動窗口

? 在這里插入圖片描述

  • 需求:每隔5s時間,統計最近10s出現的數據

  • 代碼實現:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Timeobject TestTimeWindow {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val socketSource: DataStream[String] = environment.socketTextStream("node01",9999)socketSource.flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).timeWindow(Time.seconds(10),Time.seconds(5)).sum(1).print()environment.execute()}}
7.2.2 Count Windos窗口的應用
  • 與timeWindow類型,CountWinodw也可以分為滾動窗口和滑動窗口,這兩個窗口調用方法一樣,都是調用countWindow,如果傳入一個參數就是滾動窗口,如果傳入兩個參數就是滑動窗口

    ?
    在這里插入圖片描述

  • 需求:使用count Window 統計最近5條數的最大值

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}/*** 使用countWindow統計最近5條數據的最大值*/
object TestCountWindow {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val socketSource: DataStream[String] = environment.socketTextStream("node01",9999)/*** 發送數據* spark 1* spark 2* spark 3* spark 4* spark 5* hello 100* hello 90* hello 80* hello 70* hello 60* hello 10*/socketSource.map(x => (x.split(" ")(0),x.split(" ")(1).toInt)).keyBy(0).countWindow(5).aggregate(new AggregateFunction[(String,Int),Int,Double]{var initAccumulator :Int = 0override def createAccumulator(): Int = {initAccumulator}override def add(value: (String, Int), accumulator: Int): Int = {if(accumulator >= value._2){accumulator}else{value._2}}override def getResult(accumulator: Int): Double = {accumulator}override def merge(a: Int, b: Int): Int = {if(a>=b){a}else{b}}}).print()environment.execute()}
}
7.2.3 自定義window的應用
  • 如果time window 和 countWindow 還不夠用的話,我們還可以使用自定義window來實現數據的統計等功能。

    在這里插入圖片描述

7.3 window窗口數據的集合統計
  • 前面我們可以通過aggregrate實現數據的聚合,對于求最大值,最小值,平均值等操作,我們也可以通過process方法來實現

  • 對于某一個window內的數值統計,我們可以增量的聚合統計或者全量的聚合統計

7.3.1 增量聚合統計
  • 窗口當中每加入一條數據,就進行一次統計
  • 常用的聚合算子
    • reduce(reduceFunction)
    • aggregate(aggregateFunction)

在這里插入圖片描述

  • 需求

    • 通過接收socket當中輸入的數據,統計每5秒鐘數據的累計的值
  • 代碼實現

package com.kaikeba.windowimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Timeobject FlinkTimeCount {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val socketStream: DataStream[String] = environment.socketTextStream("node01",9999)socketStream.map(x => (1, x.toInt)).keyBy(0).timeWindow(Time.seconds(5)).reduce((c1,c2)=>(c1._1,c1._2+c2._2)).print()environment.execute("FlinkTimeCount")}}
7.3.2 全量聚合統計
  • 等到窗口截止,或者窗口內的數據全部到齊,然后再進行統計,可以用于求窗口內的數據的最大值,或者最小值,平均值等

  • 等屬于窗口的數據到齊,才開始進行聚合計算【可以實現對窗口內的數據進行排序等需求】

    • apply(windowFunction)
    • process(processWindowFunction)
    • processWindowFunction比windowFunction提供了更多的上下文信息。
  • 需求

    • 通過全量聚合統計,求取每3條數據的平均值
  • 代碼實現

package com.kaikeba.windowimport org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.util.Collector/*** 求取每3條數據的平均值*/
object FlinkCountWindowAvg {/*** 輸入數據* 1* 2* 3* 4* 5* 6* @param args*/def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val socketStream: DataStream[String] = environment.socketTextStream("node01",9999)//統計一個窗口內的數據的平均值socketStream.map(x => (1, x.toInt)).keyBy(0).countWindow(3)//通過process方法來統計窗口的平均值.process(new MyProcessWindowFunctionclass).print()//必須調用execute方法,否則程序不會執行environment.execute("count avg")}
}/**ProcessWindowFunction 需要跟四個參數* 輸入參數類型,輸出參數類型,聚合的key的類型,window的下界**/
class MyProcessWindowFunctionclass extends ProcessWindowFunction[(Int , Int) , Double , Tuple , GlobalWindow]{override def process(key: Tuple, context: Context, elements: Iterable[(Int, Int)], out: Collector[Double]): Unit = {var totalNum = 0;var countNum = 0;for(data <-  elements){totalNum +=1countNum += data._2}out.collect(countNum/totalNum)}
}

📖 ??8. checkpoint機制原理深度剖析

  • checkpoint是flink為了解決state一致性和容錯性引入的一種分布式的狀態快照機制。
8.1 Flink分布式快照流程
  • 首先我們來看一下一個簡單的Checkpoint的大致流程:
    1. 暫停處理新流入數據,將新數據緩存起來。
    2. 將算子子任務的本地狀態數據拷貝到一個遠程的持久化存儲上。
    3. 繼續處理新流入的數據,包括剛才緩存起來的數據。
8.2 Barrier機制

? flink是如何來實現分布式狀態快照的呢,由于flink是流式的計算引擎,基于這種特定的場景,Flink通過向流數據中注入特殊的事件來作為快照的信號,這種特殊事件就叫Barrier(屏障,柵欄)。當算子任務處理到Barrier n的時候就會執行狀態的快照并把它標記為n的狀態快照。
在這里插入圖片描述

  • checkpoint的調用流程:

      1. 首先是JobManager中的checkpoint Coordinator(協調器) 向任務中的所有source Task周期性發送barrier(柵欄)進行快照請求。
      1. source Task接受到barrier后, 會把當前自己的狀態進行snapshot(可以保存在HDFS上)。
      1. source向checkpoint coordinator確認snapshot已經完成。
      1. source繼續向下游transformation operator發送 barrier。
      1. transformation operator重復source的操作,直到sink operator向協調器確認snapshot完成。
      1. coordinator確認完成本周期的snapshot已經完成。
    
    // 5秒啟動一次checkpoint
    env.enableCheckpointing(5000)// 設置checkpoint只checkpoint一次
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// 設置兩次checkpoint的最小時間間隔
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)// checkpoint超時的時長
    env.getCheckpointConfig.setCheckpointTimeout(60000)// 允許的最大checkpoint并行度
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)// 當程序關閉的時,觸發額外的checkpoint
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)// 設置checkpoint的地址
    env.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink-checkpoint/"))
    
  • 注意

    Checkpoint Barrier被插入到數據流中,它將數據流切分成段。Flink的Checkpoint邏輯是,一段新數據流入導致狀態發生了變化,Flink的算子接收到Checkpoint Barrier后,對狀態進行快照。每個Checkpoint Barrier有一個ID,表示該段數據屬于哪次Checkpoint。如圖所示,當ID為n的Checkpoint Barrier到達每個算子后,表示要對n-1和n之間狀態的更新做快照。

在這里插入圖片描述

8.3 多任務并行下的checkpoint
  • 我們構建一個并行數據流圖,用這個并行數據流圖來演示Flink的分布式快照機制。這個數據流圖有兩個Source子任務,數據流會在這些并行算子上從Source流動到Sink。

在這里插入圖片描述

  • 首先,Flink的檢查點協調器(Checkpoint Coordinator)觸發一次Checkpoint(Trigger Checkpoint),這個請求會發送給Source的各個子任務。

在這里插入圖片描述

  • 各Source算子子任務接收到這個Checkpoint請求之后,會將自己的狀態寫入到狀態后端,生成一次快照,并且會向下游廣播Checkpoint Barrier。

在這里插入圖片描述

  • Source算子做完快照后,還會給Checkpoint Coodinator發送一個確認,告知自己已經做完了相應的工作。這個確認中包括了一些元數據,其中就包括剛才備份到State Backend的狀態句柄,或者說是指向狀態的指針。至此,Source完成了一次Checkpoint。跟Watermark的傳播一樣,一個算子子任務要把Checkpoint Barrier發送給所連接的所有下游算子子任務。
  • 對于下游算子來說,可能有多個與之相連的上游輸入,我們將算子之間的邊稱為通道。Source要將一個ID為n的Checkpoint Barrier向所有下游算子廣播,這也意味著下游算子的多個輸入里都有同一個Checkpoint Barrier,而且不同輸入里Checkpoint Barrier的流入進度可能不同。Checkpoint Barrier傳播的過程需要進行對齊(Barrier Alignment),我們從數據流圖中截取一小部分來分析Checkpoint Barrier是如何在算子間傳播和對齊的

在這里插入圖片描述

如上圖所示,對齊分為四步:

(1). 算子子任務在某個輸入通道中收到第一個ID為n的Checkpoint Barrier,但是其他輸入通道中ID為n的Checkpoint Barrier還未到達,該算子子任務開始準備進行對齊。

(2). 算子子任務將第一個輸入通道的數據緩存下來,同時繼續處理其他輸入通道的數據,這個過程被稱為對齊。

(3). 第二個輸入通道的Checkpoint Barrier抵達該算子子任務,該算子子任務執行快照,將狀態寫入State Backend,然后將ID為n的Checkpoint Barrier向下游所有輸出通道廣播。

(4). 對于這個算子子任務,快照執行結束,繼續處理各個通道中新流入數據,包括剛才緩存起來的數據。

  • 數據流圖中的每個算子子任務都要完成一遍上述的對齊、快照、確認的工作,當最后所有Sink算子確認完成快照之后,說明ID為n的Checkpoint執行結束,Checkpoint Coordinator向State Backend寫入一些本次Checkpoint的元數據。

在這里插入圖片描述

? 之所以要進行barrier對齊,主要是為了保證一個Flink作業所有算子的狀態是一致的。也就是說,某個ID為n的Checkpoint Barrier從前到后流入所有算子子任務后,所有算子子任務都能將同樣的一段數據寫入快照。

8.4 快照性能優化方案
  • 上面講到了一致性快照的具體流程,這種方式保證了數據的一致性,但有一些潛在的問題

    • (1)每次進行Checkpoint前,都需要暫停處理新流入數據,然后開始執行快照,假如狀態比較大,一次快照可能長達幾秒甚至幾分鐘。

    • (2)Checkpoint Barrier對齊時,必須等待所有上游通道都處理完,假如某個上游通道處理很慢,這可能造成整個數據流堵塞。

  • 優化方案

    • (1)對于第一個問題,Flink提供了異步快照(Asynchronous Snapshot)的機制。當實際執行快照時,Flink可以立即向下廣播Checkpoint Barrier,表示自己已經執行完自己部分的快照。一旦數據同步完成,再給Checkpoint Coordinator發送確認信息
    • (2)對于第二個問題,Flink允許跳過對齊這一步,或者說一個算子子任務不需要等待所有上游通道的Checkpoint Barrier,直接將Checkpoint Barrier廣播,執行快照并繼續處理后續流入數據。為了保證數據一致性,Flink必須將那些較慢的數據流中的元素也一起快照,一旦重啟,這些元素會被重新處理一遍。

??
Barrier對齊會影響執行效率,怎么跳過Barrier對齊,跳過后還能保證?Exactly-Once語義嗎?

8.5 任務重啟恢復流程
  • Flink的重啟恢復邏輯相對比較簡單:

    • 1、重啟應用,在集群上重新部署數據流圖。

    • 2、從持久化存儲上讀取最近一次的Checkpoint數據,加載到各算子子任務上。

    • 3、繼續處理新流入的數據。

  • 這樣的機制可以保證Flink內部狀態的Excatly-Once一致性。至于端到端的Exactly-Once一致性,要根據Source和Sink的具體實現而定。當發生故障時,一部分數據有可能已經流入系統,但還未進行Checkpoint,Source的Checkpoint記錄了輸入的Offset;當重啟時,Flink能把最近一次的Checkpoint恢復到內存中,并根據Offset,讓Source從該位置重新發送一遍數據,以保證數據不丟不重。像Kafka等消息隊列是提供重發功能的,socketTextStream就不具有這種功能,也意味著不能保證Exactly-Once投遞保障。

📖 9. Flink兩階段提交 TwoPhaseCommit

9.1 EXACTLY_ONCE語義概述
  • 何為EXACTLY_ONCE?
    • EXACTLY_ONCE簡稱EOS,每條輸入消息只會影響最終結果一次,注意這里是影響一次,而非處理一次,Flink一直宣稱自己支持EOS,實際上主要是對于Flink應用內部來說的,對于外部系統(端到端)則有比較強的限制
    • Flink實現端到端的EXACTLY_ONCE語義需要滿足:
      • 1.外部系統寫入支持冪等性
      • 2.外部系統支持以事務的方式寫入
  • Flink的基本思路就是將狀態定時地checkpiont到hdfs中去,當發生failure的時候恢復上一次的狀態,然后將輸出update到外部。這里需要注意的是輸入流的offset也是狀態的一部分,因此一旦發生failure就能從最后一次狀態恢復,從而保證輸出的結果是exactly once。這是Flink1.4之前的實現。
  • Flink在1.4.0版本引入了TwoPhaseCommitSinkFunction接口,并在Kafka Producer的connector中實現了它,支持了對外部Kafka Sink的EXACTLY_ONCE語義,來讓開發者用更少的代碼來實現端到端的exactly-once語義
9.2 兩階段提交協議介紹
  • 兩階段提交協議是協調所有分布式原子事務參與者,并決定提交或取消(回滾)的分布式算法

  • 協議參與者

兩階段提交指的是一種協議,經常用來實現分布式事務,可以簡單理解為預提交+實際提交,一般分為協調器Coordinator(以下簡稱C)和若干事務參與者Participant(以下簡稱P)兩種角色。

在這里插入圖片描述

  • 兩個階段的執行

    • 1.請求階段(commit-request phase,或稱表決階段,voting phase)

      在請求階段,協調者將通知事務參與者準備提交或取消事務,然后進入表決過程。
      在表決過程中,參與者將告知協調者自己的決策:同意(事務參與者本地作業執行成功)或取消(本地作業執行故障)。
      
      1. 提交階段(commit phase)
      在該階段,協調者將基于第一個階段的投票結果進行決策:提交或取消。
      當且僅當所有的參與者同意提交事務協調者才通知所有的參與者提交事務,否則協調者將通知所有的參與者取消事務。
      參與者在接收到協調者發來的消息后將執行響應的操作。
      
9.3 兩階段提交實現原理機制
  • Flink和外部系統(如Kafka)之間的消息傳遞如何做到exactly once呢?

  • 先看下面這幅圖會出現的問題

    在這里插入圖片描述

    • 當sink A已經往Kafka寫入了數據,而sink B fail.
    • 根據Flink的exactly once保證,系統會回滾到最近的checkpoint,
    • 但是sink A已經把數據寫入到kafka了.
    • Flink無法回滾kafka的state.因此,kafka將在之后再次接收到一份同樣的來自sink A的數據,
    • 這樣的message delivery便成為了at least once
  • Flink采用Two phase commit來解決這個問題.

    • Two phase commit

      • Phase 1: Pre-commit 預提交
        • Flink的JobManager向source注入checkpoint barrier以開啟這snapshot,barrier從source流向sink,
        • 每個進行snapshot的算子成功snapshot后,都會向JobManager發送ACK.
        • 當sink完成snapshot后, 向JobManager發送ACK的同時向kafka進行pre-commit.
      • Phase 2: Commit 實際提交
        • 當JobManager接收到所有算子的ACK后, 就會通知所有的算子這次checkpoint已經完成
        • Sink接收到這個通知后, 就向kafka進行commit, 正式把數據寫入到kafka
  • 下面我們來看看flink消費并寫入kafka的例子是如何通過兩部提交來保證exactly-once語義的。

    • kafka從0.11開始支持事物操作,若要使用flink端到端exactly-once語義需要flink的sink的kafka是0.11版本以上的

    • 這個例子包括以下幾個步驟:

      • 從kafka讀取數據
      • 一個聚合窗操作
      • 向kafka寫入數據

      在這里插入圖片描述

    • 1、JobManager向Source發送Barrier,開始進入pre-Commit階段,當Source收到Barrier后,將自身的狀態進行保存,后端可以根據配置進行選擇,這里的狀態是指消費的每個分區對應的offset。然后將Barrier發送給下一個Operator。

      在這里插入圖片描述

    • 2、當Window這個Operator收到Barrier之后,對自己的狀態進行保存,這里的狀態是指聚合的結果(sum或count的結果),然后將Barrier發送給Sink。Sink收到后也對自己的狀態進行保存,之后會進行一次預提交。

      在這里插入圖片描述

    • 3、預提交成功后,JobManager通知每個Operator,這一輪檢查點已經完成,這個時候,Kafka Sink會向Kafka進行真正的事務Commit。

      在這里插入圖片描述

以上便是兩階段的完整流程,不同階段fail over的recovery舉措:

? (1) 在pre-commit前fail over, 系統恢復到最近的checkponit

? (2) 在pre-commit后,commit前fail over,系統恢復到剛完成pre-commit時的狀態

因此,所有opeartor必須對checkpoint最終結果達成共識:

? 即所有operator都必須認定數據提交要么成功執行,要么被終止然后回滾。

9.4 兩階段提交的TwoPhaseCommitSinkFunction類
  • 在使用兩步提交算子時,我們可以繼承TwoPhaseCommitSinkFunction這個類。

    • TwoPhaseCommitSinkFunction有4個方法

        1. beginTransaction()

          開啟事務:創建一個臨時文件.后續把原要寫入到外部系統的數據寫入到這個臨時文件
          
        1. preCommit()
        預提交:flush并close這個文件,之后便不再往其中寫數據.同時開啟一個新的事務供下個checkponit使用
        
        1. commit()

          正式提交: 把pre-committed的臨時文件移動到指定目錄
          
        1. abort()

          丟棄: 刪除掉pre-committed的臨時文件
          

7?? 把所有的代碼都敲一遍

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/72054.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/72054.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/72054.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

在資源有限中逆勢突圍:從抗戰智謀到寒門高考的破局智慧

目錄 引言 一、歷史中的非對稱作戰&#xff1a;從李牧到八路軍的智謀傳承 李牧戍邊&#xff1a;古代軍事博弈中的資源重構 八路軍的游擊戰&#xff1a;現代戰爭中的智慧延續 二、創業界的逆襲之道&#xff1a;小米與拼多多的資源重構 從MVP到杠桿解 社交裂變與資源錯配 …

C#方法之詳解

一、方法基礎語法? C#方法是封裝代碼邏輯的基本單元&#xff0c;用于執行特定操作并支持模塊化編程?。 定義與結構? C#方法由訪問修飾符、返回值、方法名、參數列表和方法體構成。基礎語法如下&#xff1a; [訪問修飾符] [static] 返回值類型 方法名(參數列表) { // 方…

網頁打印很簡單!用web打印插件lodop輕松實現文件打印

最近&#xff0c;給客戶發一個事件提醒軟件&#xff0c;其中客戶要求實現打印功能&#xff0c;因為是用asp.net mvc 開發首先考慮到用水晶報表來實現&#xff08;crystalReport&#xff09;&#xff0c;以前開發c# winform程序&#xff0c;感覺水晶報表還是蠻好的&#xff0c;但…

Claude、ChatGPT、Gemini等主流AI模型。分別詳細介紹它們并進行對比,需要指出關鍵的時間點

以下是關于Claude、ChatGPT和Gemini三大主流AI模型的詳細介紹及對比分析&#xff0c;結合關鍵時間點和核心技術特征&#xff1a; 1. Claude&#xff08;Anthropic&#xff09; 關鍵時間點與版本迭代 2023年3月&#xff1a;初代Claude發布&#xff0c;定位為安全可控的對話模型…

統計登錄系統10秒內連續登錄失敗超過3次的用戶

為防止暴力破解用戶賬號的行為&#xff0c;在輸入賬號和密碼時一般都會限制用戶嘗試密碼輸出錯誤的次數&#xff0c;如果用戶多次輸錯密碼后&#xff0c;將在一段時間內鎖定賬號&#xff0c;常見的有銀行類APP、個稅App等應用&#xff0c;如下是用戶賬號密碼輸入錯誤的提示圖&a…

vue3通過render函數實現一個菜單下拉框

背景說明 鼠標移動到產品服務上時&#xff0c;出現標紅的下拉框。 使用純css的方案實現最簡單&#xff0c;但是沒什么技術含量&#xff0c;棄之&#xff1b;使用第三方組件庫&#xff0c;樣式定制麻煩棄之。因此&#xff0c;我們使用vue3直接在頁面創建一個dom作為下拉框吧。…

二、重學C++—C語言核心

上一章節&#xff1a; 一、重學C—C語言基礎-CSDN博客https://blog.csdn.net/weixin_36323170/article/details/146002496?spm1001.2014.3001.5502 本章節代碼&#xff1a; cPart2 CuiQingCheng/cppstudy - 碼云 - 開源中國https://gitee.com/cuiqingcheng/cppstudy/tree/…

2-003:MySQL 三層 B+ 樹能存多少數據?

1. 計算 B 樹能存儲多少數據 參數設定 每個數據頁&#xff08;Page&#xff09;大小&#xff1a;16KB&#xff08;16384 字節&#xff09;每個索引節點存儲的子節點數量&#xff1a; 索引項大小&#xff1a; 假設 bigint&#xff08;主鍵&#xff09;占 8 字節每個索引項存儲…

幾種常見的虛擬環境工具(Virtualenv、Conda、System Interpreter、Pipenv、Poetry)的區別和特點總結

在 PyCharm 中創建虛擬環境是一個非常直接的過程&#xff0c;可以幫助你管理項目依賴&#xff0c;確保不同項目之間的依賴不會沖突。 通過 PyCharm 創建虛擬環境 打開 PyCharm 并選擇或創建一個項目。 打開項目設置&#xff1a; 在 Windows/Linux 上&#xff0c;可以通過點擊…

Windows系統編程項目(四)窗口管理器

本章我們講解基于對話框的MFC窗口相關的操作 該管理器要實現以下功能 初始化列表 初始化列表表頭 初始化圖像列表 初始化列表 功能實現 加載菜單 刷新列表 結束進程 隱藏窗口 最大化窗口 最小化窗口 手搓窗口管理器 // CWindowManage.cpp: 實現文件 //#include "pch.h&…

優化 NFS 掛載參數以提升可靠性與容錯性

在現代 IT 基礎設施中&#xff0c;NFS&#xff08;網絡文件系統&#xff09;被廣泛用于共享文件和存儲。雖然 NFS 提供了便利&#xff0c;但在某些情況下&#xff0c;掛載失敗或網絡問題可能導致掛載操作不穩定。為了提高掛載的可靠性和容錯性&#xff0c;我們可以通過優化 NFS…

JavaScript事件循環機制

JavaScript 事件循環機制&#xff08;Event Loop&#xff09;詳解 JavaScript 是 單線程、非阻塞 語言&#xff0c;依賴 事件循環&#xff08;Event Loop&#xff09; 來實現異步編程。它的執行模型包括 調用棧&#xff08;Call Stack&#xff09;、任務隊列&#xff08;Task …

大模型架構記錄4-文檔切分 (chunks構建)

chunks&#xff1a; 塊 trunks : 樹干 “RAG”通常指 檢索增強生成&#xff08;Retrieval-Augmented Generation&#xff09; 主要框架&#xff1a;用戶提query&#xff0c;找到和它相關的&#xff0c;先把問題轉換為向量&#xff0c;和向量數據庫的數據做比較&#xff0c;檢…

物聯網IoT系列之MQTT協議基礎知識

文章目錄 物聯網IoT系列之MQTT協議基礎知識物聯網IoT是什么&#xff1f;什么是MQTT&#xff1f;為什么說MQTT是適用于物聯網的協議&#xff1f;MQTT工作原理核心組件核心機制 MQTT工作流程1. 建立連接2. 發布和訂閱3. 消息確認4. 斷開連接 MQTT工作流程圖MQTT在物聯網中的應用 …

第27周JavaSpringboot電商進階開發 1.企業級用戶驗證

課程筆記&#xff1a;注冊郵箱驗證 一、概述 從本小節開始&#xff0c;將學習如何進行注冊郵箱驗證。主要任務是給項目配置一個公共郵箱&#xff08;可自己注冊或由公司提供&#xff09;&#xff0c;用于向用戶發送驗證碼&#xff0c;幫助用戶完成注冊流程。課程中以QQ郵箱為…

數據庫---sqlite3

數據庫&#xff1a; 數據庫文件與普通文件區別: 1.普通文件對數據管理(增刪改查)效率低 2.數據庫對數據管理效率高,使用方便 常用數據庫: 1.關系型數據庫: 將復雜的數據結構簡化為二維表格形式 大型:Oracle、DB2 中型:MySql、SQLServer …

音視頻軟件工程師面試題

一、基礎知識 編解碼相關 H.264 和 H.265(HEVC)的主要區別是什么?視頻編解碼的基本流程是什么?關鍵技術有哪些?音頻編解碼(如 AAC、MP3、Opus)的區別和應用場景?什么是 B 幀、P 幀、I 幀?它們的作用是什么? 流媒體協議RTMP、HTTP-FLV、HLS、WebRTC 的區別和應用場景…

【系統架構設計師】測試方法

目錄 1. 說明2. 靜態測試3. 動態測試4. 黑盒測試5. 白盒測試6. 灰盒測試7. 自動化測試8.例題8.1 例題1 1. 說明 1.軟件測試方法的分類有很多種&#xff0c;以測試過程中程序執行狀態為依據可分為靜態測試&#xff08;Static Testing&#xff0c;ST&#xff09;和動態測試&…

tomcat配置應用----server.xml文件具體配置

1.tomcat項目目錄 默認項目目錄&#xff1a;tomcat安裝目錄/webapps目錄 如上圖所示&#xff0c;在tomcat的項目目錄下有很多子文件夾&#xff0c;這些子文件夾中都有一個項目首頁。 如上圖所示&#xff0c;將來我們去使用IP加端口號的方式去訪問tomcat的時候&#xff0c;默認是…