flink部署使用(flink-connector-jdbc)連接達夢數據庫并寫入讀取數據

flink介紹

1)Apache Flink 是一個框架和分布式處理引擎,用于對無界和有界數據流進行有狀態計算。Flink 被設計在所有常見的集群環境中運行,以內存執行速度和任意規模來執行計算。

2)在實時計算或離線任務中,往往需要與關系型數據庫交互,例如 MySQL、PostgreSQL 等。Apache Flink 提供了 JDBC Connector,可以方便地將流式數據寫入或讀取數據庫。

3)flink版本下載:https://archive.apache.org/dist/flink/

flink單機搭建

## 1. 下載并解壓flink
[root@localhost flink_soft]# mkdir /data/flink_soft
[root@localhost flink_soft]# tar -zxvf flink-1.16.1-bin-scala_2.12.tgz
## 2. 修改配置文件,把下面的三行全部去掉
[root@localhost flink-1.16.1]# cd /data/flink_soft/flink-1.16.1
[root@localhost flink-1.16.1]# vim /data/flink_soft/flink-1.16.1/conf/flink-conf.yaml
rest.port: 8081
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0
## 3. 啟動flink
[root@localhost flink-1.16.1]# ./bin/start-cluster.sh
## 4. 查詢進程是否存在
[root@localhost flink-1.16.1]# ps aux | grep flink
## 5. 訪問http://192.168.112.162:8081/ 即可。

將已經適配dameng的jar包放到lib目錄下

1)下載已經適配好的包https://github.com/gaoyuan98/flink-connector-jdbc-dameng/releases

提供了兩個版本的dameng適配驅動包,一個是實現JdbcFactory接口,還有一個是實現JdbcDialectFactory接口。

2)截止發文v3.3版本官方還未正式發版,所以大概率是用這個版本:flink-connector-jdbc-dameng_20250331_(適用于v3.2及以下版本)

3)將下載好的適配包放到flink的lib目錄下

DmJdbcDriver8.jar 達夢數據庫jdbc驅動,可以更換為與數據庫版本相同的驅動。

flink-connector-jdbc-3.1.jar flink使用jdbc方式連接數據庫時的橋接包,如果項目本身已經有flink-connector-jdbc包可忽略該包。

flink-connector-jdbc-dameng-1.0.jar flink使用jdbc方式連接達夢數據庫的適配包,源碼基于flink-connector-jdbc.jar包進行調整,所以該包必須存在。

如項目中已經有flink-connector-jdbc的包,那么只需要使用DmJdbcDriver8.jar跟flink-connector-jdbc-dameng-1.0.jar的驅動包即可。

如項目中沒有flink-connector-jdbc的包,就把這三個包全部放到lib下。

[root@localhost lib]# cd /data/flink_soft/flink-1.16.1/lib
[root@localhost lib]# ll
total 204020
-rw-r--r--. 1 root root   1615303 Jan 17 00:30 DmJdbcDriver8.jar
-rwxrwxrwx. 1 root root    198857 Jan 19  2023 flink-cep-1.16.1.jar
-rwxrwxrwx. 1 root root    516144 Jan 19  2023 flink-connector-files-1.16.1.jar
-rw-r--r--. 1 root root    277945 Mar 28 23:46 flink-connector-jdbc-3.1-SNAPSHOT.jar
-rw-r--r--. 1 root root     13458 Mar 29 00:13 flink-connector-jdbc-dameng-1.0-SNAPSHOT.jar
-rwxrwxrwx. 1 root root    102470 Jan 19  2023 flink-csv-1.16.1.jar
-rwxrwxrwx. 1 root root 117107159 Jan 19  2023 flink-dist-1.16.1.jar
-rwxrwxrwx. 1 root root    180248 Jan 19  2023 flink-json-1.16.1.jar
-rwxrwxrwx. 1 root root  21052640 Jan 19  2023 flink-scala_2.12-1.16.1.jar
-rwxrwxrwx. 1 root root  10737871 Jan 13  2023 flink-shaded-zookeeper-3.5.9.jar
-rwxrwxrwx. 1 root root  15367504 Jan 19  2023 flink-table-api-java-uber-1.16.1.jar
-rwxrwxrwx. 1 root root  36249667 Jan 19  2023 flink-table-planner-loader-1.16.1.jar
-rwxrwxrwx. 1 root root   3133690 Jan 19  2023 flink-table-runtime-1.16.1.jar
-rwxrwxrwx. 1 root root    208006 Jan 13  2023 log4j-1.2-api-2.17.1.jar
-rwxrwxrwx. 1 root root    301872 Jan 13  2023 log4j-api-2.17.1.jar
-rwxrwxrwx. 1 root root   1790452 Jan 13  2023 log4j-core-2.17.1.jar
-rwxrwxrwx. 1 root root     24279 Jan 13  2023 log4j-slf4j-impl-2.17.1.jar

重啟flink

[root@localhost flink-1.16.1]# cd /data/flink_soft/flink-1.16.1
[root@localhost flink-1.16.1]# ./bin/stop-cluster.sh
[root@localhost flink-1.16.1]# ./bin/start-cluster.sh## 如果報錯的話查看這個日志
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.log

flink驅動驗證

在達夢數據庫上創建表數據

CREATE TABLE source_table (id INT PRIMARY KEY,name VARCHAR(50),age INT
);
INSERT INTO source_table (id, name, age) VALUES (1, 'Alice', 30);
INSERT INTO source_table (id, name, age) VALUES (2, 'Bob', 25);
INSERT INTO source_table (id, name, age) VALUES (3, 'Charlie', 40);
COMMIT;

在 Flink SQL CLI 中定義達夢表

[root@localhost lib]# cd /data/flink_soft/flink-1.16.1/
[root@localhost flink-1.16.1]#  ./bin/sql-client.sh embeddedCREATE TABLE source (id INT,name STRING,age INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:dm://192.168.127.2:5236/SYSDBA','table-name' = 'source_table','driver' = 'dm.jdbc.driver.DmDriver','username' = 'SYSDBA','password' = 'SYSDBA123'
);## 在 Flink SQL CLI 中查詢數據
SELECT * FROM source;
## 篩選數據,比如 查詢年齡大于 30 的用戶:
SELECT id, name FROM source WHERE age > 30;
## 插入數據
INSERT INTO source (id, name, age) VALUES (3, '33', 33);

CREATE TABLE source1 (id INT,name STRING,age INT
) WITH ('connector' = 'dameng','url' = 'jdbc:dm://81.70.105.201:5236/SYSDBA','table-name' = 'source_table','driver' = 'dm.jdbc.driver.DmDriver','username' = 'SYSDBA','password' = '123456'
);
SELECT * FROM source1;

flink-jdbc-dameng選錯會怎么?

目前flink-connector-jdbc中,v3.0 - v3.2 都是同一個實現思路,也就是只需要集成實現JdbcDialectFactory接口的方法即可,main分支的話是實現JdbcFactory接口函數,也就是需要適配兩個版本。

因使用的是v3.3的dameng包,但flink-connector-jdbc是v3.2及以下版本,驅動包接口實現不對所以會報這個錯。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/77267.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/77267.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/77267.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

用swift playground寫個ios應用和大模型或者網站交互

import SwiftUIstruct ContentView: View {State private var textFieldText: String ""State private var outputText: String "輸出將會顯示在這里"private let tip:String "消息已發送,請等待"State private var history:[Stri…

springboot+vue2集成JWT token實現權限驗證

前端項目搭建參考: Vue項目的搭建和啟動_vue項目啟動 csdn-CSDN博客 Vue ElementUI 登錄頁面_vue用戶登錄頁面-CSDN博客 跨域問題前端解決-CSDN博客 實現思路: 1. 實現的目的:為了保護網站安全信息,使用jwt進行權限驗證&#xf…

Cursor編程-從入門到精通__0409

早期的Github Copilot 最近更新了,支持Agent編程,字節跳動Trae使用(免費),但成熟程度不如Cursor,Cursor前50次免費 Copilot VS Cursor*** 1,Cursor VSCode 二次開發,IDE級別 2&…

MyBatis 詳解及代碼示例

MyBatis 是一個 半自動 ORM 框架,主要用于 Java 與數據庫之間的持久化操作,它本質是對 JDBC 的封裝 全名:MyBatis(前身 iBATIS)核心作用:自動將 SQL 執行結果映射為 Java 對象;也可以將 Java 對…

1.6-抓包技術(Burp Suite\Yakit抓包\Web、APP、小程序)

1.6-抓包技術(Burp Suite\Yakit抓包\Web、APP、小程序) 如果要使用抓包軟件,基本上第一步都是要安裝證書的。原因如下: 客戶端(瀏覽器或應用)會檢測到證書不受信任,并彈出 證書錯誤&#xff0…

Java 大視界 -- 基于 Java 的大數據隱私保護在金融客戶信息管理中的實踐與挑戰(178)

💖親愛的朋友們,熱烈歡迎來到 青云交的博客!能與諸位在此相逢,我倍感榮幸。在這飛速更迭的時代,我們都渴望一方心靈凈土,而 我的博客 正是這樣溫暖的所在。這里為你呈上趣味與實用兼具的知識,也…

第十屆 藍橋杯 嵌入式 省賽

一、分析 這屆的真題,有點像第七屆的液位檢測。 這屆的題目開始,貌似比賽描述的功能,邏輯上變得更好梳理了。一開始就把大致的功能給你說明一遍,不像之前都是一塊一塊的說明。 1. 基本功能 1)測量競賽板上電位器 R…

實現usb的MTP功能

前言:最終結果根據用戶自主選擇可實現host和device功能的切換。 效果展示: 當插入usb時設備會彈窗 當用戶選擇設備模式時pc端就會出現mtp設備盤符 實現mtp設備 ubuntu架構根文件系統通過uMTP-Responder實現usb的MTP功能 添加服務 /home/flynn/firfly_rootfs/lib/system…

React-05React中props屬性(傳遞數據),propTypes校驗,類式與函數式組件props的使用

1.類式組件props基本數據讀取與解構運算符傳遞 <script type"text/babel">// 創建組件class PersonalInfo extends React.Component {render() {// 讀取props屬性 并讀取值console.log(props,this.props);return(<ul><li>姓名&#xff1a;{this.p…

PCI認證 密鑰注入 ECC算法工具 NID_secp521r1 國密算法 openssl 全套證書生成,從證書提取公私鑰數組 x,y等

步驟 1.全套證書已經生成。OK 2.找國芯要ECC加密解密簽名驗簽代碼。給的邏輯說明沒有示例代碼很難的上。 3.集成到工具 與SP聯調。 1.用openssl全套證書生成及驗證 注意&#xff1a;這里CA 簽發 KLD 證書用的是SHA256。因為芯片只支持SHA256算法,不支持SHA512。改成統一。…

藍橋杯每日刷題c++

目錄 P9240 [藍橋杯 2023 省 B] 冶煉金屬 - 洛谷 (luogu.com.cn) P8748 [藍橋杯 2021 省 B] 時間顯示 - 洛谷 (luogu.com.cn) P10900 [藍橋杯 2024 省 C] 數字詩意 - 洛谷 (luogu.com.cn) P10424 [藍橋杯 2024 省 B] 好數 - 洛谷 (luogu.com.cn) P8754 [藍橋杯 2021 省 AB2…

oracle 數據庫字段類型為NUMBER(5,2)時,并且數據庫值為0.1,為什么Java執行SQL查出來時為“.1“?

在 Oracle 數據庫中&#xff0c;當字段類型為 NUMBER(5,2) 且存儲的值為 0.1 時&#xff0c;Java 程序查詢結果可能顯示為 ".1"&#xff08;省略前導零&#xff09;&#xff0c;這是由 Oracle JDBC 驅動默認的數字格式化行為 導致的。以下是原因分析和解決方案&#…

3月AI論文精選十篇

1. Feature-Level Insights into Artificial Text Detection with Sparse Autoencoders[1] 核心貢獻&#xff1a;通過稀疏自編碼器揭示AI生成文本的檢測特征&#xff0c;提出基于特征分布的鑒別方法。研究發現&#xff0c;AI文本在稀疏編碼空間中呈現獨特的"高頻低幅"…

STM32在裸機(無RTOS)環境下,需要手動實現隊列機制來替代FreeRTOS的CAN發送接收函數

xQueueSendToBackFromISR(ecuCanRxQueue, hcan->pRxMsg, &xHigherPriorityTaskWoken)&#xff0c;xQueueReceive(mscCanRxQueue,&mscRxMsg,0)和xQueueSendToBack(mscCanTxQueue, &TxMessageTemp, 0 )這3個函數&#xff0c;在裸機下實現&#xff1a; 在裸機&…

使用PX4,gazebo,mavros為旋翼添加下視的相機(仿真采集openrealm數據集-第一步)

目錄 一.方法一&#xff08;沒成功&#xff09; 1.運行PX4 2.運行mavros通訊 3.啟動仿真世界和無人機 &#xff08;1&#xff09;單獨測試相機 &#xff08;2&#xff09;make px4_sitl gazebo啟動四旋翼iris無人機 二.方法二&#xff08;成功&#xff09; 1.通過 rosl…

7、nRF52xx藍牙學習(nrf_gpiote.c庫函數學習)

續前一篇文章。 3、nrfx_gpiote_in_event_enable void nrfx_gpiote_in_event_enable(nrfx_gpiote_pin_t pin, bool int_enable) {NRFX_ASSERT(nrf_gpio_pin_present_check(pin));NRFX_ASSERT(pin_in_use_by_gpiote(pin));if (pin_in_use_by_port(pin)){nrf_gpiote_polarity_t…

Java 實現插入排序:[通俗易懂的排序算法系列之三]

引言 大家好!歡迎繼續關注我的排序算法系列。今天,我們要學習的是另一種非常基礎且重要的排序算法——插入排序 (Insertion Sort)。 插入排序的思路非常貼近我們日常整理撲克牌的方式,理解起來相對自然。雖然它在最壞情況下的效率不高,但在某些特定場景下,它的表現甚至優…

Java的spring boot項目編譯成功啟動報錯

問題現象&#xff1a;spring boot項目&#xff0c;候刪除一些無用代碼后&#xff0c;build成功&#xff0c;啟動時報錯&#xff1a;找不到java.util.Map或者其他對象&#xff08;用Lombok注解Data&#xff09;中的字段屬性找不到等錯誤。解答&#xff1a; 常見是Lombok版本問題…

PyTorch參數管理詳解:從訪問到初始化與共享

本文通過實例代碼講解如何在PyTorch中管理神經網絡參數&#xff0c;包括參數訪問、多種初始化方法、自定義初始化以及參數綁定技術。所有代碼可直接運行&#xff0c;適合深度學習初學者進階學習。 1. 定義網絡與參數訪問 1.1 定義單隱藏層多層感知機 import torch from torch…

基于springboot+vue的課程管理系統

一、系統架構 前端&#xff1a;vue | element-ui 后端&#xff1a;springboot | mybatis-plus 環境&#xff1a;jdk1.8 | mysql8 | maven | node v16.20.2 | idea 二、代碼及數據 三、功能介紹 01. 登錄 02. 管理員-首頁 03. 管理員-系管理 04. 管理員-專業管理 05. 管…