43、Flink 的 Window Join 詳解

1.Window Join
a)概述

Window join 作用在兩個流中有相同 key 且處于相同窗口的元素上,窗口可以通過 window assigner 定義,并且兩個流中的元素都會被用于計算窗口的結果。

兩個流中的元素在組合之后,會被傳遞給用戶定義的 JoinFunctionFlatJoinFunction,可以用它們輸出符合 join 要求的結果。

stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)

注意

  • 從兩個流中創建成對的元素與 inner-join 類似,即一個流中的元素在與另一個流中對應的元素完成 join 之前不會被輸出。
  • 完成 join 的元素會將他們的 timestamp 設為對應窗口中允許的最大 timestamp。比如一個邊界為 [5, 10) 窗口中的元素在 join 之后的 timestamp 為 9。
b)滾動 Window Join

使用滾動 window join 時,所有 key 相同且共享一個滾動窗口的元素會被組合成對,并傳遞給 JoinFunctionFlatJoinFunction

行為與 inner join 類似,所以一個流中的元素如果沒有與另一個流中的元素組合起來,它就不會被輸出!

在這里插入圖片描述

如圖所示,定義了一個大小為 2 毫秒的滾動窗口,即形成了邊界為 [0,1], [2,3], ... 的窗口,將每個窗口中的元素組合成對,組合的結果將被傳遞給 JoinFunction

注意:滾動窗口 [6,7] 將不會輸出任何數據,因為綠色流當中沒有數據可以與橙色流的 ⑥ 和 ⑦ 配對。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
c)滑動 Window Join

當使用滑動 window join 時,所有 key 相同且處于同一個滑動窗口的元素將被組合成對,并傳遞給 JoinFunctionFlatJoinFunction,當前滑動窗口內,如果一個流中的元素沒有與另一個流中的元素組合起來,它就不會被輸出!

注意:在某個滑動窗口中被 join 的元素不一定會在其他滑動窗口中被 join。

在這里插入圖片描述

本例中定義了長度為兩毫秒,滑動距離為一毫秒的滑動窗口,生成的窗口實例區間為 [-1, 0],[0,1],[1,2],[2,3], …。 X 軸下方是每個滑動窗口中被 join 后傳遞給 JoinFunction 的元素;圖中可以看到橙色 ② 與綠色 ③ 在窗口 [2,3] 中 join,但沒有與窗口 [1,2] 中任何元素 join。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
d)會話 Window Join

使用會話 window join 時,所有 key 相同且組合后符合會話要求的元素將被組合成對,并傳遞給 JoinFunctionFlatJoinFunction,這個操作同樣是 inner join,如果一個會話窗口中只含有某一個流的元素,這個窗口將不會產生輸出!

在這里插入圖片描述

定義了一個間隔為至少一毫秒的會話窗口。圖中總共有三個會話,前兩者中兩個流都有元素,它們被 join 并傳遞給 JoinFunction,而第三個會話中,綠流沒有任何元素,所以 ⑧ 和 ⑨ 沒有被 join!

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/17778.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/17778.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/17778.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

外匯天眼:野村證券和Laser Digital與GMO互聯網集團合作發行日元和美元穩定幣

野村控股和Laser Digital將與GMO互聯網集團合作&#xff0c;在日本探索發行日元和美元穩定幣。GMO互聯網集團的美國子公司GMO-Z.com Trust Company, Inc. 在紐約州金融服務部的監管框架下&#xff0c;在以太坊、恒星幣和Solana等主要區塊鏈上發行穩定幣。GMO-Z.com Trust Compa…

MySQL增刪查改進階

數據庫約束表的關系增刪查改 目錄 一.數據庫約束類型 NOT NULL約束類型 UNIQUE 唯一約束 DEFAULT 默認值約束 PRIMARY KEY&#xff1a;主鍵約束 FOREIGN KEY :W外鍵約束 二&#xff0c;查詢 count&#xff08;&#xff09;兩種用法 sum&#xff0c;avg&#xff0c;max…

Vue3_創建項目

目錄 一、創建vue項目 1.下載vue 2.進入剛才創建的項目 3.安裝依賴 4.運行項目 ?5.打包項目放入生產環境 二、vue項目組成 1.項目文件結構 2.項目重要文件 Vue (發音為 /vju?/&#xff0c;類似 view) 是一款用于構建用戶界面的 JavaScript 框架。它基于標準 HTML、C…

Go語言中實現RSA加解密、簽名驗證算法

隨著互聯網的高速發展&#xff0c;人們對安全的要求也越來越高。密碼學中兩大經典算法&#xff0c;一個是對稱加解密&#xff0c;另一個是非對稱加解密&#xff0c;這里就來分享一下非對稱加密算法的代表&#xff1a;RSA加解密。 在Go語言中實現RSA加解密還是比較簡單的&#…

【安全產品】基于HFish的MySQL蜜罐溯源實驗記錄

MySQL蜜罐對攻擊者機器任意文件讀取 用HFish在3306端口部署MySQL蜜罐 配置讀取文件路徑 攻擊者的mysql客戶端版本為5.7(要求低于8.0) 之后用命令行直連 mysql -h 124.222.136.33 -P 3306 -u root -p 可以看到成功連上蜜罐的3306服務&#xff0c;但進行查詢后會直接lost con…

ai機器人電銷資源有哪些?真的能幫到我們提高效率嗎ai智能語音機器人部署

隨著互聯網科技的發展&#xff0c;各種各樣的科技產物都應用于電銷企業&#xff0c;ai機器人電銷就是其中一個。那么ai機器人電銷可靠嗎&#xff1f;ai機器人電銷資源有哪些&#xff1f;我們一起來看看。 AI機器人在電銷資源方面有以下一些用途和功能&#xff1a; 自動識別潛在…

for循環綁定id,更新html頁面的文字內容

需求&#xff1a;將方法中內容對齊 實現方式 給for循環中每個方法添加一個動態的id在DOM結果渲染完后&#xff0c;更新頁面數據&#xff0c;否則會報錯&#xff0c;找不到對應節點或對應節點為空 <view v-for"(item, index) in itemList" :key"index"…

OWASP十大API漏洞解析:如何抵御Bot攻擊?

新型數字經濟中&#xff0c;API是物聯網設備、Web和移動應用以及業務合作伙伴流程的入口點。然而&#xff0c;API也是犯罪分子的前門&#xff0c;許多人依靠Bot來發動攻擊。對于安全團隊來說&#xff0c;保護API并緩解Bot攻擊至關重要。那么Bot在API攻擊中處于怎樣的地位&#…

香橙派Kunpeng Pro測評

目錄 香橙派Kunpeng Pro開發板試用體驗 觀察理解與適用場景 體驗步驟 試用感受

【ARM+Codesys案例】T3/RK3568/樹莓派+Codesys枕式包裝機運動控制器

枕式包裝機是一種包裝能力非常強&#xff0c;且能適合多種規格用于食品和非食品包裝的連續式包裝機。它不但能用于無商標包裝材料的包裝&#xff0c;而且能夠使用預先印有商標圖案的卷筒材料進行高速包裝。同時&#xff0c;具有穩定性高、生產效率高&#xff0c;適合連續包裝、…

C語言 數組—— 一維數組下標越界問題分析

目錄 數組元素的訪問 一維數組元素的越界訪問 二維數組元素的越界訪問 小結 數組元素的訪問 訪問數組元素時&#xff0c; 下標越界 是大忌&#xff01; ? 編譯器通常不檢查下標越界&#xff0c;導致程序運行時錯誤 ? 下標越界&#xff0c;將訪問數組以外的空間 ? …

pyqt窗體水印

pyqt窗體水印 介紹效果代碼 介紹 給窗體加上水印 效果 代碼 import sys from PyQt5.QtWidgets import QApplication, QMainWindow from PyQt5.QtGui import QPainter, QColor, QFont,QPen from PyQt5.QtCore import Qtclass WatermarkedWindow(QMainWindow):def __init__(se…

鴻蒙4.2小版本推出,鴻蒙5.0已經不遠了

上個月&#xff0c;市場上迎來了華為鴻蒙系統4字開頭的小升級&#xff0c;版本來到了4.2版本。 我們先來看看4.2版本都給用戶帶來哪些特色&#xff1a; 界面切換更流暢&#xff1a;無論是響應速度還是操作手感&#xff0c;用戶都將感受到更加迅速和順滑的體驗 搜星速度的顯著…

工具:Visual Studio Code

一、VSCode生成exe 二、在vs中斷點調試 如果沒效果需要安裝如下與unity相連接的插件 三、注釋 1、代碼注釋 注釋和取消都是都是同一個命令&#xff1a;選中代碼&#xff0c;然后按住CtrlShift/ 2、方法或類注釋 /// 四、導航 五、將變量注釋展示到解釋面板 1、直接顯示 [Too…

Git提交時出現Merge branch ‘master‘ of ...之解決方法

最近遇到了一個問題 我是用git提交代碼的時候 分支上 顯示的是merge 意思是 合并代碼了 每次都會 創建一個分支 因為我和另一個小伙伴共同 開發一個項目 所以 小伙伴告訴我 總是創建新的分支 我細細看了一下 測試了一下 我們兩個人 修改不同的文件 同時修改 他提交了 我再提…

pip安裝軟件包提示“沒有那個文件或目錄”問題的處理

文章目錄 一、Python.h&#xff1a;沒有那個文件或目錄二、lber.h&#xff1a;沒有那個文件或目錄 一、Python.h&#xff1a;沒有那個文件或目錄 pip install -I python-ldap3.0.0b1 #異常提示In file included from Modules/LDAPObject.c:3:0:Modules/common.h:9:20: 致命錯…

React hooks - useLayoutEffect

useLayoutEffect 用法區別 用法 useLayoutEffect 和 useEffect 的使用方式很相似&#xff1a; useLayoutEffect 接收一個函數和一個依賴項數組作為參數只有在數組中的依賴項發生改變時才會再次執行副作用函數useLayoutEffect 也可以返回一個清理函數 useEffect(()>{retur…

【NVM】持久內存的架構

1 內存數據持久化 1.1 數據持久化 持久內存系統包含如下關鍵組件&#xff1a;微處理器、連接微處理器內存總線上的持久內存模組&#xff08;Persistent MemoryModule&#xff0c;PMM&#xff09;及持久內存上的非易失性存儲介質。 使用持久內存來實現數據的持久化&#xff0c…

SpringCloud系列(22)--Ribbon默認負載輪詢算法原理及源碼解析

前言&#xff1a;在上一篇文章中我們介紹了如何去切換Ribbon的負載均衡模式&#xff0c;而本章節內容則是介紹Ribbon默認負載輪詢算法的原理。 1、負載輪詢算法公式 rest接口第N次請求數 % 服務器集群總數 實際調用服務器下標&#xff08;每次服務器重啟后rest接口計數從1開始…

爬蟲在金融領域的應用:股票數據收集

介紹 在金融領域&#xff0c;準確及時的數據收集對于市場分析和投資決策至關重要。股票價格作為金融市場的重要指標之一&#xff0c;通過網絡爬蟲技術可以高效地從多個網站獲取實時股票價格信息。本文將介紹網絡爬蟲在金融領域中的應用&#xff0c;重點討論如何利用Scrapy框架…