MySQL與Canal、RabbitMQ集成指南

MySQL 部分

1. 查看是否開啟 binlog

MySQL 8 默認開啟 binlog。可以通過以下命令查看是否開啟:

SHOW VARIABLES LIKE 'log_bin';

如果返回結果為 ON,則表示 binlog 已開啟。

Variable_nameValue
log_binON

2. 若未開啟 binlog,則需手動配置

如果 binlog 未開啟,需要在 MySQL 配置文件中添加以下配置:

log-bin=mysql-bin  # 開啟 binlog
server_id=1        # 配置 MySQL replication 需要定義,確保不與 Canal 的 slaveId 重復

修改完成后,重啟 MySQL 使配置生效。

3. 創建 Canal 使用的 MySQL 用戶

Canal 需要連接到 MySQL 并讀取 binlog,因此需要創建一個專門的用戶并授予相應權限。

# 創建用戶
CREATE USER canal IDENTIFIED WITH mysql_native_password BY 'canal';# 授予權限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';# 刷新權限
FLUSH PRIVILEGES;

MQ 部分

在 RabbitMQ 中,我們需要創建一個交換機和隊列,并將它們綁定在一起。我使用的是已經創建過的 Virtual Host : trovebox_dev,你可以根據實際情況決定是否創建新的 Virtual Host。

1. 新建交換機

在 RabbitMQ 管理界面中,創建一個新的交換機,命名為 canal.exchange

新建交換機

2. 添加隊列

創建一個新的隊列,命名為 canal.queue

添加隊列

3. 綁定交換機

將隊列 canal.queue 綁定到交換機 canal.exchange,并設置路由鍵為 canal.routing.key

綁定交換機

綁定交換機

Canal 部分

Docker 安裝 Canal

使用 Docker 安裝 Canal 非常簡單,以下是安裝步驟:

  1. 拉取 Canal 鏡像:沒有tag默認最新的

    docker pull canal/canal-server
    
  2. 運行 Canal 容器:

    docker run -p 11111:11111 -p 11110:11110 -p 11112:11112 \--name canal \-e canal.destinations=destination \-e canal.instance.master.address=ip:port \-e canal.instance.dbUsername=canal \-e canal.instance.dbPassword=canal \-e canal.instance.connectionCharset=UTF-8 \-e canal.instance.tsdb.enable=true \-e canal.instance.gtidon=false \-e canal.instance.filter.regex=dataBaseName\\..* \-d canal/canal-server:latest
    
  3. 將 Canal 的配置文件和日志文件拷貝到宿主機:

    docker cp containerId:/home/admin/canal-server/conf /www/dk_project/dk_app/canal/
    docker cp containerId:/home/admin/canal-server/logs /www/dk_project/dk_app/canal/
    
  4. 修改配置文件 conf/canal.properties

    canal.serverMode = rabbitMQ
    rabbitmq.host = ip
    rabbitmq.virtual.host = trovebox_dev
    rabbitmq.exchange = canal.exchange
    rabbitmq.username = trovebox_dev
    rabbitmq.password = troveboxadmin
    
  5. 修改配置文件 conf/destination/canal.properties

    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    canal.mq.topic=canal.routing.key
    
  6. 刪除并重新創建 Canal 容器:

    docker rm -f canaldocker run -p 11111:11111 -p 11110:11110 -p 11112:11112 \--name canal \-e canal.destinations=destination \-e canal.instance.master.address=ip:port \-e canal.instance.dbUsername=canal \-e canal.instance.dbPassword=canal \-e canal.instance.connectionCharset=UTF-8 \-e canal.instance.tsdb.enable=true \-v /www/dk_project/dk_app/canal/conf:/home/admin/canal-server/conf/ \-v /home/admin/canal-server/logs:/home/admin/canal-server/logs/ \-e canal.instance.filter.regex=dataBaseName\\..* \-d canal/canal-server:latest
    

Java 部分代碼

BinLogDto.java

BinLogDto 類用于解析 Canal 發送的 binlog 數據。

package online.trovebox.ruyiai.common.dto;import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import lombok.Data;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Data
public class BinLogDto {private String database;  // 數據庫private String table;     // 表private String type;      // 操作類型private JSONArray data;   // 操作數據private JSONArray old;    // 變更前數據private JSONArray pkNames; // 主鍵名稱private String sql;       // 執行 SQL 語句private Long es;private String gtid;private Long id;private Boolean isDdl;private JSONObject mysqlType;private JSONObject sqlType;private Long ts;public <T> List<T> getData(Class<T> clazz) {if (this.data == null || this.data.size() == 0) {return null;}return this.data.toList(clazz);}public <T> List<T> getOld(Class<T> clazz) {if (this.old == null || this.old.size() == 0) {return null;}return this.old.toList(clazz);}public List<String> getPkNames() {if (this.pkNames == null || this.pkNames.size() == 0) {return null;}List<String> pkNames = new ArrayList<>();for (Object pkName : this.pkNames) {pkNames.add(pkName.toString());}return pkNames;}public Map<String, String> getMysqlType() {if (this.mysqlType == null) {return null;}Map<String, String> mysqlTypeMap = new HashMap<>();this.mysqlType.forEach((k, v) -> {mysqlTypeMap.put(k, v.toString());});return mysqlTypeMap;}public Map<String, Integer> getSqlType() {if (this.sqlType == null) {return null;}Map<String, Integer> sqlTypeMap = new HashMap<>();this.sqlType.forEach((k, v) -> {sqlTypeMap.put(k, Integer.valueOf(v.toString()));});return sqlTypeMap;}
}

Listener.java

Listener 類用于監聽 RabbitMQ 中的消息,并處理 binlog 數據。

package online.trovebox.ruyiai.listener;import com.alibaba.fastjson2.JSON;
import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import online.trovebox.ruyiai.common.dto.BinLogDto;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;@Component
@Slf4j
@RequiredArgsConstructor
public class CanalListener {@Resourceprivate RedisTemplate<String, Object> redisTemplate;String[] prefixes = new String[]{"coin_change_log","log","message",};@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "canal.queue", durable = "true"),exchange = @Exchange(value = "canal.exchange"),key = "canal.routing.key")})public void handleDataChange(@Payload Message message) {String content = new String(message.getBody(), StandardCharsets.UTF_8);BinLogDto binLog = JSON.parseObject(content, BinLogDto.class);String type = binLog.getType();if (type.equalsIgnoreCase("select")) {return;}String table = binLog.getTable();for (String prefix : prefixes) {if (table.startsWith(prefix)) {System.err.println(table);return;}}log.info("表:{} 操作類型:{}", table, binLog.getType());log.info("操作后數據:{} ", binLog.getData().toStringPretty());deleteKeysStartingWith(table);}public void deleteKeysStartingWith(String prefix) {String cursor = "0";do {Set<String> keys = scanKeys(cursor, prefix);cursor = keys.isEmpty() ? "0" : "1";if (!keys.isEmpty()) {redisTemplate.delete(keys);}} while (!"0".equals(cursor));}private Set<String> scanKeys(String cursor, String prefix) {return redisTemplate.execute((RedisCallback<Set<String>>) connection -> {ScanOptions options = ScanOptions.scanOptions().match(prefix + "*").count(1000).build();Cursor<byte[]> cursorScan = connection.scan(options);Set<String> keys = new HashSet<>();while (cursorScan.hasNext()) {byte[] keyBytes = cursorScan.next();keys.add(new String(keyBytes, StandardCharsets.UTF_8));}return keys;});}
}

效果圖

效果圖

知識點說明

  1. Binlog:MySQL 的二進制日志,用于記錄數據庫的所有更改操作。Canal 通過讀取 binlog 來獲取數據庫的變更數據。
  2. Canal:阿里巴巴開源的數據庫同步工具,基于 MySQL 的 binlog 實現數據同步。
  3. RabbitMQ:消息隊列中間件,用于在分布式系統中傳遞消息。Canal 可以將 binlog 數據發送到 RabbitMQ,供其他服務消費。
  4. Redis:內存數據庫,用于緩存數據。在監聽 binlog 變更時,可以通過 Redis 緩存相關數據,并在數據變更時清除緩存。

通過以上步驟和代碼,你可以實現 MySQL 數據庫的變更監聽,并將變更數據通過 RabbitMQ 發送到其他服務進行處理。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/897980.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/897980.shtml
英文地址,請注明出處:http://en.pswp.cn/news/897980.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

X86 RouterOS 7.18 設置筆記十:上海電信IPTV使用msd_lite實現組播轉單撥

X86 j4125 4網口小主機折騰筆記五&#xff1a;PVE安裝ROS RouterOS X86 RouterOS 7.18 設置筆記一&#xff1a;基礎設置 X86 RouterOS 7.18 設置筆記二&#xff1a;網絡基礎設置(IPV4) X86 RouterOS 7.18 設置筆記三&#xff1a;防火墻設置(IPV4) X86 RouterOS 7.18 設置筆記四…

Select 選擇器選項位置偏移的解決方案

Select 選擇器選項位置偏移的解決方案 在使用 Select 組件時&#xff0c;可能會遇到下拉選項位置偏移的問題。這通常由 CSS 樣式、組件 渲染方式 或 父級元素的影響 造成。以下是詳細的排查步驟和解決方案。 一、常見原因 position: relative; 或 overflow: hidden; 影響下拉菜…

LeetCode 解題思路 17(Hot 100)

解題思路&#xff1a; 找到鏈表中點&#xff1a; 使用快慢指針法&#xff0c;快指針每次移動兩步&#xff0c;慢指針每次移動一步。當快指針到達末尾時&#xff0c;慢指針指向中點。遞歸分割與排序&#xff1a; 將鏈表從中點處分割為左右兩個子鏈表&#xff0c;分別對這兩個子…

數學建模歷程之初見

第一次接觸數學建模是在上大學前&#xff0c;當時只是聽過。起源于我在大學的老鄉群里聊天&#xff0c;由于當時年輕有點傻&#xff0c;說的話太多了&#xff0c;什么都問哈哈哈哈哈。 后來有個學長從老鄉群里加我&#xff0c;問我怎么話那么多&#xff0c;你們懂當時對我幼小…

Python 科學計算與機器學習入門:NumPy + Scikit-Learn 實戰指南

Langchain系列文章目錄 01-玩轉LangChain&#xff1a;從模型調用到Prompt模板與輸出解析的完整指南 02-玩轉 LangChain Memory 模塊&#xff1a;四種記憶類型詳解及應用場景全覆蓋 03-全面掌握 LangChain&#xff1a;從核心鏈條構建到動態任務分配的實戰指南 04-玩轉 LangChai…

「自動駕駛背后的數學:從傳感器數據到控制指令的函數嵌套」—— 揭秘人工智能中的線性函數、ReLU 與復合函數

引言 自動駕駛技術是人工智能領域的一個重要應用&#xff0c;其核心在于如何將傳感器數據轉化為車輛控制指令。這一過程涉及大量的數學知識&#xff0c;包括線性函數、激活函數&#xff08;如 ReLU&#xff09;以及復合函數的嵌套使用。本文將深入探討自動駕駛中的數學原理&am…

詳解SQL數據定義功能

數據定義 1. 數據庫模式&#xff08;Schema&#xff09;的定義與刪除定義模式刪除模式 2. 基本表的定義、修改與刪除定義表約束1. NOT NULL 約束2. DEFAULT 約束3. UNIQUE 約束4. PRIMARY KEY 約束多列主鍵示例&#xff1a; 5. FOREIGN KEY 約束6. CHECK 約束7. AUTO_INCREMENT…

Redis超高并發分key實現

Redis扛并發的能力是非常強的&#xff0c;所以高并發場景下經常會使用Redis&#xff0c;但是Redis單分片的寫入瓶頸在2w左右&#xff0c;讀瓶頸在10w左右&#xff0c;如果在超高并發下即使是集群部署Redis&#xff0c;單分片的Redis也是有可能扛不住的&#xff0c;如下圖所示&a…

AI Agent 時代開幕-Manus AI與OpenAI Agent SDK掀起新風暴

【本周AI新聞: AI Agent 時代開幕-Manus AI與OpenAI Agent SDK掀起新風暴】 https://www.bilibili.com/video/BV1bkQyYCEvQ/?share_sourcecopy_web&vd_source32ed33e1165d68429b2e2eb4749f3f26 最近AI圈子里最火的話題非Manus莫屬&#xff01;這款由中國武漢創業公司“蝴…

多時間尺度的配電網深度強化學習無功優化策略的Python示例代碼框架

以下是一個簡單的多時間尺度的配電網深度強化學習無功優化策略的Python示例代碼框架&#xff0c;用于幫助你理解如何使用深度強化學習&#xff08;以深度Q網絡 DQN 為例&#xff09;來處理配電網的無功優化問題。在實際應用中&#xff0c;你可能需要根據具體的配電網模型和需求…

劍指 Offer II 081. 允許重復選擇元素的組合

comments: true edit_url: https://github.com/doocs/leetcode/edit/main/lcof2/%E5%89%91%E6%8C%87%20Offer%20II%20081.%20%E5%85%81%E8%AE%B8%E9%87%8D%E5%A4%8D%E9%80%89%E6%8B%A9%E5%85%83%E7%B4%A0%E7%9A%84%E7%BB%84%E5%90%88/README.md 劍指 Offer II 081. 允許重復選擇…

Webpack 前端性能優化全攻略

文章目錄 1. 性能優化全景圖1.1 優化維度概覽1.2 優化效果指標 2. 構建速度優化2.1 緩存策略2.2 并行處理2.3 減少構建范圍 3. 輸出質量優化3.1 代碼分割3.2 Tree Shaking3.3 壓縮優化 4. 運行時性能優化4.1 懶加載4.2 預加載4.3 資源優化 5. 高級優化策略5.1 持久化緩存5.2 模…

虛擬電商-數據庫分庫分表(二)

本文章介紹&#xff1a;使用Sharding-JDBC實現數據庫分庫分表&#xff0c;數據庫分片策略&#xff0c;實現數據庫按月分表 一、Sharding-JDBC使用 1.1.準備環境 步驟一&#xff1a;分庫分表sql腳本導入 創建了兩個數據庫&#xff1a;chongba_schedule0 和chongba_schedule1…

向量數據庫對比以及Chroma操作

一、向量數據庫與傳統類型數據庫 向量數據庫&#xff08;Vector Storage Engine&#xff09;與傳統類型的數據庫如關系型數據庫&#xff08;MySQL&#xff09;、文檔型數據庫&#xff08;MongoDB&#xff09;、鍵值存儲&#xff08;Redis&#xff09;、全文搜索引擎&#xff0…

python列表基礎知識

列表 創建列表 1.列表的定義&#xff1a;可變的&#xff0c;有序的數據結構&#xff0c;可以隨時添加或者刪除其中的元素 2.基本語法&#xff1a;字面量【元素1&#xff0c;元素2&#xff0c;元素3】使用[]創建列表 定義變量&#xff1a;變量名稱【元素1&#xff0c;元素2&…

Node.js 的模塊作用域和 module 對象詳細介紹

目錄 代碼示例 1. 創建模塊文件 module-demo.js 2. 導入模塊并使用 module-demo.js 運行結果 總結 在 Node.js 中&#xff0c;每個文件都是一個獨立的模塊&#xff0c;具有自己的作用域。與瀏覽器 JavaScript 代碼不同&#xff0c;Node.js 采用模塊作用域&#xff0c;這意味…

美暢物聯丨WebRTC 技術詳解:構建實時通信的數字橋梁

在互聯網技術飛速發展的今天&#xff0c;實時通信已成為數字生活的核心需求。WebRTC作為一個開源項目&#xff0c;憑借卓越的技術實力與創新理念&#xff0c;為網頁和移動應用帶來了顛覆性的實時通信能力。它突破了傳統通信方式的限制&#xff0c;實現了音頻、視頻和數據在用戶…

excel中兩個表格的合并

使用函數&#xff1a; VLOOKUP函數 如果涉及在excel中兩個工作表之間進行配對合并&#xff0c;則&#xff1a; VLOOKUP(C1,工作表名字!A:B,2,0) 參考&#xff1a; excel表格中vlookup函數的使用方法步驟https://haokan.baidu.com/v?pdwisenatural&vid132733503560775…

單引號與雙引號在不同編程語言中的使用與支持

在編程語言中&#xff0c;單引號和雙引號是常見的符號&#xff0c;它們通常用來表示字符和字符串。然而&#xff0c;如何使用這兩種符號在不同的編程語言中有所不同&#xff0c;甚至有一些語言并不區分單引號和雙引號的用途。本文將詳細介紹不同編程語言中單引號與雙引號的支持…

怎么鑒別金媒v10.51和v10.5的區別!單單從CRM上區分!

2.怎么鑒別程序是10.5還是10.51 &#xff1f;* 作為商業用戶&#xff0c;升級完全沒有這個擔心&#xff0c;但是這次升級從全局來看清晰度不是很高&#xff0c;不像10.5的升級后臺UI都變化了&#xff01;你說有漏洞但是我沒遇到過 所以我也不知道升級了啥只能看版本數字是無法區…