FlinkSQL 常用語言指南
FlinkSQL 是 Apache Flink 提供的 SQL 接口,允許用戶使用標準 SQL 或擴展的 SQL 語法來處理流式和批式數據。以下是 FlinkSQL 的常用語言元素和操作:
- 基本查詢
-- 選擇查詢
SELECT * FROM table_name;-- 帶條件的查詢
SELECT column1, column2 FROM table_name WHERE condition;-- 分組聚合
SELECT user_id, COUNT(*) as cnt
FROM orders
GROUP BY user_id;
- 時間屬性定義
-- 定義處理時間
CREATE TABLE orders (order_id STRING,product STRING,amount DOUBLE,order_time TIMESTAMP(3),-- 聲明處理時間屬性proc_time AS PROCTIME()
) WITH (...);-- 定義事件時間和水位線
CREATE TABLE orders (order_id STRING,product STRING,amount DOUBLE,order_time TIMESTAMP(3),-- 聲明事件時間屬性WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);
- 窗口操作
-- 滾動窗口
SELECT window_start, window_end, SUM(amount) as total_amount
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end;-- 滑動窗口
SELECT window_start, window_end, user_id,SUM(amount) as total_amount
FROM TABLE(HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '5' MINUTES, INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end, user_id;-- 會話窗口
SELECT window_start, window_end, user_id,COUNT(*) as event_count
FROM TABLE(SESSION(TABLE orders, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end, user_id;
- 連接操作
-- 常規連接
SELECT o.order_id, o.product, u.user_name
FROM orders AS o
JOIN users AS u ON o.user_id = u.user_id;-- 時間區間連接
SELECT o.order_id, p.promotion_name,o.order_time,o.amount
FROM orders o
JOIN promotions p
ON o.product_id = p.product_id
AND o.order_time BETWEEN p.start_time AND p.end_time;-- 窗口連接
SELECT o.order_id,s.shipment_id,o.order_time,s.ship_time,TIMESTAMPDIFF(HOUR, o.order_time, s.ship_time) as hours_to_ship
FROM orders o
JOIN shipments s
ON o.order_id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '1' HOUR AND s.ship_time;
- 常用函數
標量函數
-- 字符串函數
SELECT LOWER(name), SUBSTRING(email, 1, 5) FROM users;-- 數學函數
SELECT ABS(amount), ROUND(price, 2) FROM products;-- 時間函數
SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd'),TIMESTAMPDIFF(DAY, order_time, CURRENT_TIMESTAMP)
FROM orders;
聚合函數
SELECT COUNT(*) as total_orders,SUM(amount) as total_amount,AVG(amount) as avg_amount,MAX(amount) as max_amount,MIN(amount) as min_amount
FROM orders;
窗口函數
SELECT product_id,order_time,amount,ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY order_time) as row_num,SUM(amount) OVER (PARTITION BY product_id ORDER BY order_time ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as moving_sum
FROM orders;
- DDL 語句
-- 創建表
CREATE TABLE orders (order_id STRING,product_id STRING,amount DECIMAL(10, 2),order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'kafka:9092','format' = 'json'
);-- 創建視圖
CREATE VIEW large_orders AS
SELECT * FROM orders WHERE amount > 1000;-- 創建函數
CREATE FUNCTION my_udf AS 'com.example.MyUDF';
- DML 語句
-- 插入數據
INSERT INTO target_table
SELECT * FROM source_table WHERE amount > 100;-- 更新數據 (Flink 1.12+ 支持有限)
UPDATE orders SET amount = 200 WHERE order_id = '123';-- 刪除數據 (Flink 1.12+ 支持有限)
DELETE FROM orders WHERE order_id = '456';
- 模式匹配 (MATCH_RECOGNIZE)
SELECT *
FROM orders
MATCH_RECOGNIZE (PARTITION BY user_idORDER BY order_timeMEASURESSTART_ROW.order_id AS start_order,LAST(PRICE_DOWN.order_id) AS bottom_order,LAST(PRICE_UP.order_id) AS end_orderONE ROW PER MATCHAFTER MATCH SKIP TO LAST PRICE_UPPATTERN (START_ROW PRICE_DOWN+ PRICE_UP+)DEFINEPRICE_DOWN AS (LAST(PRICE_DOWN.amount, 1) IS NULL AND PRICE_DOWN.amount < START_ROW.amount) OR PRICE_DOWN.amount < LAST(PRICE_DOWN.amount, 1),PRICE_UP AS PRICE_UP.amount > LAST(PRICE_DOWN.amount, 1)
) MR;
- 配置參數
-- 設置參數
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5 s';
SET 'table.exec.mini-batch.size' = '1000';
- 常用連接器配置
-- Kafka 源表
CREATE TABLE kafka_source (id INT,name STRING,event_time TIMESTAMP(3),WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'input_topic','properties.bootstrap.servers' = 'kafka:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'latest-offset','format' = 'json'
);-- JDBC 結果表
CREATE TABLE jdbc_sink (id INT,name STRING,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://mysql:3306/mydb','table-name' = 'sink_table','username' = 'user','password' = 'password'
);
FlinkSQL 不斷演進,這里只是舉例一些常用的語句,參考官方文檔可以獲取最新語法和功能。