數據庫操作的準備 1、開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下: vi /etc/my.cnf [mysqld] log-bin=mysql-bin # 開啟 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復重啟 MySQL [root@localhost etc]# systemctl restart mysql查看下binlog是否開啟,如果值為on 代表開啟。 SHOW VARIABLES LIKE 'log_bin';查看binlog的格式 show variables like '%binlog_format%';https://dev.mysql.com/doc/refman/8.4/en/ https://dev.mysql.com/doc/refman/8.4/en/show-master-status.html8.4中已經沒有SHOW MASTER STATUS;這條命令,需要用SHOW BINARY LOG STATUS SHOW MASTER STATUS; 是一條 MySQL 查詢命令,用于獲取當前 MySQL 服務器作為主服務器時的二進制日志狀態。這條命令對于設置從服務器的復制非常有用,因為它提供了從服務器需要的信息,以便知道從哪里開始復制數據。SHOW BINARY LOG STATUS binlog.000034 1582、授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grantCREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;MySQL的slave需要的權限說明如下: REPLICATION SLAVE權限,這是Slave服務器必須擁有的最重要的權限。它允許Slave服務器連接到Master服務器并獲取復制數據。如果Slave服務器沒有REPLICATION SLAVE權限,它將無法連接到Master服務器并執行復制操作。 REPLICATION CLIENT權限,這個權限允許Slave服務器向Master服務器發送查詢語句以獲取復制數據。如果Slave服務器沒有REPLICATION CLIENT權限,它將無法向Master服務器發送查詢語句,從而無法獲取復制數據。 SELECT權限,Slave服務器需要SELECT權限來讀取Master服務器上的數據。如果Slave服務器沒有SELECT權限,它將無法獲取Master服務器上的數據,從而無法保持與Master服務器的同步。查看權限 SHOW grants for 'canal'顯示:GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO `canal`@`%`1、下載用1.1.7這個版本比較穩定 1.1 https://github.com/alibaba/canal/releases/tag/canal-1.1.71.2 解壓縮和修改配置 mkdir canal mkdir canal-admin tar -zxvf canal.deployer-1.1.7.tar.gz -C canal/ tar -zxvf canal.admin-1.1.7.tar.gz -C canal-admin/修改conf下的canal.properties,主要改綁定的IP,注意:最好改成IP,別用127.0.0.1,因為要是程序和代碼沒在一臺機器的話連不上。 [root@localhost conf]# vi canal.properties # tcp bind ip canal.ip = 192.168.150.50修改/conf/example下的 instance.propertiescanal.instance.master.address=192.168.3.70:3306 # username/password這個是要同步的數據庫的賬號和密碼這個里用的canal,前提是得提前在數據庫里創建canal的賬號和授權。 canal.instance.dbUsername=canal canal.instance.dbPassword=canal1.3啟動 /usr/local/software/canal/conf /usr/local/software/canal/bin 看日志 /usr/local/software/canal/logs/canal/usr/local/software/canal/bin ./startup.sh ./stop.sh
使用注意事項:1)修改canal.admin-1.1.7\conf和/canal/conf下的application.yml 啟動的驅動driver-class-name: com.mysql.cj.jdbc.Driver2) java21去掉admin和canal bin目錄下startup.sh里的java8的參數。刪除掉:-XX:+AggressiveOpts -XX:-UseBiasedLocking 3)啟動提示druid找不到 將druid的jar包放在lib目錄就可以了。druid-1.2.22.jar測試通過 https://repo1.maven.org/maven2/com/alibaba/druid/1.2.22/ 2、加入依賴,Canal TCP模式需要的依賴 <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.7</version> </dependency> <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.7</version> </dependency>3、canal 1.1.7 只能同步mysql8.0下的版本,mysql8.4版本的收不到消息。版本的觀念要建立起來,或者我哪弄的不對。4、RocketMQ 監聽 canal的變化。 1)啟動 RocketMQ 2)修改canal.properties # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ canal.serverMode = rocketMQ 3)寫消費的監聽者 參考RocketMQTool類import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.demo.demo.entity.CanalBinlog; import org.apache.rocketmq.shaded.com.google.gson.Gson; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;/*** canal.properties 設置生產者組rocketmq.producer.group = test 用的默認組* example/instance.properties 修改主題 canal.mq.topic=rocketmq_toppic_01*/ @Component @RocketMQMessageListener(topic = "example", consumerGroup = "test", selectorExpression="*") public class RocketMQTool implements RocketMQListener<String> {private Gson gson = new Gson();@Overridepublic void onMessage(String mqJson){CanalBinlog canalBinlog = gson.fromJson(mqJson, CanalBinlog.class);System.out.println(mqJson);System.out.println(canalBinlog);CanalBinlog canalBinlog2 = JSON.parseObject(mqJson, CanalBinlog.class);System.out.println("fastJson: "+JSON.toJSONString(canalBinlog2));} }
import java.util.List; import java.util.Map;@Data public class CanalBinlog {// 數據private List<Map<String,Object>> data;// 數據庫名稱private String database;private long es;// 遞增,從1開始private int id;// 是否是DDL語句private boolean isDdl;// 表結構的字段類型private Map<String,Object> mysqlType;private List<Map<String,Object>> old;// 主鍵名稱private List<String> pkNames;// sql語句private String sql;private Map<String,Object> sqlType;// 表名private String table;private long ts;// INSERT、UPDATE、DELETE、ERASE(刪除表)private String type; }