一、Canal 簡介
1.1 Canal 是什么?
Canal 是阿里巴巴開源的一款基于 MySQL 數據庫增量日志解析(Binlog)中間件,它模擬 MySQL 的從機(Slave)行為,監聽 MySQL 主機的二進制日志(Binlog),并解析出數據變更事件(DML 和 DDL),然后將這些事件轉發給下游應用,從而實現數據的實時同步和處理。
1.2 Canal 的應用場景
-
數據同步:將 MySQL 數據庫的變更數據實時同步到其他存儲系統,如 Redis、Elasticsearch 等。
-
數據備份:實時備份 MySQL 數據庫的變更數據,用于數據恢復或異地備份。
-
數據一致性:在分布式系統中,保證 MySQL 數據庫和緩存(如 Redis)的數據一致性。
-
業務解耦:通過消息隊列(如 Kafka)將數據變更事件傳遞給下游應用,實現業務系統的解耦。
二、Canal 工作原理
2.1 MySQL 主從復制原理
在介紹 Canal 的工作原理之前,我們先復習一下 MySQL 的主從復制原理。MySQL 主從復制主要涉及以下幾個步驟:
-
主庫記錄二進制日志:主庫將數據變更操作(如 INSERT、UPDATE、DELETE)記錄到二進制日志(Binlog)中。
-
從庫讀取二進制日志:從庫連接到主庫,請求二進制日志,并將日志內容寫入到本地的中繼日志(Relay Log)中。
-
從庫應用中繼日志:從庫的 SQL 線程讀取中繼日志,并將日志中的數據變更操作應用到從庫的數據庫中。
2.2 Canal 模擬從庫行為
Canal 模擬了 MySQL 從庫的行為,通過以下步驟實現數據的監聽和解析:
-
連接到 MySQL 主庫:Canal 以從庫的身份連接到 MySQL 主庫,并請求二進制日志。
-
解析二進制日志:Canal 解析二進制日志中的數據變更事件(DML 和 DDL),并將其轉換為 Canal 自定義的事件格式。
-
轉發數據變更事件:Canal 將解析后的數據變更事件轉發給下游應用,如 Redis、Kafka 等。
2.3 Canal 的核心組件
-
Canal Server:負責與 MySQL 主庫建立連接,監聽二進制日志,并解析數據變更事件。
-
Canal Client:負責接收 Canal Server 轉發的數據變更事件,并進行相應的處理。
-
Canal Filter:用于過濾二進制日志中的數據變更事件,只處理感興趣的表和字段。
三、Canal 安裝與配置
3.1 開啟 MySQL 二進制日志
在使用 Canal 之前,需要確保 MySQL 的二進制日志已經開啟。可以通過以下命令查看二進制日志是否開啟:
SHOW VARIABLES LIKE 'log_bin';
如果 log_bin
的值為 OFF
,則需要在 MySQL 配置文件中開啟二進制日志:
[mysqld]
log-bin=mysql-bin
3.2 創建 Canal 用戶
為了保證數據安全,建議為 Canal 創建一個專用的 MySQL 用戶,并授予其必要的權限:
CREATE USER canal@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT, SUPER ON *.* TO 'canal'@'%';
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
FLUSH PRIVILEGES;
3.3 使用 Docker 安裝 Canal
Canal 提供了 Docker 鏡像,可以通過以下命令安裝并啟動 Canal:
docker pull canal/canal-server:v1.1.5docker run -p 11111:11111 --name canal \
-e canal.destinations=tingshuTopic \
-e canal.instance.master.address=192.168.200.130:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=.*\\..* \
-d canal/canal-server:v1.1.5
四、Spring Boot 整合 Canal
4.1 創建 Spring Boot 工程
使用 Spring Initializr 創建一個新的 Spring Boot 工程
4.2 添加依賴
在 pom.xml
文件中添加以下依賴:
-
目前canal不支持jdk17,變成jdk8版本
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu</groupId><artifactId>service-cdc</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>service-cdc</name><url>http://maven.apache.org</url><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.6.RELEASE</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!--web 需要啟動項目--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version></dependency><!-- 起到監聽的作用 --><dependency><groupId>javax.persistence</groupId><artifactId>persistence-api</artifactId><version>1.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency></dependencies>
</project>
4.3 配置 Canal 和 Redis
一個canal服務器有很多個客戶端,每個客戶端有自己的通道名字
在 application-dev.yml
文件中配置 Canal 和 Redis 的連接信息:
server:port: 7080
#canal配置
canal:destination: tingshuTopic #Canal服務端發送數據的話題名稱跟上面容器里參數destinations的一樣server: 192.168.200.130:11111spring:redis:host: 192.168.200.130port: 6379
4.4 創建實體類
通過實體類監聽到mysql變化的數據,但因為不同表的數據都不一樣,所以每個實體類的字段都不一樣,但是每個表都會有id,所以在實體類中加上變化字段的id
import lombok.Data;
import javax.persistence.Column;@Data
public class CDCEntity {// 注意Column 注解必須是persistence包下的,表示監聽表中的一個字段@Column(name = "id")private Long id;
}
4.5 創建 Canal 處理類
創建一個類實現 EntryHandler
接口,用于處理 Canal 解析的數據變更事件:它定義了三個方法,分別對應 INSERT
、UPDATE
和 DELETE
操作:
import com.alibaba.otter.canal.client.adapter.support.EntryHandler;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;@Slf4j
@Component
@CanalTable("album_info")
public class AlbumInfoCdcHandler implements EntryHandler<CDCEntity> {@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void insert(CDCEntity cdcEntity) {log.info("監聽到數據添加,ID: {}", cdcEntity.getId());}@Overridepublic void update(CDCEntity before, CDCEntity after) {log.info("監聽到數據更新,ID: {}", after.getId());String key = "album:info:" + after.getId();redisTemplate.delete(key);}@Overridepublic void delete(CDCEntity cdcEntity) {log.info("監聽到數據刪除,ID: {}", cdcEntity.getId());}
}
實例:
@Slf4j
@Component
@CanalTable("album_info") 監聽變更表
public class AlbumInfoCdcHandler implements EntryHandler<CDCEntity> {@Autowiredprivate RedisTemplate redisTemplate;//mysql執行添加操作,這個方法執行public void insert(CDCEntity cdcEntity) {log.info("監聽到數據修改,ID:{}", cdcEntity.getId());}//mysql執行修改操作,這個方法執行public void update(CDCEntity before, CDCEntity after) {log.info("監聽到數據修改,ID:{}", after.getId());String key = "album:info:" + after.getId();redisTemplate.delete(key);}//mysql執行刪除操作,這個方法執行public void delete(CDCEntity cdcEntity) {log.info("監聽到數據修改,ID:{}", cdcEntity.getId());}
}
?
五、總結
Canal 是一款強大的 MySQL 數據庫增量日志解析中間件,通過模擬 MySQL 從庫的行為,實現數據的實時同步和處理。在本文中,我們詳細介紹了 Canal 的工作原理、安裝配置方法以及如何與 Spring Boot 進行整合。通過 Canal,我們可以輕松地實現 MySQL 數據庫與 Redis 等其他存儲系統的數據一致性,為分布式系統的開發提供了有力的支持。
希望本文對你有所幫助。如果有任何問題或建議,歡迎在評論區留言。