一、基礎概念與原理
1. Canal是什么?
阿里巴巴開源的MySQL binlog增量訂閱與消費組件,通過偽裝為MySQL Slave監聽Master的binlog變更,實現實時數據同步。
Canal 官方網站:https://github.com/alibaba/canal
Canal Demo:?https://gitee.com/original-intention/canal-gorgor-demo
2. 工作原理
關鍵角色:
2.1? canal.deployer(服務端/Server)
-
核心作用:偽裝成 MySQL 的從庫(Slave),監聽主庫的?
binlog
?變更,解析并轉發數據變更事件。 -
關鍵功能:
-
連接 MySQL 主庫,訂閱?
binlog
?并解析為結構化數據(如?CanalEntry
)。 -
支持將解析后的數據通過?TCP、Kafka、RocketMQ?等方式投遞給下游消費者(如?
canal.adapter
)。 -
管理多個同步實例(
instance
),每個實例對應一個獨立的數據同步通道58。
-
-
配置文件:
-
conf/canal.properties
:全局參數(如端口、存儲模式)。 -
conf/example/instance.properties
:實例級配置(如源數據庫地址、賬號、表過濾規則)。
-
2.2 canal.adapter(客戶端適配器)
-
核心作用:消費?
canal.deployer
?解析的數據,并同步到目標數據源(如 MySQL、Elasticsearch、OceanBase 等)。 -
關鍵功能:
-
支持多種目標源適配器(
rdb
、es7
、hbase
?等)。 -
提供?全量 & 增量同步能力,通過 REST API 觸發全量同步(如?
curl /etl/rdb/mysql1/user.yml
)。 -
支持多表映射、字段轉換、批量提交等配置。
-
-
配置文件:
-
conf/application.yml
:定義數據源、消費模式(TCP/MQ)、目標適配器。 -
conf/rdb/*.yml
?或?conf/es7/*.yml
:表級同步規則(如源表、目標表、主鍵映射)。
-
2.3 canal.admin(管理平臺)
-
核心作用:提供?Web 可視化界面,集中管理?
canal.deployer
?集群和實例配置。 -
關鍵功能:
-
動態管理實例(啟動/停止/配置)。
-
監控同步狀態和日志。
-
支持高可用部署(依賴 ZooKeeper)。
-
-
部署要求:
-
需初始化元數據庫(執行?
canal_manager.sql
)。 -
通過?
conf/application.yml
?配置數據庫連接和權限。
-
3. 核心應用場景:
-
業務解耦(如訂單狀態變更觸發消息通知)
-
實時緩存更新(Redis)
-
跨數據庫/機房數據同步(如MySQL→MySQL、MySQL→Elasticsearch)
-
數據庫鏡像
-
數據庫實時備份
二、環境準備與部署
1.?MySQL配置
- 開啟binlog:
查看配置
show VARIABLES LIKE '%log_bin%';
show VARIABLES LIKE '%binlog_format%';
show VARIABLES LIKE '%server_id%';
修改my.cnf
,添加:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW # 必須為ROW模式
server_id=1 # 與Canal的slaveId不重復
- 創建Canal賬號:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
?2.?Canal Server部署
- 下載與解壓:canal.deployer-1.1.4.tar.gz
- 配置實例(
conf/canal.properties
)



# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysqlbinlog.000065
canal.instance.master.position=238116155# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.master.journal.name和canal.instance.master.position的值,通過一下命令獲取
show master STATUS;
?第二處、是要對哪些相關的業務表進行監視,比如我們這里是course課程信息,數據放在
# table regex
canal.instance.filter.regex=seckill_order.course

?三、數據同步實戰
1. 引入依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.5.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.gorgor.canal</groupId><artifactId>canal-gorgor-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>canal-gorgor-demo</name><description>Demo project for Spring Boot</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><optional>true</optional></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion></exclusions></dependency></dependencies></project>
2. 配置(application.yml)
canal:server:ip: localhostport: 11111course:destination: coursebatchSize: 1000spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/shardingdb1?useSSL=false&serverTimezone=UTCusername: rootpassword: root
3. 配置?CanalConnector 連接
@Configuration
@EnableScheduling
@EnableAsync
public class CanalCourseConfig {@Value("${canal.server.ip}")private String canalServerIp;@Value("${canal.server.port}")private int canalServerPort;@Value("${canal.server.username:blank}")private String userName;@Value("${canal.server.password:blank}")private String password;@Value("${canal.course.destination}")private String destination;@Bean("secKillConnector")public CanalConnector newSingleConnector(){String userNameStr = "blank".equals(userName) ? "" : userName;String passwordStr = "blank".equals(password) ? "" : password;return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp,canalServerPort), destination, userNameStr, passwordStr);}}
4. 數據同步代碼
@Service
@Slf4j
public class SecKillData implements IProcessCanalData {private final static String COURSE_ID = "cid";private final static String COURSE_NAME = "cname";private final static String USER_ID = "user_id";private final static String COURSE_STATUS = "cstatus";@Autowired@Qualifier("secKillConnector")private CanalConnector connector;@Value("${canal.seckill.subscribe:server}")private String subscribe;@Value("${canal.course.batchSize}")private int batchSize;@Autowiredprivate JdbcTemplate jdbcTemplate;@PostConstruct@Overridepublic void connect() {connector.connect();if ("server".equals(subscribe))connector.subscribe(null);elseconnector.subscribe(subscribe);connector.rollback();}@PreDestroy@Overridepublic void disConnect() {connector.disconnect();}@Async@Scheduled(initialDelayString = "${canal.course.initialDelay:5000}", fixedDelayString = "${canal.course.fixedDelay:5000}")@Overridepublic void processData() {try {if (!connector.checkValid()) {log.warn("與Canal服務器的連接失效!!!重連,下個周期再檢查數據變更");this.connect();return; // 重連后等待下個周期處理}Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {log.info("本次[{}]沒有檢測到課程數據更新。", batchId);// 空消息也必須確認connector.ack(batchId);return;}log.info("本次[{}]課程數據共有[{}]次更新需要處理", batchId, size);for (CanalEntry.Entry entry : message.getEntries()) {// 跳過事務開始/結束事件if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());EventType eventType = rowChange.getEventType();for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {try {if (eventType == EventType.DELETE) {processDeleteEvent(rowData);} else if (eventType == EventType.INSERT) {processInsertEvent(rowData);} else if (eventType == EventType.UPDATE) {processUpdateEvent(rowData);}} catch (Exception e) {log.error("處理行數據失敗: {}", e.getMessage(), e);}}}connector.ack(batchId); // 批量確認log.info("本次[{}]處理課程Canal同步數據完成", batchId);} catch (Exception e) {log.error("處理課程Canal同步數據失敗,請檢查:", e);}}/*** 處理刪除事件*/private void processDeleteEvent(CanalEntry.RowData rowData) {// 刪除事件使用Before列獲取數據Map<String, String> beforeColumns = getColumnsMap(rowData.getBeforeColumnsList());Long cid = parseLongSafely(beforeColumns.get(COURSE_ID));if (cid != null) {jdbcTemplate.update("DELETE FROM course WHERE cid = ?", cid);log.info("刪除課程活動: cid={}", cid);} else {log.error("刪除事件中未找到有效的課程ID");}}/*** 處理插入事件*/private void processInsertEvent(CanalEntry.RowData rowData) {Map<String, String> afterColumns = getColumnsMap(rowData.getAfterColumnsList());Long cid = parseLongSafely(afterColumns.get(COURSE_ID));String cname = afterColumns.get(COURSE_NAME);Long userId = parseLongSafely(afterColumns.get(USER_ID));String cstatus = afterColumns.get(COURSE_STATUS);if (cid != null && cname != null && userId != null && cstatus != null) {jdbcTemplate.update("INSERT INTO course (cid, cname, user_id, cstatus) VALUES (?, ?, ?, ?)",cid, cname, userId, cstatus);log.info("新增課程活動: cid={}, cname={}", cid, cname);} else {log.error("插入事件中缺失必要字段: cid={}, cname={}, userId={}, cstatus={}",cid, cname, userId, cstatus);}}/*** 處理更新事件*/private void processUpdateEvent(CanalEntry.RowData rowData) {Map<String, String> afterColumns = getColumnsMap(rowData.getAfterColumnsList());Long cid = parseLongSafely(afterColumns.get(COURSE_ID));String cname = afterColumns.get(COURSE_NAME);Long userId = parseLongSafely(afterColumns.get(USER_ID));String cstatus = afterColumns.get(COURSE_STATUS);if (cid != null && cname != null && userId != null && cstatus != null) {jdbcTemplate.update("UPDATE course SET cname = ?, user_id = ?, cstatus = ? WHERE cid = ?",cname, userId, cstatus, cid);log.info("更新課程活動: cid={}, cname={}", cid, cname);} else {log.error("更新事件中缺失必要字段: cid={}, cname={}, userId={}, cstatus={}",cid, cname, userId, cstatus);}}/*** 將列列表轉換為Map (列名 -> 值)*/private Map<String, String> getColumnsMap(List<Column> columns) {return columns.stream().collect(Collectors.toMap(Column::getName,Column::getValue,(existing, replacement) -> existing));}/*** 安全轉換Long類型*/private Long parseLongSafely(String value) {try {return value != null && !value.isEmpty() ? Long.parseLong(value) : null;} catch (NumberFormatException e) {log.error("轉換Long失敗: {}", value);return null;}}
}
具體代碼在上面?Canal Demo 案例鏈接項目中。
初始化sql 在項目 resources/sql 目錄下。
四、相關開源&產品
- canal 消費端開源項目: Otter
- 阿里巴巴去 Oracle 數據遷移同步工具: yugong
- 阿里巴巴離線同步開源項目 DataX
- 阿里巴巴數據庫連接池開源項目 Druid
- 阿里巴巴實時數據同步工具 DTS