基于Canal實現MySQL數據庫數據同步

一、基礎概念與原理

1. Canal是什么?

阿里巴巴開源的MySQL binlog增量訂閱與消費組件,通過偽裝為MySQL Slave監聽Master的binlog變更,實現實時數據同步。

Canal 官方網站:https://github.com/alibaba/canal

Canal Demo:?https://gitee.com/original-intention/canal-gorgor-demo


2. 工作原理

關鍵角色:

2.1? canal.deployer(服務端/Server)
  • 核心作用:偽裝成 MySQL 的從庫(Slave),監聽主庫的?binlog?變更,解析并轉發數據變更事件。

  • 關鍵功能

    • 連接 MySQL 主庫,訂閱?binlog?并解析為結構化數據(如?CanalEntry)。

    • 支持將解析后的數據通過?TCPKafkaRocketMQ?等方式投遞給下游消費者(如?canal.adapter)。

    • 管理多個同步實例(instance),每個實例對應一個獨立的數據同步通道58。

  • 配置文件

    • conf/canal.properties:全局參數(如端口、存儲模式)。

    • conf/example/instance.properties:實例級配置(如源數據庫地址、賬號、表過濾規則)。


2.2 canal.adapter(客戶端適配器)
  • 核心作用:消費?canal.deployer?解析的數據,并同步到目標數據源(如 MySQL、Elasticsearch、OceanBase 等)。

  • 關鍵功能

    • 支持多種目標源適配器(rdbes7hbase?等)。

    • 提供?全量 & 增量同步能力,通過 REST API 觸發全量同步(如?curl /etl/rdb/mysql1/user.yml)。

    • 支持多表映射、字段轉換、批量提交等配置。

  • 配置文件

    • conf/application.yml:定義數據源、消費模式(TCP/MQ)、目標適配器。

    • conf/rdb/*.yml?或?conf/es7/*.yml:表級同步規則(如源表、目標表、主鍵映射)。


2.3 canal.admin(管理平臺)
  • 核心作用:提供?Web 可視化界面,集中管理?canal.deployer?集群和實例配置。

  • 關鍵功能

    • 動態管理實例(啟動/停止/配置)。

    • 監控同步狀態和日志。

    • 支持高可用部署(依賴 ZooKeeper)。

  • 部署要求

    • 需初始化元數據庫(執行?canal_manager.sql)。

    • 通過?conf/application.yml?配置數據庫連接和權限。


3. 核心應用場景:

  • 業務解耦(如訂單狀態變更觸發消息通知)

  • 實時緩存更新(Redis)

  • 跨數據庫/機房數據同步(如MySQL→MySQL、MySQL→Elasticsearch)

  • 數據庫鏡像

  • 數據庫實時備份


二、環境準備與部署

1.?MySQL配置

  • 開啟binlog

查看配置

show VARIABLES LIKE '%log_bin%';
show VARIABLES LIKE '%binlog_format%';
show VARIABLES LIKE '%server_id%';

修改my.cnf,添加:

[mysqld]
log-bin=mysql-bin
binlog-format=ROW  # 必須為ROW模式
server_id=1        # 與Canal的slaveId不重復
  • 創建Canal賬號
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

?2.?Canal Server部署

  • 下載與解壓:canal.deployer-1.1.4.tar.gz

  • 配置實例conf/canal.properties

進入conf目錄,修改canal.properties文件,比較關鍵的是canal.destinations
?canal.destinations =?example 修改成??canal.destinations =?course
這里表示,我們需要監控與course課程相關的數據變動,而相關的數據庫、表配置會分
別放在course目錄下,如果沒有這個目錄就需要新建這個目錄。可以從example目錄拷貝一
個過來,并且將名字修改成?course
修改?conf/example/instance.properties 文件
對于instance.properties的修改比較關鍵的就是幾處,
第一 、是MySQL主服務的連接配置
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysqlbinlog.000065
canal.instance.master.position=238116155# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

canal.instance.master.journal.name和canal.instance.master.position的值,通過一下命令獲取

show master STATUS;

?第二處、是要對哪些相關的業務表進行監視,比如我們這里是course課程信息,數據放在

seckill_order庫中:
# table regex
canal.instance.filter.regex=seckill_order.course
配置完成后,進入bin目錄,執行startup.bat即可

?三、數據同步實戰

1. 引入依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.5.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.gorgor.canal</groupId><artifactId>canal-gorgor-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>canal-gorgor-demo</name><description>Demo project for Spring Boot</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><optional>true</optional></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion></exclusions></dependency></dependencies></project>

2. 配置(application.yml)

canal:server:ip: localhostport: 11111course:destination: coursebatchSize: 1000spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/shardingdb1?useSSL=false&serverTimezone=UTCusername: rootpassword: root

3. 配置?CanalConnector 連接

@Configuration
@EnableScheduling
@EnableAsync
public class CanalCourseConfig {@Value("${canal.server.ip}")private String canalServerIp;@Value("${canal.server.port}")private int canalServerPort;@Value("${canal.server.username:blank}")private String userName;@Value("${canal.server.password:blank}")private String password;@Value("${canal.course.destination}")private String destination;@Bean("secKillConnector")public CanalConnector newSingleConnector(){String userNameStr = "blank".equals(userName) ? "" : userName;String passwordStr = "blank".equals(password) ? "" : password;return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp,canalServerPort), destination, userNameStr, passwordStr);}}

4. 數據同步代碼

@Service
@Slf4j
public class SecKillData implements IProcessCanalData {private final static String COURSE_ID = "cid";private final static String COURSE_NAME = "cname";private final static String USER_ID = "user_id";private final static String COURSE_STATUS = "cstatus";@Autowired@Qualifier("secKillConnector")private CanalConnector connector;@Value("${canal.seckill.subscribe:server}")private String subscribe;@Value("${canal.course.batchSize}")private int batchSize;@Autowiredprivate JdbcTemplate jdbcTemplate;@PostConstruct@Overridepublic void connect() {connector.connect();if ("server".equals(subscribe))connector.subscribe(null);elseconnector.subscribe(subscribe);connector.rollback();}@PreDestroy@Overridepublic void disConnect() {connector.disconnect();}@Async@Scheduled(initialDelayString = "${canal.course.initialDelay:5000}", fixedDelayString = "${canal.course.fixedDelay:5000}")@Overridepublic void processData() {try {if (!connector.checkValid()) {log.warn("與Canal服務器的連接失效!!!重連,下個周期再檢查數據變更");this.connect();return; // 重連后等待下個周期處理}Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {log.info("本次[{}]沒有檢測到課程數據更新。", batchId);// 空消息也必須確認connector.ack(batchId);return;}log.info("本次[{}]課程數據共有[{}]次更新需要處理", batchId, size);for (CanalEntry.Entry entry : message.getEntries()) {// 跳過事務開始/結束事件if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());EventType eventType = rowChange.getEventType();for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {try {if (eventType == EventType.DELETE) {processDeleteEvent(rowData);} else if (eventType == EventType.INSERT) {processInsertEvent(rowData);} else if (eventType == EventType.UPDATE) {processUpdateEvent(rowData);}} catch (Exception e) {log.error("處理行數據失敗: {}", e.getMessage(), e);}}}connector.ack(batchId); // 批量確認log.info("本次[{}]處理課程Canal同步數據完成", batchId);} catch (Exception e) {log.error("處理課程Canal同步數據失敗,請檢查:", e);}}/*** 處理刪除事件*/private void processDeleteEvent(CanalEntry.RowData rowData) {// 刪除事件使用Before列獲取數據Map<String, String> beforeColumns = getColumnsMap(rowData.getBeforeColumnsList());Long cid = parseLongSafely(beforeColumns.get(COURSE_ID));if (cid != null) {jdbcTemplate.update("DELETE FROM course WHERE cid = ?", cid);log.info("刪除課程活動: cid={}", cid);} else {log.error("刪除事件中未找到有效的課程ID");}}/*** 處理插入事件*/private void processInsertEvent(CanalEntry.RowData rowData) {Map<String, String> afterColumns = getColumnsMap(rowData.getAfterColumnsList());Long cid = parseLongSafely(afterColumns.get(COURSE_ID));String cname = afterColumns.get(COURSE_NAME);Long userId = parseLongSafely(afterColumns.get(USER_ID));String cstatus = afterColumns.get(COURSE_STATUS);if (cid != null && cname != null && userId != null && cstatus != null) {jdbcTemplate.update("INSERT INTO course (cid, cname, user_id, cstatus) VALUES (?, ?, ?, ?)",cid, cname, userId, cstatus);log.info("新增課程活動: cid={}, cname={}", cid, cname);} else {log.error("插入事件中缺失必要字段: cid={}, cname={}, userId={}, cstatus={}",cid, cname, userId, cstatus);}}/*** 處理更新事件*/private void processUpdateEvent(CanalEntry.RowData rowData) {Map<String, String> afterColumns = getColumnsMap(rowData.getAfterColumnsList());Long cid = parseLongSafely(afterColumns.get(COURSE_ID));String cname = afterColumns.get(COURSE_NAME);Long userId = parseLongSafely(afterColumns.get(USER_ID));String cstatus = afterColumns.get(COURSE_STATUS);if (cid != null && cname != null && userId != null && cstatus != null) {jdbcTemplate.update("UPDATE course SET cname = ?, user_id = ?, cstatus = ? WHERE cid = ?",cname, userId, cstatus, cid);log.info("更新課程活動: cid={}, cname={}", cid, cname);} else {log.error("更新事件中缺失必要字段: cid={}, cname={}, userId={}, cstatus={}",cid, cname, userId, cstatus);}}/*** 將列列表轉換為Map (列名 -> 值)*/private Map<String, String> getColumnsMap(List<Column> columns) {return columns.stream().collect(Collectors.toMap(Column::getName,Column::getValue,(existing, replacement) -> existing));}/*** 安全轉換Long類型*/private Long parseLongSafely(String value) {try {return value != null && !value.isEmpty() ? Long.parseLong(value) : null;} catch (NumberFormatException e) {log.error("轉換Long失敗: {}", value);return null;}}
}

具體代碼在上面?Canal Demo 案例鏈接項目中。

初始化sql 在項目 resources/sql 目錄下。


四、相關開源&產品

  • canal 消費端開源項目: Otter
  • 阿里巴巴去 Oracle 數據遷移同步工具: yugong
  • 阿里巴巴離線同步開源項目 DataX
  • 阿里巴巴數據庫連接池開源項目 Druid
  • 阿里巴巴實時數據同步工具 DTS

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/914717.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/914717.shtml
英文地址,請注明出處:http://en.pswp.cn/news/914717.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

算法第23天|貪心算法:基礎理論、分發餅干、擺動序列、最大子序和

今日總結&#xff1a; 擺動序列的三種特殊情況需要著重思考&#xff0c;感覺是沒有思考清楚 基礎理論 1、貪心的本質&#xff1a; 貪心的本質是選擇每一階段的局部最優&#xff0c;從而達到全局最優。 例如&#xff1a;一堆鈔票&#xff0c;只能拿走10張&#xff0c;如何拿走最…

Q-chunking——帶有動作分塊的強化學習:基于人類演示,進行一定的連貫探索(且可做到無偏的n步價值回溯)

前言 我在之前的文章中提到過多次&#xff0c;長沙具身團隊是我司建設的第二支具身團隊&#xff0c;通過5月份的全力招聘&#xff0c;為了沖刺6月底和7月初來長沙辦公室考察的第一批客戶&#xff0c;過去一個多月來&#xff0c;長沙分部(一開始就5人&#xff0c;另外5人 實習…

NW956NW961美光固態閃存NW964NW968

美光固態閃存深度解析&#xff1a;NW956、NW961、NW964與NW968的全方位評測一、產品概述與市場定位在當今數據爆炸的時代&#xff0c;固態硬盤&#xff08;SSD&#xff09;作為存儲領域的佼佼者&#xff0c;其性能與穩定性成為了用戶關注的焦點。美光&#xff08;Micron&#x…

C++修煉:IO流

Hello大家好&#xff01;很高興我們又見面啦&#xff01;給生活添點passion&#xff0c;開始今天的編程之路&#xff01; 我的博客&#xff1a;<但凡. 我的專欄&#xff1a;《編程之路》、《數據結構與算法之美》、《C修煉之路》、《Linux修煉&#xff1a;終端之內 洞悉真理…

語音識別的速度革命:從 Whisper 到 Whisper-CTranslate2,我經歷了什么?

Whisper-CTranslate2&#xff1a;語音識別的速度革命 大家好&#xff0c;一個沉迷于 AI 語音技術的 “音頻獵人”。最近在處理大量播客轉錄項目時&#xff0c;我被傳統語音識別工具折磨得苦不堪言 ——RTX 3090 跑一個小時的音頻要整整 20 分鐘&#xff0c;服務器內存分分鐘爆滿…

JVM 內存模型詳解:GC 是如何拯救內存世界的?

JVM 內存模型詳解&#xff1a;GC 是如何拯救內存世界的&#xff1f; 引言 Java 虛擬機&#xff08;JVM&#xff09;是 Java 程序運行的基礎&#xff0c;其核心特性之一就是自動內存管理。與 C/C 不同&#xff0c;Java 開發者無需手動分配和釋放內存&#xff0c;而是由 JVM 自動…

分布式全局唯一ID生成:雪花算法 vs Redis Increment,怎么選?

在黑馬點評項目實戰中&#xff0c;關于全局唯一ID生成的實現方案選擇中&#xff0c;我看到有人提到了雪花算法&#xff0c;本文就來簡單了解一下雪花算法與Redis的incr方案的不同。在分布式系統開發中&#xff0c;“全局唯一ID”是繞不開的核心問題。無論是分庫分表的數據庫設計…

(新手友好)MySQL學習筆記(完):事務和鎖

事務和鎖事務transaction&#xff0c;一組原子性的SQL查詢&#xff0c;或者說是一個獨立的工作單元。如果能夠成功執行這組查詢的全部語句&#xff0c;就會執行這組查詢&#xff1b;如果其中任何一條語句無法成功執行&#xff0c;那么這組查詢的所有語句都不會執行。也就是說&a…

【CMake】使用 CMake 將單模塊 C 項目構建為庫并鏈接主程序

目錄1. 項目結構設計&#x1f4e6; 結構說明2. 項目文件內容2.1 頂層 CMakeLists.txt2.2 模塊 src/color/CMakeLists.txt ?【推薦寫法】?是否需要寫 project()&#xff1f;2.3 模塊頭文件 include/color.h2.4 模塊實現文件 src/color/color.c2.5 主程序 src/main.c3. 構建與運…

從零開始的云計算生活——番外4,使用 Keepalived 實現 MySQL 高可用

目錄 前言 一、架構原理? ?Keepalived 作用? ?MySQL 主從復制? 二、環境準備? 服務器要求?&#xff1a; 安裝基礎軟件? 三、配置 MySQL 主從復制 四、配置 Keepalived 主節點配置?&#xff08;/etc/keepalived/keepalived.conf&#xff09; 從節點配置 五、…

list類的常用接口實現及迭代器

目錄 1. list類的介紹 2.list類的常用接口 2.1 list類的常用構造 2.2 list類對象的容量操作 2.3 list迭代器 2.4 list類的常用操作 3.list的模擬實現 1. list類的介紹 list代表的是雙向鏈表&#xff0c;常見的有創建&#xff0c;增&#xff0c;刪&#xff0c;改幾個接口…

vscode Cline接入火山引擎的Deepseek R1

創建火山引擎Deepseek R1的API 在火山引擎管理控制臺中創建Deepseek R1推理接入點&#xff08;大模型&#xff09;&#xff0c;創建成功后會看到下圖效果。在操作中選擇API調用&#xff0c;在頁面中選擇OpenAI SDK&#xff0c;按照步驟找到baseUrl地址和API_KEY&#xff0c;后續…

新手向:自動化圖片格式轉換工具

大家好&#xff01;今天我要分享一個非常實用的Python小工具——圖片格式批量轉換器。如果你經常需要處理大量不同格式的圖片文件&#xff0c;或者需要統一圖片格式以便于管理&#xff0c;那么這個工具將會成為你的得力助手&#xff01;一、為什么需要圖片格式轉換&#xff1f;…

CUDA中的內存管理、鎖頁內存、UVA統一虛擬地址、零拷貝、統一內存

文章目錄0 前言1 swap內存跟鎖頁內存2 UVA(Unified Virtual Addressing)統一虛擬地址3 先看最普通的cuda內存分配、釋放、傳輸4 申請鎖頁內存4.1 cudaHostAllocDefault4.2 cudaHostAllocPortable4.3 cudaHostAllocWriteCombined4.3 cudaHostAllocMapped4.4 幾種鎖頁內存總結4.5…

微服務環境下的灰度發布與金絲雀發布實戰經驗分享

微服務環境下的灰度發布與金絲雀發布實戰經驗分享 在大規模微服務架構中&#xff0c;如何平滑安全地上線新功能是每個后端團隊的痛點。本文將結合生產環境中的真實案例&#xff0c;分享灰度發布&#xff08;Gray Release&#xff09;與金絲雀發布&#xff08;Canary Release&am…

MEF 在 WPF 中的簡單應用

MEF核心筆記MEF 的開發模式主要適用于插件化的業務場景中&#xff0c;C/S 和 B/S 中都有相應的使用場景&#xff0c;其中包括但不限于 ASP.NET MVC 、ASP WebForms、WPF、UWP 等開發框架。當然&#xff0c;DotNet Core 也是支持的。 以下是搜索到一些比較好的博文供參考&#…

Gitlab跑CICD的時候,maven鏡像和pom.xml使用的maven版本沖突導致沒辦法build成功的解決方法

是這樣的&#xff01;最近遇到一個非常棘手的難題&#xff0c;我搞了大概2周時間才把他弄出來&#xff0c;因為自己搭了個私服的maven倉庫&#xff0c;他不像maven官方倉庫一樣&#xff0c;可以跟nginx一樣轉的&#xff0c;所以遇到好幾個難點&#xff01;第一點&#xff1a;就…

Linux內核IPv4路由查找:LPC-Trie算法的深度實踐

在互聯網基礎設施的核心領域,路由查找性能直接決定了網絡轉發效率。Linux內核作為現代網絡系統的基石,其IPv4路由子系統采用了一種名為LPC-Trie(Level-Compressed Trie) 的創新數據結構,在net/ipv4/fib_trie.c文件中實現了高效的路由管理方案。本文將深入剖析這一機制的設…

【設計模式】裝飾(器)模式 透明裝飾模式與半透明裝飾模式

裝飾模式&#xff08;Decorator Pattern&#xff09;詳解一、裝飾模式簡介 裝飾模式&#xff08;Decorator Pattern&#xff09; 是一種 結構型設計模式&#xff0c;它允許你動態地給對象添加行為或職責&#xff0c;而無需修改其源代碼&#xff0c;也不需要使用繼承來擴展功能。…

NAT原理與實驗指南:網絡地址轉換技術解析與實踐

NAT實驗 NAT&#xff08;Network Address Translation&#xff0c;網絡地址轉換&#xff09;&#xff1a; NAT技術的介紹&#xff1a; 隨著Internet用戶的快速增長&#xff0c;以及地址分配不均等因素&#xff0c;IPv4地址&#xff08;約40億的空間地址&#xff09;已經陷入不…