Apache Flink 提供了強大的 Table API 和 SQL 接口,用于統一處理批數據和流數據。它們為開發者提供了類 SQL 的編程方式,簡化了復雜的數據處理邏輯,并支持與外部系統集成。
🧩 一、Flink Table & SQL 核心概念
概念 | 描述 |
---|
Table API | 基于 Java/Scala 的 DSL,提供類型安全的操作接口 |
Flink SQL | 支持標準 ANSI SQL 語法的查詢語言 |
DataStream / DataSet ? Table | 可以在 DataStream 或 Table 之間互相轉換 |
Catalog | 元數據管理器,如 Hive Catalog、Memory Catalog |
TableEnvironment | 管理表、SQL 執行環境的核心類 |
Connectors | 支持 Kafka、Hive、MySQL、文件等數據源接入 |
Time Attributes | 定義事件時間(Event Time)、處理時間(Processing Time) |
Windowing | 支持滾動窗口、滑動窗口、會話窗口等 |
💻 二、Flink Table API 和 SQL 的優勢
特性 | 描述 |
---|
統一接口 | 同一套代碼可運行在 Batch 和 Streaming 場景下 |
高性能 | 底層使用 Apache Calcite 進行優化,自動進行查詢優化 |
易用性強 | 對熟悉 SQL 的用戶非常友好 |
生態集成好 | 支持 Kafka、Hive、JDBC、Elasticsearch 等多種數據源 |
狀態管理 | 在流式場景中自動管理狀態和窗口邏輯 |
📦 三、核心組件說明
1. TableEnvironment
- 是操作 Table 和 SQL 的入口
- 負責注冊表、執行查詢、管理元數據等
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
2. DataStream ? Table
轉換
示例:DataStream 轉 Table
DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("a", 1), Tuple2.of("b", 2));
Table table = tEnv.fromDataStream(dataStream);
tEnv.createTemporaryView("myTable", dataStream);
示例:Table 轉 DataStream
Table resultTable = tEnv.sqlQuery("SELECT * FROM myTable WHERE f1 > 1");
DataStream<Row> resultStream = tEnv.toDataStream(resultTable);
3. Flink SQL 查詢
示例:使用 SQL 查詢統計結果
tEnv.executeSql("CREATE TABLE MyKafkaSource (" +" user STRING," +" url STRING," +" ts BIGINT" +") WITH (" +" 'connector' = 'kafka'," +" 'format' = 'json'" +")"
);
Table result = tEnv.sqlQuery("SELECT user, COUNT(*) AS cnt FROM MyKafkaSource GROUP BY user");
tEnv.toDataStream(result).print();env.execute();
🧪 四、Java 示例:完整的 Table API + SQL 使用案例
? 功能:
從 Kafka 讀取日志數據,按用戶分組統計訪問次數
📁 依賴建議(pom.xml)
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.17.1</version></dependency>
</dependencies>
🧱 五、完整 Java 示例代碼
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkTableAndSQLEntry {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);tEnv.executeSql("CREATE TABLE KafkaLog (" +" user STRING," +" url STRING," +" ts BIGINT" +") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'user_log'," +" 'properties.bootstrap.servers' = 'localhost:9092'," +" 'properties.group.id' = 'flink-sql-group'," +" 'format' = 'json'" +")");tEnv.executeSql("CREATE TABLE ConsoleSink (" +" user STRING," +" cnt BIGINT" +") WITH (" +" 'connector' = 'print'" +")");tEnv.executeSql("INSERT INTO ConsoleSink " +"SELECT user, COUNT(*) AS cnt " +"FROM KafkaLog " +"GROUP BY user");}
}
📊 六、SQL 查詢示例匯總
SQL 示例 | 描述 |
---|
SELECT * FROM table | 查詢所有字段 |
SELECT user, COUNT(*) FROM table GROUP BY user | 分組聚合 |
SELECT * FROM table WHERE ts > 1000 | 條件過濾 |
SELECT TUMBLE_END(ts, INTERVAL '5' SECOND), COUNT(*) ... | 時間窗口聚合 |
SELECT * FROM LATERAL TABLE(udtf(col)) | 使用 UDTF |
CREATE VIEW view_name AS SELECT ... | 創建視圖 |
INSERT INTO sink_table SELECT ... | 寫入到目標表 |
?? 七、時間屬性與窗口聚合
示例:定義事件時間并使用滾動窗口
CREATE TABLE EventTable (user STRING,url STRING,ts BIGINT,WATERMARK FOR ts AS ts - 1000
) WITH (...);
SELECT TUMBLE_END(ts, INTERVAL '5' SECOND) AS window_end,user,COUNT(*) AS cnt
FROM EventTable
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), user;
📁 八、連接器(Connector)配置示例
1. Kafka Source
CREATE TABLE KafkaSource (user STRING,url STRING,ts BIGINT
) WITH ('connector' = 'kafka','topic' = 'input-topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-group','format' = 'json'
);
2. MySQL Sink
CREATE TABLE MysqlSink (user STRING,cnt BIGINT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydb','table-name' = 'user_access_log'
);
📈 九、Flink SQL + Table API 的典型應用場景
場景 | 示例 |
---|
實時 ETL | 從 Kafka 讀取數據 → 清洗 → 寫入 HDFS |
流式分析 | 統計每分鐘點擊量、異常檢測 |
數據質量監控 | 判斷字段是否為空、格式是否合法 |
風控規則引擎 | 使用 CEP 檢測異常行為 |
數倉建模 | 構建 DWD、DWS 層表結構 |
🧠 十、Table API vs SQL
特性 | Table API | SQL |
---|
語法風格 | 函數式鏈式調用 | 類 SQL 語法 |
易用性 | 對 Java 開發者更友好 | 對 SQL 用戶更友好 |
動態解析 | 不適合動態 SQL | 支持字符串拼接、模板引擎 |
性能 | 一致(底層都是 Calcite) | 一致 |
支持功能 | 大部分 SQL 功能都有對應 API | 支持完整 SQL 語法 |
調試難度 | 相對較難調試 | 更直觀、便于調試 |
? 十一、總結
技術點 | 描述 |
---|
Table API | 基于 Java/Scala 的函數式 API |
Flink SQL | 支持 ANSI SQL,易于上手 |
TableEnvironment | 管理表和 SQL 的核心類 |
Connectors | 支持 Kafka、Hive、JDBC、File、Print 等 |
Time Attributes | 支持事件時間、處理時間 |
Windowing | 支持滾動、滑動、會話窗口 |
State Backend | 支持 RocksDB、FS、Memory 狀態后端 |
🧩 十二、擴展學習方向
如果你希望我為你演示以下內容,請繼續提問:
- 自定義函數(UDF、UDAF、UDTF)
- Kafka + MySQL 實時同步方案
- 基于 Hive 的批處理 SQL 作業
- 使用 PyFlink 實現 SQL 作業
- 使用
WITH
子句定義臨時表 - 使用
LATERAL TABLE
調用 UDTF - 使用
MATCH_RECOGNIZE
實現 CEP 模式匹配
📌 一句話總結:
Flink Table API 和 SQL 提供了一種統一的批流一體編程模型,適合數據倉庫、實時分析、ETL、風控等多種大數據處理場景。