使用 Spring Boot 和 Canal 實現 MySQL 數據庫同步

文章目錄

  • 前言
  • 一、背景
  • 二、Canal 簡介
  • 三、主庫數據庫配置
      • 1.主庫配置
      • 2.創建 Canal 用戶并授予權限
  • 四.配置 Canal Server
      • 1.Canal Server 配置文件
      • 2.啟動 Canal Server
  • 五.開發 Spring Boot 客戶端
      • 1. 引入依賴
      • 2. 配置 Canal 客戶端
      • 3. 實現數據同步邏輯
  • 六.啟動并測試
  • 七.注意事項
  • 八.總結


前言

在分布式系統中,數據同步是一個常見的需求。例如,我們可能需要將主庫的數據實時同步到多個從庫,或者將數據從一個數據庫集群同步到另一個集群。本篇內容通過一個實際案例,介紹如何使用 Spring Boot 和 Canal 實現 MySQL 數據庫之間的數據同步。


一、背景

假設我們有以下數據庫架構:

  • 兩個主庫:db_1 和 db_2。
    每個主庫對應兩個從庫:db_1_bk_1、db_1_bk_2 和 db_2_bk_1、db_2_bk_2。
  • 我們的目標是:
    將 db_1 的數據同步到 db_1_bk_1 和 db_1_bk_2。
    將 db_2 的數據同步到 db_2_bk_1 和 db_2_bk_2。

二、Canal 簡介

Canal 是阿里巴巴開源的一款基于 MySQL Binlog 的增量數據訂閱與分發工具。它通過模擬 MySQL 的從節點,實時捕獲主庫的 Binlog 日志,并將數據變更事件推送給下游消費者。Canal 支持多種下游適配器,如 Kafka、RabbitMQ 和直接消費。

三、主庫數據庫配置

1.主庫配置

為了使 Canal 能夠正常解析 Binlog 日志,主庫需要進行以下配置:

  • 開啟 Binlog 日志:確保主庫開啟了 Binlog 日志,并且設置為 ROW 模式。
  • 配置 server-id:為每個主庫設置唯一的 server-id。
  • 創建 Canal 用戶并授予權限:創建一個用戶供 Canal 使用,并授予必要的權限。

編輯主庫的配置文件(my.cnf 或 my.ini),添加以下內容:

[mysqld]
# 開啟 Binlog 日志
log-bin=mysql-bin
# 設置 Binlog 格式為 ROW 模式
binlog-format=ROW
# 設置唯一的 server-id
server-id=1

注意:

  • 如果你有多個主庫,每個主庫的 server-id 必須是唯一的。
  • 修改配置后,需要重啟 MySQL 服務以使配置生效。

2.創建 Canal 用戶并授予權限

Canal 需要一個具有讀取 Binlog 權限的 MySQL 用戶。以下是創建用戶并授予權限的步驟:

# 登錄 MySQL
mysql -u root -p
# 創建用戶
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
# 授予權限
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
# 刷新權限
FLUSH PRIVILEGES;

說明:

  • canal 用戶需要足夠的權限來讀取 Binlog 數據,但不需要對數據庫進行寫操作。
  • 如果你的 MySQL 版本較新(8.x),可能需要使用 ALTER USER 命令來設置密碼:
ALTER USER 'canal'@'%' IDENTIFIED BY 'canal';

四.配置 Canal Server

Canal Server 是 Canal 的核心組件,負責連接主庫并解析 Binlog 數據。我們需要為每個主庫配置一個 Canal 實例。

1.Canal Server 配置文件

在 Canal Server 的配置目錄下,創建兩個實例配置文件:conf/db_1/instance.properties 和 conf/db_2/instance.properties。
conf/db_1/instance.properties:

# 主庫的地址和端口
canal.instance.master.address=db_1_ip:3306
# Canal 連接主庫的用戶名和密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正則表達式,這里表示同步 db_1 數據庫的所有表
canal.instance.filter.regex=db_1\\..*

conf/db_2/instance.properties:

# 主庫的地址和端口
canal.instance.master.address=db_2_ip:3306
# Canal 連接主庫的用戶名和密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正則表達式,這里表示同步 db_2 數據庫的所有表
canal.instance.filter.regex=db_2\\..*

2.啟動 Canal Server

使用以下命令啟動 Canal Server:

nohup sh bin/canal.sh start &

注意:

  • 確保主庫的 Binlog 位置和文件名正確。如果不確定,可以通過 SHOW MASTER STATUS; 命令查看。
  • 如果主庫已經運行了一段時間,需要指定 Binlog 的起始位置,避免重復同步舊數據。

五.開發 Spring Boot 客戶端

Spring Boot 客戶端作為 Canal 的消息消費者,負責接收數據變更事件并同步到目標從庫。

1. 引入依賴

在 Spring Boot 項目的 pom.xml文件中,引入 Canal 客戶端依賴:

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.8</version>
</dependency>

2. 配置 Canal 客戶端

application.yml 文件中,配置 Canal Server 的地址:

canal:server.ip: canal_server_ipserver.port: 11111

3. 實現數據同步邏輯

創建一個 Canal 客戶端服務類,用于接收和處理數據變更事件。
CanalClientService.java:

@Service
public class CanalClientService {private final CanalConnector canalConnector;public CanalClientService(@Value("${canal.server.ip}") String canalServerIp, @Value("${canal.server.port}") int canalServerPort) {this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp, canalServerPort), "example", "", "");}@PostConstructpublic void start() {canalConnector.connect();canalConnector.subscribe("db_1..*, db_2..*"); // 訂閱 db_1 和 db_2 的所有表new Thread(this::process).start();}private void process() {while (true) {Message message = canalConnector.getWithoutAck(100);long batchId = message.getId();if (batchId == -1 || message.getEntries().isEmpty()) {continue;}for (Entry entry : message.getEntries()) {handleData(entry);}canalConnector.ack(batchId);}}private void handleData(Entry entry) {String schemaName = entry.getHeader().getSchemaName(); // 數據庫名String tableName = entry.getHeader().getTableName();  // 表名EventType eventType = entry.getHeader().getEventType(); // 數據變更類型System.out.println("Schema: " + schemaName + ", Table: " + tableName + ", Type: " + eventType);// 根據來源數據庫同步到對應的從庫if ("db_1".equals(schemaName)) {syncToBackupDbs(entry, "db_1_bk_1", "db_1_bk_2");} else if ("db_2".equals(schemaName)) {syncToBackupDbs(entry, "db_2_bk_1", "db_2_bk_2");}}private void syncToBackupDbs(Entry entry, String... backupDbs) {// 根據事件類型同步到從庫if (entry.getHeader().getEventType() == EventType.INSERT) {for (String db : backupDbs) {syncInsert(entry, db);}} else if (entry.getHeader().getEventType() == EventType.UPDATE) {for (String db : backupDbs) {syncUpdate(entry, db);}} else if (entry.getHeader().getEventType() == EventType.DELETE) {for (String db : backupDbs) {syncDelete(entry, db);}}}private void syncInsert(Entry entry, String backupDb) {// 使用 MyBatis 將數據插入到對應的從庫System.out.println("INSERT into " + backupDb);}private void syncUpdate(Entry entry, String backupDb) {// 使用 MyBatis 將數據更新到對應的從庫System.out.println("UPDATE into " + backupDb);}private void syncDelete(Entry entry, String backupDb) {// 使用 MyBatis 將數據從對應的從庫刪除System.out.println("DELETE from " + backupDb);}
}

六.啟動并測試

  • 啟動 Canal Server。
  • 啟動 Spring Boot 應用。
  • 在主庫 db_1 或 db_2 中插入、更新或刪除數據。
  • 觀察從庫 db_1_bk_1、db_1_bk_2、db_2_bk_1 和 db_2_bk_2 是否同步成功。

七.注意事項

  • 數據一致性:確保從庫的數據與主庫保持一致。可以通過事務或鎖機制來避免沖突。
  • 性能優化:如果數據量較大,建議結合中間件(如 Kafka)進行緩沖和負載均衡。
  • 錯誤處理:在同步過程中,需要處理網絡異常、數據庫連接異常等情況。
  • Canal Server 高可用:在生產環境中,建議部署 Canal Server 的集群,以提高系統的可用性。

八.總結

通過 Spring Boot 和 Canal,我們可以實現 MySQL 數據庫之間的高效數據同步。Canal 提供了強大的 Binlog 解析能力,而 Spring Boot 則提供了靈活的開發框架,兩者結合可以輕松應對復雜的分布式數據同步需求。希望本文對你有所幫助,如果有任何問題,歡迎在評論區留言。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/70326.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/70326.shtml
英文地址,請注明出處:http://en.pswp.cn/web/70326.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Linux系統配置阿里云yum源,安裝docker

配置阿里云yum源 需要保證能夠訪問阿里云網站 可以先ping一下看看&#xff08;阿里云可能禁ping&#xff0c;只要能夠解析為正常的ip地址即可&#xff09; ping mirrors.aliyun.com腳本 #!/bin/bash mkdir /etc/yum.repos.d/bak mv /etc/yum.repos.d/*.repo /etc/yum.repos…

后端開發:開啟技術世界的新大門

在互聯網的廣闊天地中&#xff0c;后端開發宛如一座大廈的基石&#xff0c;雖不直接與用戶 “面對面” 交流&#xff0c;卻默默地支撐著整個互聯網產品的穩定運行。它是服務器端編程的核心領域&#xff0c;負責處理數據、執行業務邏輯以及與數據庫和其他后端服務進行交互。在當…

銀河麒麟系統安裝mysql5.7【親測可行】

一、安裝環境 cpu&#xff1a;I5-10代&#xff1b; 主板&#xff1a;華碩&#xff1b; OS&#xff1a;銀河麒麟V10&#xff08;SP1&#xff09;未激活 架構&#xff1a;Linux 5.10.0-9-generic x86_64 GNU/Linux mysql版本&#xff1a;mysql-5.7.34-linux-glibc2.12-x86_64.ta…

從零開始學習PX4源碼9(部署px4源碼到gitee)

目錄 文章目錄 目錄摘要1.gitee上創建倉庫1.1 gitee上創建倉庫PX4代碼倉庫1.2 gitee上創建子倉庫2.固件在gitee部署過程2.1下載固件到本地2.2切換本地分支2.3修改.gitmodules內容2.4同步子模塊倉庫地址2.5同步子模塊倉庫地址更新(下載)子模塊3.一級子模塊和二級子模塊的映射關…

【回溯算法2】

力扣17.電話號碼的字母組合 鏈接: link 思路 這道題容易想到用嵌套的for循環實現&#xff0c;但是如果輸入的數字變多&#xff0c;嵌套的for循環也會變長&#xff0c;所以暴力破解的方法不合適。 可以定義一個map將數字和字母對應&#xff0c;這樣就可以獲得數字字母的映射了…

科普:“Docker Desktop”和“Docker”以及“WSL”

“Docker Desktop”和“Docker”這兩個概念既有緊密聯系&#xff0c;又存在一定區別&#xff1a; 一、聯系 核心功能同源&#xff1a;Docker Desktop 本質上是基于 Docker 核心技術構建的。Docker 是一個用于開發、部署和運行應用程序的開源平臺&#xff0c;它利用容器化技術…

Flutter 網絡請求與數據處理:從基礎到單例封裝

Flutter 網絡請求與數據處理&#xff1a;從基礎到單例封裝 在 Flutter 開發中&#xff0c;網絡請求是一個非常常見的需求&#xff0c;比如獲取 API 數據、上傳文件、處理分頁加載等。為了高效地處理網絡請求和數據管理&#xff0c;我們需要選擇合適的工具并進行合理的封裝。 …

虛擬表格實現全解析

在數據展示越來越復雜的今天&#xff0c;大量數據的渲染就像是“滿漢全席”——如果把所有菜肴一次性擺上桌&#xff0c;既浪費資源也讓人眼花繚亂。幸運的是&#xff0c;我們有兩種選擇&#xff1a; 自己動手&#xff1a;通過二次封裝 Element Plus 的表格組件&#xff0c;實…

QT 讀寫鎖

一、概述 1、讀寫鎖是一種線程同步機制&#xff0c;用于解決多線程環境下的讀寫競爭問題。 2、讀寫鎖允許多個線程同時獲取讀鎖&#xff08;共享訪問&#xff09;&#xff0c;但只允許一個線程獲取寫鎖&#xff08;獨占訪問&#xff09;。 3、這種機制可以提高并發性能&…

2025 vue3面試題匯總,通俗易懂

一、基礎概念與核心特性 1. Vue3 相比 Vue2 的改進&#xff08;通俗版&#xff09; 問題&#xff1a;Vue3 比 Vue2 好在哪&#xff1f; 答案&#xff1a; 更快&#xff1a; Proxy 代理&#xff1a;Vue2 的響應式像“逐個監聽保險箱”&#xff08;每個屬性單獨監聽&#xff0…

第5章:在LangChain中如何使用AI Services

這篇文章詳細介紹了 LangChain4j 中的 AI Services 概念&#xff0c;展示了如何通過高層次的抽象來簡化與大語言模型&#xff08;LLM&#xff09;的交互。AI Services 的核心思想是隱藏底層復雜性&#xff0c;讓開發者專注于業務邏輯&#xff0c;同時支持聊天記憶、工具調用和 …

二叉樹(數據結構)

二叉樹 二叉樹也是用過遞歸定義的結構 先序遍歷又稱前序遍歷 ?? ?? 按照先序遍歷的方法去手算處理這個二叉樹 ?? 先A B C 再 A B D E C&#xff08;也就是把B換成BDE再放進去&#xff09; 再 A B D E C F 看這個插入的方法要掌握像二叉樹這樣向一個…

機器學習筆記——常用損失函數

大家好&#xff0c;這里是好評筆記&#xff0c;公主號&#xff1a;Goodnote&#xff0c;專欄文章私信限時Free。本筆記介紹機器學習中常見的損失函數和代價函數&#xff0c;各函數的使用場景。 熱門專欄 機器學習 機器學習筆記合集 深度學習 深度學習筆記合集 文章目錄 熱門…

Wireshark使用介紹

文章目錄 Wireshark介紹Wireshark使用工作模式介紹1. 混雜模式&#xff08;Promiscuous Mode&#xff09;2. 普通模式&#xff08;Normal Mode&#xff09;3. 監視模式&#xff08;Monitor Mode&#xff09; 界面分區捕獲過濾器語法基本語法邏輯運算符高級語法使用示例捕獲過濾…

#滲透測試#批量漏洞挖掘#暢捷通T+SQL注入漏洞

免責聲明 本教程僅為合法的教學目的而準備,嚴禁用于任何形式的違法犯罪活動及其他商業行為,在使用本教程前,您應確保該行為符合當地的法律法規,繼續閱讀即表示您需自行承擔所有操作的后果,如有異議,請立即停止本文章讀。 目錄 一、漏洞全景解析 1. 高危漏洞案例庫 2.…

【小游戲】C++控制臺版本俄羅斯輪盤賭

制作團隊&#xff1a;洛谷813622&#xff08;Igallta&#xff09; 989571&#xff08;_ayaka_&#xff09; Mod&#xff1a;_ayaka_ 雙人模式&#xff1a;Igallta 公告&#xff1a; 原先的9.8改名為 Alpha 1.0&#xff0c;以后每次更新都增加 0.1。 Alpha 1.11 改為 Beta 1…

nvm安裝、管理node多版本以及配置環境變量【保姆級教程】

引言 不同的項目運行時可能需要不同的node版本才可以運行&#xff0c;由于來回進行卸載不同版本的node比較麻煩&#xff1b;所以需要使用node工程多版本管理。 本人在配置時&#xff0c;通過網絡搜索教程&#xff0c;由于文章時間過老&#xff0c;或者文章的互相拷貝導致配置時…

框架--Mybatis3

一.特殊符號處理 < < > > " &quot; &apos; & &amp; 除了可以使用上述轉義字符外&#xff0c;還可以使<![CDATA[ ]]>用來包裹特殊字符。 二.mybatis 一級緩存二級緩存 1.為什么緩存 緩存&#xff1a;數據緩存&#xf…

純新手教程:用llama.cpp本地部署DeepSeek蒸餾模型

0. 前言 llama.cpp是一個基于純C/C實現的高性能大語言模型推理引擎&#xff0c;專為優化本地及云端部署而設計。其核心目標在于通過底層硬件加速和量化技術&#xff0c;實現在多樣化硬件平臺上的高效推理&#xff0c;同時保持低資源占用與易用性。 最近DeepSeek太火了&#x…

Netty入門詳解

引言 Netty 是一個基于 Java 的高性能、異步事件驅動的網絡應用框架&#xff0c;用于快速開發可維護的高性能網絡服務器和客戶端。它提供了一組豐富的 API&#xff0c;使得開發人員能夠輕松地處理各種網絡協議&#xff0c;如 TCP、UDP 等&#xff0c;并且支持多種編解碼方式&a…