Flink雙流(join)

?一、介紹

Join大體分類只有兩種:Window Join和Interval Join

Window Join有可以根據Window的類型細分出3種:Tumbling(滾動) Window Join、Sliding(滑動) Window Join、Session(會話) Widnow Join。

????????🌸Window 類型的join都是利用window的機制,先將數據緩存在Window State中,當窗口觸發計算時,執行join操作。

????????🌸Interval join也是利用state存儲數據再處理,區別在于state中的數據有失效機制,依靠數據觸發數據清理,目前Stream join的結果是數據的卡爾積。

二、Window Join

?Tumbling Window Join

????????執行翻滾窗口聯接時,具有公共鍵和公告翻滾窗口的所有元素將成對組合聯接,并傳遞JoinFunction或FlatJoinFunction。因為它的行為類似于內部連接,所以一個流中的元素在其滾動窗口中沒有來自另一個流的元素,因此不會被發射。

????????如圖所示,我們定義了一個為2毫秒的翻滾窗口,結果窗口的形式為[0,1]、[2,3]..............該圖顯示了每個窗口中所以元素的成對組合,這些元素將傳遞給JoinFunction。注意在翻滾窗口[6,7]中沒有發射任何東西,因為綠色流中不存在與橙色元素⑥和⑦結合的元素。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

?Sliding Window Join

? ? ? ? 在執行滑動窗口聯接時,具有公共鍵和公共滑動窗口的所以元素將作為成對組合聯接,并傳遞JoinFunction或FlatJoinFunction。在當前滑動窗口中,一個流的元素沒有來自另一個流的元素,則不會發射!請注意,某些元素可能會聯接到一個滑動窗口中,但不會聯接到另一個滑動窗口中!

? ? ? ? 在本例中,我們使用大小為2毫秒的滑動窗口,并將其滑動1毫秒,從而產生滑動窗口[-1,0],[1,2],[2,3]...........x軸下方的連續元素時傳遞給每個滑動窗口的Join Function的元素。在這里,你還可以看到,例如在窗口[2,3]中,橙色②和綠色③連接,但在窗口[1,2]中沒有與任何對象連接。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

?Session Window Join

? ? ? ? 在執行會話窗口聯接時,具有相同鍵(當“組合”滿足會話條件)的所有元素以成對組合方式聯接,并傳遞給JoinFunction或FlatJoinFunction。同樣,這執行一個內部連接,所以如果有一個會話窗口只包含來自一個流的元素,則不會發出任何輸出

? ? ? ? 這里,我們定義一個會話窗口連接,其中每個會話被至少1毫秒的時間分割。有三個會話,在前兩個會話中,來自兩個流的連接元素被傳遞給JoinFunction。在第三個會話中,綠色流中沒有元素,所以⑧和⑨沒有連接!

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

三、Interval Join

????????前面學習的Window Join必須要在一個Window中進行Join,那如果沒有Window如何處理呢?interval join也是使用相同的key來join兩個流(流A、流B),并且流B中的元素中的時間戳,和流A元素的時間戳,有一個時間間隔。

b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]?or

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

也就是:流B的元素的時間戳 ≥ 流A的元素時間戳 + 下界,且,流B的元素的時間戳 ≤ 流A的元素時間戳

?

在上面的示例中,我們將兩個流“orange”和“green”連接起來,其下限為-2毫秒,上限為+1毫秒。默認情況下,這些邊界是包含的,但是可以應用.lowerBoundExclusive()和.upperBoundExclusive來更改行為orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound?

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(first + "," + second);}});

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/696876.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/696876.shtml
英文地址,請注明出處:http://en.pswp.cn/news/696876.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【OpenFeign常用配置】

OpenFeign常用配置 快速入門&#xff1a;1、引入依賴2、啟用OpenFeign 實踐1、引入依賴2、開啟連接池功能3、模塊劃分4、日志5、重試 快速入門&#xff1a; OpenFeign是一個聲明式的http客戶端&#xff0c;是spring cloud在eureka公司開源的feign基礎上改造而來。其作用及時基于…

C++ template-2

第 5 章 基礎技巧 5.1 typename 關鍵字 關鍵字typename在C標準化過程中被引入進來&#xff0c;用來澄清模板內部的一個標識符代表的 是某種類型&#xff0c;而不是數據成員。考慮下面這個例子&#xff1a; template<typename T> class MyClass { public:void foo() {t…

【代碼隨想錄算法訓練營Day09】28.實現 strStr(); 459.重復的子字符串

文章目錄 Day 9 第四章 字符串part0228. 實現 strStr() &#xff08;本題可以跳過&#xff09;KMP 思路KMP 代碼 459.重復的子字符串 &#xff08;本題可以跳過&#xff09;字符串總結雙指針回顧 Day 9 第四章 字符串part02 今日任務 28.實現 strStr(); 459.重復的子字符串; 字…

題目:C++快速找到未知長度單鏈表的中間節點。普通方法和高級方法2種解題思路解析。

在數據結構的面試中&#xff0c;經常會出現這樣的問題&#xff1a;如何快速找到未知長度單鏈表的中間節點&#xff1f;通常&#xff0c;面試官會期待你提供兩種解法&#xff1a;一種是最基本的普通方法&#xff0c;另一種是更高效的 advanced 方法。本文將詳細介紹這兩種方法。…

Nginx -2

接著上文寫 5.4.7 驗證模塊 需要輸入用戶名和密碼 模塊名稱&#xff1a;ngx_http_auth_basic_module 訪問控制基于模塊 ngx_http_auth_basic_module 實現&#xff0c;可以通過匹配客戶端資源進行限制 語法&#xff1a; Syntax: auth_basic string | off; Default: auth_ba…

威爾金森功分器基本原理學習筆記

威爾金森功分器基本原理 威爾金森功率分配器的功能是將輸入信號等分或不等分的分配到各個輸出端口&#xff0c;并保持相同輸出相位。環形器雖然有類似功能&#xff0c;但威爾金森功率分配器在應用上具有更寬的帶寬。微帶形功分器的電路結構如圖所示&#xff0c;其中&#xff0…

【OpenAI Sora】何時開放使用?付費課程已上線(sora什么時候開放使用 )

Sora何時開放使用 根據提供的信息&#xff0c;Sora目前還未對廣大用戶開放。OpenAI在2024年2月15日展示了Sora的視頻&#xff0c;但沒有設立等待名單或提供API訪問。Sora仍在開發中&#xff0c;正在接受安全測試&#xff0c;并且尚未向公眾開放使用。 付費課程已上線 根據最…

Vue圖片瀏覽組件v-viewer,支持旋轉、縮放、翻轉等操作

Vue圖片瀏覽組件v-viewer&#xff0c;支持旋轉、縮放、翻轉等操作 之前用過viewer.js&#xff0c;算是市場上用過最全面的圖片預覽。v-viewer&#xff0c;是基于viewer.js的一個圖片瀏覽的Vue組件&#xff0c;支持旋轉、縮放、翻轉等操作。 基本使用 安裝&#xff1a;npm安裝…

費舍爾FISHER金屬探測器探測儀維修F70

美國FISHER LABS費舍爾地下金屬探測器&#xff0c;金屬探測儀等維修&#xff08;考古探金銀銅探寶等儀器&#xff09;。 費舍爾F70視聽目標ID金屬探測器&#xff0c;Fisher 金屬探測器公司成立于1931年&#xff0c;在實驗條件很艱苦的情況下&#xff0c;研發出了地下金屬探測器…

【Python】實現一個類似于Glass2k的Windows窗口透明化軟件

一 背景說明 網上看到一款Windows下的窗口透明化工具Glass2k&#xff08;Glass2k官網&#xff09;&#xff0c;可以簡單地通過快捷鍵實現任意窗口的透明化&#xff0c;還挺方便的&#xff0c;想用Python自己實現一下類似的功能。 軟件已經開源到&#xff1a;窗口透明化小工具開…

【Leetcode】889. 根據前序和后序遍歷構造二叉樹

文章目錄 題目思路代碼結果 題目 題目鏈接 給定兩個整數數組&#xff0c;preorder 和 postorder &#xff0c;其中 preorder 是一個具有 無重復 值的二叉樹的前序遍歷&#xff0c;postorder 是同一棵樹的后序遍歷&#xff0c;重構并返回二叉樹。 如果存在多個答案&#xff0c;…

CSS基礎屬性

【三】基礎屬性 【1】高度和寬度 &#xff08;1&#xff09;參數 width&#xff08;寬度&#xff09;&#xff1a;用于設置元素的寬度。可以使用具體的數值&#xff08;如像素值&#xff09;或百分比來指定寬度。 height&#xff08;高度&#xff09;&#xff1a;用于設置元…

Kubernetes 卷存儲 NFS | nfs搭建配置 原理介紹 nfs作為存儲卷使用

目錄 1、NFS介紹2、NFS服務部署2.1安裝nfs服務 (服務端配置)2.2啟動NFS服務2.3 服務檢查2.4 客戶端配置 3、nfs作為存儲卷使用3.1 nfs作為volume3.2 nfs存儲的缺點3.3 nfs作為PersistentVolum 4、nfs作為動態存儲提供5、總結 1、NFS介紹 NFS&#xff08;Network File System&a…

4.pom文件介紹Maven常用命令

1.pom.xml文件介紹. 1.1project標簽和modelVersion標簽介紹. pom.xml文件是maven的核心文件&#xff0c;POM(Project Object Model&#xff0c;項目對象模型)定義了項目的基本信息&#xff0c;用于描述如何構建&#xff0c;聲明項目依賴;&#xff1b; 1.2依賴坐標介紹. 依賴的…

得物面試:Kafka消息0丟失,如何實現?

得物面試&#xff1a;Kafka消息0丟失&#xff0c;如何實現&#xff1f; 尼恩說在前面 在40歲老架構師 尼恩的讀者交流群(50)中&#xff0c;最近有小伙伴拿到了一線互聯網企業如得物、阿里、滴滴、極兔、有贊、希音、百度、網易、美團的面試資格&#xff0c;遇到很多很重要的面…

新版Java面試專題視頻教程——多線程篇②

新版Java面試專題視頻教程——多線程篇② 0. 問題匯總0.1 線程的基礎知識0.2 線程中并發安全0.3 線程池0.4 使用場景 1.線程的基礎知識2.線程中并發鎖3.線程池3.1 說一下線程池的核心參數&#xff08;線程池的執行原理知道嘛&#xff09;3.2 線程池中有哪些常見的阻塞隊列Array…

高級語言期末2014級A卷

1.編寫函數 int delarr(int a[] ,int n)&#xff0c;刪除有n個元素的正整型數組a中所有素數&#xff0c;要求&#xff1a; 1&#xff09;數組a中剩余元素保持原來次序&#xff1b; 2&#xff09;將處理后的數組輸出&#xff1b; 3&#xff09;函數值返回剩余元素個數&#xff1…

MySQL索引面試題(高頻)

文章目錄 前言什么時候需要&#xff08;不需要&#xff09;)使用索引&#xff1f;有哪些優化索引的方法前綴索引優化索引覆蓋優化索引失效場景 總結 前言 今天來講一講 MySQL 索引的高頻面試題。主要是針對前一篇文章 MySQL索引入門&#xff08;一文搞定&#xff09;進行查漏補…

虛擬機的內存結構

一、摘要 熟悉 Java 語言特性的同學都知道&#xff0c;相比 C、C 等編程語言&#xff0c;Java 無需通過手動方式回收內存&#xff0c;內存中所有的對象都可以交給 Java 虛擬機來幫助自動回收&#xff1b;而像 C、C 等編程語言&#xff0c;需要開發者通過代碼手動釋放內存資源&…

MedicalGPT 訓練醫療大模型,實現了包括增量預訓練、有監督微調、RLHF(獎勵建模、強化學習訓練)和DPO(直接偏好優化)

MedicalGPT 訓練醫療大模型&#xff0c;實現了包括增量預訓練、有監督微調、RLHF(獎勵建模、強化學習訓練)和DPO(直接偏好優化)。 MedicalGPT: Training Your Own Medical GPT Model with ChatGPT Training Pipeline. 訓練醫療大模型&#xff0c;實現了包括增量預訓練、有監督微…