【Flink系列四】Window及Watermark

3.1、window

在 Flink 中 Window 可以將無限流切分成有限流,是處理有限流的核心組件,現在 Flink 中 Window 可以是時間驅動的(Time Window),也可以是數據驅動的(Count Window)。

Flink中的窗口可以分成:滾動窗口(Tumbling Window,無重疊),滑動窗口(Sliding Window,可能有重疊),會話窗口(Session Window,活動間隙),全局窗口(Gobal Window)

3.1.1、Tumbling Windows 滾動窗口

滾動窗口的assigner分發元素到指定大小的窗口。滾動窗口的大小是固定的,且各自范圍之間不重疊。

// 滾動event-time窗口
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 滾動processing-time窗口
input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.second(5))).<windowed transformation>(<window function>);// 長度為一天的滾動event-time窗口, 偏移量為-8小時
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);

如上一個例子所示,滾動窗口的 assigners 也可以傳入可選的 offset 參數。這個參數可以用來對齊窗口。 比如說,不設置 offset 時,長度為一小時的滾動窗口會與 linux 的 epoch 對齊。 你會得到如 1:00:00.000 - 1:59:59.999、2:00:00.000 - 2:59:59.999 等。 如果你想改變對齊方式,你可以設置一個 offset。如果設置了 15 分鐘的 offset, 你會得到 1:15:00.000 - 2:14:59.999、2:15:00.000 - 3:14:59.999 等。 一個重要的 offset 用例是根據 UTC-0 調整窗口的時差。比如說,在中國你可能會設置 offset 為 Time.hours(-8)。

3.1.2、Sliding Windows滑動窗口

滑動窗口的assigner 分發元素到指定大小的窗口,窗口大小通過 window size 參數設置。 滑動窗口需要一個額外的滑動距離(滑動步長window slide)參數來控制生成新窗口的頻率。 因此,如果 slide 小于窗口大小,滑動窗口可以允許窗口重疊。這種情況下,一個元素可能會被分發到多個窗口。

// 滑動 event-time 窗口
input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑動 processing-time 窗口
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑動 processing-time 窗口,偏移量為 -8 小時
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<windowed transformation>(<window function>);

3.1.3、Session Windows 會話窗口

會話窗口的 assigner 會把數據按活躍的會話分組。 與滾動窗口和滑動窗口不同,會話窗口不會相互重疊,且沒有固定的開始或結束時間。 會話窗口在一段時間沒有收到數據之后會關閉,即在一段不活躍的間隔之后。 會話窗口的 assigner 可以設置固定的會話間隔(session gap)或 用 session gap extractor 函數來動態地定義多長時間算作不活躍。 當超出了不活躍的時間段,當前的會話就會關閉,并且將接下來的數據分發到新的會話窗口。

// 設置了固定間隔的event-time會話窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 設置了動態間隔的event-time會話窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withDynamicGap((element)-> {// 決定并返回會話間隔})).<windowed transformation>(<window function>);// 設置了固定間隔的 processing-time session 窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 設置了動態間隔的 processing-time 會話窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {// 決定并返回會話間隔}))

3.1.4、Global Windows 全局窗口

全局窗口的 assigner 將擁有相同 key 的所有數據分發到一個全局窗口。 這樣的窗口模式僅在你指定了自定義的?trigger?時有用。 否則,計算不會發生,因為全局窗口沒有天然的終點去觸發其中積累的數據。

input.keyBy(<key selector>).window(GlobalWindows.create()).<windowed transformation>(<window function>);
3.1.5、Triggers窗口觸發

Trigger決定了一個窗口(由window assigner定義)何時可以被window function處理。一般來說,watermark的時間戳>=window endTime并且在窗口內有數據,就會觸發窗口的計算。每個WindowAssigner都有一個默認的Trigger。如果默認trigger無法滿足需求,可以在trigger(...)調用中指定自定義的trigger。

  • onElement() 每次往 window 增加一個元素的時候都會觸發
  • onEventTime() 當 event-time timer 被觸發的時候會調用
  • onProcessingTime() 當 processing-time timer 被觸發的時候會調用
  • onMerge() 對兩個 trigger 的 state 進行 merge 操作
  • clear() window 銷毀的時候被調用

上面的接口中前三個會返回一個 TriggerResult,TriggerResult 有如下幾種可能的選擇:

  • CONTINUE 不做任何事情
  • FIRE 觸發 window
  • PURGE 清空整個 window 的元素并銷毀窗口
  • FIRE_AND_PURGE 觸發窗口,然后銷毀窗口

3.2、time和watermark

3.2.1、time

在 Flink 中 Time 可以分為三種Event-Time,Processing-Time 以及 Ingestion-Time,三者的關系我們可以從下圖中得知:

3.2.2、watermark

Flink提出了watermark,專門處理EventTime窗口計算,其本質其實就是一個時間戳。因為對于遲到數據late element,不可能一直無限期等待,必須有一個機制來保證一個特定的時間后,必須取觸發window去進行計算,這種機制就是watermark

watermark本質上也是一種時間戳,由Apache Flink Source或者自定義的Watermark生成器按照需求Punctuated或者Periodic兩種方式生成的一種系統Event,與普通數據流Event一樣流轉到對應的下游算子,接收到Watermark Event的算子以此不斷調整自己管理的EventTime clock。 Apache Flink 框架保證Watermark單調遞增,算子接收到一個Watermark時候,框架知道不會再有任何小于該Watermark的時間戳的數據元素到來了,所以Watermark可以看做是告訴Apache Flink框架數據流已經處理到什么位置(時間維度)的方式。 Watermark的產生和Apache Flink內部處理邏輯如下圖所示:?

目前Apache Flink 有兩種生產Watermark的方式,如下:

  • Punctuated - 數據流中每一個遞增的EventTime都會產生一個Watermark。 在實際的生產中Punctuated方式在TPS很高的場景下會產生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成。
  • Periodic - 周期性的(一定時間間隔或者達到一定的記錄條數)產生一個Watermark。在實際的生產中Periodic的方式必須結合時間和積累條數兩個維度繼續周期性產生Watermark,否則在極端情況下會有很大的延時。

參閱:Apache Flink 漫談系列(03) - Watermark-阿里云開發者社區

我們可以考慮一個這樣的例子:某 App 會記錄用戶的所有點擊行為,并回傳日志(在網絡不好的情況下,先保存在本地,延后回傳)。A 用戶在 11:02 對 App 進行操作,B 用戶在 11:03 操作了 App,但是 A 用戶的網絡不太穩定,回傳日志延遲了,導致我們在服務端先接受到 B 用戶 11:03 的消息,然后再接受到 A 用戶 11:02 的消息,消息亂序了。那我們怎么保證基于 event-time 的窗口在銷毀的時候,已經處理完了所有的數據呢?這就是 watermark 的功能所在。watermark 會攜帶一個單調遞增的時間戳 t,watermark(t) 表示所有時間戳不大于 t 的數據都已經到來了,未來小于等于t的數據不會再來,因此可以放心地觸發和銷毀窗口了。下圖中給了一個亂序數據流中的 watermark 例子

3.2.3、遲到的數據

上面的 watermark 讓我們能夠應對亂序的數據,但是真實世界中我們沒法得到一個完美的 watermark 數值 — 要么沒法獲取到,要么耗費太大,因此實際工作中我們會使用近似 watermark — 生成 watermark(t) 之后,還有較小的概率接受到時間戳 t 之前的數據,在 Flink 中將這些數據定義為 “late elements”, 同樣我們可以在 window 中指定是允許延遲的最大時間(默認為 0),可以使用下面的代碼進行設置

設置allowedLateness之后,遲來的數據同樣可以觸發窗口,進行輸出,利用 Flink 的 side output 機制,我們可以獲取到這些遲到的數據,使用方式如下:

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/209669.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/209669.shtml
英文地址,請注明出處:http://en.pswp.cn/news/209669.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

c jpeg YUV圖片幀分割成 8*8 塊 ,與逆向把8*8還原為幀

1. 正向分割為若干8*8 塊 下面的程序為通用程序&#xff0c;可以分割任意塊 #include <stdlib.h> #include <string.h> #include <stdio.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdlib.h>…

如果微軟20年前開發.net core,JAVA會不會和IE一樣倒下了

可以跨平臺&#xff0c;大量類庫&#xff0c;微軟親自操刀&#xff0c;性能一流&#xff0c;因為沒有做跨平臺&#xff0c;.NET被 python,javascript等搶了一半以上市場。 如果微軟早早的推出類似.net core這樣的跨平臺語言&#xff0c;.net程序猿還會出在這樣的尷尬局面嗎眾所…

Java基礎-開發流程以及HelloWorld程序

目錄 1. Java的開發流程2. HelloWorld 1. Java的開發流程 開發Java程序&#xff0c;需要三個步驟&#xff1a;編寫代碼&#xff0c;編譯代碼&#xff0c;運行代碼 2. HelloWorld 編寫代碼 public class HelloWorld {public static void main(String[] args) {System.out.pri…

Ribbon 饑餓加載

Ribbon默認是采用懶加載&#xff0c;即第一次訪問時才會去創建LoadBalanceClient&#xff0c;請求時間會很長而饑餓加載則會在項目啟動時創建&#xff0c;降低第一次訪問的耗時&#xff0c;通過下面配置開啟饑餓加載: 一、懶加載 Ribbon 默認為懶加載即在首次啟動Application…

代碼隨想錄二刷 |二叉樹 | 二叉樹的層序遍歷

代碼隨想錄二刷 &#xff5c;二叉樹 &#xff5c; 二叉樹的層序遍歷 題目描述解題思路代碼實現 題目描述 102.二叉樹的層序遍歷 給你二叉樹的根節點 root &#xff0c;返回其節點值的 層序遍歷 。 &#xff08;即逐層地&#xff0c;從左到右訪問所有節點&#xff09;。 示例…

Flask 最佳實踐(一)

Flask是一個輕量級而強大的Python Web框架&#xff0c;它的簡潔性和靈活性使其成為許多開發者的首選。然而&#xff0c;為了確保項目的可維護性和可擴展性&#xff0c;我們需要遵循一些最佳實踐。本文將探討Flask中一些關鍵的最佳實踐。 1. 項目結構 構建一個清晰的項目結構是…

Java實現Socket聊天室

一、網絡編程是什么&#xff1f; 在網絡通信協議下&#xff0c;不同計算機上運行的程序&#xff0c;進行數據傳輸。 應用場景&#xff1a;即時通訊、網游對戰、金融證券、國際貿易、郵件、等等。 不管是什么場景&#xff0c;都是計算機與計算機之間通過網絡進行數據傳輸。 …

軟件測試之接口測試自動化(詳解版)

本著以和大家交流如何實現高效的接口測試為出發點&#xff0c;本文包含了我在接口測試領域的一些方法和心得&#xff0c;希望大家一起討論和分享&#xff0c;內容包括但不僅限于&#xff1a; 服務端接口測試介紹接口測試自動化介紹接口測試自動化實踐關于接口測試自動化的思考…

質量工程化,交付快速化

質量和速度之間權衡讓人很難取舍&#xff0c;而通過推進質量工程&#xff0c;以系統化的方式識別和優化系統痛點&#xff0c;可以幫助團隊構建既快又好的精益軟件生產系統。原文: Quality Engineered, Speed Delivered 所有人都想要更快的速度。 但需要解決復雜問題: 權衡質量會…

Kotlin(十四) 擴展函數和運算符重載

目錄 擴展函數 語法結構 代碼示例 運算符重載 語法結構 一元操作符 二元操作符 數值類型操作符 等于和不等于操作符 比較操作符 調用操作符 擴展函數 語法結構 對于擴張函數的語法結構其實很簡單&#xff0c;你想在那個類中添加擴張函數&#xff0c;那么你就用該類…

6. Zigzag Conversion

按照下標找規律注意leetcode的運行輸出&#xff0c;如果其中一組用例出現死循環&#xff0c;輸出結果會在一個文件&#xff0c;即部分測試用例正確&#xff0c;部分錯誤且出現死循環&#xff0c;則需辨別輸出結果屬于哪一份測試用例 class Solution { public:string convert(s…

(二)五種最新算法(SWO、COA、LSO、GRO、LO)求解無人機路徑規劃MATLAB

一、五種算法&#xff08;SWO、COA、LSO、GRO、LO&#xff09;簡介 1、蜘蛛蜂優化算法SWO 蜘蛛蜂優化算法&#xff08;Spider wasp optimizer&#xff0c;SWO&#xff09;由Mohamed Abdel-Basset等人于2023年提出&#xff0c;該算法模型雌性蜘蛛蜂的狩獵、筑巢和交配行為&…

w3school學習筆記3(NumPy)

系列文章目錄 文章目錄 系列文章目錄前言一、NumPy簡介二、NumPy入門三、NumPy創建四、NumPy數組索引五、NumPy數組裁切六、NumPy數據類型七、NumPy副本/視圖八、NumPy數據形狀九、NumPy數組重塑十、NumPy數組迭代總結 前言 一、NumPy簡介 1、什么是Numpy&#xff1f; NumPy是…

線上盲盒小程序,開啟互聯網盲盒時代

近年來&#xff0c;盲盒經濟在國內非常火爆&#xff0c;各類盲盒品牌層出不窮&#xff0c;深受國內外年輕人、消費者的喜愛。 目前&#xff0c;根據數據顯示&#xff0c;盲盒市場不僅在線下異常火熱&#xff0c;線上盲盒也是成為了大眾的新選擇。各類電商平臺中盲盒的成交額更…

Esxi7Esxi8設置VMFSL虛擬閃存的大小

Esxi7Esxi8設置VMFSL虛擬閃存的大小 ESXi7,8 默認安裝會分配一個 VMFSL(VMFS-L)(Local VMFS)很大空間(120G), 感覺很浪費, 實際給 8G 就可以了, 最少 6G , 經實驗,給2G沒法安裝 . Esxi7是虛擬閃存的 修改的方法是: 在安裝時修改 設置 autoPartitionOSDataSize8192 在cdromBoo…

快捷切換raw頁面到repo頁面-Raw2Repo插件

Raw2Repo By Rick &#x1f4d6;快捷切換代碼托管平臺raw頁面到repo頁面 &#x1f517;github鏈接 https://github.com/rickhqh/Raw2Repo ?Features 功能&#xff1a; ?單擊 Raw2Repo 插件按鈕&#xff0c;即可跳轉到相應的代碼倉庫頁面。?支持 GitHub、Gitee、GitCode …

spring boot整合mybatis進行部門管理管理的增刪改查

部門列表查詢&#xff1a; 功能實現&#xff1a; 需求&#xff1a;查詢數據庫表中的所有部門數據&#xff0c;展示在頁面上。 準備工作&#xff1a; 準備數據庫表dept&#xff08;部門表&#xff09;&#xff0c;實體類Dept。在項目中引入mybatis的起步依賴&#xff0c;mysql的…

【ET8】1.ET8入門-運行指南

主要學習網址 論壇地址為&#xff1a;https://et-framework.cn Git地址為&#xff1a;GitHub - egametang/ET: Unity3D Client And C# Server Framework 官方QQ群 : 474643097 項目檢出 檢出項目切換到release8.0分支 GitHub地址&#xff1a;GitHub - egametang/ET: Unity…

[足式機器人]Part2 Dr. CAN學習筆記-數學基礎Ch0-5Laplace Transform of Convolution卷積的拉普拉斯變換

本文僅供學習使用 本文參考&#xff1a; B站&#xff1a;DR_CAN Dr. CAN學習筆記-數學基礎Ch0-5Laplace Transform of Convolution卷積的拉普拉斯變換 Laplace Transform : X ( s ) L [ x ( t ) ] ∫ 0 ∞ x ( t ) e ? s t d t X\left( s \right) \mathcal{L} \left[ x\lef…

基于Swin_Transformer的圖像超分辨率系統

1.研究背景與意義 項目參考AAAI Association for the Advancement of Artificial Intelligence 研究背景與意義 隨著科技的不斷發展&#xff0c;圖像超分辨率技術在計算機視覺領域中變得越來越重要。圖像超分辨率是指通過使用計算機算法將低分辨率圖像轉換為高分辨率圖像的過…