?一、介紹
Join大體分類只有兩種:Window Join和Interval Join
Window Join有可以根據Window的類型細分出3種:Tumbling(滾動) Window Join、Sliding(滑動) Window Join、Session(會話) Widnow Join。
????????🌸Window 類型的join都是利用window的機制,先將數據緩存在Window State中,當窗口觸發計算時,執行join操作。
????????🌸Interval join也是利用state存儲數據再處理,區別在于state中的數據有失效機制,依靠數據觸發數據清理,目前Stream join的結果是數據的卡爾積。
二、Window Join
?Tumbling Window Join
????????執行翻滾窗口聯接時,具有公共鍵和公告翻滾窗口的所有元素將成對組合聯接,并傳遞JoinFunction或FlatJoinFunction。因為它的行為類似于內部連接,所以一個流中的元素在其滾動窗口中沒有來自另一個流的元素,因此不會被發射。
????????如圖所示,我們定義了一個為2毫秒的翻滾窗口,結果窗口的形式為[0,1]、[2,3]..............該圖顯示了每個窗口中所以元素的成對組合,這些元素將傳遞給JoinFunction。注意在翻滾窗口[6,7]中沒有發射任何東西,因為綠色流中不存在與橙色元素⑥和⑦結合的元素。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
?Sliding Window Join
? ? ? ? 在執行滑動窗口聯接時,具有公共鍵和公共滑動窗口的所以元素將作為成對組合聯接,并傳遞JoinFunction或FlatJoinFunction。在當前滑動窗口中,一個流的元素沒有來自另一個流的元素,則不會發射!請注意,某些元素可能會聯接到一個滑動窗口中,但不會聯接到另一個滑動窗口中!
? ? ? ? 在本例中,我們使用大小為2毫秒的滑動窗口,并將其滑動1毫秒,從而產生滑動窗口[-1,0],[1,2],[2,3]...........x軸下方的連續元素時傳遞給每個滑動窗口的Join Function的元素。在這里,你還可以看到,例如在窗口[2,3]中,橙色②和綠色③連接,但在窗口[1,2]中沒有與任何對象連接。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
?Session Window Join
? ? ? ? 在執行會話窗口聯接時,具有相同鍵(當“組合”滿足會話條件)的所有元素以成對組合方式聯接,并傳遞給JoinFunction或FlatJoinFunction。同樣,這執行一個內部連接,所以如果有一個會話窗口只包含來自一個流的元素,則不會發出任何輸出!
? ? ? ? 這里,我們定義一個會話窗口連接,其中每個會話被至少1毫秒的時間分割。有三個會話,在前兩個會話中,來自兩個流的連接元素被傳遞給JoinFunction。在第三個會話中,綠色流中沒有元素,所以⑧和⑨沒有連接!
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
三、Interval Join
????????前面學習的Window Join必須要在一個Window中進行Join,那如果沒有Window如何處理呢?interval join也是使用相同的key來join兩個流(流A、流B),并且流B中的元素中的時間戳,和流A元素的時間戳,有一個時間間隔。
b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]?or
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
也就是:流B的元素的時間戳 ≥ 流A的元素時間戳 + 下界,且,流B的元素的時間戳 ≤ 流A的元素時間戳
?
在上面的示例中,我們將兩個流“orange”和“green”連接起來,其下限為-2毫秒,上限為+1毫秒。默認情況下,這些邊界是包含的,但是可以應用.lowerBoundExclusive()和.upperBoundExclusive來更改行為orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound?
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(first + "," + second);}});
?