![]() | 博主歷時三年精心創作的《大數據平臺架構與原型實現:數據中臺建設實戰》一書現已由知名IT圖書品牌電子工業出版社博文視點出版發行,點擊《重磅推薦:建大數據平臺太難了!給我發個工程原型吧!》了解圖書詳情,京東購書鏈接:https://item.jd.com/12677623.html,掃描左側二維碼進入京東手機購書頁面。 |
我們知道,時態表(確切地說應該是版本表)提供了回溯歷史的能力,也就是能讀取一條記錄過去某個時刻所對應的值。要想查詢版本表在過去某個時刻對應的值,我們得在查詢時把這個時間作為參數傳遞給版本表,但這個時間參數絕不會是一個 where 條件,它是另一個維度(時間維度)上的參數,那么用怎樣的形式才能把這個時間參數合理地表達到查詢中呢? Flink 使用了 UDF 的形式,主要思路就是:注冊一個 UDF 來指代一張版本表,表名不能有參數,但函數可以有,這時把想訪問版本表的目標時間點作為參數傳給這個UDF,返回的就是當時表中的數據了,這個 UDF 就被稱作:Temporal Table Function!
例如:以下代碼將匯率表 currency_rates
注冊成了時態表函數 rates
。(注意:目前在 Flink SQL 中是不支持定義 Temporal Table Function 的!只能以代碼方式定義,但是 SQL 中可以定義 Temporal Table DDL)
rates = tEnv.from("currency_rates").createTemporalTableFunction("update_time", "currency")tEnv.createTemporarySystemFunction("rates", rates);
然后,使用下面的 SQL 就能查詢出在 11:05
時的匯率信息了:
SELECT * FROM rates('11:05');
可以說:是時態表函數是訪問時態表的“入口”,是時態表的“正確打開方式”!
但是,像上面那樣直接查詢某一時刻版本表上的數據的情形其實并不多,真正常見是:其他表主動 Join 一張時態表,期望獲得表中記錄所代表的事件在發生時刻時態表中的當時的數據,就是我們曾經解釋的“當時對當時”的需求場景(典型案例:Join 匯率表計算訂單當時的總價):
-- 基于時態表函數實現的Join,由于指定的 order_time 是一個事件時間
-- 所以該SQL實現的是:基于事件時間的 Temporal Join,也就是 Join 事件發生時刻關聯表當時的值
SELECTSUM(amount * rate) AS amount
FROMorders,LATERAL TABLE (rates(order_time))
WHERErates.currency = orders.currency
上面的 SQL 就是標準的 Temporal Table Function Join 語法,SQL 中使用了關鍵字 LATERAL TABLE
,填入一個 Temporal Table Function / 時態表函數 rates
,設定傳給時態表的時間屬性(基于什么時間查找時態表上的版本)order_time
。
這里,官方文檔其實隱去了一個背景信息,order_time
其實是 orders
表的事件時間屬性,所以,上述使用 Temporal Table Function Join 語法實現的是:基于事件時間的 Temporal Join,這種 Join 還可以通過 FOR SYSTEM_TIME AS OF
關鍵字實現, Temporal Table Function Join 語法除了能實現基于事件時間的 Temporal Join 外,還能實現基于處理時間的 Temporal Join 了,語法不變,只要將傳給 rates
函數的時間屬性從一個事件時間改為一個處理時間就可以了,就像 [ 官方文檔 ] 給出的示例中那樣,使用了一個 o_proctime
字段,這個字段是 orders
表的處理時間屬性:
-- 基于時態表函數實現的Join,由于指定的 o_proctime 是一個處理時間
-- 所以該SQL實現的是:基于處理時間的 Temporal Join,也就是總是 Join 關聯表當前最新狀態的數據
SELECTo_amount, r_rate
FROMOrders,LATERAL TABLE (rates(o_proctime))
WHEREr_currency = o_currency