Flink2.0學習筆記:Table API SQL

stevensu1/EC0720

表 API 和 SQL#

表 API 和 SQL——用于統一流和批處理 加工。表 API 是適用于 Java、Scala 和 Python 的語言集成查詢 API,它 允許組合來自關系運算符的查詢,例如 selection、filter 和 join in 一種非常直觀的方式。Flink 的 SQL 支持基于實現 SQL 標準的?Apache Calcite。任一接口中指定的查詢具有相同的語義 并指定相同的結果,無論輸入是連續的(流式處理:無界)還是有界的(批處理:有界)。

我們的目標是同步mysql表和數據

先完成maven依賴:這里我們只引入flink-table-api-java:概覽 |Apache Flink

如果在ide中運行:還要引入<!--flink-clients,flink-table-runtime,flink-table-planner-loader- -->三個模塊:概覽 |Apache Flink

接著是mysql連接相關JDBC |Apache Flink

JDBC SQL 連接器

JDBC 連接器允許使用 JDBC 驅動程序從任何關系數據庫讀取數據和將數據寫入任何關系數據庫。本文檔介紹如何設置 JDBC 連接器以針對關系數據庫運行 SQL 查詢。

如果在 DDL 上定義了主鍵,則 JDBC 接收器以更新插入模式運行,以便與外部系統交換 UPDATE/DELETE 消息,否則,它以追加模式運行,不支持使用 UPDATE/DELETE 消息。

依次引入對應maven依賴:<!--flink-connector-jdbc-core,mysql-connector-java,flink-connector-jdbc-mysql -->
到此所需的依賴引入完成。不過程序通常需要打包并通過web ui上傳到Fink服務器上運行,Fink服務器通過java SPI服務發現運行我們的jar,關于java SPI接口,前面的文章《關于Red Hat Single Sign-On的User Storage SPI》里有提到過。

這是官網的插件配置地址:

第一步 |Apache Flink,所以要需要添加官方提供的maven打包插件:使用 Maven |Apache Flink

最后完整的依賴如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FLINKTAS-TEST-Catalog</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>FLINKTAS-TEST-Catalog</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>2.0.0</flink.version></properties><dependencies><!--flink-clients,flink-table-runtime,flink-table-planner-loader- --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>2.0.0</version></dependency><!--flink-table-api-java    --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>2.0.0</version></dependency><!--flink-connector-jdbc-core,mysql-connector-java,flink-connector-jdbc-mysql		--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc-mysql</artifactId><version>4.0.0-2.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc-core</artifactId><version>4.0.0-2.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- Replace this with the main class of your job --><mainClass>org.example.App</mainClass></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>15</source><target>15</target></configuration></plugin></plugins></build>
</project>

現在來實現java處理流程:

先理解一下Catalogs:他可把整個數據庫一次性注冊到表環境TableEnvironment中

Catalogs | Apache Flink

flink-connector-jdbc-mysql模塊已經對mysql的Catalogs 做了實現MySqlCatalog,但是它不能創建物理表,對此需要對其進行擴展實現對應的建表邏輯。

這是我的實現:

package org.example;import org.apache.flink.connector.jdbc.mysql.database.catalog.MySqlCatalog;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.types.DataType;import java.sql.*;
import java.util.*;public class MyMySqlCatalog extends MySqlCatalog {public MyMySqlCatalog(ClassLoader userClassLoader, String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);}public MyMySqlCatalog(ClassLoader userClassLoader, String catalogName, String defaultDatabase, String baseUrl, Properties connectionProperties) {super(userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties);}public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {// 檢查數據庫是否存在if (!databaseExists(tablePath.getDatabaseName())) {throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());}// 檢查表是否已存在if (tableExists(tablePath)) {if (!ignoreIfExists) {return;}}Connection conn = null;try {conn = DriverManager.getConnection(baseUrl + tablePath.getDatabaseName(), this.getUsername(), this.getPassword());String createTableSql = generateCreateTableSql(tablePath.getObjectName(), table);try (PreparedStatement stmt = conn.prepareStatement(createTableSql)) {stmt.execute();}} catch (SQLException e) {throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e);} finally {try {if (conn != null) {conn.close();}} catch (SQLException e) {e.printStackTrace();}}}private String generateCreateTableSql(String tableName, CatalogBaseTable table) {StringBuilder sql = new StringBuilder();sql.append("CREATE TABLE IF NOT EXISTS `").append(tableName).append("` (");// 構建列定義Schema schema = table.getUnresolvedSchema();List<String> columnDefs = new ArrayList<>();for (Schema.UnresolvedColumn column : schema.getColumns()) {if (column instanceof Schema.UnresolvedPhysicalColumn) {Schema.UnresolvedPhysicalColumn physicalColumn =(Schema.UnresolvedPhysicalColumn) column;String columnDef = String.format("`%s` %s",physicalColumn.getName(),convertFlinkTypeToMySql((DataType) physicalColumn.getDataType()));columnDefs.add(columnDef);}}sql.append(String.join(", ", columnDefs));sql.append(")");return sql.toString();}private String convertFlinkTypeToMySql(DataType dataType) {// 簡化的類型轉換,您可以根據需要擴展String typeName = dataType.getLogicalType().getTypeRoot().name();switch (typeName) {case "INTEGER":return "INT";case "VARCHAR":return "VARCHAR(255)";case "BIGINT":return "BIGINT";case "DOUBLE":return "DOUBLE";case "BOOLEAN":return "BOOLEAN";case "TIMESTAMP_WITHOUT_TIME_ZONE":return "TIMESTAMP";default:return "TEXT";}}
}

最后貼一下做數據同步過程的代碼:

package org.example;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import java.util.List;import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.STRING;/*** Hello world!*/
public class App {public static void main(String[] args) throws DatabaseNotExistException, TableAlreadyExistException {EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();TableEnvironment tableEnv = TableEnvironment.create(settings);String name = "my_catalog";String defaultDatabase = "test";String username = "root";String password = "root";String baseUrl = "jdbc:mysql://localhost:3306";MyMySqlCatalog catalog = new MyMySqlCatalog(ClassLoader.getSystemClassLoader(),name,defaultDatabase,username,password,baseUrl);tableEnv.registerCatalog("my_catalog", catalog);// set the JdbcCatalog as the current catalog of the sessiontableEnv.useCatalog("my_catalog");List<String> tables = catalog.listTables(defaultDatabase);boolean exists = catalog.tableExists(ObjectPath.fromString("test.my_table_03"));//如果表不存在,則創建if (!exists) {// 定義表的字段和類型List<Column> columns = List.of(Column.physical("id", INT().notNull()),Column.physical("name", STRING()));Schema.Builder chemaB = Schema.newBuilder();chemaB.column("id", INT().notNull());chemaB.column("name", STRING());chemaB.primaryKey("id");Schema chema = chemaB.build();CatalogTable catalogTable = CatalogTable.newBuilder().schema(chema).build();catalog.createTable(ObjectPath.fromString("test.my_table_03"), catalogTable, true);}tableEnv.executeSql("SELECT * FROM my_table_01").print();tableEnv.executeSql("SELECT * FROM my_table_03").print();// 執行同步tableEnv.executeSql("INSERT INTO my_table_03 SELECT id, name FROM my_table_01");System.out.println("Hello World!");}
}

執行結果:

但是如果系統表太多,注冊Catalogs可能會很消耗Flink內存,所以也可以只把需要的表注冊到表環境中,

package org.example;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;/*** Hello world!*/
public class App {public static void main(String[] args) {EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();TableEnvironment tableEnv = TableEnvironment.create(settings);registerMySqlTable(tableEnv);Table table1 = tableEnv.from("my_table_01");table1.printSchema();tableEnv.executeSql("SELECT * FROM my_table_01").print();registerMySqlTable02(tableEnv); // my_table_02Table table2 = tableEnv.from("my_table_02");table2.printSchema();tableEnv.executeSql("SELECT * FROM my_table_02").print();// 執行同步tableEnv.executeSql("INSERT INTO my_table_02 SELECT id, name FROM my_table_01");System.out.println("Hello World!");}/*** 注冊 MySQL 表 my_table_02 到 Flink 表環境中*/public static void registerMySqlTable02(TableEnvironment tableEnv) {tableEnv.executeSql("CREATE TABLE my_table_02 (" +"id INT PRIMARY KEY NOT ENFORCED, " +"name STRING" +") WITH (" +"'connector' = 'jdbc', " +"'url' = 'jdbc:mysql://localhost:3306/test', " +"'table-name' = 'my_table_02', " +"'username' = 'root', " +"'password' = 'root'" +")");}/*** 注冊 MySQL 表到 Flink 表環境中*/public static void registerMySqlTable(TableEnvironment tableEnv) {tableEnv.executeSql("CREATE TABLE my_table_01 (" +"id INT PRIMARY KEY NOT ENFORCED," +"name STRING" +") WITH (" +"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://localhost:3306/test'," +"'table-name' = 'my_table_01'," +"'username' = 'root'," +"'password' = 'root'" +")");}
}

這樣也可以實現數據同步。最后優化建議可以使用jdbc連接池技術。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/92034.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/92034.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/92034.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【 SpringAI核心特性 | Prompt工程 】

1. Prompt 工程 基本概念&#xff1a;Prompt ?工程又叫提示?詞工程&#xff0c;簡單來說&#xff0c;就是輸入?給 AI 的指令。 比如下面?這段內容&#xff0c;就是提示詞&#xff1a; 請問桂林電子科技大學是一個怎么樣的學校&#xff1f;1.1 Prompt分類 在 AI ?對話中…

windows wsl2-06-docker hello world

hello-world 例子 就像其他任何一門語言一樣&#xff0c;我們來體驗 docker 的 hello world $ docker run hello-world但是報錯 :~$ docker run hello-world Unable to find image hello-world:latest locally docker: Error response from daemon: Get "https://registry…

Python知識點4-嵌套循環break和continue使用死循環

一、循環【重點掌握】 1.嵌套循環類似于嵌套if語句 語法&#xff1a; while 表達式1&#xff1a;while 表達式2&#xff1a;語句# 1. # 循環5次&#xff0c;打印0~4 m 0 while m < 5:print(m)m 1 # 循環3次&#xff0c;打印0~2 n 0 while n < 3:print(n)n 1print(&qu…

將HTML+JS+CSS數獨游戲包裝為安卓App

HTMLJSCSS制作一個數獨游戲-CSDN博客 中開發了一個數獨游戲&#xff0c;這個數獨游戲提供了一次性回退到指定步驟的輔助功能&#xff0c;在解決復雜數獨問題時十分有幫助&#xff0c;可作為玩數獨游戲的輔助工具&#xff0c;因此&#xff0c;考慮將它改裝成安卓App安裝在手機上…

編程語言Java入門——核心技術篇(一)封裝、繼承和多態

同專欄基礎知識篇寫在這里&#xff0c;有興趣的可以去看看&#xff1a; 編程語言Java入門——基礎知識篇&#xff08;一&#xff09;-CSDN博客 編程語言Java入門——基礎知識篇&#xff08;二&#xff09;-CSDN博客 編程語言Java入門——基礎知識篇&#xff08;三&#xff0…

【39】MFC入門到精通——C++ /MFC操作文件行(讀取,刪除,修改指定行)

文章目錄1 通過關鍵詞&#xff0c;讀取某一行 &#xff08;3種方法&#xff09;2 刪除 指定行3 修改 指定行1 通過關鍵詞&#xff0c;讀取某一行 &#xff08;3種方法&#xff09; 通過定位關鍵詞&#xff0c;讀取某一行信息,返回CString //通過定位關鍵詞&#xff0c;讀取某…

5 種可行的方法:如何將 Redmi 聯系人備份到 Mac

將 Redmi 聯系人備份到 Mac 是防止因手機損壞、丟失或更換設備而導致數據丟失的重要措施。雖然云服務提供了便利性&#xff0c;但擁有離線備份可以提供額外的安全性&#xff0c;而無需完全依賴互聯網。如果您想知道如何將 Redmi 聯系人備份到 Mac&#xff0c;本文將為您介紹 5 …

LeRobot 具身智能機械臂 SO-ARM100 從搭建到訓練全流程

今天給大家分享一下 LeRobot 具身智能機械臂 SO-ARM100 的完整使用流程&#xff0c;包括設備組裝、環境配置、遠程控制、數據錄制到模型訓練的全過程。適合剛入門具身智能的小伙伴參考學習。 一、前期準備與資源獲取 在開始之前&#xff0c;我們需要準備好相關的資源和工具&a…

LINUX720 SWAP擴容;新增邏輯卷;邏輯卷擴容;數據庫遷移;gdisk

SWAP空間擴展 方法一 增加硬盤或分區擴展 swap -s mkswap /dev/sdd6 blkid /dev/sdd6 swapon /dev/sdd6 swapon -s vim /etc/fstab /dev/sdd6 swap swap defaults 0 0 開機自動擴容 swap -s [rootweb ~]# lsblk NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT sd…

Python 進程間通信:TCP安全加密數據傳輸

最近在寫安全方面的程序&#xff0c;有需求&#xff0c;就做了這些TCP加密數據傳輸類。 utils.safeUtils的內容詳見&#xff1a; SafeObj&#xff1a;Python 高安全性加密數據容器類-CSDN博客SafeKey&#xff1a;Python 高安全性加密密碼容器類-CSDN博客 如有任何問題或漏洞歡迎…

Windows批量修改文件屬性方法

標題使用icacls命令&#xff08;推薦批量操作&#xff09;打開管理員權限的命令提示符&#xff08;CMD&#xff09;執行以下命令&#xff1a;cmd icacls "文件夾路徑" /grant 用戶名:(OI)(CI)F /T /C 參數說明&#xff1a;(OI)&#xff1a;對象繼承 - 適用于文件夾(C…

Entity Component System架構

ECS架構 1 簡介 在當今快速發展的軟件開發領域&#xff0c;游戲開發、實時模擬等場景對系統的性能、靈活性和可擴展性提出了極高的要求。傳統的面向對象架構在面對復雜且動態變化的實體時&#xff0c;往往會出現代碼耦合度高、擴展性差等問題。? ECS&#xff08;Entity - Com…

.vscode 擴展配置

一、vue快捷鍵配置 在項目.vscode下新建vue3.0.code-snippets 每當輸入vue3.0后自動生成代碼片段 {"Vue3.0快速生成模板": {"scope": "vue","prefix": "Vue3.0","body": ["<template>"," &…

一個基于阿里云的C端Java服務的整體項目架構

1.背景介紹 總結一下工作使用到的基于通常的公有云的項目整體架構&#xff0c;如何基于公有云建設安全可靠的服務&#xff0c;以阿里云為例的整體架構&#xff1b;1. 全局流量治理層&#xff08;用戶請求入口&#xff09;1.1 域名與 DNS 解析域名注冊與備案&#xff1a;通過阿里…

《剝開洋蔥看中間件:Node.js請求處理效率與錯誤控制的深層邏輯》

在Node.js的運行時環境中&#xff0c;中間件如同一系列精密咬合的齒輪&#xff0c;驅動著請求從進入到響應的完整旅程&#xff0c;而洋蔥模型則是這組齒輪的傳動系統。它以一種看似矛盾的方式融合了順序與逆序、分離與協作——讓每個處理環節既能獨立工作&#xff0c;又能感知全…

GaussDB union 的用法

1 union 的作用union 運算符用于組合兩個或更多 select 語句的結果集。2 union 使用前提union 中的每個 select 語句必須具有相同的列數這些列也必須具有相似的數據類型每個 select 語句中的列也必須以相同的順序排列3 union 語法select column_name(s) from table1 union sele…

構建足球實時比分APP:REST API與WebSocket接入方案詳解

在開發足球實時比分應用時&#xff0c;數據接入方式的選擇直接影響用戶體驗和系統性能。本文將客觀分析REST API和WebSocket兩種主流接入方案的技術特點、適用場景和實現策略&#xff0c;幫助開發者做出合理選擇。一、REST API&#xff1a;靈活的數據獲取方案核心優勢標準化接口…

Linux文件系統三要素:塊劃分、分區管理與inode結構解析

理解文件系統 我們知道文件可以分為磁盤文件和內存文件&#xff0c;內存文件前面我們已經談過了&#xff0c;下面我們來談談磁盤文件。 目錄 一、引入"塊"概念 解析 stat demo.c 命令輸出 基本信息 設備信息 索引節點信息 權限信息 時間戳 二、引入"分區…

基于paddleDetect的半監督目標檢測實戰

基于paddleDetect的半監督目標檢測實戰前言相關介紹前提條件實驗環境安裝環境項目地址使用paddleDetect的半監督方法訓練自己的數據集準備數據分割數據集配置參數文件PaddleDetection-2.7.0/configs/semi_det/denseteacher/denseteacher_ppyoloe_plus_crn_l_coco_semi010.ymlPa…

計算機網絡:(十)虛擬專用網 VPN 和網絡地址轉換 NAT

計算機網絡&#xff1a;&#xff08;十&#xff09;虛擬專用網 VPN 和網絡地址轉換 NAT前言一、虛擬專用網 VPN1. 基礎概念與作用2. 工作原理3. 常見類型4. 協議對比二、NAT&#xff1a;網絡地址轉換1. 基礎概念與作用2. 工作原理與類型3. 優缺點與問題4. 進階類型三、VPN 與 N…