kafka jdbc connector適配kadb數據實時同步

  • 測試結論

源端增量獲取方式包括:bulk、incrementing、timestamp、incrementing+timestamp(混合),各種方式說明如下:

bulk: 一次同步整個表的數據

incrementing: 使用嚴格的自增列標識增量數據。不支持對舊數據的更新和刪除

timestamp: 使用時間戳標識增量數據,每次更新數據都要修改時間戳,時間戳嚴格遞增

timestamp+incrementing: 使用兩個列,一個為自增列,一個為時間戳列。綜合incrementing和timestamp的功能

  • 環境說明

本文在kafka的standalone模式下,適配kafka jdbc connector從源端mysql數據庫實時同步數據到kadb中。驗證1. 增量數據獲取及增量數據獲取方式

  1. kadb版本:V8R3
  2. mysql版本:5.7
  3. 操作系統:centos 7.6
  4. jdbc connector版本:10.8.3。下載地址:JDBC Connector (Source and Sink) | Confluent Hub: Apache Kafka Connectors for Streaming Data.
  5. mysql驅動:mysql-connector-java-5.1.39-bin.jar
  6. kadb驅動:postgresql-42.7.4.jar
  7. java版本:17.0.12 (kafka要求必須為17或者18版本,否則kafka安裝報錯)
  8. kafka版本:kafka_2.13-4.0.0
  9. kafka jdbc connector參考資料:

JDBC Source and Sink Connector for Confluent Platform | Confluent Documentation

  1. kafka connector參考資料

https://kafka.apache.org/documentation/

  • 環境部署
  1. kafka部署

解壓

tar -xzf kafka_2.13-4.0.0.tgz

cd kafka_2.13-4.0.0

產生集群UUID

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

格式化日志目錄

bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

啟動kafka

bin/kafka-server-start.sh config/server.properties

  1. jdbc connector部署

下載jdbc connector,將解壓的內容保存到kafka解壓目錄的plugins下(plugins目錄需自己創建內容如下:

[root@nanri plugins]# ls -l

total 8

drwxr-xr-x. 2 root root?? 43 Apr 17 21:50 assets

drwxr-xr-x. 3 root root? 108 Apr 17 21:50 doc

drwxr-xr-x. 2 root root?? 90 Apr 17 21:50 etc

drwxr-xr-x. 2 root root 4096 Apr 17 21:50 lib

-rw-r--r--. 1 root root 2687 Apr 17 21:50 manifest.json

[root@nanri plugins]# pwd

/root/kafka_2.13-4.0.0/plugins

  1. 源端/目標端jdbc驅動

將源端mysql的jdbc驅動文件和目標端kadb驅動文件拷貝至kafka的解壓目錄的libs目錄下:

[root@nanri libs]# ls -l mysql* postgres*

-rw-r--r--. 1 root root? 989497 Apr 17 23:15 mysql-connector-java-5.1.39-bin.jar

-rw-r--r--. 1 root root 1086687 Apr 17 23:14 postgresql-42.7.4.jar

[root@nanri libs]# pwd

/root/kafka_2.13-4.0.0/libs

  1. 配置文件修改
  1. 連接器配置文件:connect-standalone.properties

添加插件路徑參數:(絕對路徑)

plugin.path=/root/kafka_2.13-4.0.0/plugins,/root/kafka_2.13-4.0.0/libs/connect-file-4.0.0.jar

  1. 源端配置文件:connect-mysql-source.properties文件內容,參數意義參考:

https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html

#productor名字

name=connect-mysql-source????????????????

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector??? //固定值,使用jdbc connector的類

# topic名稱列表,源端和目標端的topic必須一致

topics=test

# 配置jdbc連接

connection.url=jdbc:mysql://192.168.85.145:3306/test_source?useUnicode=true&characterEncoding=utf8&user=root&password=Kingbase@1234&serverTimezone=Asia/Shanghai&useSSL=false

#增量獲取方式,支持bulk,incrementing,timestamp等等

mode=incrementing

  1. 目標端配置文件:connect-kadb-sink.properties文件內容,參數意義參考:

https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html

#consumer名字

name=connect-kadb-sink

# 為當前connector創建的最大線程數

tasks.max=1

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector //固定值,必須設置

# topic名稱列表

topics=test

# 配置jdbc連接

connection.url=jdbc:postgresql://192.168.85.145:5432/test_sink

connection.user=mppadmin

# 自動創建表

auto.create=true

# 寫入模式

insert.mode=insert

  1. 啟動connect

bin/connect-standalone.sh

config/connect-standalone.properties ?????????????? //connect配置參數

config/connect-mysql-source.properties ?? //源端配置參數

config/connect-kadb-sink.properties??????????? //目標端參數

  1. 測試
  1. mysql源端創建表,目標端會自動創建對應的表

mysql> desc test

??? -> ;

+-------+-------------+------+-----+---------+----------------+

| Field | Type??????? | Null | Key | Default | Extra????????? |

+-------+-------------+------+-----+---------+----------------+

| a???? | int(11)???? | NO?? | PRI | NULL??? | auto_increment |??? //使用increment ing方式,必須是自增列

| b???? | varchar(10) | YES? |???? | NULL??? |??????????????? |

+-------+-------------+------+-----+---------+----------------+

2 rows in set (0.00 sec)

  1. 源端插入數據

mysql> insert into test(b) values('dddd');

Query OK, 1 row affected (0.00 sec)

  1. connect日志:

[2025-04-18 22:39:27,665] INFO [connect-kadb-sink|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Completed write operation for 1 records to the database (io.confluent.connect.jdbc.sink.JdbcDbWriter:100)

[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Successfully wrote 1 records. (io.confluent.connect.jdbc.sink.JdbcSinkTask:91)

[2025-04-18 22:39:32,637] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:32,641] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:34,208] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 0 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)

[2025-04-18 22:39:37,642] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:37,644] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:42,645] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:42,648] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:44,210] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)

[2025-04-18 22:39:47,649] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:47,650] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:52,653] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:52,657] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:54,192] INFO Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

  1. 使用kafka-console-consumer.sh查看topic中的事件

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"a"},{"type":"string","optional":true,"field":"b"}],"optional":false,"name":"test"},"payload":{"a":5,"b":"dddd"}}

  1. 目標端數據

1 | aaa

?2 | bbb

?3 | ccc

?4 | ddd

?5 | dddd

(844 rows)

test_sink=#

  1. 源端數據

mysql> select * from test;

+---+------+

| a | b??? |

+---+------+

| 5 | dddd |

+---+------+

1 row in set (0.00 sec)

  1. 命令參考

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic sys_config

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic sys_config

bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic sys_config --from-beginning

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 –list

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-local-file-sink –state

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic __consumer_offsets

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/77883.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/77883.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/77883.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

基于Hadoop的音樂推薦系統(源碼+lw+部署文檔+講解),源碼可白嫖!

摘要 本畢業生數據分析與可視化系統采用B/S架構,數據庫是MySQL,網站的搭建與開發采用了先進的Java語言、爬蟲技術進行編寫,使用了Spring Boot框架。該系統從兩個對象:由管理員和用戶來對系統進行設計構建。主要功能包括&#xff…

CentOS的安裝以及網絡配置

CentOS的下載 在學習docker之前,我們需要知道的就是docker是運行在Linux內核之上的,所以我們需要Linux環境的操作系統,當然了你也可以選擇安裝ubuntu等操作系統,如果你不想在本機安裝的話還可以考慮買阿里或者華為的云服務器&…

【條形碼識別改名工具】如何批量識別圖片條形碼,并以條碼內容批量重命名,基于WPF和Zxing的開發總結

批量圖片條形碼識別與重命名系統 (WPF + ZXing)開發總結 項目適用場景 ??電商商品管理??:批量處理商品圖片,根據條形碼自動分類歸檔??圖書館系統??:掃描圖書條形碼快速建立電子檔案??醫療檔案管理??:通過藥品條形碼整理醫療圖片資料??倉儲管理??:自動化識…

RAGFlow安裝+本地知識庫+踩坑記錄

RAGFlow是一種融合了數據檢索與生成式模型的新型系統架構,其核心思想在于將大規模檢索系統與先進的生成式模型(如Transformer、GPT系列)相結合,從而在回答查詢時既能利用海量數據的知識庫,又能生成符合上下文語義的自然…

android liveData observeForever 與 observe對比

LiveData 是一個非常有用的組件,用于在數據變化時通知觀察者。LiveData 提供了兩種主要的觀察方法:observe 和 observeForever。這兩種方法在使用場景、生命周期感知以及內存管理等方面有所不同。 一、observe 方法?? ??1. 基本介紹?? ??生命周期感知??:observe…

web-ssrfme

一、題目源碼 <?php highlight_file(__file__); function curl($url){ $ch curl_init();curl_setopt($ch, CURLOPT_URL, $url);curl_setopt($ch, CURLOPT_HEADER, 0);echo curl_exec($ch);curl_close($ch); }if(isset($_GET[url])){$url $_GET[url];if(preg_match(/file…

企業AI應用模式解析:從本地部署到混合架構

在人工智能快速發展的今天&#xff0c;企業如何選擇合適的大模型應用方式成為了一個關鍵問題。本文將詳細介紹六種主流的企業AI應用模式&#xff0c;幫助您根據自身需求做出最優選擇。 1. 本地部署&#xff08;On-Premise Deployment&#xff09; 特點&#xff1a;將模型下載…

OpenCV 圖形API(49)顏色空間轉換-----將 NV12 格式的圖像數據轉換為 BGR 顏色空間函數NV12toBGR()

操作系統&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 編程語言&#xff1a;C11 算法描述 將圖像從NV12&#xff08;YUV420p&#xff09;顏色空間轉換為BGR。 該函數將輸入圖像從NV12顏色空間轉換為RGB。Y、U和V通道值的常規范圍是0到25…

【java實現+4種變體完整例子】排序算法中【桶排序】的詳細解析,包含基礎實現、常見變體的完整代碼示例,以及各變體的對比表格

以下是桶排序的詳細解析&#xff0c;包含基礎實現、常見變體的完整代碼示例&#xff0c;以及各變體的對比表格&#xff1a; 一、桶排序基礎實現 原理 將數據分到有限數量的桶中&#xff0c;每個桶內部使用其他排序算法&#xff08;如插入排序或快速排序&#xff09;&#xf…

Linux[基本指令]

Linux[基本指令] pwd 查看當前所處的工作目錄 斜杠在Linux中作為路徑分割符 路徑存在的價值為了確定文件的唯一性 cd指令 更改路徑 cd 你要去的路徑(直接進入) cd . 當前目錄 cd . . 上級目錄(路徑回退) 最后的’/為根目錄(根節點) Linux還是window的目錄結構都是樹狀…

git -- 對遠程倉庫的操作 -- 查看,添加(與clone對比),抓取和拉取,推送(注意點,抓取更新+合并的三種方法,解決沖突,對比),移除

目錄 對遠程倉庫的操作 介紹 查看 (git remote) 介紹 查看詳細信息 添加(git remote add) 介紹 與 git clone對比 從遠程倉庫中抓取與拉取 抓取(git fetch) 拉取(git pull) 推送(git push) 介紹 注意 抓取更新合并的方法 git fetch git merge 解決沖突 git …

vue3 excel文件導入

文章目錄 前言使用在vue文件中的使用 前言 最近寫小組官網涉及到了excel文件導入的功能 場景是導入小組成員年級 班級 郵箱 組別 姓名等基本信息的excel表格用于展示各組信息 使用 先下載js庫 npm install xlsx為了提高代碼的復用性 我將它寫成了一個通用的函數 import ap…

Docker環境下SpringBoot程序內存溢出(OOM)問題深度解析與實戰調優

文章目錄 一、問題背景與現象還原**1. 業務背景****2. 故障特征****3. 核心痛點****4. 解決目標** 二、核心矛盾點分析**1. JVM 與容器內存協同失效****2. 非堆內存泄漏****3. 容器內存分配策略缺陷** 三、系統性解決方案**1. Docker 容器配置**2. JVM參數優化&#xff08;容器…

【PGCCC】Postgres MVCC 內部:更新與插入的隱性成本

為什么 Postgres 中的更新操作有時感覺比插入操作慢&#xff1f;答案在于 Postgres 如何在后臺管理數據版本。 Postgres 高效處理并發事務能力的核心是多版本并發控制&#xff08;MVCC&#xff09;。 在本文中&#xff0c;我將探討 MVCC 在 Postgres 中的工作原理以及它如何影響…

Docker使用、容器遷移

Docker 簡介 Docker 是一個開源的容器化平臺&#xff0c;用于打包、部署和運行應用程序及其依賴環境。Docker 容器是輕量級的虛擬化單元&#xff0c;運行在宿主機操作系統上&#xff0c;通過隔離機制&#xff08;如命名空間和控制組&#xff09;確保應用運行環境的一致性和可移…

c#清理釋放內存

雖然c#具有內存管理和垃圾回收機制&#xff0c;但是在arcobjects二次開發嵌入到arcgis data reviewet還會報內存錯誤。需要強制清理某變量內存方法如下: 1設置靜態函數ReleaseCom函數 public static void ReleaseCom(object o) { try{System.Runtime.InteropServices.Marsh…

Linux:進程:進程控制

進程創建 在Linux中我們使用fork函數創建新進程&#xff1a; fork函數 fork函數是Linux中的一個系統調用&#xff0c;用于創建一個新的進程&#xff0c;創建的新進程是原來進程的子進程 返回值&#xff1a;如果子進程創建失敗&#xff0c;返回值是-1。如果子進程創建成功&a…

day1-小白學習JAVA---JDK安裝和環境變量配置(mac版)

JDK安裝和環境變量配置 我的電腦系統一、下載JDK1、oracle官網下載適合的JDK安裝包&#xff0c;選擇Mac OS對應的版本。 二、安裝三、配置環境變量1、終端輸入/usr/libexec/java_home -V查詢所在的路徑&#xff0c;復制備用2、輸入ls -a3、檢查文件目錄中是否有.bash_profile文…

Python項目--基于機器學習的股票預測分析系統

1. 項目介紹 在當今數字化時代&#xff0c;金融市場的數據分析和預測已經成為投資決策的重要依據。本文將詳細介紹一個基于Python的股票預測分析系統&#xff0c;該系統利用機器學習算法對歷史股票數據進行分析&#xff0c;并預測未來股票價格走勢&#xff0c;為投資者提供決策…

計算機視覺與深度學習 | 基于YOLOv8與光流法的目標檢測與跟蹤(Python代碼)

===================================================== github:https://github.com/MichaelBeechan CSDN:https://blog.csdn.net/u011344545 ===================================================== 目標檢測與跟蹤 關鍵實現邏輯檢測-跟蹤協作機制?特征點選擇策略?運動…