《Flink SQL 語法篇》系列,共包含以下 10 篇文章:
- Flink SQL 語法篇(一):CREATE
- Flink SQL 語法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
- Flink SQL 語法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
- Flink SQL 語法篇(四):Group 聚合、Over 聚合
- Flink SQL 語法篇(五):Regular Join、Interval Join
- Flink SQL 語法篇(六):Temporal Join
- Flink SQL 語法篇(七):Lookup Join、Array Expansion、Table Function
- Flink SQL 語法篇(八):集合、Order By、Limit、TopN
- Flink SQL 語法篇(九):Window TopN、Deduplication
- Flink SQL 語法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints
😊 如果您覺得這篇文章有用 ?? 的話,請給博主一個一鍵三連 🚀🚀🚀 吧 (點贊 🧡、關注 💛、收藏 💚)!!!您的支持 💖💖💖 將激勵 🔥 博主輸出更多優質內容!!!
Flink SQL 語法篇(六):Temporal Join
- 1.Versioned Table 的兩種定義方式
- 1.1 PRIMARY KEY 定義方式
- 1.2 Deduplicate 定義方式
- 2.應用案例
- 2.1 案例一(事件時間)
- 2.2 案例二(處理時間)
Temporal Join 定義(支持 Batch / Streaming):Temporal Join 在離線的概念中其實是沒有類似的 Join 概念的,但是離線中常常會維護一種表叫做 拉鏈快照表,使用一個明細表去 Join 這個 拉鏈快照表 的 Join 方式就叫做 Temporal Join。而 Flink SQL 中也有對應的概念,表叫做 Versioned Table
,使用一個明細表去 Join 這個 Versioned Table
的 Join 操作就叫做 Temporal Join。Temporal Join 中,Versioned Table
其實就是對同一條 key
(在 DDL 中以 Primary Key 標記同一個 key
)的歷史版本(根據時間劃分版本)做一個維護,當有明細表 Join 這個表時,可以根據明細表中的時間版本選擇 Versioned Table
對應時間區間內的快照數據進行 Join。
應用場景:比如常見的匯率數據(實時的根據匯率計算總金額),在 12 : 00 12:00 12:00 之前(事件時間),人民幣和美元匯率是 7 : 1 7:1 7:1,在 12 : 00 12:00 12:00 之后變為 6 : 1 6:1 6:1,那么在 12 : 00 12:00 12:00 之前數據就要按照 7 : 1 7:1 7:1 進行計算, 12 : 00 12:00 12:00 之后就要按照 6 : 1 6:1 6:1 計算。在事件時間語義的任務中,事件時間 12 : 00 12:00 12:00 之前的數據,要按照 7 : 1 7:1 7:1 進行計算, 12 : 00 12:00 12:00 之后的數據,要按照 6 : 1 6:1 6:1 進行計算。這其實就是離線中快照的概念,維護具體匯率的表在 Flink SQL 體系中就叫做 Versioned Table
。
1.Versioned Table 的兩種定義方式
Verisoned Table
:Verisoned Table 中存儲的數據通常是來源于 CDC 或者會發生更新的數據。Flink SQL 會為 Versioned Table 維護 Primary Key 下的所有歷史時間版本的數據。舉一個匯率場景的案例來看一下一個 Versioned Table 的兩種定義方式。
1.1 PRIMARY KEY 定義方式
-- 定義一個匯率 versioned 表,其中 versioned 表的概念下文會介紹到
CREATE TABLE currency_rates (currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,WATERMARK FOR update_time AS update_time,-- PRIMARY KEY 定義方式PRIMARY KEY(currency) NOT ENFORCED
) WITH ('connector' = 'kafka','value.format' = 'debezium-json',/* ... */
);
1.2 Deduplicate 定義方式
-- 定義一個 append-only 的數據源表
CREATE TABLE currency_rates (currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,WATERMARK FOR update_time AS update_time
) WITH ('connector' = 'kafka','value.format' = 'debezium-json',/* ... */
);-- 將數據源表按照 Deduplicate 方式定義為 Versioned Table
CREATE VIEW versioned_rates AS
SELECT currency, conversion_rate, update_time -- 1. 定義 `update_time` 為時間字段FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY currency -- 2. 定義 `currency` 為主鍵ORDER BY update_time DESC -- 3. ORDER BY 中必須是時間戳列) AS rownum FROM currency_rates)
WHERE rownum = 1;
2.應用案例
Temporal Join 支持的時間語義:事件時間、處理時間。
2.1 案例一(事件時間)
-- 1. 定義一個輸入訂單表
CREATE TABLE orders (order_id STRING,price DECIMAL(32,2),currency STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time
) WITH (/* ... */);-- 2. 定義一個匯率 versioned 表,其中 versioned 表的概念下文會介紹到
CREATE TABLE currency_rates (currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,WATERMARK FOR update_time AS update_time,PRIMARY KEY(currency) NOT ENFORCED
) WITH ('connector' = 'kafka','value.format' = 'debezium-json',/* ... */
);SELECT order_id,price,currency,conversion_rate,order_time,
FROM orders
-- 3. Temporal Join 邏輯
-- SQL 語法為:FOR SYSTEM_TIME AS OF
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;
結果如下,可以看到相同的貨幣匯率會根據具體數據的事件時間不同 Join 到對應時間的匯率:
order_id price 貨幣 匯率 order_time
======== ===== ======== =============== =========
o_001 11.11 EUR 1.14 12:00:00
o_002 12.51 EUR 1.10 12:06:00
- 事件時間的 Temporal Join 一定要給左右兩張表都設置 Watermark。
- 事件時間的 Temporal Join 一定要把 Versioned Table 的主鍵包含在 Join on 的條件中。
2.2 案例二(處理時間)
10:15> SELECT * FROM LatestRates;currency rate
======== ======
US Dollar 102
Euro 114
Yen 110:30> SELECT * FROM LatestRates;currency rate
======== ======
US Dollar 102
Euro 114
Yen 1-- 10:42 時,Euro 的匯率從 114 變為 116
10:52> SELECT * FROM LatestRates;currency rate
======== ======
US Dollar 102
Euro 116 <==== 從 114 變為 116
Yen 1-- 從 Orders 表查詢數據
SELECT * FROM Orders;amount currency
====== =========2 Euro <== 在處理時間 10:15 到達的一條數據1 US Dollar <== 在處理時間 10:30 到達的一條數據2 Euro <== 在處理時間 10:52 到達的一條數據-- 執行關聯查詢
SELECTo.amount, o.currency, r.rate, o.amount * r.rate
FROMOrders AS oJOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS rON r.currency = o.currency-- 結果如下:
amount currency rate amount*rate
====== ========= ======= ============2 Euro 114 228 <== 在處理時間 10:15 到達的一條數據1 US Dollar 102 102 <== 在處理時間 10:30 到達的一條數據2 Euro 116 232 <== 在處理時間 10:52 到達的一條數據
可以發現處理時間就比較好理解了,因為處理時間語義中是根據左流數據到達的時間決定拿到的匯率值。Flink 就只為 LatestRates
維護了最新的狀態數據,不需要關心歷史版本的數據。