【核心重點】Flink四大基石

1. Time(時間機制)

時間概念

  • 處理時間:執行具體操作時的機器時間(例如 Java的 System.currentTimeMillis()) )
  • 事件時間:數據本身攜帶的時間,事件產生時的時間。
  • 攝入時間:數據進入 Flink 的時間;在系統內部,會把它當做事件時間來處理。

事件時間在實際應用中更為廣泛,從Flink 1.12版本開始,Flink已經將事件時間作為默認的時間語義

Flink 可以根據不同的時間概念處理數據。

2. Window(窗口計算)

收集窗口時間內的數據,對窗口中收集數據進行聚合運算這就是窗口機制

窗口的生命周期

創建:屬于該窗口的第一個元素到達時就會創建該窗口,窗口事先定義好就是固定的,但是窗口創建時間不固定【窗口開始時間以水印所攜帶的時間戳作為標準】

銷毀:窗口結束時間之后,就會銷毀當前窗口

Flink窗口分類以及窗口 API

Watermark處理亂序數據

3. State(狀態機制)

什么是Flink的狀態

狀態其實是個變量,這個變量保存了數據流的歷史數據, 如果有新的數據流進來,會讀取狀態變量,將新的數據和歷史一起計算。

狀態分類

托管狀態(Managed State)和原始狀態(Raw State)

托管狀態就是由 Flink 統一管理的,狀態的存儲訪問、故障恢復和重組等一系列問題都由Flink實現,直接使用API

原始狀態則是自定義的,相當于就是開辟了一塊內存,需要自己管理,實現狀態的序列化和故障恢復。

通常采用 Flink 托管狀態來實現需求

算子狀態(Operator State)和按鍵分區狀態(Keyed State)

可以將托管狀態分為兩類:算子狀態和按鍵分區狀態。

keyBy 將DataStream轉換為KeyedStream,KeyedStream是特殊的DataStream。

KeyedState只能應用于KeyedStream,因此KeyedState的計算只能放在KeyBy之后

基于狀態(KeyedState)計算實現詞頻統計

代碼實現

事先定義一個實體類:

public class WordCount {private String word;private Integer count;// setter&getter&toString方法
}  

Flink程序基本流程:

/*** description: 基于狀態(KeyedState)計算實現詞頻統計*/
public class WordCountWithStateful {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> lines =env.socketTextStream("127.0.0.1",8888,"\n");lines.flatMap((String input, Collector<WordCount> output)-> {String[] words = input.split(" ");for(String word:words) {output.collect(new WordCount(word,1));}}).returns(WordCount.class)// keyBy之后,每個key都有對應的狀態,同一個key只能操作自己對應的狀態.keyBy(WordCount::getWord)// 狀態計算.flatMap(new WordCountStateFunc()).print();env.execute();}
}

計算函數:

public class WordCountStateFunc extends RichFlatMapFunction<WordCount, WordCount> {/*** 狀態變量*/private ValueState<WordCount> keyedState;/*** description: open方法中狀態變量的初始化*/@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<WordCount> valueStateDescriptor =// valueState描述器new ValueStateDescriptor<>(// 描述器的名稱"wordcountState",/* * 描述器的數據類型:** Flink有自己的一套數據類型,包含了JAVA和Scala的所有數據類型* 這些數據類型都是TypeInformation對象的子類。* TypeInformation對象統一了所有數據類型的序列化實現*/TypeInformation.of(WordCount.class));keyedState = getRuntimeContext().getState(valueStateDescriptor);}/*** description: keyedState計算邏輯*/@Overridepublic void flatMap(WordCount input, Collector<WordCount> output) throws Exception {// 讀取狀態WordCount lastKeyedState = keyedState.value();// 更新狀態if (lastKeyedState == null) {// 狀態還未賦值的情況 更新狀態keyedState.update(input);// 返回原數據output.collect(input);} else {// 狀態存在舊的狀態數據的情況Integer count = lastKeyedState.getCount() + input.getCount();WordCount newWordCount = new WordCount(input.getWord(), count);// 更新狀態keyedState.update(newWordCount);// 返回新的數據output.collect(newWordCount);}}}

keyedState狀態計算步驟

  1. 繼承Rich函數
  2. 重寫Open方法,對狀態變量進行初始化
  3. 狀態計算邏輯

為什么要進行有狀態的計算 ?

如果Flink發生了異常退出,checkpoint機制可以讀取保存的狀態,進行恢復。

廣播流、廣播狀態

有時希望算子并行子任務都保持同一份“全局”狀態,用來做統一的配置和規則設定。這時所有分區的所有數據都會訪問到同一個狀態,狀態就像被“廣播”到所有分區一樣,這種特殊的算子狀態,就叫作廣播狀態(BroadcastState)。【可以動態修改配置】

編碼步驟

  1. 構建事件流
  2. 構建廣播流
  3. 將事件流和廣播流連接
  4. 對連接后的流進行處理

狀態后端

Flink中,狀態的存儲、訪問以及維護,都是由一個可插拔的組件決定的,這個組件就叫作狀態后端(state backend)。狀態后端主要負責管理本地狀態的存儲方式和位置。

  1. Memory State Backend 【java內存HashMap】
  2. FS State Backend 【HDFS】
  3. RocksDB State Backend 【可持久化的key value存儲引擎】

選擇正確的狀態后端

HashMapStateBackend 是內存計算讀寫速度非常快;但是,狀態的大小到集群可用內存的限制,如果應用的狀態隨著時間不停地增長,就會耗盡內存資源。

RocksDB 是硬盤存儲,可以根據可用的磁盤空間進行擴展,所以它非常適合于超級海量狀態的存儲。不過由于每個狀態的讀寫都需要做序列化/反序列化,而且可能需要直接從磁盤讀取數據,這就會導致性能的降低,平均讀寫性能要比HashMapStateBackend慢一個數量級

空間和時間的抉擇

4. Checkpoint(容錯機制)

什么是Checkpoint(檢查點)

Checkpoint能生成快照(Snapshot)
若Flink程序崩潰,重新運行程序時可以有選擇地從這些快照進行恢復
Checkpoint是Flink可靠性的基石

Checkpoint和State的區別

State指某個算子的數據狀態(中間狀態),Checkpoint指所有算子的數據狀態(全局快照)
State保存在堆內存,Checkpoint持久化保存

Checkpoint分布式快照流程(重點)

水用擋板擋停讓水靜止,進行快照存儲;Checkpoint機制也是如此,Checkpoint Barrier類似擋板

步驟一

Source子任務收到了Checkpoint請求,該算子會對自己的數據狀態保存快照向自己的下一個算子發送Checkpoint Barrier
下一個算子只有收到上一個算子廣播過來的Checkpoint Barrier,才進行快照保存

步驟二

Sink算子已經收到了所有上游的Checkpoint Barrier時,進行以下2步操作:
1.保存自己的數據狀態,2.并直接通知檢查點協調器
檢查點協調器在收集所有的task通知后,就認為這次的Checkpoint全局完成了,從而進行持久化操作

Checkpoint如何保證數據的一致性(重點)

至少一次(at-least-once)

發生故障,可能會有重復數據

精確一次(exactly-once)

發生故障,能保證不丟失數據,也沒有重復數據

讀取最近一次存放的快照,數據重放重新計算,Checkpoint機制保證exactly-once

Checkpoint Barrier對齊機制

Barrie對齊機制保證了Checkpoint數據狀態的精確一致

下游算子上面對應多個上游算子,下游算子必須要等到上游算子所有的Checkpoint Barrier到齊之后,下游算子才會進行快照的輸入。(會把先到的Checkpoint Barrier數據先緩存起來,直到所有的Checkpoint Barrier全部到達,該算子才會進行快照操作

什么是savepoint(保存點)

基于checkpoint機制的快照

Checkpoint和Savepoint區別
Checkpoint是自動容錯恢復機制,Savepoint某個時間點的全局狀態鏡像
Checkpoint是Flink系統行為,Savepoint是用戶觸發
Checkpoint默認程序刪除,Savepoint會一直保存

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/210149.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/210149.shtml
英文地址,請注明出處:http://en.pswp.cn/news/210149.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

linux vim 基礎設置-自動填充文件頭

前言 當前為vimrc腳本設置&#xff0c;腳本位置在 ~/.vimrc or /etc/vimrc 當前為首次打開 C C Shell 文件&#xff0c;自動填充對應文件頭信息&#xff0c;再次打開時會修改對應時間信息 :set nu "顯示行號 :set hlsearch "搜索時 高亮"新建 .c .cpp .sh文件&a…

理解Go語言中的defer

引言 Go有許多在其他編程語言中可以找到的常見控制流關鍵字,例如if、switch、for等。defer是其他大多數編程語言中沒有的關鍵字,盡管它不太常見,但你很快就會看到它在你的程序中有多么有用。 defer語句的主要用途之一是清理資源,例如打開的文件,網絡連接和數據庫句柄。在…

在AWS Lambda上部署EC2編譯的FFmpeg工具——自定義層的方案

大綱 1 確定Lambda運行時環境1.1 Lambda系統、鏡像、內核版本1.2 運行時1.2.1 Python1.2.2 Java 2 環境準備2.1 創建EC2實例 3 編譯FFmpeg3.1 連接EC2 4 編譯5 上傳S3存儲桶5.1 創建S3桶5.2 創建IAM策略5.3 創建IAM角色5.4 EC2關聯角色5.5 修改桶策略5.6 打包并上傳 6 創建Lamb…

智能優化算法應用:基于海鷗算法3D無線傳感器網絡(WSN)覆蓋優化 - 附代碼

智能優化算法應用&#xff1a;基于海鷗算法3D無線傳感器網絡(WSN)覆蓋優化 - 附代碼 文章目錄 智能優化算法應用&#xff1a;基于海鷗算法3D無線傳感器網絡(WSN)覆蓋優化 - 附代碼1.無線傳感網絡節點模型2.覆蓋數學模型及分析3.海鷗算法4.實驗參數設定5.算法結果6.參考文獻7.MA…

【nuxt3】cannot read preperties of null (reading ‘$nuxt‘)

問題描述 vue3 中&#xff0c;通過 createVNode 創建子組件實例時&#xff0c;發現子組件無法獲取到父組件中的 router、store 信息&#xff0c;一旦子組件使用就會報錯。 問題原因 通過控制臺斷點調試&#xff0c;發現時 appContext 值為空導致的。懷疑是創建子組件的時候&a…

海外地區開啟IPV6無法訪問服務器問題

前言 最近有海外地區的用戶反饋無法訪問公司的網絡&#xff0c;無法下載應用和系統進行升級。了解到瀏覽器可以正常訪問公司域名&#xff0c;谷歌&#xff0c;油管等都能正常使用。日志分析GET請求服務器數據時沒有得到應答&#xff0c;最終查詢網絡相關修改確認與網絡IPV6有關…

掌握游戲開發的全方位知識:這些內容你一定要知道

游戲開發是一項涉及多學科的綜合性工作&#xff0c;從游戲設計到編程、美術、音頻、測試等多個方面都需要開發者具備廣泛的知識。以下是進行游戲開發時需要掌握的主要知識領域。 首先&#xff0c;游戲設計是整個過程的基石。這包括游戲機制和玩法設計、關卡設計、用戶界面&…

表示你的shell未被正確配置以使用conda activate--換成清華源anaconda

1 CommandNotFoundError: Your shell has not been properly configured to use conda activate. If using conda activate from a batch script, change your invocation to CALL conda.bat activate.To initialize your shell, run$ conda init <SHELL_NAME>這個錯誤提…

uniapp-獲取手機型號

要獲取當前設備的手機型號&#xff0c;您可以使用uni-app提供的uni.getSystemInfo() API來實現此目的。 代碼示例&#xff1a; uni.getSystemInfo({success: function(res) {console.log("手機型號&#xff1a;" res.platform)} })該方法會返回一個包含設備信息的…

JFrog推出面向Hugging Face的原生集成,為 ML 模型提供強大支持,實現DevOps、安全和AI的協調統一

2023年12月5日 —— 流式軟件公司、企業軟件供應鏈平臺提供商JFrog推出ML模型管理功能&#xff0c;這是業界首套旨在簡化機器學習&#xff08;ML&#xff09;模型管理和安全性的功能。JFrog 平臺中的全新ML模型管理功能使AI交付與企業現有的 DevOps 和 DevSecOps 實踐保持一致&…

計算機評價的主要性能指標

對計算機評價的主要性能指標如下&#xff1a; 1&#xff0e;時鐘頻率&#xff08;主頻&#xff09; 主頻是計算機的主要性能指標之一&#xff0c;在很大程度上決定了計算機的運算速度。CPU 的工作節拍是由主時鐘來控制的&#xff0c;主時鐘不斷產生固定頻率的時鐘脈沖&#xff…

一個簡單的可視化的A星自動尋路

一個簡單的應用場景&#xff0c;流程圖連線 源碼&#xff1a; addExample("A星路徑查找", function () {return {template: <div><div ref"main"></div></div>,data() { return {}; },computed: {},methods: {},mounted() {var c…

Python中的比較兩個字符串

更多資料獲取 &#x1f4da; 個人網站&#xff1a;ipengtao.com 在Python編程中&#xff0c;字符串比較是一項常見且關鍵的操作&#xff0c;涵蓋了諸多方法和技巧。比較兩個字符串是否相等、大小寫是否一致&#xff0c;或者在一個字符串中尋找特定的子字符串&#xff0c;都是日…

征途漫漫:汽車MCU的國產替代往事

01.西雁東飛&#xff0c;南下創業 1985年&#xff0c;山東大學物理系畢業的周生明加入878廠&#xff08;“北霸天”&#xff09;參與MOS電路研發&#xff0c;隨后幾年&#xff0c;大洋彼岸的英特爾相繼推出CPU 386\486、奔騰系列等產品。在摩爾定律的凸顯、進口和走私的劇烈沖…

基于Java房屋租賃管理系統

基于Java房屋租賃管理系統 功能需求 1、房源信息管理&#xff1a;系統需要能夠記錄和管理所有房源的詳細信息&#xff0c;包括房屋地址、房屋面積、租金、付款方式、房屋類型等。管理員應該可以添加、編輯和刪除房源信息。 2、租戶信息管理&#xff1a;系統需要能夠記錄和管…

class067 二維動態規劃【算法】

class067 二維動態規劃 code1 64. 最小路徑和 // 最小路徑和 // 給定一個包含非負整數的 m x n 網格 grid // 請找出一條從左上角到右下角的路徑&#xff0c;使得路徑上的數字總和為最小。 // 說明&#xff1a;每次只能向下或者向右移動一步。 // 測試鏈接 : https://leetcode…

<JavaEE> 經典設計模式之 -- 線程池

目錄 一、線程池的概念 二、Java 標準庫中的線程池類 2.1 ThreadPoolExecutor 類 2.1.1 corePoolSize 和 maximumPoolSize 2.1.2 keepAliveTime 和 unit 2.1.3 workQueue 2.1.4 threadFactory 2.1.5 handler 2.1.6 創建一個參數自定義的線程池 2.2 Executors 類 2.3…

go學習筆記(17)Blob and ArrayBuffer

最近在學習go websocket的時候&#xff0c;在學習實驗過程遇到一個比較奇怪問題。為什么我的數據返回是blob&#xff0c;而不是arrayBuffer&#xff1f;百思不得其解。 直到同事打包的時候微信小游戲遇到了一個報錯。FileReader不支持。 經過在社區查詢&#xff0c;官方答復是…

Qt之QCache和QContiguousCache

一.QCache QCache在構造的時候指定了緩存中允許的最大成本,也就是如下構造函數中的參數maxCost。默認情況下,QCaches maxCost() 是100。 QCache(int maxCost = 100) ~QCache() void clear() bool contains(const Key &key) const int count() const bool insert(const …

[原創] 電源芯片輸出端的紋波測試

網上有很多文章講解&#xff0c;電源芯片的紋波測試&#xff0c;原理圖各種講解&#xff0c;理論有余&#xff0c;實質性測試細節不夠細致&#xff0c;想寫一些測試步驟&#xff0c;作為分享和記錄。 1、設置示波器參數 1.1 校準示波器 1.2 探頭按鈕推到X1&#xff08;代表波…