MySQL 部分
1. 查看是否開啟 binlog
MySQL 8 默認開啟 binlog。可以通過以下命令查看是否開啟:
SHOW VARIABLES LIKE 'log_bin';
如果返回結果為 ON
,則表示 binlog 已開啟。
Variable_name | Value |
---|---|
log_bin | ON |
2. 若未開啟 binlog,則需手動配置
如果 binlog 未開啟,需要在 MySQL 配置文件中添加以下配置:
log-bin=mysql-bin # 開啟 binlog
server_id=1 # 配置 MySQL replication 需要定義,確保不與 Canal 的 slaveId 重復
修改完成后,重啟 MySQL 使配置生效。
3. 創建 Canal 使用的 MySQL 用戶
Canal 需要連接到 MySQL 并讀取 binlog,因此需要創建一個專門的用戶并授予相應權限。
# 創建用戶
CREATE USER canal IDENTIFIED WITH mysql_native_password BY 'canal';# 授予權限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';# 刷新權限
FLUSH PRIVILEGES;
MQ 部分
在 RabbitMQ 中,我們需要創建一個交換機和隊列,并將它們綁定在一起。我使用的是已經創建過的 Virtual Host : trovebox_dev
,你可以根據實際情況決定是否創建新的 Virtual Host。
1. 新建交換機
在 RabbitMQ 管理界面中,創建一個新的交換機,命名為 canal.exchange
。
2. 添加隊列
創建一個新的隊列,命名為 canal.queue
。
3. 綁定交換機
將隊列 canal.queue
綁定到交換機 canal.exchange
,并設置路由鍵為 canal.routing.key
。
Canal 部分
Docker 安裝 Canal
使用 Docker 安裝 Canal 非常簡單,以下是安裝步驟:
-
拉取 Canal 鏡像:沒有tag默認最新的
docker pull canal/canal-server
-
運行 Canal 容器:
docker run -p 11111:11111 -p 11110:11110 -p 11112:11112 \--name canal \-e canal.destinations=destination \-e canal.instance.master.address=ip:port \-e canal.instance.dbUsername=canal \-e canal.instance.dbPassword=canal \-e canal.instance.connectionCharset=UTF-8 \-e canal.instance.tsdb.enable=true \-e canal.instance.gtidon=false \-e canal.instance.filter.regex=dataBaseName\\..* \-d canal/canal-server:latest
-
將 Canal 的配置文件和日志文件拷貝到宿主機:
docker cp containerId:/home/admin/canal-server/conf /www/dk_project/dk_app/canal/ docker cp containerId:/home/admin/canal-server/logs /www/dk_project/dk_app/canal/
-
修改配置文件
conf/canal.properties
:canal.serverMode = rabbitMQ rabbitmq.host = ip rabbitmq.virtual.host = trovebox_dev rabbitmq.exchange = canal.exchange rabbitmq.username = trovebox_dev rabbitmq.password = troveboxadmin
-
修改配置文件
conf/destination/canal.properties
:canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.mq.topic=canal.routing.key
-
刪除并重新創建 Canal 容器:
docker rm -f canaldocker run -p 11111:11111 -p 11110:11110 -p 11112:11112 \--name canal \-e canal.destinations=destination \-e canal.instance.master.address=ip:port \-e canal.instance.dbUsername=canal \-e canal.instance.dbPassword=canal \-e canal.instance.connectionCharset=UTF-8 \-e canal.instance.tsdb.enable=true \-v /www/dk_project/dk_app/canal/conf:/home/admin/canal-server/conf/ \-v /home/admin/canal-server/logs:/home/admin/canal-server/logs/ \-e canal.instance.filter.regex=dataBaseName\\..* \-d canal/canal-server:latest
Java 部分代碼
BinLogDto.java
BinLogDto
類用于解析 Canal 發送的 binlog 數據。
package online.trovebox.ruyiai.common.dto;import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import lombok.Data;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Data
public class BinLogDto {private String database; // 數據庫private String table; // 表private String type; // 操作類型private JSONArray data; // 操作數據private JSONArray old; // 變更前數據private JSONArray pkNames; // 主鍵名稱private String sql; // 執行 SQL 語句private Long es;private String gtid;private Long id;private Boolean isDdl;private JSONObject mysqlType;private JSONObject sqlType;private Long ts;public <T> List<T> getData(Class<T> clazz) {if (this.data == null || this.data.size() == 0) {return null;}return this.data.toList(clazz);}public <T> List<T> getOld(Class<T> clazz) {if (this.old == null || this.old.size() == 0) {return null;}return this.old.toList(clazz);}public List<String> getPkNames() {if (this.pkNames == null || this.pkNames.size() == 0) {return null;}List<String> pkNames = new ArrayList<>();for (Object pkName : this.pkNames) {pkNames.add(pkName.toString());}return pkNames;}public Map<String, String> getMysqlType() {if (this.mysqlType == null) {return null;}Map<String, String> mysqlTypeMap = new HashMap<>();this.mysqlType.forEach((k, v) -> {mysqlTypeMap.put(k, v.toString());});return mysqlTypeMap;}public Map<String, Integer> getSqlType() {if (this.sqlType == null) {return null;}Map<String, Integer> sqlTypeMap = new HashMap<>();this.sqlType.forEach((k, v) -> {sqlTypeMap.put(k, Integer.valueOf(v.toString()));});return sqlTypeMap;}
}
Listener.java
Listener
類用于監聽 RabbitMQ 中的消息,并處理 binlog 數據。
package online.trovebox.ruyiai.listener;import com.alibaba.fastjson2.JSON;
import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import online.trovebox.ruyiai.common.dto.BinLogDto;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;@Component
@Slf4j
@RequiredArgsConstructor
public class CanalListener {@Resourceprivate RedisTemplate<String, Object> redisTemplate;String[] prefixes = new String[]{"coin_change_log","log","message",};@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "canal.queue", durable = "true"),exchange = @Exchange(value = "canal.exchange"),key = "canal.routing.key")})public void handleDataChange(@Payload Message message) {String content = new String(message.getBody(), StandardCharsets.UTF_8);BinLogDto binLog = JSON.parseObject(content, BinLogDto.class);String type = binLog.getType();if (type.equalsIgnoreCase("select")) {return;}String table = binLog.getTable();for (String prefix : prefixes) {if (table.startsWith(prefix)) {System.err.println(table);return;}}log.info("表:{} 操作類型:{}", table, binLog.getType());log.info("操作后數據:{} ", binLog.getData().toStringPretty());deleteKeysStartingWith(table);}public void deleteKeysStartingWith(String prefix) {String cursor = "0";do {Set<String> keys = scanKeys(cursor, prefix);cursor = keys.isEmpty() ? "0" : "1";if (!keys.isEmpty()) {redisTemplate.delete(keys);}} while (!"0".equals(cursor));}private Set<String> scanKeys(String cursor, String prefix) {return redisTemplate.execute((RedisCallback<Set<String>>) connection -> {ScanOptions options = ScanOptions.scanOptions().match(prefix + "*").count(1000).build();Cursor<byte[]> cursorScan = connection.scan(options);Set<String> keys = new HashSet<>();while (cursorScan.hasNext()) {byte[] keyBytes = cursorScan.next();keys.add(new String(keyBytes, StandardCharsets.UTF_8));}return keys;});}
}
效果圖
知識點說明
- Binlog:MySQL 的二進制日志,用于記錄數據庫的所有更改操作。Canal 通過讀取 binlog 來獲取數據庫的變更數據。
- Canal:阿里巴巴開源的數據庫同步工具,基于 MySQL 的 binlog 實現數據同步。
- RabbitMQ:消息隊列中間件,用于在分布式系統中傳遞消息。Canal 可以將 binlog 數據發送到 RabbitMQ,供其他服務消費。
- Redis:內存數據庫,用于緩存數據。在監聽 binlog 變更時,可以通過 Redis 緩存相關數據,并在數據變更時清除緩存。
通過以上步驟和代碼,你可以實現 MySQL 數據庫的變更監聽,并將變更數據通過 RabbitMQ 發送到其他服務進行處理。