1. Time(時間機制)
時間概念
- 處理時間:執行具體操作時的機器時間(例如 Java的
System.currentTimeMillis()
) ) - 事件時間:數據本身攜帶的時間,事件產生時的時間。
- 攝入時間:數據進入 Flink 的時間;在系統內部,會把它當做事件時間來處理。
事件時間在實際應用中更為廣泛,從Flink 1.12版本開始,Flink已經將
事件時間作為默認的時間語義
。
Flink 可以根據不同的時間概念處理數據。
2. Window(窗口計算)
收集窗口時間內的數據,對窗口中收集數據進行聚合運算這就是窗口機制
。
窗口的生命周期
創建:屬于該窗口的第一個元素到達時
就會創建該窗口,窗口事先定義好就是固定的,但是窗口創建時間不固定【窗口開始時間以水印所攜帶的時間戳作為標準】
銷毀:窗口結束時間之后,就會銷毀當前窗口
Flink窗口分類以及窗口 API
Watermark處理亂序數據
3. State(狀態機制)
什么是Flink的狀態
狀態
其實是個變量,這個變量保存了數據流的歷史數據, 如果有新的數據流進來,會讀取狀態變量,將新的數據和歷史一起計算。
狀態分類
托管狀態(Managed State)和原始狀態(Raw State)
托管狀態
就是由 Flink 統一管理的,狀態的存儲訪問、故障恢復和重組等一系列問題都由Flink實現,直接使用API
原始狀態
則是自定義的,相當于就是開辟了一塊內存,需要自己管理,實現狀態的序列化和故障恢復。
通常采用 Flink 托管狀態來實現需求
。
算子狀態(Operator State)和按鍵分區狀態(Keyed State)
可以將托管狀態分為兩類
:算子狀態和按鍵分區狀態。
keyBy 將DataStream轉換為KeyedStream,KeyedStream是特殊的DataStream。
KeyedState只能應用于KeyedStream,因此KeyedState的計算只能放在KeyBy之后
基于狀態(KeyedState)計算實現詞頻統計
代碼實現
事先定義一個實體類:
public class WordCount {private String word;private Integer count;// setter&getter&toString方法
}
Flink程序基本流程:
/*** description: 基于狀態(KeyedState)計算實現詞頻統計*/
public class WordCountWithStateful {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> lines =env.socketTextStream("127.0.0.1",8888,"\n");lines.flatMap((String input, Collector<WordCount> output)-> {String[] words = input.split(" ");for(String word:words) {output.collect(new WordCount(word,1));}}).returns(WordCount.class)// keyBy之后,每個key都有對應的狀態,同一個key只能操作自己對應的狀態.keyBy(WordCount::getWord)// 狀態計算.flatMap(new WordCountStateFunc()).print();env.execute();}
}
計算函數:
public class WordCountStateFunc extends RichFlatMapFunction<WordCount, WordCount> {/*** 狀態變量*/private ValueState<WordCount> keyedState;/*** description: open方法中狀態變量的初始化*/@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<WordCount> valueStateDescriptor =// valueState描述器new ValueStateDescriptor<>(// 描述器的名稱"wordcountState",/* * 描述器的數據類型:** Flink有自己的一套數據類型,包含了JAVA和Scala的所有數據類型* 這些數據類型都是TypeInformation對象的子類。* TypeInformation對象統一了所有數據類型的序列化實現*/TypeInformation.of(WordCount.class));keyedState = getRuntimeContext().getState(valueStateDescriptor);}/*** description: keyedState計算邏輯*/@Overridepublic void flatMap(WordCount input, Collector<WordCount> output) throws Exception {// 讀取狀態WordCount lastKeyedState = keyedState.value();// 更新狀態if (lastKeyedState == null) {// 狀態還未賦值的情況 更新狀態keyedState.update(input);// 返回原數據output.collect(input);} else {// 狀態存在舊的狀態數據的情況Integer count = lastKeyedState.getCount() + input.getCount();WordCount newWordCount = new WordCount(input.getWord(), count);// 更新狀態keyedState.update(newWordCount);// 返回新的數據output.collect(newWordCount);}}}
keyedState狀態計算步驟
- 繼承Rich函數
- 重寫Open方法,對狀態變量進行初始化
- 狀態計算邏輯
為什么要進行有狀態的計算 ?
如果Flink發生了異常退出,checkpoint機制可以讀取保存的狀態,進行恢復。
廣播流、廣播狀態
有時希望算子并行子任務都保持同一份“全局”狀態,用來做統一的配置和規則設定。這時所有分區的所有數據都會訪問到同一個狀態,狀態就像被“廣播”到所有分區一樣,這種特殊的算子狀態,就叫作廣播狀態
(BroadcastState)。【可以動態修改配置】
編碼步驟
- 構建事件流
- 構建廣播流
- 將事件流和廣播流連接
- 對連接后的流進行處理
狀態后端
Flink中,狀態的存儲、訪問以及維護,都是由一個可插拔的組件決定的,這個組件就叫作狀態后端(state backend)
。狀態后端主要負責管理本地狀態的存儲方式和位置。
- Memory State Backend 【java內存HashMap】
- FS State Backend 【HDFS】
- RocksDB State Backend 【可持久化的key value存儲引擎】
選擇正確的狀態后端
HashMapStateBackend 是內存計算
,讀寫速度非常快;但是,狀態的大小會受到集群可用內存的限制,如果應用的狀態隨著時間不停地增長,就會耗盡內存資源。
RocksDB 是硬盤存儲
,可以根據可用的磁盤空間進行擴展,所以它非常適合于超級海量狀態的存儲。不過由于每個狀態的讀寫都需要做序列化/反序列化,而且可能需要直接從磁盤讀取數據,這就會導致性能的降低,平均讀寫性能要比HashMapStateBackend慢一個數量級。
空間和時間的抉擇
4. Checkpoint(容錯機制)
什么是Checkpoint(檢查點)
Checkpoint能生成快照(Snapshot)
若Flink程序崩潰,重新運行程序時可以有選擇地從這些快照進行恢復
Checkpoint是Flink可靠性的基石
Checkpoint和State的區別
State指某個算子的數據狀態
(中間狀態),Checkpoint指所有算子的數據狀態
(全局快照)
State保存在堆內存
,Checkpoint持久化保存
Checkpoint分布式快照流程(重點)
水用擋板擋停讓水靜止,進行快照存儲;Checkpoint機制也是如此,Checkpoint Barrier類似擋板
步驟一:
當Source子任務收到了Checkpoint請求
,該算子會對自己的數據狀態保存快照
向自己的下一個算子發送Checkpoint Barrier
下一個算子只有收到上一個算子廣播過來的Checkpoint Barrier,才進行快照保存
步驟二:
當Sink算子已經收到了所有上游的Checkpoint Barrier
時,進行以下2步操作:
1.保存自己的數據狀態,2.并直接通知檢查點協調器
檢查點協調器在收集所有的task通知后
,就認為這次的Checkpoint全局完成了,從而進行持久化操作
Checkpoint如何保證數據的一致性(重點)
至少一次(at-least-once)
發生故障,可能會有重復數據
精確一次(exactly-once)
發生故障,能保證不丟失數據,也沒有重復數據
讀取最近一次存放的快照,數據重放重新計算,Checkpoint機制保證exactly-once
Checkpoint Barrier對齊機制
Barrie對齊機制
保證了Checkpoint數據狀態的精確一致
下游算子上面對應多個上游算子,下游算子必須要等到上游算子所有的Checkpoint Barrier到齊之后,下游算子才會進行快照的輸入。(會把先到的Checkpoint Barrier數據先緩存起來,直到所有的Checkpoint Barrier全部到達,該算子才會進行快照操作)
什么是savepoint(保存點)
基于checkpoint機制的快照
Checkpoint和Savepoint區別
Checkpoint是自動容錯恢復機制
,Savepoint某個時間點的全局狀態鏡像
Checkpoint是Flink系統行為
,Savepoint是用戶觸發
Checkpoint默認程序刪除
,Savepoint會一直保存