參考: https://chbxw.blog.csdn.net/article/details/115078261 (datastream 實現)
一、ODS
模擬訂單表及訂單明細表
CREATE TABLE orders (order_id STRING,user_id STRING,order_time TIMESTAMP(3),-- 定義事件時間及 Watermark(允許5秒亂序)WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'chb1:9092',-- 如果source被多個任務使用,不在定義時指定group.id-- 通過hint指定 OPTIONS('properties.group.id'='test_group2') 注意是group.id 是點不是下劃線-- 'properties.group.id' = 'flink-sql-group-orders', -- 消費者組 ID'scan.startup.mode' = 'earliest-offset','format' = 'json'
);CREATE TABLE order_details (detail_id STRING,order_id STRING,product_id STRING,price DECIMAL(10,2),quantity INT,detail_time TIMESTAMP(3),-- 定義事件時間及 Watermark(允許5秒亂序)WATERMARK FOR detail_time AS detail_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'order_details','properties.bootstrap.servers' = 'chb1:9092',-- 'properties.group.id' = 'flink-sql-group-order_details', -- 消費者組 ID'scan.startup.mode' = 'earliest-offset','format' = 'json'
);-- 造數據
insert into order_details values ('d001', 'o001', 'car', 5000, 1, now());
insert into orders values('o001', 'u001', now());insert into orders values('o003', 'u003', now());insert into order_details values ('d003', 'o003', 'water', 2, 12, now());
insert into order_details values ('d003', 'o003', 'food', 50, 3, now());
二、DWD 訂單和訂單明細關聯
-- sink
CREATE TABLE dwd_trd_order (detail_id STRING,order_id STRING,product_id STRING,price DECIMAL(10,2),quantity INT,detail_time TIMESTAMP(3),user_id STRING,order_time TIMESTAMP(3),-- 定義事件時間及 Watermark(允許5秒亂序)WATERMARK FOR detail_time AS detail_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'dwd_trd_order','properties.bootstrap.servers' = 'chb1:9092','scan.startup.mode' = 'earliest-offset','format' = 'json'
);insert into dwd_trd_order
SELECT d.detail_id,o.order_id,d.product_id,d.price,d.quantity,d.detail_time,user_id,order_time
FROM orders o
JOIN order_details d
ON o.order_id = d.order_id
AND d.detail_time BETWEEN o.order_time AND o.order_time + INTERVAL '10' MINUTE;
報錯:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: The query contains more than one rowtime attribute column [detail_time, order_time] for writing into table 'default_catalog.default_database.dwd_trd_order'.
Please select the column that should be used as the event-time timestamp for the table sink by casting all other columns to regular TIMESTAMP or TIMESTAMP_LTZ.
在 Flink SQL 中,每個表只能有一個 行時間屬性(rowtime attribute) 用于定義事件時間(Event Time)。當寫入目標表時,若查詢結果包含多個行時間屬性字段(如 order_time
和 detail_time
),會導致沖突。以下是解決方案:
1. 問題定位
錯誤信息表明目標表 dwd_trd_order
在寫入時檢測到多個行時間屬性字段(detail_time
和 order_time
)。
根本原因是:JOIN 后的結果保留了雙表的事件時間字段,且未被轉換為普通時間戳。
2. 解決方案
方案一:僅保留一個行時間屬性
在查詢中選擇一個時間字段作為事件時間,將其他時間字段轉為普通 TIMESTAMP
類型。
假設目標表 dwd_trd_order
使用 order_time
作為事件時間:
INSERT INTO dwd_trd_order
SELECT o.order_id,o.user_id,d.product_id,d.price,d.quantity,o.order_time, -- 保留為行時間屬性(需與目標表定義一致)CAST(d.detail_time AS TIMESTAMP(3)) AS detail_time -- 轉為普通時間戳
FROM orders o
JOIN order_details d ON o.order_id = d.order_id;
方案二:調整目標表定義
若業務需要同時保留兩個時間字段,需在目標表 DDL 中 僅定義一個行時間屬性,其他字段轉為普通時間戳:
CREATE TABLE dwd_trd_order (order_id STRING,user_id STRING,product_id STRING,price DECIMAL(10,2),quantity INT,order_time TIMESTAMP(3), -- 行時間屬性detail_time TIMESTAMP(3), -- 普通時間戳WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND -- 僅一個事件時間
) WITH (...);
3. 關鍵步驟說明
-
檢查目標表 DDL:
確保目標表僅有一個WATERMARK
定義,且對應字段為行時間屬性。 -
轉換多余的行時間屬性:
在查詢中使用CAST
將非主時間字段轉為普通TIMESTAMP
或TIMESTAMP_LTZ
:CAST(detail_time AS TIMESTAMP(3)) -- 轉為非行時間屬性
-
驗證查詢結果:
使用DESCRIBE
確認查詢結果的字段類型:DESCRIBE (SELECT ... FROM ...);
目標表 DDL(僅一個行時間屬性)
CREATE TABLE dwd_trd_order (order_id STRING,user_id STRING,product_id STRING,price DECIMAL(10,2),quantity INT,order_time TIMESTAMP(3), -- 行時間屬性detail_time TIMESTAMP(3), -- 普通時間戳WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);
寫入數據的 SQL(轉換多余時間字段)
INSERT INTO dwd_trd_order
SELECT o.order_id,o.user_id,d.product_id,d.price,d.quantity,o.order_time, -- 保留為行時間屬性CAST(d.detail_time AS TIMESTAMP(3)) AS detail_time -- 轉為普通時間戳
FROM orders o
JOIN order_details d ON o.order_id = d.order_id;
三、DWS
CREATE TABLE dws_trd_order (window_start TIMESTAMP(3),window_end TIMESTAMP(3),product_num bigint,uv bigint,total_amount DECIMAL(10,2)
) WITH ('connector' = 'kafka','topic' = 'dws_trd_order','properties.bootstrap.servers' = 'chb1:9092','scan.startup.mode' = 'earliest-offset','format' = 'json'
);-- dws
insert into dws_trd_order
SELECTwindow_start, window_end,COUNT(1) AS product_num,COUNT(DISTINCT user_id) AS uv,SUM(price * quantity) AS total_amount
FROM TABLE(CUMULATE(TABLE dwd_trd_order, DESCRIPTOR(detail_time), INTERVAL '5' SECOND, INTERVAL '1' DAY)
)
GROUP BY window_start, window_end;
有個問題: 為什么窗口結束時間從 2025-04-02 20:48:50.000 開始???
dwd_trd_order 表的時間如下order_time detail_time2025-04-02 20:06:01.281 2025-04-02 20:07:35.4942025-04-02 20:50:27.975 2025-04-02 20:50:33.2332025-04-02 20:50:27.975 2025-04-02 20:50:34.405累計窗口運算如下selectwindow_start, window_end,count(1) product_num,count(distinct user_id) uv,sum(price*quantity) as total_amountfrom TABLE(CUMULATE(TABLE dwd_trd_order, DESCRIPTOR(detail_time ), INTERVAL '5' SECOND, INTERVAL '1' DAY)
)
group by window_start,window_end;
為什么窗口結束時間從 2025-04-02 20:48:50.000 開始???window_start window_end product_num uv total_amount2025-04-02 00:00:00.000 2025-04-02 20:48:50.000 1 1 5000.002025-04-02 00:00:00.000 2025-04-02 20:48:55.000 1 1 5000.002025-04-02 00:00:00.000 2025-04-02 20:49:00.000 1 1 5000.002025-04-02 00:00:00.000 2025-04-02 20:49:05.000 1 1 5000.002025-04-02 00:00:00.000 2025-04-02 20:49:10.000 1 1 5000.002025-04-02 00:00:00.000 2025-04-02 20:49:15.000 1 1 5000.002025-04-02 00:00:00.000 2025-04-02 20:49:20.000 1 1 5000.002025-04-02 00:00:00.000 2025-04-02 20:49:25.000 1 1 5000.002025-04-02 00:00:00.000 2025-04-02 20:49:30.000