《Flink SQL 語法篇》系列,共包含以下 10 篇文章:
- Flink SQL 語法篇(一):CREATE
- Flink SQL 語法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
- Flink SQL 語法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
- Flink SQL 語法篇(四):Group 聚合、Over 聚合
- Flink SQL 語法篇(五):Regular Join、Interval Join
- Flink SQL 語法篇(六):Temporal Join
- Flink SQL 語法篇(七):Lookup Join、Array Expansion、Table Function
- Flink SQL 語法篇(八):集合、Order By、Limit、TopN
- Flink SQL 語法篇(九):Window TopN、Deduplication
- Flink SQL 語法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints
😊 如果您覺得這篇文章有用 ?? 的話,請給博主一個一鍵三連 🚀🚀🚀 吧 (點贊 🧡、關注 💛、收藏 💚)!!!您的支持 💖💖💖 將激勵 🔥 博主輸出更多優質內容!!!
Flink SQL 語法篇(五):Regular Join、Interval Join
- 1.Regular Join
- 1.1 Inner Join 案例
- 1.2 Left Join 案例
- 1.3 Full Join 案例
- 2.Interval Join(時間區間 Join)
- 2.1 Inner Interval Join
- 2.2 Left Interval Join
- 2.3 Full Interval Join
Flink 也支持了非常多的數據 Join 方式,主要包括以下三種:
- 動態表(流)與動態表(流)的 Join
- 動態表(流)與外部維表(比如 Redis)的 Join
- 動態表字段的列轉行(一種特殊的 Join)
細分 Flink SQL 支持的 Join:
Regular Join
:流與流的 Join,包括 Inner Equal Join、Outer Equal JoinInterval Join
:流與流的 Join,兩條流一段時間區間內的 JoinTemporal Join
:流與流的 Join,包括事件時間,處理時間的 Temporal Join,類似于離線中的快照 JoinLookup Join
:流與外部維表的 JoinArray Expansion
:表字段的列轉行,類似于 Hive 的 explode 數據炸開的列轉行Table Function
:自定義函數的表字段的列轉行,支持 Inner Join 和 Left Outer Join
1.Regular Join
Regular Join 定義(支持 Batch / Streaming):Regular Join 其實就是和離線 Hive SQL 一樣的 Regular Join,通過條件關聯兩條流數據輸出。
應用場景:Join 其實在我們的數倉建設過程中應用是非常廣泛的。離線數倉可以說基本上是離不開 Join 的。那么實時數倉的建設也必然離不開 Join,比如日志關聯擴充維度數據,構建寬表;日志通過 ID 關聯計算 CTR。
Regular Join 包含以下幾種(以 L
作為左流中的數據標識,R
作為右流中的數據標識):
Inner Join
(Inner Equal Join
):流任務中,只有兩條流 Join 到才輸出,輸出+[L, R]
。Left Join
(Outer Equal Join
):流任務中,左流數據到達之后,無論有沒有 Join 到右流的數據,都會輸出(Join 到輸出+[L, R]
,沒 Join 到輸出+[L, null]
),如果右流之后數據到達之后,發現左流之前輸出過沒有 Join 到的數據,則會發起回撤流,先輸出-[L, null]
,然后輸出+[L, R]
。Right Join
(Outer Equal Join
):有 Left Join 一樣,左表和右表的執行邏輯完全相反。Full Join
(Outer Equal Join
):流任務中,左流或者右流的數據到達之后,無論有沒有 Join 到另外一條流的數據,都會輸出(對右流來說:Join 到輸出+[L, R]
,沒 Join 到輸出+[null, R]
;對左流來說:Join 到輸出+[L, R]
,沒 Join 到輸出+[L, null]
)。如果一條流的數據到達之后,發現之前另一條流之前輸出過沒有 Join 到的數據,則會發起回撤流(左流數據到達為例:回撤-[null, R]
,輸出+[L, R]
,右流數據到達為例:回撤-[L, null]
,輸出+[L, R]
)。
1.1 Inner Join 案例
實際案例:案例為 曝光日志 關聯 點擊日志 篩選既有曝光又有點擊的數據。
-- 曝光日志數據
CREATE TABLE show_log_table (log_id BIGINT,show_params STRING
) WITH ('connector' = 'datagen','rows-per-second' = '2','fields.show_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '100'
);-- 點擊日志數據
CREATE TABLE click_log_table (log_id BIGINT,click_params STRING
)
WITH ('connector' = 'datagen','rows-per-second' = '2','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);-- 流的 INNER JOIN,條件為 log_id
INSERT INTO sink_table
SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table
INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
輸出結果如下:
+I[5, d, 5, f]
+I[5, d, 5, 8]
+I[5, d, 5, 2]
+I[3, 4, 3, 0]
+I[3, 4, 3, 3]
...
1.2 Left Join 案例
CREATE TABLE show_log_table (log_id BIGINT,show_params STRING
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.show_params.length' = '3','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE click_log_table (log_id BIGINT,click_params STRING
)
WITH ('connector' = 'datagen','rows-per-second' = '1','fields.click_params.length' = '3','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table
LEFT JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
輸出結果如下:
+I[5, f3c, 5, c05]
+I[5, 6e2, 5, 1f6]
+I[5, 86b, 5, 1f6]
+I[5, f3c, 5, 1f6]
-D[3, 4ab, null, null]
-D[3, 6f2, null, null]
+I[3, 4ab, 3, 765]
+I[3, 6f2, 3, 765]
+I[2, 3c4, null, null]
+I[3, 4ab, 3, a8b]
+I[3, 6f2, 3, a8b]
+I[2, c03, null, null]
...
1.3 Full Join 案例
CREATE TABLE show_log_table (log_id BIGINT,show_params STRING
) WITH ('connector' = 'datagen','rows-per-second' = '2','fields.show_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE click_log_table (log_id BIGINT,click_params STRING
)
WITH ('connector' = 'datagen','rows-per-second' = '2','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table
FULL JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
輸出結果如下:
+I[null, null, 7, 6]
+I[6, 5, null, null]
-D[1, c, null, null]
+I[1, c, 1, 2]
+I[3, 1, null, null]
+I[null, null, 7, d]
+I[10, 0, null, null]
+I[null, null, 2, 6]
-D[null, null, 7, 6]
-D[null, null, 7, d]
...
關于 Regular Join 的注意事項:
- 實時 Regular Join 可以不是 等值 Join。等值 Join 和 非等值 Join 區別在于,等值 Join 數據 Shuffle 策略是
Hash
,會按照 Join on 中的等值條件作為id
發往對應的下游;非等值 Join 數據 Shuffle 策略是Global
,所有數據發往一個并發,按照非等值條件進行關聯。 - Join 的流程是左流新來一條數據之后,會和右流中符合條件的所有數據做 Join,然后輸出。
- 流的上游是無限的數據,所以要做到關聯的話,Flink 會將兩條流的所有數據都存儲在 State 中,所以 Flink 任務的 State 會無限增大,因此你需要為 State 配置合適的 TTL,以防止 State 過大。
2.Interval Join(時間區間 Join)
Interval Join 定義(支持 Batch / Streaming):Interval Join 在離線的概念中是沒有的。Interval Join 可以讓一條流去 Join 另一條流中前后一段時間內的數據。
應用場景:為什么有 Regular Join 還要 Interval Join 呢?剛剛的案例也講了,Regular Join 會產生 回撤流,但是在實時數倉中一般寫入的 Sink 都是類似于 Kafka 這樣的消息隊列,然后后面接 Clickhouse 等引擎,這些引擎又不具備處理回撤流的能力。所以博主理解 Interval Join 就是用于消滅回撤流的。
Interval Join 包含以下幾種(以 L
作為左流中的數據標識,R
作為右流中的數據標識):
Inner Interval Join
:流任務中,只有兩條流 Join 到(滿足 Join on 中的條件:兩條流的數據在時間區間 + 滿足其他等值條件)才輸出,輸出+[L, R]
Left Interval Join
:流任務中,左流數據到達之后,如果沒有 Join 到右流的數據,就會等待(放在 State 中等),如果之后右流之后數據到達之后,發現能和剛剛那條左流數據 Join 到,則會輸出+[L, R]
。事件時間中隨著 Watermark 的推進(也支持處理時間)。如果發現發現左流 State 中的數據過期了,就把左流中過期的數據從 State 中刪除,然后輸出+[L, null]
,如果右流 State 中的數據過期了,就直接從 State 中刪除。Right Interval Join
:和 Left Interval Join 執行邏輯一樣,只不過左表和右表的執行邏輯完全相反Full Interval Join
:流任務中,左流或者右流的數據到達之后,如果沒有 Join 到另外一條流的數據,就會等待(左流放在左流對應的 State 中等,右流放在右流對應的 State 中等),如果之后另一條流數據到達之后,發現能和剛剛那條數據 Join 到,則會輸出+[L, R]
。事件時間中隨著 Watermark 的推進(也支持處理時間),發現 State 中的數據過期了,就將這些數據從 State 中刪除并且輸出(左流過期輸出+[L, null]
,右流過期輸出+[null, R]
)
可以發現
Inner Interval Join
和其他三種Outer Interval Join
的區別在于,Outer 在隨著時間推移的過程中,如果有數據過期了之后,會根據是否是 Outer 將沒有 Join 到的數據也給輸出。
2.1 Inner Interval Join
實際案例:還是剛剛的案例,曝光日志 關聯 點擊日志 篩選既有曝光又有點擊的數據,條件是曝光關聯之后發生 4 4 4 小時之內的點擊。
CREATE TABLE show_log_table (log_id BIGINT,show_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.show_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE click_log_table (log_id BIGINT,click_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
)
WITH ('connector' = 'datagen','rows-per-second' = '1','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '4' HOUR AND click_log_table.row_time;
輸出結果如下:
6> +I[2, a, 2, 6]
6> +I[2, 6, 2, 6]
2> +I[4, 1, 4, 5]
2> +I[10, 8, 10, d]
2> +I[10, 7, 10, d]
2> +I[10, d, 10, d]
2> +I[5, b, 5, d]
6> +I[1, a, 1, 7]
2.2 Left Interval Join
CREATE TABLE show_log (log_id BIGINT,show_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.show_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE click_log (log_id BIGINT,click_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
)
WITH ('connector' = 'datagen','rows-per-second' = '1','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTshow_log.log_id as s_id,show_log.show_params as s_params,click_log.log_id as c_id,click_log.click_params as c_params
FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_id
AND show_log.row_time BETWEEN click_log.row_time - INTERVAL '5' SECOND AND click_log.row_time + INTERVAL '5' SECOND;
輸出結果如下:
+I[6, e, 6, 7]
+I[11, d, null, null]
+I[7, b, null, null]
+I[8, 0, 8, 3]
+I[13, 6, null, null]
2.3 Full Interval Join
CREATE TABLE show_log (log_id BIGINT,show_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.show_params.length' = '1','fields.log_id.min' = '5','fields.log_id.max' = '15'
);CREATE TABLE click_log (log_id BIGINT,click_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
)
WITH ('connector' = 'datagen','rows-per-second' = '1','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
);CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECTshow_log.log_id as s_id,show_log.show_params as s_params,click_log.log_id as c_id,click_log.click_params as c_params
FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_id
AND show_log.row_time BETWEEN click_log.row_time - INTERVAL '5' SECOND AND click_log.row_time + INTERVAL '5' SECOND;
輸出結果如下:
+I[6, 1, null, null]
+I[7, 3, 7, 8]
+I[null, null, 6, 6]
+I[null, null, 4, d]
+I[8, d, null, null]
+I[null, null, 3, b]
關于 Interval Join 的注意事項:
- 實時 Interval Join 可以不是 等值 Join。等值 Join 和 非等值 Join 區別在于,等值 Join 數據 Shuffle 策略是
Hash
,會按照 Join on 中的等值條件作為id
發往對應的下游;非等值 Join 數據 Shuffle 策略是Global
,所有數據發往一個并發,然后將滿足條件的數據進行關聯輸出。