flink介紹
1)Apache Flink 是一個框架和分布式處理引擎,用于對無界和有界數據流進行有狀態計算。Flink 被設計在所有常見的集群環境中運行,以內存執行速度和任意規模來執行計算。
2)在實時計算或離線任務中,往往需要與關系型數據庫交互,例如 MySQL、PostgreSQL 等。Apache Flink 提供了 JDBC Connector,可以方便地將流式數據寫入或讀取數據庫。
3)flink版本下載:https://archive.apache.org/dist/flink/
flink單機搭建
## 1. 下載并解壓flink
[root@localhost flink_soft]# mkdir /data/flink_soft
[root@localhost flink_soft]# tar -zxvf flink-1.16.1-bin-scala_2.12.tgz
## 2. 修改配置文件,把下面的三行全部去掉
[root@localhost flink-1.16.1]# cd /data/flink_soft/flink-1.16.1
[root@localhost flink-1.16.1]# vim /data/flink_soft/flink-1.16.1/conf/flink-conf.yaml
rest.port: 8081
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0
## 3. 啟動flink
[root@localhost flink-1.16.1]# ./bin/start-cluster.sh
## 4. 查詢進程是否存在
[root@localhost flink-1.16.1]# ps aux | grep flink
## 5. 訪問http://192.168.112.162:8081/ 即可。
將已經適配dameng的jar包放到lib目錄下
1)下載已經適配好的包https://github.com/gaoyuan98/flink-connector-jdbc-dameng/releases
提供了兩個版本的dameng適配驅動包,一個是實現JdbcFactory接口,還有一個是實現JdbcDialectFactory接口。
2)截止發文v3.3版本官方還未正式發版,所以大概率是用這個版本:flink-connector-jdbc-dameng_20250331_(適用于v3.2及以下版本)
3)將下載好的適配包放到flink的lib目錄下
DmJdbcDriver8.jar 達夢數據庫jdbc驅動,可以更換為與數據庫版本相同的驅動。
flink-connector-jdbc-3.1.jar flink使用jdbc方式連接數據庫時的橋接包,如果項目本身已經有flink-connector-jdbc包可忽略該包。
flink-connector-jdbc-dameng-1.0.jar flink使用jdbc方式連接達夢數據庫的適配包,源碼基于flink-connector-jdbc.jar包進行調整,所以該包必須存在。
如項目中已經有flink-connector-jdbc的包,那么只需要使用DmJdbcDriver8.jar跟flink-connector-jdbc-dameng-1.0.jar的驅動包即可。
如項目中沒有flink-connector-jdbc的包,就把這三個包全部放到lib下。
[root@localhost lib]# cd /data/flink_soft/flink-1.16.1/lib
[root@localhost lib]# ll
total 204020
-rw-r--r--. 1 root root 1615303 Jan 17 00:30 DmJdbcDriver8.jar
-rwxrwxrwx. 1 root root 198857 Jan 19 2023 flink-cep-1.16.1.jar
-rwxrwxrwx. 1 root root 516144 Jan 19 2023 flink-connector-files-1.16.1.jar
-rw-r--r--. 1 root root 277945 Mar 28 23:46 flink-connector-jdbc-3.1-SNAPSHOT.jar
-rw-r--r--. 1 root root 13458 Mar 29 00:13 flink-connector-jdbc-dameng-1.0-SNAPSHOT.jar
-rwxrwxrwx. 1 root root 102470 Jan 19 2023 flink-csv-1.16.1.jar
-rwxrwxrwx. 1 root root 117107159 Jan 19 2023 flink-dist-1.16.1.jar
-rwxrwxrwx. 1 root root 180248 Jan 19 2023 flink-json-1.16.1.jar
-rwxrwxrwx. 1 root root 21052640 Jan 19 2023 flink-scala_2.12-1.16.1.jar
-rwxrwxrwx. 1 root root 10737871 Jan 13 2023 flink-shaded-zookeeper-3.5.9.jar
-rwxrwxrwx. 1 root root 15367504 Jan 19 2023 flink-table-api-java-uber-1.16.1.jar
-rwxrwxrwx. 1 root root 36249667 Jan 19 2023 flink-table-planner-loader-1.16.1.jar
-rwxrwxrwx. 1 root root 3133690 Jan 19 2023 flink-table-runtime-1.16.1.jar
-rwxrwxrwx. 1 root root 208006 Jan 13 2023 log4j-1.2-api-2.17.1.jar
-rwxrwxrwx. 1 root root 301872 Jan 13 2023 log4j-api-2.17.1.jar
-rwxrwxrwx. 1 root root 1790452 Jan 13 2023 log4j-core-2.17.1.jar
-rwxrwxrwx. 1 root root 24279 Jan 13 2023 log4j-slf4j-impl-2.17.1.jar
重啟flink
[root@localhost flink-1.16.1]# cd /data/flink_soft/flink-1.16.1
[root@localhost flink-1.16.1]# ./bin/stop-cluster.sh
[root@localhost flink-1.16.1]# ./bin/start-cluster.sh## 如果報錯的話查看這個日志
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.log
flink驅動驗證
在達夢數據庫上創建表數據
CREATE TABLE source_table (id INT PRIMARY KEY,name VARCHAR(50),age INT
);
INSERT INTO source_table (id, name, age) VALUES (1, 'Alice', 30);
INSERT INTO source_table (id, name, age) VALUES (2, 'Bob', 25);
INSERT INTO source_table (id, name, age) VALUES (3, 'Charlie', 40);
COMMIT;
在 Flink SQL CLI 中定義達夢表
[root@localhost lib]# cd /data/flink_soft/flink-1.16.1/
[root@localhost flink-1.16.1]# ./bin/sql-client.sh embeddedCREATE TABLE source (id INT,name STRING,age INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:dm://192.168.127.2:5236/SYSDBA','table-name' = 'source_table','driver' = 'dm.jdbc.driver.DmDriver','username' = 'SYSDBA','password' = 'SYSDBA123'
);## 在 Flink SQL CLI 中查詢數據
SELECT * FROM source;
## 篩選數據,比如 查詢年齡大于 30 的用戶:
SELECT id, name FROM source WHERE age > 30;
## 插入數據
INSERT INTO source (id, name, age) VALUES (3, '33', 33);
CREATE TABLE source1 (id INT,name STRING,age INT
) WITH ('connector' = 'dameng','url' = 'jdbc:dm://81.70.105.201:5236/SYSDBA','table-name' = 'source_table','driver' = 'dm.jdbc.driver.DmDriver','username' = 'SYSDBA','password' = '123456'
);
SELECT * FROM source1;
flink-jdbc-dameng選錯會怎么?
目前flink-connector-jdbc中,v3.0 - v3.2 都是同一個實現思路,也就是只需要集成實現JdbcDialectFactory接口的方法即可,main分支的話是實現JdbcFactory接口函數,也就是需要適配兩個版本。
因使用的是v3.3的dameng包,但flink-connector-jdbc是v3.2及以下版本,驅動包接口實現不對所以會報這個錯。