flink 不設置水印_區分理解Flink水印延遲與窗口允許延遲的概念

link 在開窗處理事件時間(Event Time) 數據時,可設置水印延遲以及設置窗口允許延遲(allowedLateness)以保證數據的完整性。這兩者因都是設置延遲時間所以剛接觸時容易混淆。本文接下將展開討論分析“水印延遲”與“窗口允許延遲”概念及區別。

水印延遲(WaterMark)

(1) 水印

由于采用了事件時間,脫離了物理掛鐘。窗口不知道什么時候需要關閉并進行計算,這個時候需要借助水印來解決該問題。當窗口遇到水位標識時就默認是窗口時間段內的數據都到齊了,可以觸發窗口計算。

(2) 水印延遲

設置水印延遲時間的目的是讓水印延遲到達,從而可以解決亂序問題。通過水印延遲到達讓在延遲時間范圍內到達的遲到數據可以加入到窗口計算中,保證了數據的完整性。當水印到達后就會觸發窗口計算,在水印之后到達的遲到數據則會被丟棄。

窗口允許延遲(allowedLateness)

使用 StreamAPI 時,在進行開窗后可設置 allowedLateness 窗口延遲。官網中對其解釋如下:

默認情況下,當水印到達窗口末端時,遲到元素將會被刪除。但Flink允許為window operators指定允許的最大延遲。允許延遲指定元素在被刪除之前延遲的時間,默認值為0。當元素在水印經過窗口末端后到達,且它的到達時間在窗口末端加上運行延遲的時間之內,其仍會被添加到窗口中。根據所使用的觸發器,延遲但未被丟棄的元素可能會再次觸發窗口計算。EventTimeTrigger就是這種情況。為了做到這一點,Flink保持窗口的狀態,直到它們允許的延遲到期。一旦發生這種情況,Flink將刪除窗口并刪除其狀態,正如窗口生命周期部分中所描述的那樣。

簡單理解:通常在水印到達之后遲到數據將會被刪除,而窗口的延遲則是指數據在被刪除之前的允許保留時間。也就是說,在水印達到之后遲到數據本該被刪除,但是如果設置了窗口延遲,那么在水印之后到窗口延遲時間段內到達的遲到數據還是會被加入到窗口計算中,并再次觸發窗口計算。

一個Demo 兩個猜想

下面我用一個 Demo 和兩個猜想來幫助大家加深理解這兩個概念。

例子:接收 Kafka 數據,數據為 JSON 格式如:{"word":"a","count":1,"time":1604286564}。我們開一個 5 秒的 tumbling windows 滾動窗口,以 word 作為 key 在窗口內對 count 值進行累加。同時設置水印延遲 2 秒,窗口延遲 2 秒。代碼如下:

public?class?MyExample?{

public?static?void?main(String[]?args)?throws?Exception?{

//?創建環境

StreamExecutionEnvironment?env=StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

//?設置時間特性為

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//?水印策略,其需要注入Timestamp?Assigner(描述了如何訪問事件時間戳)和?Watermark?Generator?(事件流顯示的超出正常范圍的程度)

WatermarkStrategywatermarkStrategy=WatermarkStrategy

//?forBoundedOutOfOrderness?屬于(periodic周期性),周期生成器通常通過onEvent()觀察傳入的事件,然后在框架調用onPeriodicEmit()時發出水印。

.forBoundedOutOfOrderness(Duration.ofSeconds(2))

.withTimestampAssigner(new?SerializableTimestampAssigner()?{

@Override

public?long?extractTimestamp(WC?wc,?long?l)?{

return?wc.getEventTime()?*?1000;

}

});

//?Kafka?配置

Properties?properties=newProperties();

properties.setProperty("bootstrap.servers",?"Kafka地址:9092");

properties.setProperty("group.id",?"test");

//?Flink?需要知道如何轉換Kafka消息為Java對象(反序列化),默認提供了?KafkaDeserializationSchema(序列化需要自己編寫)、JsonDeserializationSchema、AvroDeserializationSchema、TypeInformationSerializationSchema

env.addSource(new?FlinkKafkaConsumer<>("flinktest1",?new?JSONKeyValueDeserializationSchema(true),?properties).setStartFromLatest())

//?map?構建?WC?對象

.map(new?MapFunction()?{

@Override

public?WC?map(ObjectNode?jsonNode)?throws?Exception?{

JsonNode?valueNode=jsonNode.get("value");

WC?wc=newWC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong());

return?wc;

}

})

//?設定水印策略

.assignTimestampsAndWatermarks(watermarkStrategy)

.keyBy(WC::getWord)

//?窗口設置,這里設置為滾動窗口

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

//?設置窗口延遲

.allowedLateness(Time.seconds(2))

.reduce(new?ReduceFunction()?{

@Override

public?WC?reduce(WC?wc,?WC?t1)?throws?Exception?{

return?new?WC(wc.getWord(),?wc.getCount()?+?t1.getCount());

}

})

.print();

env.execute();

}

static?class?WC?{

public?String?word;

public?int?count;

public?long?eventTime;

public?long?getEventTime()?{

return?eventTime;

}

public?void?setEventTime(long?eventTime)?{

this.eventTime=?eventTime;

}

public?String?getWord()?{

return?word;

}

public?void?setWord(String?word)?{

this.word=?word;

}

public?int?getCount()?{

return?count;

}

public?void?setCount(int?count)?{

this.count=?count;

}

public?WC(String?word,?int?count)?{

this.word=?word;

this.count=?count;

}

public?WC(String?word,?int?count,long?eventTime)?{

this.word=?word;

this.count=?count;

this.eventTime=?eventTime;

}

@Override

public?String?toString()?{

return?"WC{"?+

"word='"?+?word?+?'\''?+

",?count="?+?count?+

'}';

}

}

}

猜想1:

水印延遲 2s 達到,所以會在第 5 + 2 = 7s 時認為 [ 0 ,5 ) 窗口的數據全部到齊,并觸發窗口計算。

//?往?Kafka?中寫入數據

{"word":"a","count":1,"time":1604286560}???//2020-11-02?11:09:20

{"word":"a","count":1,"time":1604286561}???//2020-11-02?11:09:21

{"word":"a","count":1,"time":1604286562}???//2020-11-02?11:09:22

{"word":"a","count":1,"time":1604286566}???//2020-11-02?11:09:26

{"word":"a","count":1,"time":1604286567}???//2020-11-02?11:09:27?(觸發了窗口計算)

控制臺輸出

分析:通過測試發現最后在第 7s 也就是 11:09:27 時觸發了窗口計算,這符合了我們的猜想一。水印延遲 2s 達到,所以會在第 5 + 2 = 7s 時認為 [ 0 ,5 ) 窗口的數據全部到齊,并觸發窗口計算。計算結果為3,這是因為只有最前面的3條數據屬于 [0,5) 窗口計算范圍之內。

猜想2:

設置了窗口延遲2秒,那么只要在水印之后到窗口允許延遲的時間范圍內達到且屬于 [ 0,5) 窗口的遲到數據會被加入到窗口中,且再次觸發窗口運算:

//?繼續往?Kafka?中寫入數據

{"word":"a","count":1,"time":1604286568}???//2020-11-02?11:09:28?時間到達了第?8?秒

{"word":"a","count":1,"time":1604286563}???//2020-11-02?11:09:23?模擬一個在水印之后、在窗口允許延遲范圍內、且屬于[0,5)?窗口的遲到數據,該數據還是會觸發并參與到[0,5)?窗口的計算

控制臺輸出新增了一行

//?我們再繼續往?Kafka?中寫入數據

{"word":"a","count":1,"time":1604286569}??//2020-11-02?11:09:29??時間到達第9秒

{"word":"a","count":1,"time":1604286563}??//2020-11-02?11:09:23?模擬一個在水印之后且超出窗口允許延遲范圍、且屬于[0,5)?窗口的遲到數據,該數據不會參與和觸發[0,5)窗口計算

查看控制臺并沒有發現新的輸出打印。

解析:水印因延遲在第 7s 到達之后會觸發[0,5) 窗口計算,如果沒有設置窗口延遲的情況下,水印之后遲到且屬于 [0,5) 窗口的數據會被丟棄。上面我們實驗設置窗口延遲 2s,實現的效果就是在水印之后,窗口允許延遲時間之內(7 + 2 = 9s 之間),遲到且屬于 [0,5) 窗口的數據還是會觸發一次窗口計算,并參與到窗口計算中。而在 9s 之后,也就是超過窗口允許延時時間,那么遲到且屬于[0,5)的數據就會被丟棄。

總結

WaterMark 到達之前,窗口在攢數據,不會觸發計算。

WaterMark 等于 windowEndTime 時,第一次觸發窗口計算。

WaterMark 到達之后,allowlateness之前,如果來了數據,每條數據都會觸發窗口計算。

超過了allowlateness之后到達的遲到數據會丟棄。

水印用于解決亂序問題保證數據的完整性。而之所以有allowlateness的出現是因為如果WaterMark 加大會導致窗口計算延遲。WaterMark 設定的時間,是第一次觸發窗口計算的時間。allowlateness 表示,WaterMark 觸發窗口計算以后,還可以再等多久的遲到數據,每次符合條件的數據到達都會再次觸發一次窗口計算。allowlateness 是在 Watermark 基礎上再做了一層遲到數據的保證。

【責任編輯:趙寧寧 TEL:(010)68476606】

點贊 0

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/533467.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/533467.shtml
英文地址,請注明出處:http://en.pswp.cn/news/533467.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

愛特php文件管理器2.8_查找「超級蜘蛛池開發者中心 摳:44564876易」安卓應用 - 豌豆莢...

8.6萬人安裝開發者頭條 - 程序員分享平臺 2015 年獲「最美應用」官方推薦&#xff0c;程序員必裝的應用。 開發者頭條是由一群程序員創建的&#xff0c;我們運營了 developerWorks 的微博、微信&#xff0c;創建了碼農周刊&#xff0c;已覆蓋百萬程序員&#xff1b; 我們更懂程…

談華為鴻蒙內核和操作系統,談華為鴻蒙內核和操作系統

作者 | 陸首群談到華為自研鴻蒙內核和操作系統&#xff0c;從華為透漏出來的信息來看&#xff0c;有點自相矛盾、撲朔迷離&#xff01;我曾說過&#xff1a;真真假假&#xff0c;虛虛實實&#xff01;這里有技術原因&#xff0c;也有外部原因。一開始(大概是 2016 年左右)&…

彈跳機器人 桌游_MIT機器人輕松搞定桌游疊疊樂:你能玩過它算我輸 |《科學》子刊...

乾明 發自 凹非寺 量子位 報道 | 公眾號 QbitAI江湖上&#xff0c;一直流傳著一種疊疊樂的試煉。規則很簡單&#xff0c;從下方的積木中&#xff0c;抽一根往上搭。你能往上搭幾層&#xff1f;對MIT團隊研發的機器人來說&#xff0c;玩這個游戲基本上不費吹灰之力。而且&#x…

華為鴻蒙無人駕駛,特斯拉最大的對手竟是華為?Hicar+鴻蒙OS無人駕駛技術不再一家獨大!...

原標題&#xff1a;特斯拉最大的對手竟是華為&#xff1f;Hicar鴻蒙OS無人駕駛技術不再一家獨大&#xff01;短短幾個月的時間&#xff0c;特斯拉的市值翻了近4倍&#xff0c;對于一個超級企業來說一切都顯得那么不可思議&#xff0c;如果把它單純的看成一家車企&#xff0c;恐…

new_picview_一款漂亮的圖片查看器PictureViewer

前段時間寫了一款查看妹子圖片的客戶端宅男福利妹子客戶端SuperGank&#xff0c;于是后來就把其中的一個圖片查看的功能封裝成了一個library&#xff0c;使用簡單&#xff0c;可以進行多項設置。先來看一眼效果圖吧&#xff01;下面來看看如何使用它&#xff1a;首先把圖片url的…

鴻蒙系統會不會影響游戲,令人擔心,鴻蒙系統會不會讓人失望?未來難說

如今&#xff0c;一直被炒的沸沸揚揚的鴻蒙系統&#xff0c;終于在2019年8月9日發布了&#xff0c;次日&#xff0c;也就是8月10日&#xff0c;榮耀的智慧屏又帶著鴻蒙系統出現了一次&#xff0c;榮耀智慧屏也成為了首次搭載鴻蒙系統的終端&#xff0c;見證了中國操作系統的歷史…

buck電路上下管_推薦 | 學好電路設計與仿真?你不能錯過這兩本書籍 ~

網 友小編&#xff0c;有沒有 Saber 相關書籍可以推薦一下&#xff1f;還有&#xff0c;Saber 軟件下載那個版本比較好&#xff1f;當然有啦&#xff01;小 編《Saber 電路仿真及開關電源設計》柯福波 等編著本書以 Saber 開關電源為基礎&#xff0c;以具體工程電路為范例&am…

html5畫電池狀態,HTML5的一個顯示電池狀態的API簡介

這篇文章主要介紹了HTML5的一個顯示電池狀態的API簡介,由Mozilla設計,具體的設備和瀏覽器支持情況還要通過檢測才能確定,需要的朋友可以參考下移動設備的份額在網絡流量中在大量增長&#xff0c;其所貢獻的網絡流量非常龐大&#xff0c;以至于為了移動設備&#xff0c;我們單獨…

redux異步action_react-redux--異步Action

上兩篇文章敘述的都是同步操作&#xff0c;每當 dispatch action 時&#xff0c;state 會被立即更新。但是實際應用中&#xff0c;我們有很多操作執行后&#xff0c;過一段時間&#xff0c;才會得到結果。那么怎么處理這種情況呢&#xff1f;先熟悉一個概念中間件本質就是一個通…

怎么批量修改html文件后綴,如何批量修改文件后綴名

我們都知道電腦文件都有一個格式&#xff0c;比如JPG、MP3等等格式&#xff0c;每個格式都代表不一樣文件類型&#xff0c;那么我們該如何批量更改文件類型的后綴呢?比如把JPG更改為MP3&#xff0c;只要在電腦里設置不隱藏文件擴展名&#xff0c;然后建立統一的文件夾&#xf…

python 怎么調用 矩陣 第幾行_第58集 python機器學習:混淆矩陣精度指標

混淆矩陣的精度計算公式為&#xff1a;精度(TPTN)/(TPTNFPFN)&#xff0c;也就是說&#xff0c;精度就是指正確的預測數目除以所有樣本的數量。準確率、召回率與f-分數&#xff1a;總結混淆矩陣還有幾種方法&#xff0c;其中最常見的就是準確率和召回率。準確率度量的是被預測為…

android seekbar閃退,android seekbar 踩坑之路

最近項目中有用到seekbar&#xff0c;之前對這東西不太了解&#xff0c;趁機來踩坑。seekbar樣式按我覺得Material 中的還不算難看了。但是美工給了自己的樣式&#xff0c;還是得改。主要有這2個屬性:android:thumb"drawable/thumb"android:progressDrawable"dr…

tensorflow分類的loss函數_tensorflow 分類損失函數使用小記

多分類損失函數label.shape:[batch_size]; pred.shape: [batch_size, num_classes]使用 tf.keras.losses.sparse_categorical_crossentropy(y_true, y_pred, from_logitsFalse, axis-1)- y_true 真實值&#xff0c; y_pred 預測值- from_logits,我的理解是&#xff0c;如果預測…

華為鴻蒙系統還沒發布嗎,華為沒有孤軍奮戰,合作伙伴“雪中送炭”,鴻蒙系統正式發布!...

6月2日晚上&#xff0c;期待許久的鴻蒙0S 2終于正式登場了&#xff0c;這意味著鴻蒙手機已經變成了面向市場的正式產品&#xff0c;這是華為邁出的一小步&#xff0c;卻是國產系統與安卓、iOS競爭的開始。在鴻蒙OS正式推出后&#xff0c;不少華為手機用戶已經收到了系統更新的推…

freemark循環map_java與freemarker遍歷map

一、java遍歷MAP/*** 1.把key放到一個集合里&#xff0c;遍歷key值同時根據key得到值 (推薦)*/Set set map.keySet();Iterator itset.iterator();while(it.hasNext()){String s (String) it.next();System.out.println(map.get(s));}/*** 2.把一個map對象放到放到entry里&#…

.net 開發 html框架,Asp.net的開發框架

Asp.net的開發首先要選擇開發框架&#xff0c;選擇怎樣框架要看看開發什么養的網站用的&#xff0c;選擇一個適合的開發框架能節約很多的時間。20個優秀的前端框架&#xff1a;1. Twitter BootStrap (Apache v2.0&#xff1b;響應式)時髦、直觀并且強大的前端框架&#xff0c;讓…

baseresponse響應類_內部類、響應類Response、序列化基類、反序列化、全局局部鉤子...

一、內部類1、概念&#xff1a;將類定義在一個類的內部&#xff0c;被定義的類就是內部類2、特點&#xff1a;內部類及內部類的所以名稱空間&#xff0c;可以直接被外部類訪問的3、 應用&#xff1a;通過內部類的名稱空間&#xff0c;給外部類額外拓展一些特殊的屬性(配置)&…

python模塊名限定_python?正則表達式?匹配??的使用?限定符?sys.re模塊

特殊字符&#xff1a;注意\b是匹配單詞而非字符串的開始和結束&#xff1b; \w不能匹配漢字限定符放在匹配符的后邊2 括號分組&#xff1a;(\d\d\d){2} #有括號匹配6位數字\d\d\d{2} #沒有括號匹配4位數字3 &#xff1f;的使用4 sys.re模塊的常用函數表&#xff1a;查找、分隔、…

計算機不能進入桌面,電腦開機無法進入桌面,請高手解決。

該故障是Windows XP操作系統關機故障中最容易出現的故障。造成該故障的原因可能有以下幾方面原因&#xff1a;系統設置在添亂Windows XP默認情況下&#xff0c;當系統出現錯誤時會自動重新啟動&#xff0c;這樣當用戶關機時&#xff0c;如果關機過程中系統出現錯誤就會重新啟動…