水位線和窗口

水位線特點

  1. 插入到數據流中的一個標記,可以認為是一個特殊的數據
  2. 主要內容是一個時間戳
  3. 水位線是基于數據的時間戳生成的,即事件時間
  4. 水位線必須單調遞增
  5. 水位線可以通過設置延遲,來保證正確處理亂序數據
  6. 一個水位線,表示事件時間已經達到了時間戳t
  7. 水位線是Flink流處理中保證結果正確性的核心機制

窗口

錯誤理解:窗口是一個固定位置的框,數據流源源不斷地流過來,到某個時間窗口該關閉了,就停止收集數據,觸發計算并窗口關閉輸出結果。

Flink中窗口是動態創建的,當有落在這個窗口區間范圍的數據達到時,才創建對應的窗口。事實上,觸發計算和窗口關閉兩個行為可以分開。

總體原則

水位線出現表示這個時間之前的數據已經全部到齊,之后再也不會出現了,不過要保證絕對正確,就必須等足夠長的時間,這會帶來更高的延遲。水位線是流處理中對低延遲和結果正確性的一個權衡機制。

水位線生成方案

水位線的生成位置:越靠近數據源越好

WatermarkStrategy:水位線策略對象
1. 水位線生成器 WatermarkGenerator
- onEvent() 給每條數據生成水位線
- onPeriodicEmit():周期性生成水位線
2. 時間戳分配器 TimestampAssigner
- extractTimestamp()

有序流水位線生成的代碼如下:

public class Flink01_UserDefineWaterMarkStrategy {public static void main(String[] args) {//1.創建運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默認是最大并行度env.setParallelism(1);//設置生成水位線的周期env.getConfig().setAutoWatermarkInterval(1000);//tom,/home,1000SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888).map(line -> {String[] words = line.split(",");return new Event(words[0].trim(),words[1].trim(),Long.valueOf(words[2].trim()));});ds.print("input");ds.assignTimestampsAndWatermarks(new MyWatermarkStrategy());ds.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}public static class MyWatermarkStrategy implements WatermarkStrategy<Event>{/*** 創建水位線生成器* @param context* @return*/@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new MyWatermarkGenerator();}public static class MyWatermarkGenerator implements WatermarkGenerator<Event>{private Long maxTs = Long.MIN_VALUE;/*** 每一條數據調用一次,用于生成一次水位線* @param event* @param eventTimestamp* @param output*/@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {//有序流,每條數據生成水位線
//                System.out.println("有序流每條數據生成水位線===》"+eventTimestamp);
//                output.emitWatermark(new Watermark(eventTimestamp));maxTs = Math.max(maxTs, eventTimestamp);}/*** 周期性生成水位線* 默認周期是200ms* @param output*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {//有序流,周期性生成水位線System.out.println("有序流周期性生成水位線===》"+maxTs);output.emitWatermark(new Watermark(maxTs));}}/*** 創建時間戳分配器,用于從數據中提取時間戳* @param context* @return*/@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new MyTimestampAssigner();}}public static class MyTimestampAssigner implements TimestampAssigner<Event>{/*** 從數據中提取時間戳* @param element The element that the timestamp will be assigned to.* @param recordTimestamp The current internal timestamp of the element, or a negative value, if*     no timestamp has been assigned yet.* @return*/@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTs();}}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/207469.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/207469.shtml
英文地址,請注明出處:http://en.pswp.cn/news/207469.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

[FPGA 學習記錄] 數碼管動態顯示

數碼管動態顯示 文章目錄 1 理論學習1.1 數碼管動態掃描顯示原理 2 實戰演練2.1 實驗目標2.2 程序設計2.2.1 框圖繪制2.2.2 數據生成模塊 data_gen2.2.2.1 波形繪制2.2.2.2 代碼編寫2.2.2.3 代碼編譯2.2.2.4 邏輯仿真2.2.2.4.1 仿真代碼編寫2.2.2.4.2 仿真代碼編譯2.2.2.4.3 波…

如何解決el-table中動態添加固定列時出現的行錯位

問題描述 在使用el-table組件時&#xff0c;我們有時需要根據用戶的操作動態地添加或刪除一些固定列&#xff0c;例如操作列或選擇列。但是&#xff0c;當我們使用v-if指令來控制固定列的顯示或隱藏時&#xff0c;可能會出現表格的行錯位的問題&#xff0c;即固定列和非固定列…

el-tree數據量過大,造成瀏覽器卡死、崩潰

el-tree數據量過大&#xff0c;造成瀏覽器卡死、崩潰 場景&#xff1a;樹形結構展示&#xff0c;數據超級多&#xff0c;超過萬條&#xff0c;每次打開都會崩潰 我這里采用的是引入新的插件虛擬樹&#xff0c;它是參照element-plus 中TreeV2改造vue2.x版本虛擬化樹形控件&…

2024年強烈推薦mac 讀寫NTFS工具Tuxera NTFS for Mac2023中文破解版

大家好啊&#xff5e;今天要給大家推薦的是 Tuxera NTFS for Mac2023中文破解版&#xff01; 小可愛們肯定知道&#xff0c;Mac系統一直以來都有一個小小的痛點&#xff0c;就是無法直接讀寫NTFS格式的移動硬盤和U盤。但是&#xff0c;有了Tuxera NTFS for Mac2023&#xff0c;…

正則表達式:字符串處理的瑞士軍刀

&#x1f90d; 前端開發工程師&#xff08;主業&#xff09;、技術博主&#xff08;副業&#xff09;、已過CET6 &#x1f368; 阿珊和她的貓_CSDN個人主頁 &#x1f560; 牛客高級專題作者、在牛客打造高質量專欄《前端面試必備》 &#x1f35a; 藍橋云課簽約作者、已在藍橋云…

記一次xss通殺挖掘歷程

前言 前端時間&#xff0c;要開放一個端口&#xff0c;讓我進行一次安全檢測&#xff0c;發現的一個漏洞。 經過 訪問之后發現是類似一個目錄索引的端口。(這里上厚碼了哈) 錯誤案例測試 亂輸內容asdasffda之后看了一眼Burp的抓包&#xff0c;抓到的內容是可以發現這是一個…

MuJoCo機器人動力學仿真平臺安裝與教程

MuJoCo是一個機器人動力學仿真平臺&#xff0c;它包括一系列的物理引擎、可視化工具和機器人模擬器等工具&#xff0c;用于研究和模擬機器人的運動和動力學特性。以下是MuJoCo的安裝教程&#xff1a; 下載和安裝MuJoCo Pro。可以從MuJoCo的官方網站上下載最新版本的安裝包。根…

【Python機器學習系列】一文徹底搞懂機器學習中表格數據的輸入形式(理論+源碼)

一、問題 機器學習或者深度學習在處理表格數據&#xff08;Tabular data&#xff09;、圖像數據&#xff08;Image data&#xff09;、文本數據&#xff08;Text data&#xff09;、時間序列數據&#xff08;Time series data&#xff09;上得到了廣泛的應用。 其中&#xff0c…

微信小程序 - 創建 ZIP 壓縮包

微信小程序 - 創建 ZIP 壓縮包 場景分享代碼片段導入 JSZip創建ZIP文件追加寫入文件測試方法參考資料 場景 微信小程序只提供了解壓ZIP的API&#xff0c;并沒有提供創建ZIP的方法。 當我們想把自己處理好的保存&#xff0c;打包ZIP保存下來時就需要自己實現了。 分享代碼片段…

無重復字符的最長子串(LeetCode 3)

文章目錄 1.問題描述2.難度等級3.熱門指數4.解題思路方法一&#xff1a;暴力法方法二&#xff1a;滑動窗口 參考文獻 1.問題描述 給定一個字符串 s &#xff0c;請你找出其中不含有重復字符的最長子串的長度。 s 由英文字母、數字、符號和空格組成。 示例 1&#xff1a; 輸…

基于Java商品銷售管理系統

基于Java商品銷售管理系統 功能需求 1、商品管理&#xff1a;系統需要提供商品信息的管理功能&#xff0c;包括商品的錄入、編輯、查詢和刪除。每個商品應包含基本信息如名稱、編碼、類別、價格、庫存量等。 2、客戶管理&#xff1a;系統需要能夠記錄客戶的基本信息&#xf…

算法:常見的哈希表算法

文章目錄 兩數之和判斷是否互為字符重排存在重復元素存在重復元素字母異位詞分組 本文總結的是關于哈希表常見的算法 哈希表其實就是一個存儲數據的容器&#xff0c;所以其實它本身的算法難度并不高&#xff0c;只是利用哈希表可以對于一些場景進行優化 兩數之和 class Solut…

Michael.W基于Foundry精讀Openzeppelin第41期——ERC20Capped.sol

Michael.W基于Foundry精讀Openzeppelin第41期——ERC20Capped.sol 0. 版本0.1 ERC20Capped.sol 1. 目標合約2. 代碼精讀2.1 constructor() && cap()2.2 _mint(address account, uint256 amount) 0. 版本 [openzeppelin]&#xff1a;v4.8.3&#xff0c;[forge-std]&…

AI智能降重軟件大全,免費最新AI智能降重軟件

在當今信息爆炸的時代&#xff0c;內容創作者們面臨著巨大的寫作壓力&#xff0c;如何在保持高質量的前提下提高效率成為擺在許多人面前的難題。AI智能降重軟件因其獨特的算法和功能逐漸成為提升文案質量的得力助手。本文將專心分享一些優秀的AI智能降重軟件。 147SEO改寫軟件 …

云貝教育 |【技術文章】PostgreSQL中誤刪除數據怎么辦(一)

原文鏈接&#xff1a;【PostgreSQL】PostgreSQL中誤刪除數據怎么辦&#xff08;一&#xff09; - 課程體系 - 云貝教育 (yunbee.net) 在我們學習完PG的MVCC機制之后&#xff0c;對于DML操作&#xff0c;被操作的行其實并未被刪除&#xff0c;只能手工vacuum或自動vacuum觸發才會…

【分享】我想上手機器學習

目錄 前言 一、理解機器學習 1.1 機器學習的目的 1.2 機器學習的模型 1.3 機器學習的數據 二、學習機器學習要學什么 2.1 學習機器學習的核心內容 2.2 怎么選擇模型 2.3 怎么獲取訓練數據 2.4 怎么訓練模型 三、機器學習的門檻 3.1 機器學習的第一道門檻 3.2 機器…

最新版IDEA專業版大學生申請免費許可證教學(無需學校教育郵箱+官方途徑+非破解手段)

文章目錄 前言1. 申請學籍在線驗證報告2. 進入IDEA官網進行認證3. 申請 JB (IDEA) 賬號4. 打開 IDEA 專業版總結 前言 當你進入本篇文章時, 你應該是已經遇到了 IDEA 社區版無法解決的問題, 或是想進一步體驗 IDEA 專業版的強大. 本文是一篇學生申請IDEA免費許可證的教學, 在學…

unity 2d 入門 飛翔小鳥 小鳥碰撞 及死亡(九)

1、給地面&#xff0c;柱體這種添加2d盒裝碰撞器&#xff0c;小鳥移動碰到就不會動了 2、修改小鳥的腳本&#xff08;腳本命名不規范&#xff0c;不要在意&#xff09; using System.Collections; using System.Collections.Generic; using UnityEngine;public class Fly : Mo…

kafka高吞吐、低延時、高性能的實現原理

作者&#xff1a;源碼時代-Raymon老師 Kafka的高吞吐、低延時、高性能的實現原理 Kafka是大數據領域無處不在的消息中間件&#xff0c;目前廣泛使用在企業內部的實時數據管道&#xff0c;并幫助企業構建自己的流計算應用程序。Kafka雖然是基于磁盤做的數據存儲&#xff0c;但…

可信固件-M (TF-M)

概述&#xff1a; 參考: Trusted Firmware-M Documentation — Trusted Firmware-M v2.0.0 documentation 開源代碼托管&#xff1a; trusted-firmware-m.git - Trusted Firmware for M profile Arm CPUs STM32 U5支持TF-M : STM32U5 — Trusted Firmware-M v2.0.0 document…