文章目錄
- 前言
- 一、背景
- 二、Canal 簡介
- 三、主庫數據庫配置
- 1.主庫配置
- 2.創建 Canal 用戶并授予權限
- 四.配置 Canal Server
- 1.Canal Server 配置文件
- 2.啟動 Canal Server
- 五.開發 Spring Boot 客戶端
- 1. 引入依賴
- 2. 配置 Canal 客戶端
- 3. 實現數據同步邏輯
- 六.啟動并測試
- 七.注意事項
- 八.總結
前言
在分布式系統中,數據同步是一個常見的需求。例如,我們可能需要將主庫的數據實時同步到多個從庫,或者將數據從一個數據庫集群同步到另一個集群。本篇內容通過一個實際案例,介紹如何使用 Spring Boot 和 Canal 實現 MySQL 數據庫之間的數據同步。
一、背景
假設我們有以下數據庫架構:
- 兩個主庫:db_1 和 db_2。
每個主庫對應兩個從庫:db_1_bk_1、db_1_bk_2 和 db_2_bk_1、db_2_bk_2。 - 我們的目標是:
將 db_1 的數據同步到 db_1_bk_1 和 db_1_bk_2。
將 db_2 的數據同步到 db_2_bk_1 和 db_2_bk_2。
二、Canal 簡介
Canal 是阿里巴巴開源的一款基于 MySQL Binlog 的增量數據訂閱與分發工具。它通過模擬 MySQL 的從節點,實時捕獲主庫的 Binlog 日志,并將數據變更事件推送給下游消費者。Canal 支持多種下游適配器,如 Kafka、RabbitMQ 和直接消費。
三、主庫數據庫配置
1.主庫配置
為了使 Canal 能夠正常解析 Binlog 日志,主庫需要進行以下配置:
- 開啟 Binlog 日志:確保主庫開啟了 Binlog 日志,并且設置為 ROW 模式。
- 配置 server-id:為每個主庫設置唯一的 server-id。
- 創建 Canal 用戶并授予權限:創建一個用戶供 Canal 使用,并授予必要的權限。
編輯主庫的配置文件(my.cnf 或 my.ini),添加以下內容:
[mysqld]
# 開啟 Binlog 日志
log-bin=mysql-bin
# 設置 Binlog 格式為 ROW 模式
binlog-format=ROW
# 設置唯一的 server-id
server-id=1
注意:
- 如果你有多個主庫,每個主庫的 server-id 必須是唯一的。
- 修改配置后,需要重啟 MySQL 服務以使配置生效。
2.創建 Canal 用戶并授予權限
Canal 需要一個具有讀取 Binlog 權限的 MySQL 用戶。以下是創建用戶并授予權限的步驟:
# 登錄 MySQL
mysql -u root -p
# 創建用戶
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
# 授予權限
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
# 刷新權限
FLUSH PRIVILEGES;
說明:
- canal 用戶需要足夠的權限來讀取 Binlog 數據,但不需要對數據庫進行寫操作。
- 如果你的 MySQL 版本較新(8.x),可能需要使用 ALTER USER 命令來設置密碼:
ALTER USER 'canal'@'%' IDENTIFIED BY 'canal';
四.配置 Canal Server
Canal Server 是 Canal 的核心組件,負責連接主庫并解析 Binlog 數據。我們需要為每個主庫配置一個 Canal 實例。
1.Canal Server 配置文件
在 Canal Server 的配置目錄下,創建兩個實例配置文件:conf/db_1/instance.properties 和 conf/db_2/instance.properties。
conf/db_1/instance.properties:
# 主庫的地址和端口
canal.instance.master.address=db_1_ip:3306
# Canal 連接主庫的用戶名和密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正則表達式,這里表示同步 db_1 數據庫的所有表
canal.instance.filter.regex=db_1\\..*
conf/db_2/instance.properties:
# 主庫的地址和端口
canal.instance.master.address=db_2_ip:3306
# Canal 連接主庫的用戶名和密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正則表達式,這里表示同步 db_2 數據庫的所有表
canal.instance.filter.regex=db_2\\..*
2.啟動 Canal Server
使用以下命令啟動 Canal Server:
nohup sh bin/canal.sh start &
注意:
- 確保主庫的 Binlog 位置和文件名正確。如果不確定,可以通過 SHOW MASTER STATUS; 命令查看。
- 如果主庫已經運行了一段時間,需要指定 Binlog 的起始位置,避免重復同步舊數據。
五.開發 Spring Boot 客戶端
Spring Boot 客戶端作為 Canal 的消息消費者,負責接收數據變更事件并同步到目標從庫。
1. 引入依賴
在 Spring Boot 項目的 pom.xml
文件中,引入 Canal 客戶端依賴:
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.8</version>
</dependency>
2. 配置 Canal 客戶端
在 application.yml
文件中,配置 Canal Server 的地址:
canal:server.ip: canal_server_ipserver.port: 11111
3. 實現數據同步邏輯
創建一個 Canal 客戶端服務類,用于接收和處理數據變更事件。
CanalClientService.java:
@Service
public class CanalClientService {private final CanalConnector canalConnector;public CanalClientService(@Value("${canal.server.ip}") String canalServerIp, @Value("${canal.server.port}") int canalServerPort) {this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp, canalServerPort), "example", "", "");}@PostConstructpublic void start() {canalConnector.connect();canalConnector.subscribe("db_1..*, db_2..*"); // 訂閱 db_1 和 db_2 的所有表new Thread(this::process).start();}private void process() {while (true) {Message message = canalConnector.getWithoutAck(100);long batchId = message.getId();if (batchId == -1 || message.getEntries().isEmpty()) {continue;}for (Entry entry : message.getEntries()) {handleData(entry);}canalConnector.ack(batchId);}}private void handleData(Entry entry) {String schemaName = entry.getHeader().getSchemaName(); // 數據庫名String tableName = entry.getHeader().getTableName(); // 表名EventType eventType = entry.getHeader().getEventType(); // 數據變更類型System.out.println("Schema: " + schemaName + ", Table: " + tableName + ", Type: " + eventType);// 根據來源數據庫同步到對應的從庫if ("db_1".equals(schemaName)) {syncToBackupDbs(entry, "db_1_bk_1", "db_1_bk_2");} else if ("db_2".equals(schemaName)) {syncToBackupDbs(entry, "db_2_bk_1", "db_2_bk_2");}}private void syncToBackupDbs(Entry entry, String... backupDbs) {// 根據事件類型同步到從庫if (entry.getHeader().getEventType() == EventType.INSERT) {for (String db : backupDbs) {syncInsert(entry, db);}} else if (entry.getHeader().getEventType() == EventType.UPDATE) {for (String db : backupDbs) {syncUpdate(entry, db);}} else if (entry.getHeader().getEventType() == EventType.DELETE) {for (String db : backupDbs) {syncDelete(entry, db);}}}private void syncInsert(Entry entry, String backupDb) {// 使用 MyBatis 將數據插入到對應的從庫System.out.println("INSERT into " + backupDb);}private void syncUpdate(Entry entry, String backupDb) {// 使用 MyBatis 將數據更新到對應的從庫System.out.println("UPDATE into " + backupDb);}private void syncDelete(Entry entry, String backupDb) {// 使用 MyBatis 將數據從對應的從庫刪除System.out.println("DELETE from " + backupDb);}
}
六.啟動并測試
- 啟動 Canal Server。
- 啟動 Spring Boot 應用。
- 在主庫 db_1 或 db_2 中插入、更新或刪除數據。
- 觀察從庫 db_1_bk_1、db_1_bk_2、db_2_bk_1 和 db_2_bk_2 是否同步成功。
七.注意事項
- 數據一致性:確保從庫的數據與主庫保持一致。可以通過事務或鎖機制來避免沖突。
- 性能優化:如果數據量較大,建議結合中間件(如 Kafka)進行緩沖和負載均衡。
- 錯誤處理:在同步過程中,需要處理網絡異常、數據庫連接異常等情況。
- Canal Server 高可用:在生產環境中,建議部署 Canal Server 的集群,以提高系統的可用性。
八.總結
通過 Spring Boot 和 Canal,我們可以實現 MySQL 數據庫之間的高效數據同步。Canal 提供了強大的 Binlog 解析能力,而 Spring Boot 則提供了靈活的開發框架,兩者結合可以輕松應對復雜的分布式數據同步需求。希望本文對你有所幫助,如果有任何問題,歡迎在評論區留言。