Flinksql--訂單寬表

參考: https://chbxw.blog.csdn.net/article/details/115078261 (datastream 實現)

一、ODS

模擬訂單表及訂單明細表

CREATE TABLE orders (order_id STRING,user_id STRING,order_time TIMESTAMP(3),-- 定義事件時間及 Watermark(允許5秒亂序)WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'chb1:9092',-- 如果source被多個任務使用,不在定義時指定group.id-- 通過hint指定  OPTIONS('properties.group.id'='test_group2')  注意是group.id 是點不是下劃線-- 'properties.group.id' = 'flink-sql-group-orders',  -- 消費者組 ID'scan.startup.mode' = 'earliest-offset','format' = 'json'
);CREATE TABLE order_details (detail_id STRING,order_id STRING,product_id STRING,price DECIMAL(10,2),quantity INT,detail_time TIMESTAMP(3),-- 定義事件時間及 Watermark(允許5秒亂序)WATERMARK FOR detail_time AS detail_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'order_details','properties.bootstrap.servers' = 'chb1:9092',-- 'properties.group.id' = 'flink-sql-group-order_details',  -- 消費者組 ID'scan.startup.mode' = 'earliest-offset','format' = 'json'
);-- 造數據
insert into order_details values ('d001', 'o001', 'car', 5000, 1, now());
insert into orders values('o001', 'u001', now());insert into orders values('o003', 'u003', now());insert into order_details values ('d003', 'o003', 'water', 2, 12, now());
insert into order_details values ('d003', 'o003', 'food', 50, 3, now());

二、DWD 訂單和訂單明細關聯


-- sink
CREATE TABLE dwd_trd_order (detail_id STRING,order_id STRING,product_id STRING,price DECIMAL(10,2),quantity INT,detail_time TIMESTAMP(3),user_id STRING,order_time TIMESTAMP(3),-- 定義事件時間及 Watermark(允許5秒亂序)WATERMARK FOR detail_time AS detail_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'dwd_trd_order','properties.bootstrap.servers' = 'chb1:9092','scan.startup.mode' = 'earliest-offset','format' = 'json'
);insert into dwd_trd_order
SELECT d.detail_id,o.order_id,d.product_id,d.price,d.quantity,d.detail_time,user_id,order_time
FROM orders o
JOIN order_details d 
ON o.order_id = d.order_id
AND d.detail_time BETWEEN o.order_time AND o.order_time + INTERVAL '10' MINUTE;

報錯:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: The query contains more than one rowtime attribute column [detail_time, order_time] for writing into table 'default_catalog.default_database.dwd_trd_order'.
Please select the column that should be used as the event-time timestamp for the table sink by casting all other columns to regular TIMESTAMP or TIMESTAMP_LTZ.

在 Flink SQL 中,每個表只能有一個 行時間屬性(rowtime attribute) 用于定義事件時間(Event Time)。當寫入目標表時,若查詢結果包含多個行時間屬性字段(如 order_timedetail_time),會導致沖突。以下是解決方案:


1. 問題定位

錯誤信息表明目標表 dwd_trd_order 在寫入時檢測到多個行時間屬性字段(detail_timeorder_time)。
根本原因是:JOIN 后的結果保留了雙表的事件時間字段,且未被轉換為普通時間戳


2. 解決方案

方案一:僅保留一個行時間屬性

在查詢中選擇一個時間字段作為事件時間,將其他時間字段轉為普通 TIMESTAMP 類型。
假設目標表 dwd_trd_order 使用 order_time 作為事件時間:

INSERT INTO dwd_trd_order
SELECT o.order_id,o.user_id,d.product_id,d.price,d.quantity,o.order_time,  -- 保留為行時間屬性(需與目標表定義一致)CAST(d.detail_time AS TIMESTAMP(3)) AS detail_time  -- 轉為普通時間戳
FROM orders o
JOIN order_details d ON o.order_id = d.order_id;
方案二:調整目標表定義

若業務需要同時保留兩個時間字段,需在目標表 DDL 中 僅定義一個行時間屬性,其他字段轉為普通時間戳:

CREATE TABLE dwd_trd_order (order_id STRING,user_id STRING,product_id STRING,price DECIMAL(10,2),quantity INT,order_time TIMESTAMP(3),  -- 行時間屬性detail_time TIMESTAMP(3),  -- 普通時間戳WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND  -- 僅一個事件時間
) WITH (...);

3. 關鍵步驟說明

  1. 檢查目標表 DDL
    確保目標表僅有一個 WATERMARK 定義,且對應字段為行時間屬性。

  2. 轉換多余的行時間屬性
    在查詢中使用 CAST 將非主時間字段轉為普通 TIMESTAMPTIMESTAMP_LTZ

    CAST(detail_time AS TIMESTAMP(3))  -- 轉為非行時間屬性
    
  3. 驗證查詢結果
    使用 DESCRIBE 確認查詢結果的字段類型:

    DESCRIBE (SELECT ... FROM ...);
    
目標表 DDL(僅一個行時間屬性)
CREATE TABLE dwd_trd_order (order_id STRING,user_id STRING,product_id STRING,price DECIMAL(10,2),quantity INT,order_time TIMESTAMP(3),  -- 行時間屬性detail_time TIMESTAMP(3),  -- 普通時間戳WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);
寫入數據的 SQL(轉換多余時間字段)
INSERT INTO dwd_trd_order
SELECT o.order_id,o.user_id,d.product_id,d.price,d.quantity,o.order_time,  -- 保留為行時間屬性CAST(d.detail_time AS TIMESTAMP(3)) AS detail_time  -- 轉為普通時間戳
FROM orders o
JOIN order_details d ON o.order_id = d.order_id;

三、DWS

CREATE TABLE dws_trd_order (window_start TIMESTAMP(3),window_end TIMESTAMP(3),product_num bigint,uv bigint,total_amount DECIMAL(10,2)
) WITH ('connector' = 'kafka','topic' = 'dws_trd_order','properties.bootstrap.servers' = 'chb1:9092','scan.startup.mode' = 'earliest-offset','format' = 'json'
);-- dws 
insert into dws_trd_order
SELECTwindow_start, window_end,COUNT(1) AS product_num,COUNT(DISTINCT user_id) AS uv,SUM(price * quantity) AS total_amount
FROM TABLE(CUMULATE(TABLE dwd_trd_order, DESCRIPTOR(detail_time), INTERVAL '5' SECOND, INTERVAL '1' DAY)
)
GROUP BY window_start, window_end;

有個問題: 為什么窗口結束時間從 2025-04-02 20:48:50.000 開始???


dwd_trd_order 表的時間如下order_time              detail_time2025-04-02 20:06:01.281 2025-04-02 20:07:35.4942025-04-02 20:50:27.975 2025-04-02 20:50:33.2332025-04-02 20:50:27.975 2025-04-02 20:50:34.405累計窗口運算如下selectwindow_start, window_end,count(1) product_num,count(distinct user_id) uv,sum(price*quantity) as total_amountfrom TABLE(CUMULATE(TABLE dwd_trd_order, DESCRIPTOR(detail_time ), INTERVAL '5' SECOND, INTERVAL '1' DAY)
)
group by window_start,window_end;
為什么窗口結束時間從 2025-04-02 20:48:50.000 開始???window_start              window_end                    product_num                   uv                             total_amount2025-04-02 00:00:00.000 2025-04-02 20:48:50.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:48:55.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:49:00.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:49:05.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:49:10.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:49:15.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:49:20.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:49:25.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:49:30.000

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/899890.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/899890.shtml
英文地址,請注明出處:http://en.pswp.cn/news/899890.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

粒子濾波介紹

目錄 粒子濾波的主要流程可以分為以下 5 個步驟: 粒子濾波(PF) vs. ESKF(誤差狀態卡爾曼濾波) 粒子濾波的主要流程可以分為以下 5 個步驟: 初始化(Initialization) 生成 N 個粒子&…

一場國際安全廠商的交流會議簡記

今天參與了一場國際安全廠商A公司組織的交流會議 與會有國際TOP企業跨境企業 還有國內一些頭部商業公司。 A公司很有意思介紹了自己是怎么做安全運營中心SOC的。 介紹了很多內容,包括他們自己的員工量/設備量/事件量/SOC中心人員量,其中人員量只有個位數…

Java面試黃金寶典30

1. 請詳細列舉 30 條常用 SQL 優化方法 定義 SQL 優化是指通過對 SQL 語句、數據庫表結構、索引等進行調整和改進,以提高 SQL 查詢的執行效率,減少系統資源消耗,提升數據庫整體性能的一系列操作。 要點 從索引運用、查詢語句結構優化、數據…

花灑洗澡完畢并關閉后過段時間會突然滴水的原因探究

洗澡完畢后的殘留水 在洗澡的過程中,我們通常會使用到大量的水。這些水會通過花灑管子到達花灑頂噴流出。由于大頂噴花灑的噴頭較大,關閉后里面的存水會更多。 氣壓失衡后的滴水 當花灑關閉后,內部的水管和花灑頭中仍存有一定量的水。由于…

QSettings用法實戰(相機配置文件的寫入和讀取)

很多情況,在做項目開發的時候,將參數獨立出來是比較好的方法 例如:相機的曝光次數、曝光時長等參數,獨立成ini文件,用戶可以在外面修改即可生效,無需在動代碼重新編譯等工作 QSettings便可以實現該功能 內…

運維培訓班之最佳選擇(The best Choice for Operation and Maintenance Training Courses)

運維培訓班之最佳選擇 從面試官的角度聊聊培訓班對運維的幫助,同時給培訓班出身的運維一些建議~ 談到運維(尤其是零基礎非科班轉行的運維)找工作,培訓班是個不可回避的討論熱點。雖然本人也做過兼職運維培訓老師,多少…

網絡安全與防護策略

隨著信息技術的飛速發展,互聯網已成為現代社會不可或缺的一部分。從日常生活到企業運營,幾乎所有活動都離不開網絡。然而,網絡的開放性和廣泛性也使得網絡安全問題愈發嚴峻。無論是個人數據泄露,還是大規模的網絡攻擊,…

LLM 分詞器Tokenizer 如何從 0 到 1 訓練出來

寫在前面 大型語言模型(LLM)處理的是人類的自然語言,但計算機本質上只能理解數字。Tokenizer(分詞器) 就是架在自然語言和計算機數字表示之間的一座至關重要的橋梁。它負責將我們輸入的文本字符串分解成模型能夠理解的最小單元——Token,并將這些 Token 轉換成對應的數字…

【ArcGIS微課1000例】0142:如何從谷歌地球保存高清影像圖片

文章目錄 一、選取影像區域1. 搜索地圖區域2. 導入矢量范圍二、添加輸出圖層三、保存高清影像1. 地圖選項2. 輸出分辨率3. 保存圖像四、注意事項一、選取影像區域 首先需要選取影像區域,可通過以下方式快速定位。 1. 搜索地圖區域 在搜索框內輸入關鍵詞,例如青海湖,點擊【…

Unity注冊表修改分辨率:探索幕后設置與手動調控

Unity注冊表修改分辨率:探索幕后設置與手動調控 在Unity開發中,調整分辨率和顯示模式是開發過程中常見的需求,尤其是當我們打包并運行應用時,可能會遇到顯示模式不符合預期的情況。Unity在首次運行時會自動保存這些設置&#xff…

外部流輸入的 Layer

在 Android 的 SurfaceFlinger 體系中,外部流輸入的 Layer 通常通過 Sideband Stream 或 BufferQueue 機制傳遞給 SurfaceFlinger,然后由 HWC(Hardware Composer)或 OpenGL ES 進行合成。 1. 什么是外部流輸入的 Layer&#xff1f…

31-體測管理系統

介紹 技術: 基于 B/S 架構 SpringBootMySQLvueelementui 環境: Idea mysql maven jdk1.8 node 用戶端功能 1.系統首頁展示輪播圖及公告信息 2.測試項目:展示可以參加測試的項目列表 3.公告信息:公告信息列表及詳情 可進行點贊和收藏 4.在線留言 5.個人…

NVR接入錄像回放平臺EasyCVR視頻系統守護舌尖上的安全,打造“明廚亮灶”云監管平臺

一、方案背景 近年來,餐飲行業食品安全和衛生等問題頻發,比如后廚衛生臟亂差等,持續引發關注,這些事情導致連鎖反應,使其收益遭受損失。同時,給消費者造成了心理和生理上的傷害。 加強餐飲行業的監管成為…

Python辦公自動化(3)對Excel的操作

1.讀取excel文件 1.安裝工具 終端下載讀取excel文檔的工具庫: pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple xlrd 若對版本有特殊需求: 刪除當前版本:pip3 uninstall xlrd 下載所需要的版本:pip3 install -i htt…

go語言:開發一個最簡單的用戶登錄界面

1.用deepseek生成前端頁面&#xff1a; 1.提問&#xff1a;請你用html幫我設計一個用戶登錄頁面&#xff0c;要求特效采用科技感的背景渲染加粒子流動&#xff0c;用css、div、span標簽&#xff0c;并給出最終合并后的代碼。 生成的完整代碼如下&#xff1a; <!DOCTYPE h…

blender二次元上色

前&#xff1a; 后&#xff1a;&#xff08;臉自己會發光) 參考&#xff1a;05-模型導入與材質整理_嗶哩嗶哩_bilibili

Mysql+Demo 獲取當前日期時間的方式

記錄一下使用Mysql獲取當前日期時間的方式 獲取當前完整的日期時間有常見的四種方式&#xff0c;獲取得到的默認格式(mysql的格式標準)是 %Y-%m-%d %H:%i:%s其它格式 %Y-%m-%d %H:%i:%s.%f方式一&#xff1a;now()函數 select now();mysql> select now(); -------------…

C#核心學習(六)面向對象--封裝(5)靜態成員及靜態構造函數和靜態類 以及和常量的區別

目錄 一、什么是靜態的&#xff1f;什么是常量&#xff1f; 1. ?靜態&#xff08;Static&#xff09;? 2. ?常量&#xff08;const&#xff09;? 二、類中的靜態成員有什么用&#xff1f; 1. ?共享數據 2. ?工具方法與全局配置 3. ?單例模式 三、靜態類和靜態成…

FreeRTOS源碼下載分享

FreeRTOS源碼下載分享 官網下載太慢了&#xff0c;分享下FreeRTOSv202411 FreeRTOSv202411.00.zip 鏈接: https://pan.baidu.com/s/1P4sVS5WroYEl0WTlPD7GXg 提取碼: g6aq

2025年win10使用dockerdesktop安裝k8s

一、寫作背景 百度了一圈&#xff0c; 要么教程老&#xff0c;很多操作步驟冗余&#xff0c; 要么跑不通&#xff0c;或者提供的鏈接失效等情況。 二、看前須知 1、安裝過程使用的AI輔助&#xff0c; 因為參考的部分博客卡柱了。 2、如果操作過程中遇到卡頓&#xff0c; …