3.1、window
在 Flink 中 Window 可以將無限流切分成有限流,是處理有限流的核心組件,現在 Flink 中 Window 可以是時間驅動的(Time Window),也可以是數據驅動的(Count Window)。
Flink中的窗口可以分成:滾動窗口(Tumbling Window,無重疊),滑動窗口(Sliding Window,可能有重疊),會話窗口(Session Window,活動間隙),全局窗口(Gobal Window)
3.1.1、Tumbling Windows 滾動窗口
滾動窗口的assigner分發元素到指定大小的窗口。滾動窗口的大小是固定的,且各自范圍之間不重疊。
// 滾動event-time窗口 input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 滾動processing-time窗口 input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.second(5))).<windowed transformation>(<window function>);// 長度為一天的滾動event-time窗口, 偏移量為-8小時 input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);
如上一個例子所示,滾動窗口的 assigners 也可以傳入可選的 offset 參數。這個參數可以用來對齊窗口。 比如說,不設置 offset 時,長度為一小時的滾動窗口會與 linux 的 epoch 對齊。 你會得到如 1:00:00.000 - 1:59:59.999、2:00:00.000 - 2:59:59.999 等。 如果你想改變對齊方式,你可以設置一個 offset。如果設置了 15 分鐘的 offset, 你會得到 1:15:00.000 - 2:14:59.999、2:15:00.000 - 3:14:59.999 等。 一個重要的 offset 用例是根據 UTC-0 調整窗口的時差。比如說,在中國你可能會設置 offset 為 Time.hours(-8)。
3.1.2、Sliding Windows滑動窗口
滑動窗口的assigner 分發元素到指定大小的窗口,窗口大小通過 window size 參數設置。 滑動窗口需要一個額外的滑動距離(滑動步長window slide)參數來控制生成新窗口的頻率。 因此,如果 slide 小于窗口大小,滑動窗口可以允許窗口重疊。這種情況下,一個元素可能會被分發到多個窗口。
// 滑動 event-time 窗口 input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑動 processing-time 窗口 input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑動 processing-time 窗口,偏移量為 -8 小時 input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<windowed transformation>(<window function>);
3.1.3、Session Windows 會話窗口
會話窗口的 assigner 會把數據按活躍的會話分組。 與滾動窗口和滑動窗口不同,會話窗口不會相互重疊,且沒有固定的開始或結束時間。 會話窗口在一段時間沒有收到數據之后會關閉,即在一段不活躍的間隔之后。 會話窗口的 assigner 可以設置固定的會話間隔(session gap)或 用 session gap extractor 函數來動態地定義多長時間算作不活躍。 當超出了不活躍的時間段,當前的會話就會關閉,并且將接下來的數據分發到新的會話窗口。
// 設置了固定間隔的event-time會話窗口 input.keyBy(<key selector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 設置了動態間隔的event-time會話窗口 input.keyBy(<key selector>).window(EventTimeSessionWindows.withDynamicGap((element)-> {// 決定并返回會話間隔})).<windowed transformation>(<window function>);// 設置了固定間隔的 processing-time session 窗口 input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 設置了動態間隔的 processing-time 會話窗口 input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {// 決定并返回會話間隔}))
3.1.4、Global Windows 全局窗口
全局窗口的 assigner 將擁有相同 key 的所有數據分發到一個全局窗口。 這樣的窗口模式僅在你指定了自定義的?trigger?時有用。 否則,計算不會發生,因為全局窗口沒有天然的終點去觸發其中積累的數據。
input.keyBy(<key selector>).window(GlobalWindows.create()).<windowed transformation>(<window function>);
3.1.5、Triggers窗口觸發
Trigger決定了一個窗口(由window assigner定義)何時可以被window function處理。一般來說,watermark的時間戳>=window endTime并且在窗口內有數據,就會觸發窗口的計算。每個WindowAssigner都有一個默認的Trigger。如果默認trigger無法滿足需求,可以在trigger(...)調用中指定自定義的trigger。
- onElement() 每次往 window 增加一個元素的時候都會觸發
- onEventTime() 當 event-time timer 被觸發的時候會調用
- onProcessingTime() 當 processing-time timer 被觸發的時候會調用
- onMerge() 對兩個 trigger 的 state 進行 merge 操作
- clear() window 銷毀的時候被調用
上面的接口中前三個會返回一個 TriggerResult,TriggerResult 有如下幾種可能的選擇:
- CONTINUE 不做任何事情
- FIRE 觸發 window
- PURGE 清空整個 window 的元素并銷毀窗口
- FIRE_AND_PURGE 觸發窗口,然后銷毀窗口
3.2、time和watermark
3.2.1、time
在 Flink 中 Time 可以分為三種Event-Time,Processing-Time 以及 Ingestion-Time,三者的關系我們可以從下圖中得知:
3.2.2、watermark
Flink提出了watermark,專門處理EventTime窗口計算,其本質其實就是一個時間戳。因為對于遲到數據late element,不可能一直無限期等待,必須有一個機制來保證一個特定的時間后,必須取觸發window去進行計算,這種機制就是watermark
watermark本質上也是一種時間戳,由Apache Flink Source或者自定義的Watermark生成器按照需求Punctuated或者Periodic兩種方式生成的一種系統Event,與普通數據流Event一樣流轉到對應的下游算子,接收到Watermark Event的算子以此不斷調整自己管理的EventTime clock。 Apache Flink 框架保證Watermark單調遞增,算子接收到一個Watermark時候,框架知道不會再有任何小于該Watermark的時間戳的數據元素到來了,所以Watermark可以看做是告訴Apache Flink框架數據流已經處理到什么位置(時間維度)的方式。 Watermark的產生和Apache Flink內部處理邏輯如下圖所示:?
目前Apache Flink 有兩種生產Watermark的方式,如下:
- Punctuated - 數據流中每一個遞增的EventTime都會產生一個Watermark。 在實際的生產中Punctuated方式在TPS很高的場景下會產生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成。
- Periodic - 周期性的(一定時間間隔或者達到一定的記錄條數)產生一個Watermark。在實際的生產中Periodic的方式必須結合時間和積累條數兩個維度繼續周期性產生Watermark,否則在極端情況下會有很大的延時。
參閱:Apache Flink 漫談系列(03) - Watermark-阿里云開發者社區
我們可以考慮一個這樣的例子:某 App 會記錄用戶的所有點擊行為,并回傳日志(在網絡不好的情況下,先保存在本地,延后回傳)。A 用戶在 11:02 對 App 進行操作,B 用戶在 11:03 操作了 App,但是 A 用戶的網絡不太穩定,回傳日志延遲了,導致我們在服務端先接受到 B 用戶 11:03 的消息,然后再接受到 A 用戶 11:02 的消息,消息亂序了。那我們怎么保證基于 event-time 的窗口在銷毀的時候,已經處理完了所有的數據呢?這就是 watermark 的功能所在。watermark 會攜帶一個單調遞增的時間戳 t,watermark(t) 表示所有時間戳不大于 t 的數據都已經到來了,未來小于等于t的數據不會再來,因此可以放心地觸發和銷毀窗口了。下圖中給了一個亂序數據流中的 watermark 例子
3.2.3、遲到的數據
上面的 watermark 讓我們能夠應對亂序的數據,但是真實世界中我們沒法得到一個完美的 watermark 數值 — 要么沒法獲取到,要么耗費太大,因此實際工作中我們會使用近似 watermark — 生成 watermark(t) 之后,還有較小的概率接受到時間戳 t 之前的數據,在 Flink 中將這些數據定義為 “late elements”, 同樣我們可以在 window 中指定是允許延遲的最大時間(默認為 0),可以使用下面的代碼進行設置
設置allowedLateness
之后,遲來的數據同樣可以觸發窗口,進行輸出,利用 Flink 的 side output 機制,我們可以獲取到這些遲到的數據,使用方式如下: