Apache Flink 的 Table API 是 Flink 提供的一種高級抽象,用于以聲明式方式處理批處理和流處理數據。它是基于關系模型的 API,用戶可以像編寫 SQL 一樣,以簡潔、類型安全的方式編寫數據處理邏輯。
一、基本概念
1. 什么是 Table API?
Table API 是 Flink 中用于處理結構化數據的 關系型編程接口,它支持:
-
批處理(Batch)
-
流處理(Streaming)
Table API 提供了類似 SQL 的語法風格,但以函數式 API 方式表達,具備更好的類型安全和 IDE 友好性。
二、核心組件
1. Table
-
Flink 中的
Table
是對結構化數據的一種抽象。 -
相當于數據庫中的表,可以進行過濾、聚合、連接等操作。
2. TableEnvironment
-
Table API 的執行上下文。
-
創建表、注冊 UDF、執行 SQL/Table API 操作等都依賴它。
3. Schema(模式)
-
定義表結構,包括字段名、數據類型、主鍵、水位線等。
三、編程模型
// 1. 創建 TableEnvironment
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);// 2. 注冊表(從外部數據源)
tableEnv.executeSql("""CREATE TABLE source_table (id STRING,ts TIMESTAMP(3),val INT,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND) WITH ('connector' = 'kafka','topic' = 'test',...)
""");// 3. 使用 Table API 處理數據
Table result = tableEnv.from("source_table").filter($("val").isGreater(10)).groupBy($("id")).select($("id"), $("val").avg().as("avg_val"));// 4. 輸出結果到目標表
tableEnv.executeSql("""CREATE TABLE sink_table (id STRING,avg_val DOUBLE) WITH ('connector' = 'print')
""");result.executeInsert("sink_table");
四、常用操作
操作類型 | 示例 |
---|---|
過濾 | table.filter($("age").isGreater(18)) |
投影 | table.select($("name"), $("age")) |
聚合 | table.groupBy($("city")).select($("city"), $("salary").avg()) |
連接 | table1.join(table2).where(...).select(...) |
去重 | table.distinct() |
排序 | table.orderBy($("time").desc()) |
窗口 | table.window(...) (見下文) |
五、時間和窗口支持
Table API 支持兩種時間語義:
-
事件時間(Event Time)
-
處理時間(Processing Time)
常見的窗口類型:
-
滾動窗口(Tumble)
-
滑動窗口(Slide)
-
會話窗口(Session)
示例:
table.window(Tumble.over(lit(10).minutes()).on($("ts")).as("w")).groupBy($("id"), $("w")).select($("id"), $("w").start(), $("val").sum());
六、與 SQL 的關系
Table API 與 SQL 是等價的抽象:
-
SQL 更加聲明式,適合數據分析人員;
-
Table API 更加靈活、支持編程邏輯,適合開發者。
兩者可以混合使用,例如:
Table result = tableEnv.sqlQuery("SELECT id, COUNT(*) FROM source GROUP BY id");
七、數據源和連接器支持
Table API 支持多種數據源和 sink,通過 Flink Connector 實現:
常見的:
-
Kafka
-
HDFS
-
MySQL / JDBC
-
Elasticsearch
-
Hive
-
Iceberg / Delta / Hudi
-
Redis 等
通過 SQL 創建表:
CREATE TABLE example (...
) WITH ('connector' = 'kafka',...
);
八、UDF 擴展
可以定義自定義函數:
-
ScalarFunction(標量函數)
-
TableFunction(表函數)
-
AggregateFunction(聚合函數)
-
TableAggregateFunction(表聚合函數)
示例:
public class HashCode extends ScalarFunction {public int eval(String s) {return s.hashCode();}
}tableEnv.createTemporarySystemFunction("HashCode", HashCode.class);
table.select(call("HashCode", $("name")));
九、批與流統一
Flink 的 Table API 實現了 批流統一語義,相同的 API 可用于處理批或流數據,只需切換 EnvironmentSettings
即可。
十、優點總結
-
統一的 API:批流統一,開發邏輯一致
-
類型安全:Java/Scala 函數式風格,IDE 友好
-
與 SQL 無縫切換
-
可插拔的連接器與格式支持
-
強大的時間與窗口語義支持
-
與 Flink Runtime 深度整合,性能高效