1.Window Join
a)概述
Window join 作用在兩個流中有相同 key 且處于相同窗口的元素上,窗口可以通過 window assigner 定義,并且兩個流中的元素都會被用于計算窗口的結果。
兩個流中的元素在組合之后,會被傳遞給用戶定義的 JoinFunction
或 FlatJoinFunction
,可以用它們輸出符合 join 要求的結果。
stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)
注意:
- 從兩個流中創建成對的元素與 inner-join 類似,即一個流中的元素在與另一個流中對應的元素完成 join 之前不會被輸出。
- 完成 join 的元素會將他們的 timestamp 設為對應窗口中允許的最大 timestamp。比如一個邊界為
[5, 10)
窗口中的元素在 join 之后的 timestamp 為 9。
b)滾動 Window Join
使用滾動 window join 時,所有 key 相同且共享一個滾動窗口的元素會被組合成對,并傳遞給 JoinFunction
或 FlatJoinFunction
。
行為與 inner join 類似,所以一個流中的元素如果沒有與另一個流中的元素組合起來,它就不會被輸出!
如圖所示,定義了一個大小為 2 毫秒的滾動窗口,即形成了邊界為 [0,1], [2,3], ...
的窗口,將每個窗口中的元素組合成對,組合的結果將被傳遞給 JoinFunction
。
注意:滾動窗口 [6,7]
將不會輸出任何數據,因為綠色流當中沒有數據可以與橙色流的 ⑥ 和 ⑦ 配對。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
c)滑動 Window Join
當使用滑動 window join 時,所有 key 相同且處于同一個滑動窗口的元素將被組合成對,并傳遞給 JoinFunction
或 FlatJoinFunction
,當前滑動窗口內,如果一個流中的元素沒有與另一個流中的元素組合起來,它就不會被輸出!
注意:在某個滑動窗口中被 join 的元素不一定會在其他滑動窗口中被 join。
本例中定義了長度為兩毫秒,滑動距離為一毫秒的滑動窗口,生成的窗口實例區間為 [-1, 0],[0,1],[1,2],[2,3], …
。 X 軸下方是每個滑動窗口中被 join 后傳遞給 JoinFunction
的元素;圖中可以看到橙色 ② 與綠色 ③ 在窗口 [2,3]
中 join,但沒有與窗口 [1,2]
中任何元素 join。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
d)會話 Window Join
使用會話 window join 時,所有 key 相同且組合后符合會話要求的元素將被組合成對,并傳遞給 JoinFunction
或 FlatJoinFunction
,這個操作同樣是 inner join,如果一個會話窗口中只含有某一個流的元素,這個窗口將不會產生輸出!
定義了一個間隔為至少一毫秒的會話窗口。圖中總共有三個會話,前兩者中兩個流都有元素,它們被 join 并傳遞給 JoinFunction
,而第三個會話中,綠流沒有任何元素,所以 ⑧ 和 ⑨ 沒有被 join!
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});