【大數據】Flink SQL 語法篇(六):Temporal Join

Flink SQL 語法篇》系列,共包含以下 10 篇文章:

  • Flink SQL 語法篇(一):CREATE
  • Flink SQL 語法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 語法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 語法篇(四):Group 聚合、Over 聚合
  • Flink SQL 語法篇(五):Regular Join、Interval Join
  • Flink SQL 語法篇(六):Temporal Join
  • Flink SQL 語法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 語法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 語法篇(九):Window TopN、Deduplication
  • Flink SQL 語法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

😊 如果您覺得這篇文章有用 ?? 的話,請給博主一個一鍵三連 🚀🚀🚀 吧 (點贊 🧡、關注 💛、收藏 💚)!!!您的支持 💖💖💖 將激勵 🔥 博主輸出更多優質內容!!!

Flink SQL 語法篇(六):Temporal Join

  • 1.Versioned Table 的兩種定義方式
    • 1.1 PRIMARY KEY 定義方式
    • 1.2 Deduplicate 定義方式
  • 2.應用案例
    • 2.1 案例一(事件時間)
    • 2.2 案例二(處理時間)

Temporal Join 定義(支持 Batch / Streaming):Temporal Join 在離線的概念中其實是沒有類似的 Join 概念的,但是離線中常常會維護一種表叫做 拉鏈快照表,使用一個明細表去 Join 這個 拉鏈快照表 的 Join 方式就叫做 Temporal Join。而 Flink SQL 中也有對應的概念,表叫做 Versioned Table,使用一個明細表去 Join 這個 Versioned Table 的 Join 操作就叫做 Temporal Join。Temporal Join 中,Versioned Table 其實就是對同一條 key(在 DDL 中以 Primary Key 標記同一個 key)的歷史版本(根據時間劃分版本)做一個維護,當有明細表 Join 這個表時,可以根據明細表中的時間版本選擇 Versioned Table 對應時間區間內的快照數據進行 Join。

應用場景:比如常見的匯率數據(實時的根據匯率計算總金額),在 12 : 00 12:00 12:00 之前(事件時間),人民幣和美元匯率是 7 : 1 7:1 7:1,在 12 : 00 12:00 12:00 之后變為 6 : 1 6:1 6:1,那么在 12 : 00 12:00 12:00 之前數據就要按照 7 : 1 7:1 7:1 進行計算, 12 : 00 12:00 12:00 之后就要按照 6 : 1 6:1 6:1 計算。在事件時間語義的任務中,事件時間 12 : 00 12:00 12:00 之前的數據,要按照 7 : 1 7:1 7:1 進行計算, 12 : 00 12:00 12:00 之后的數據,要按照 6 : 1 6:1 6:1 進行計算。這其實就是離線中快照的概念,維護具體匯率的表在 Flink SQL 體系中就叫做 Versioned Table

1.Versioned Table 的兩種定義方式

Verisoned Table:Verisoned Table 中存儲的數據通常是來源于 CDC 或者會發生更新的數據。Flink SQL 會為 Versioned Table 維護 Primary Key 下的所有歷史時間版本的數據。舉一個匯率場景的案例來看一下一個 Versioned Table 的兩種定義方式。

1.1 PRIMARY KEY 定義方式

-- 定義一個匯率 versioned 表,其中 versioned 表的概念下文會介紹到
CREATE TABLE currency_rates (currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,WATERMARK FOR update_time AS update_time,-- PRIMARY KEY 定義方式PRIMARY KEY(currency) NOT ENFORCED
) WITH ('connector' = 'kafka','value.format' = 'debezium-json',/* ... */
);

1.2 Deduplicate 定義方式

-- 定義一個 append-only 的數據源表
CREATE TABLE currency_rates (currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,WATERMARK FOR update_time AS update_time
) WITH ('connector' = 'kafka','value.format' = 'debezium-json',/* ... */
);-- 將數據源表按照 Deduplicate 方式定義為 Versioned Table
CREATE VIEW versioned_rates AS
SELECT currency, conversion_rate, update_time   -- 1. 定義 `update_time` 為時間字段FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY currency  -- 2. 定義 `currency` 為主鍵ORDER BY update_time DESC              -- 3. ORDER BY 中必須是時間戳列) AS rownum FROM currency_rates)
WHERE rownum = 1; 

2.應用案例

Temporal Join 支持的時間語義:事件時間、處理時間。

2.1 案例一(事件時間)

-- 1. 定義一個輸入訂單表
CREATE TABLE orders (order_id    STRING,price       DECIMAL(32,2),currency    STRING,order_time  TIMESTAMP(3),WATERMARK FOR order_time AS order_time
) WITH (/* ... */);-- 2. 定義一個匯率 versioned 表,其中 versioned 表的概念下文會介紹到
CREATE TABLE currency_rates (currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,WATERMARK FOR update_time AS update_time,PRIMARY KEY(currency) NOT ENFORCED
) WITH ('connector' = 'kafka','value.format' = 'debezium-json',/* ... */
);SELECT order_id,price,currency,conversion_rate,order_time,
FROM orders
-- 3. Temporal Join 邏輯
-- SQL 語法為:FOR SYSTEM_TIME AS OF
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;

結果如下,可以看到相同的貨幣匯率會根據具體數據的事件時間不同 Join 到對應時間的匯率:

order_id  price  貨幣       匯率             order_time
========  =====  ========  ===============  =========
o_001     11.11  EUR       1.14             12:00:00
o_002     12.51  EUR       1.10             12:06:00
  • 事件時間的 Temporal Join 一定要給左右兩張表都設置 Watermark。
  • 事件時間的 Temporal Join 一定要把 Versioned Table 的主鍵包含在 Join on 的條件中。

2.2 案例二(處理時間)

10:15> SELECT * FROM LatestRates;currency   rate
======== ======
US Dollar   102
Euro        114
Yen           110:30> SELECT * FROM LatestRates;currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1-- 10:42 時,Euro 的匯率從 114 變為 116
10:52> SELECT * FROM LatestRates;currency   rate
======== ======
US Dollar   102
Euro        116     <====114 變為 116
Yen           1-- 從 Orders 表查詢數據
SELECT * FROM Orders;amount currency
====== =========2 Euro             <== 在處理時間 10:15 到達的一條數據1 US Dollar        <== 在處理時間 10:30 到達的一條數據2 Euro             <== 在處理時間 10:52 到達的一條數據-- 執行關聯查詢
SELECTo.amount, o.currency, r.rate, o.amount * r.rate
FROMOrders AS oJOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS rON r.currency = o.currency-- 結果如下:
amount currency     rate   amount*rate
====== ========= ======= ============2 Euro          114          228    <== 在處理時間 10:15 到達的一條數據1 US Dollar     102          102    <== 在處理時間 10:30 到達的一條數據2 Euro          116          232    <== 在處理時間 10:52 到達的一條數據

可以發現處理時間就比較好理解了,因為處理時間語義中是根據左流數據到達的時間決定拿到的匯率值。Flink 就只為 LatestRates 維護了最新的狀態數據,不需要關心歷史版本的數據。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/711801.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/711801.shtml
英文地址,請注明出處:http://en.pswp.cn/news/711801.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

機器視覺——硬件選型

1、相機選型 在選擇機器視覺相機時&#xff0c;通常需要考慮以下幾個方面&#xff1a; 1、分辨率&#xff1a;相機的分辨率決定了其拍攝圖像的清晰度和細節程度。根據具體的應用需求&#xff0c;可以選擇適當的分辨率范圍。 2、幀率&#xff1a;幀率表示相機每秒鐘能夠拍攝的…

2023年營養保健品線上電商市場行業分析(2024年營養保健行業未來趨勢分析)

近年來&#xff0c;受人口老齡化、養生年輕化等因素驅動&#xff0c;保健品行業增長強勁&#xff0c;加之越來越多的年輕人也加入養生大軍&#xff0c;成為保健品市場上的一股新力量&#xff0c;進一步帶動市場擴容。 鯨參謀數據顯示&#xff0c;2023年度&#xff0c;京東平臺…

[pdf]《軟件方法》2024版部分公開-共196頁

DDD領域驅動設計批評文集 做強化自測題獲得“軟件方法建模師”稱號 《軟件方法》各章合集 潘加宇《軟件方法》2024版部分公開pdf文件&#xff0c;共196頁&#xff0c;已上傳CSDN資源。 也可到以下地址下載&#xff1a; http://www.umlchina.com/url/softmeth2024.html 如果…

Ubuntu20.04 ssh終端登錄后未自動執行.bashrc

sudo vim ~/.profile輸入以下內容 if [ -n "$BASH_VERSION" ]; then if [ -f "$HOME/.bashrc" ]; then . "$HOME/.bashrc" fi fi 執行 source ~/.profile重新測試 其他答案 如果你的~/.bashrc文件在Ubuntu中沒有自動生效&#xff0c;…

3. 文檔概述(Documentation Overview)

3. 文檔概述&#xff08;Documentation Overview&#xff09; 本章節簡要介紹一下Spring Boot參考文檔。它包含本文檔其它部分的鏈接。 本文檔的最新版本可在 docs.spring.io/spring-boot/docs/current/reference/ 上獲取。 3.1 第一步&#xff08;First Steps&#xff09; …

解析電源模塊測試條件與測試步驟 快速完成測試

高溫高濕儲存測試是電源模塊環境適應性測試內容之一&#xff0c;在實際使用過程中由于應用場景不同電源所處的環境也是多樣的&#xff0c;因此需要測試電源對各種環境的適應能力&#xff0c;提高電源的性能和可靠性。 電源高溫高濕存儲測試的目的是為了測量環境對電源結構、元件…

C語言第三十三彈---動態內存管理(上)

?個人主頁&#xff1a; 熬夜學編程的小林 &#x1f497;系列專欄&#xff1a; 【C語言詳解】 【數據結構詳解】 動態內存管理 1、為什么要有動態內存分配 2、malloc和free 2.1、malloc 2.2、free 3、calloc和realloc 3.1、calloc 3.2、realloc 4、常見的動態內存的錯…

氣象數據收集

1、國家氣象科學數據中心 預報數據:需要定制,收費10萬+ 觀測數據:國家氣象信息中心-中國氣象數據網 (cma.cn)https://data.cma.cn/data/cdcdetail/dataCode/A.0012.0001.html 地面基本氣象觀測數據 滯后2天 滯后一天 路面數據同化系統,實時 國家氣象信息中心-中國氣象數…

11.以太網交換機工作原理

目錄 一、以太網協議二、以太網交換機原理三、交換機常見問題思考四、同網段數據通信全過程五、跨網段數據通信全過程六、關鍵知識七、調試命令 前言&#xff1a;在網絡中傳輸數據時需要遵循一些標準&#xff0c;以太網協議定義了數據幀在以太網上的傳輸標準&#xff0c;了解以…

android移動應用開發基礎答案,安卓工程師面試題

一線企業的app都是多線程和多進程的&#xff0c;而Android進程間通信機制就是Binder&#xff0c;原生的線程間通信則是Handler&#xff0c;Binder和Handler是了解安卓運行機制必須要掌握的一個知識點&#xff0c;更是一線企業面試必問的知識點&#xff01; 以下幾道就是大廠關于…

【QT+QGIS跨平臺編譯】之五十五:【QGIS_CORE跨平臺編譯】—【qgsmeshcalcparser.cpp生成】

文章目錄 一、Bison二、生成來源三、構建過程一、Bison GNU Bison 是一個通用的解析器生成器,它可以將注釋的無上下文語法轉換為使用 LALR (1) 解析表的確定性 LR 或廣義 LR (GLR) 解析器。Bison 還可以生成 IELR (1) 或規范 LR (1) 解析表。一旦您熟練使用 Bison,您可以使用…

Unity中URP實現水體(整理優化)

文章目錄 前言一、優化水的深度1、我們把 水流動的方向 和 水深淺過渡值&#xff0c;整合到一個四維變量中2、修改 水體流動方向3、在片元著色器中&#xff0c;修改使用過渡變量 二、優化泡沫三、優化水下的扭曲1、修復原本擾動UV的計算 四、優化水面高光1、把高光強度、光滑度…

紅隊基礎設施建設

文章目錄 一、ATT&CK二、T1583 獲取基礎架構2.1 匿名網絡2.2 專用設備2.3 滲透測試虛擬機 三、T1588.002 C23.1 開源/商用 C23.1.1 C2 調研SliverSliver 對比 CS 3.1.2 CS Beacon流量分析流量規避免殺上線 3.1.3 C2 魔改3.1.4 C2 隱匿3.1.5 C2 準入應用場景安裝配置說明工具…

UC++對象方法IsValid()、IsValidLowLevel()、IsValidLowLevelFast()的區別

在 Unreal Engine 中&#xff0c;IsValid(), IsValidLowLevel(), 和 IsValidLowLevelFast() 是用于檢查 UObject&#xff08;Unreal Object&#xff09;有效性的三個不同的方法。它們之間的區別主要在于檢查的級別和效率。 IsValid()&#xff1a; 檢查級別&#xff1a; IsVal…

深度學習 精選筆記(2)自動求導與概率

學習參考&#xff1a; 動手學深度學習2.0Deep-Learning-with-TensorFlow-bookpytorchlightning ①如有冒犯、請聯系侵刪。 ②已寫完的筆記文章會不定時一直修訂修改(刪、改、增)&#xff0c;以達到集多方教程的精華于一文的目的。 ③非常推薦上面&#xff08;學習參考&#x…

Linux系統——LAMP架構

目錄 一、LAMP架構組成 1.LAMP定義 2.各組件的主要作用 3.CGI和FastCGI 3.1CGI 3.3CGI和FastCGI比較 4.PHP 4.1PHP簡介 4.2PHP的Opcode語言 4.3PHP設置 二、LAMP架構實現 1.編譯安裝Apache httpd服務 2.編譯安裝Mysql 3.編譯安裝PHP 4.安裝論壇 5.搭建博客 W…

Linux編程 2.4 文件和目錄-Linux文件系統結構

1、文件操作基本元素 文件操作相關的最基本元素是&#xff1a;目錄結構、索引節點和文件的數據本身。 目錄結構&#xff08;目錄項&#xff09;索引節點&#xff08;i節點&#xff09;文件的數據 2、文件系統的三個區域 屬性&#xff1a; 超級塊&#xff1a;存放文件系統本身…

vs code快捷鍵

ShiftCtrlO vs code 提供很強大的功能&#xff0c;就是可以快速查文件中的符號列表和函數列表&#xff0c;我們首先打開一個源碼文件&#xff0c;比tcp.c&#xff0c;然后我們通過快捷鍵“ShiftCtrlO”即可打開對應源碼文件的符號列表和函數列表&#xff0c;通過查看這些列表&a…

【學習心得】Python調用JS的三種常用方法

在做JS逆向的時候&#xff0c;一種情況是直接用Python代碼復現JS代碼的功能&#xff0c;達成目的。但很多時候這種方法有明顯的缺點&#xff0c;那就是一旦JS代碼邏輯發生了更改&#xff0c;你就得重寫Python的代碼邏輯非常不便。于是第二種情況就出現了&#xff0c;我直接得到…

python自動化管理和zabbix監控網絡設備(防火墻和python自動化配置部分)

目錄 前言 一、ssh配置 1.FW1 2.core-sw1 3.core-sw2 二、python自動化配置防火墻 三、驗證DNAT 四、驗證DNAT 前言 視頻演示請訪問b站主頁 白帽小丑的個人空間-白帽小丑個人主頁-嗶哩嗶哩視頻 一、ssh配置 給需要自動化管理的設備配置ssh服務端用戶名和密碼 1.FW1 …