Flink Table SQL

Apache Flink 提供了強大的 Table API 和 SQL 接口,用于統一處理批數據和流數據。它們為開發者提供了類 SQL 的編程方式,簡化了復雜的數據處理邏輯,并支持與外部系統集成。


🧩 一、Flink Table & SQL 核心概念

概念描述
Table API基于 Java/Scala 的 DSL,提供類型安全的操作接口
Flink SQL支持標準 ANSI SQL 語法的查詢語言
DataStream / DataSet ? Table可以在 DataStream 或 Table 之間互相轉換
Catalog元數據管理器,如 Hive Catalog、Memory Catalog
TableEnvironment管理表、SQL 執行環境的核心類
Connectors支持 Kafka、Hive、MySQL、文件等數據源接入
Time Attributes定義事件時間(Event Time)、處理時間(Processing Time)
Windowing支持滾動窗口、滑動窗口、會話窗口等

💻 二、Flink Table API 和 SQL 的優勢

特性描述
統一接口同一套代碼可運行在 Batch 和 Streaming 場景下
高性能底層使用 Apache Calcite 進行優化,自動進行查詢優化
易用性強對熟悉 SQL 的用戶非常友好
生態集成好支持 Kafka、Hive、JDBC、Elasticsearch 等多種數據源
狀態管理在流式場景中自動管理狀態和窗口邏輯

📦 三、核心組件說明

1. TableEnvironment

  • 是操作 Table 和 SQL 的入口
  • 負責注冊表、執行查詢、管理元數據等
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

2. DataStream ? Table 轉換

示例:DataStream 轉 Table
DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("a", 1), Tuple2.of("b", 2));// 將 DataStream 轉換為 Table
Table table = tEnv.fromDataStream(dataStream);// 注冊為臨時表
tEnv.createTemporaryView("myTable", dataStream);
示例:Table 轉 DataStream
Table resultTable = tEnv.sqlQuery("SELECT * FROM myTable WHERE f1 > 1");
DataStream<Row> resultStream = tEnv.toDataStream(resultTable);

3. Flink SQL 查詢

示例:使用 SQL 查詢統計結果
// 創建臨時表
tEnv.executeSql("CREATE TABLE MyKafkaSource (" +"  user STRING," +"  url STRING," +"  ts BIGINT" +") WITH (" +"  'connector' = 'kafka'," +"  'format' = 'json'" +")"
);// 執行 SQL 查詢
Table result = tEnv.sqlQuery("SELECT user, COUNT(*) AS cnt FROM MyKafkaSource GROUP BY user");// 轉換為 DataStream 并輸出
tEnv.toDataStream(result).print();env.execute();

🧪 四、Java 示例:完整的 Table API + SQL 使用案例

? 功能:

從 Kafka 讀取日志數據,按用戶分組統計訪問次數

📁 依賴建議(pom.xml)

<dependencies><!-- Flink Core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.1</version></dependency><!-- Flink Streaming --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.1</version></dependency><!-- Flink Table API & SQL --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.17.1</version></dependency><!-- Kafka Connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.1</version></dependency><!-- JSON Format --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.17.1</version></dependency>
</dependencies>

🧱 五、完整 Java 示例代碼

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkTableAndSQLEntry {public static void main(String[] args) throws Exception {// 1. 初始化流執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 創建 TableEnvironmentStreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 3. 創建 Kafka Source 表(模擬從 Kafka 讀取日志)tEnv.executeSql("CREATE TABLE KafkaLog (" +"  user STRING," +"  url STRING," +"  ts BIGINT" +") WITH (" +"  'connector' = 'kafka'," +"  'topic' = 'user_log'," +"  'properties.bootstrap.servers' = 'localhost:9092'," +"  'properties.group.id' = 'flink-sql-group'," +"  'format' = 'json'" +")");// 4. 創建 Sink 表(控制臺輸出)tEnv.executeSql("CREATE TABLE ConsoleSink (" +"  user STRING," +"  cnt BIGINT" +") WITH (" +"  'connector' = 'print'" +")");// 5. 使用 SQL 編寫業務邏輯tEnv.executeSql("INSERT INTO ConsoleSink " +"SELECT user, COUNT(*) AS cnt " +"FROM KafkaLog " +"GROUP BY user");}
}

📊 六、SQL 查詢示例匯總

SQL 示例描述
SELECT * FROM table查詢所有字段
SELECT user, COUNT(*) FROM table GROUP BY user分組聚合
SELECT * FROM table WHERE ts > 1000條件過濾
SELECT TUMBLE_END(ts, INTERVAL '5' SECOND), COUNT(*) ...時間窗口聚合
SELECT * FROM LATERAL TABLE(udtf(col))使用 UDTF
CREATE VIEW view_name AS SELECT ...創建視圖
INSERT INTO sink_table SELECT ...寫入到目標表

?? 七、時間屬性與窗口聚合

示例:定義事件時間并使用滾動窗口

-- 定義帶有事件時間的表
CREATE TABLE EventTable (user STRING,url STRING,ts BIGINT,WATERMARK FOR ts AS ts - 1000 -- 定義水印
) WITH (...);-- 使用滾動窗口進行統計
SELECT TUMBLE_END(ts, INTERVAL '5' SECOND) AS window_end,user,COUNT(*) AS cnt
FROM EventTable
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), user;

📁 八、連接器(Connector)配置示例

1. Kafka Source

CREATE TABLE KafkaSource (user STRING,url STRING,ts BIGINT
) WITH ('connector' = 'kafka','topic' = 'input-topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-group','format' = 'json'
);

2. MySQL Sink

CREATE TABLE MysqlSink (user STRING,cnt BIGINT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydb','table-name' = 'user_access_log'
);

📈 九、Flink SQL + Table API 的典型應用場景

場景示例
實時 ETL從 Kafka 讀取數據 → 清洗 → 寫入 HDFS
流式分析統計每分鐘點擊量、異常檢測
數據質量監控判斷字段是否為空、格式是否合法
風控規則引擎使用 CEP 檢測異常行為
數倉建模構建 DWD、DWS 層表結構

🧠 十、Table API vs SQL

特性Table APISQL
語法風格函數式鏈式調用類 SQL 語法
易用性對 Java 開發者更友好對 SQL 用戶更友好
動態解析不適合動態 SQL支持字符串拼接、模板引擎
性能一致(底層都是 Calcite)一致
支持功能大部分 SQL 功能都有對應 API支持完整 SQL 語法
調試難度相對較難調試更直觀、便于調試

? 十一、總結

技術點描述
Table API基于 Java/Scala 的函數式 API
Flink SQL支持 ANSI SQL,易于上手
TableEnvironment管理表和 SQL 的核心類
Connectors支持 Kafka、Hive、JDBC、File、Print 等
Time Attributes支持事件時間、處理時間
Windowing支持滾動、滑動、會話窗口
State Backend支持 RocksDB、FS、Memory 狀態后端

🧩 十二、擴展學習方向

如果你希望我為你演示以下內容,請繼續提問:

  • 自定義函數(UDF、UDAF、UDTF)
  • Kafka + MySQL 實時同步方案
  • 基于 Hive 的批處理 SQL 作業
  • 使用 PyFlink 實現 SQL 作業
  • 使用 WITH 子句定義臨時表
  • 使用 LATERAL TABLE 調用 UDTF
  • 使用 MATCH_RECOGNIZE 實現 CEP 模式匹配

📌 一句話總結:

Flink Table API 和 SQL 提供了一種統一的批流一體編程模型,適合數據倉庫、實時分析、ETL、風控等多種大數據處理場景。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/81383.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/81383.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/81383.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【AWS入門】Amazon SageMaker簡介

【AWS入門】Amazon SageMaker簡介 [AWS Essentials] Brief Introduction to Amazon SageMaker By JacksonML 機器學習(Machine Learning&#xff0c;簡稱ML) 是當代流行的計算機科學分支技術。通常&#xff0c;人們在本地部署搭建環境&#xff0c;以滿足機器學習的要求。 AWS…

解決 Go 構建依賴超時問題:使用 GOPROXY 提升 Docker 構建穩定性

目錄 解決 Go 構建依賴超時問題&#xff1a;使用 GOPROXY 提升 Docker 構建穩定性 ? 問題背景 ? 正確做法&#xff1a;多階段中在 Go 階段設置 GOPROXY ? 實際收獲 &#x1f9ea; 小技巧&#xff1a;驗證 GOPROXY 設置是否生效 ? 總結 解決 Go 構建依賴超時問題&#x…

【周輸入】510周閱讀推薦-3

前文 【周輸入】510周閱讀推薦-1-CSDN博客 【周輸入】510周閱讀推薦-2-CSDN博客 本次推薦 目錄 前文 本次推薦 算法技術 模型產品 算法技術 vLLM和DeepSpeed部署模型的優缺點_vllm deepspeed-CSDN博客 優點缺點總結vLLM 適用于推理 優化內存管理 高效并行化 功能單…

Kubernetes控制平面組件:Kubelet詳解(七):容器網絡接口 CNI

云原生學習路線導航頁&#xff08;持續更新中&#xff09; kubernetes學習系列快捷鏈接 Kubernetes架構原則和對象設計&#xff08;一&#xff09;Kubernetes架構原則和對象設計&#xff08;二&#xff09;Kubernetes架構原則和對象設計&#xff08;三&#xff09;Kubernetes控…

【推薦】新準則下對照會計報表172個會計科目解釋

序號 科目名稱 對應的會計報表項目 序號 科目名稱 對應的會計報表項目   一、資產類     二、負債類   1 1001 庫存現金 貨幣資金 103 2001 短期借款 短期借款 2 1002 銀行存款 貨幣資金 104 2101 交易性金融負債 易性金融負債 3 1012 其他貨幣資…

MongoDB的安裝及簡單使用

MongoDB 是一個開源的文檔型 NoSQL 數據庫??&#xff0c;由 MongoDB Inc. 開發&#xff0c;專為靈活性和可擴展性設計。 特點&#xff1a; ??1.文檔模型??&#xff1a;數據以 BSON&#xff08;二進制 JSON&#xff09;格式存儲&#xff0c;支持嵌套結構。 ??2.動態 S…

Gartner《如何將生成式人工智能(GenAI)集成到應用架構》學習心得

針對軟件架構師、技術專業人士如何更好的把 GenAI 如何融入解決方案,提升用戶體驗、生產力并帶來差異化成果的趨勢,Gartner發布了《Integrating GenAI Into Your Application Architecture》研究報告。 報告首先介紹了 GenAI 的發展背景,指出其已成為主流趨勢,大型語言模型…

IDEA - Windows IDEA 代碼塊展開與折疊(基礎折疊操作、高級折疊操作)

一、基礎折疊操作 折疊當前代碼塊&#xff1a;Ctrl - # 操作方式按下 【Ctrl】 鍵&#xff0c;再按下 【-】 鍵展開當前代碼塊&#xff1a;Ctrl # 操作方式按下 【Ctrl】 鍵&#xff0c;再按下 【】 鍵折疊所有代碼塊&#xff1a;Ctrl Shift - # 操作方式按下 【Ctrl】…

基于STM32F103與Marvell88W8686的WIFI無線監控視頻傳輸系統研發(論文)

基于STM32F103與Marvell88W8686的WIFI無線監控視頻傳輸系統研發 中文摘要 在當今社會信息化進程不斷加速的時代背景下&#xff0c;眾多領域對于監控系統的需求日益增長&#xff0c;像車內安全監控、電梯運行監控等場景都離不開監控系統的支持。過去&#xff0c;不少領域普遍采用…

Java基礎知識總結(超詳細整理)

一&#xff1a;概述 1.1Java類及類的成員 屬性、方法、構造器、代碼塊、內部類 &#xff08;1&#xff09;數組 java虛擬機內存劃分 各區域作用 內存解析 基本使用 兩個變量指向一個一維數組 沒有new就不會在堆里新開辟空間 &#xff08;2&#xff09;對象數組 &#xff08;3&a…

StarRocks Community Monthly Newsletter (Apr)

版本動態 3.4.3 版本更新 核心功能升級 Routine Load和Stream Load新增Lambda表達式支持&#xff0c;支持復雜的列數據提取 增強JSON數據處理能力&#xff0c;支持將JSON Array/Object轉為ARRAY/MAP類型 優化information_schema.task_runs視圖查詢&#xff0c;新增LIMIT支持…

探索AI新領域:生成式人工智能認證(GAI認證)助力職場發展

在數字化時代的大潮中&#xff0c;人工智能&#xff08;AI&#xff09;技術以其強大的影響力和廣泛的應用前景&#xff0c;正逐步重塑我們的生活與工作方式。隨著生成式AI技術的崛起&#xff0c;掌握這一前沿技能已成為職場競爭中的關鍵優勢。那么&#xff0c;如何通過系統的學…

數據庫觸發器Trigger

在數據庫管理系統中&#xff0c;觸發器&#xff08;Trigger&#xff09;是一種特殊的存儲過程&#xff0c;它在特定的事件發生時自動執行。觸發器通常用于維護數據的完整性和一致性。通過事件觸發而被執行&#xff0c;不能直接調用。 觸發器的三要素 觸發事件 before/after&a…

如何利用 Java 爬蟲獲得某書筆記詳情:實戰指南

在知識分享和學習的領域&#xff0c;許多平臺提供了豐富的書籍筆記和學習資源。通過 Java 爬蟲技術&#xff0c;我們可以高效地獲取這些筆記的詳細信息&#xff0c;以便進行進一步的分析和整理。本文將詳細介紹如何利用 Java 爬蟲獲取某書筆記詳情&#xff0c;并提供完整的代碼…

主成分分析的應用之sklearn.decomposition模塊的PCA函數

主成分分析的應用之sklearn.decomposition模塊的PCA函數 一、模型建立整體步驟 二、數據 2297.86 589.62 474.74 164.19 290.91 626.21 295.20 199.03 2262.19 571.69 461.25 185.90 337.83 604.78 354.66 198.96 2303.29 589.99 516.21 236.55 403.92 730.05 438.41 225.80 …

【Redis】List 列表

文章目錄 初識列表常用命令lpushlpushxlrangerpushrpushxlpop & rpoplindexlinsertllen阻塞操作 —— blpop & brpop 內部編碼應用場景 初識列表 列表類型&#xff0c;用于存儲多個字符串。在操作和實現上&#xff0c;類似 C 的雙端隊列&#xff0c;支持隨機訪問(O(N)…

Android framework 中間件開發(三)

前兩篇我們講了中間件的開發和打包應用, Android framework 中間件開發(一) Android framework 中間件開發(二) 這邊我們來講一下在中間件中編寫JNI 1.新建C文件 找到frameworks\base\services\core\jni\路徑,新建一個cpp文件,文件名為com_android_server_DarkControlService.c…

深入了解linux系統—— 基礎IO(上)

文件 在之前學習C語言文件操作時&#xff0c;我們了解過什么是文件&#xff0c;這里簡單回顧一下&#xff1a; 文件存在磁盤中&#xff0c;文件有分為程序文件、數據文件&#xff1b;二進制文件和文本文件等。 詳細描述見文章&#xff1a;文件操作——C語言 文件在磁盤里&a…

Flink CDC—實時數據集成框架

Flink CDC 是一個基于流的數據集成工具&#xff0c;旨在為用戶提供一套功能更加全面的編程接口&#xff08;API&#xff09;&#xff0c;它基于數據庫日志的 CDC&#xff08;變更數據捕獲&#xff09;技術實現了統一的增量和全量數據讀取。 該工具使得用戶能夠以 YAML 配置文件…

ES(ES2023/ES14)最新更新內容,及如何減少內耗

截至2023年10月,JavaScript(ECMAScript)的最新版本是 ES2023(ES14)。 ES2023 引入了許多新特性,如findLast、toSorted等,同時優化了性能。通過減少全局變量、避免內存泄漏、優化循環、減少DOM操作、使用Web Workers、懶加載、緩存、高效數據結構和代碼壓縮,可以顯著降低…