目錄
1.維表
2.數據準備
創建源數據
創建維度表
創建Sink表
3.配置任務
Flink SQL創建kafka源表
Flink SQL創建MySQL維表
Flink SQL創建MySQL結果表
編寫計算任務
核驗數據
1.維表
目前在實時計算的場景中,大多數都使用過MySQL、Hbase、redis作為維表引擎存儲一些維度數據,然后在DataStream API中調用MySQL、Hbase、redis客戶端去獲取到維度數據進行維度擴充。
本案例采用MySQL創建維表,與創建MySQL sink表語法相同。
2.數據準備
創建源數據
重啟kafka,創建Topic:? case_kafka_mysql
寫入json格式的數據
? {"ts": "20201011","id": 8,"price_amt":211}
創建維度表
在MySQL中創建名為product_dim的表
CREATE TABLE `product_dim` (`id` bigint(11) NOT NULL,`coupon_price_amt` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
向數據表插入如下數據:
INSERT INTO `product_dim` VALUES (1, 1);
INSERT INTO `product_dim` VALUES (3, 1);
INSERT INTO `product_dim` VALUES (8, 1);
創建Sink表
在MySQL中創建名為sync_test_3的表
CREATE TABLE `sync_test_3` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`ts` varchar(64) DEFAULT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uidx` (`ts`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
3.配置任務
Flink SQL創建kafka源表
create table flink_test_3 (id BIGINT,ts VARCHAR,price_amt BIGINT,proctime AS PROCTIME ()
)with ('connector' = 'kafka','topic' = 'case_kafka_mysql','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink_gp_test3','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '127.0.0.1:2181/kafka');
Flink SQL創建MySQL維表
create table flink_test_3_dim (id BIGINT,coupon_price_amt BIGINT
)
WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'product_dim','username' = 'root','password' = 'Admin','lookup.max-retries' = '3','lookup.cache.max-rows' = 1000);
WITH參數
參數 | 說明 | 類型 | 備注 |
lookup.cache.max-rows | 指定緩存的最大行數。如果超過該值,則最老的行記錄將會過期,會被新的記錄替換掉。 | Integer | 默認情況下,維表Cache是未開啟的。 |
lookup.cache.ttl | 指定緩存中每行記錄的最大存活時間。如果某行記錄超過該時間,則該行記錄將會過期。 | Duration | 默認情況下,維表Cache是未開啟的。你可以設置lookup.cache.max-rows和?lookup.cache.ttl參數來啟用維表Cache。啟用緩存時,采用的是LRU策略緩存。 |
lookup.cache.caching-missing-key | 是否緩存空的查詢結果。 | Boolean | 參數取值如下: true(默認值):緩存空的查詢結果。 false:不緩存空的查詢結果。 |
lookup.max-retries | 查詢數據庫失敗的最大重試次數。 | Integer | 默認值為3。 |
Flink SQL創建MySQL結果表
CREATE TABLE sync_test_3 (ts string,total_gmv bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'sync_test_3','username' = 'root','password' = 'Admin');
編寫計算任務
INSERT INTO sync_test_3
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_3 as aLEFT JOIN flink_test_3_dim? FOR SYSTEM_TIME AS OF? a.proctime? as bON b.id = a.id)
GROUP BY ts;
核驗數據
SELECT id, ts, total_gmv FROM sync_test_3;