目錄
一、前置基礎知識
1、主從復制(Replication)
2、數據恢復
3、數據庫熱備
4、讀寫分離
5、存儲位置及命名
二、Maxwell簡介
1、簡介
2、Maxwell同步數據特點
2.1.歷史記錄同步
2.2.斷點續傳
三、前期準備
1、查看網卡:
2、配置靜態IP
3、設置主機名
4、配置IP與主機名映射
5、關閉防火墻
6、配置免密登錄
四、JDK的安裝
五、Maxwell的安裝與部署
1、Maxwell的下載安裝?
2、目錄及腳本說明
3、配置Mysql文件
4、創建Maxwell所需數據庫和用戶
4.1.創建元數據庫??
4.2.設置安全級別
4.3.創建Maxwell的數據庫用戶并賦權限
5、配置Maxwell
六、Maxwell的使用?
1、啟動Kafka集群
2、創建數據庫與表?
3、Maxwell啟停腳本
4、啟動Maxwell
5、啟動Kafka消費者
6、增量數據同步
6.1 模擬生成數據
6.2 觀察Kafka消費者
一、前置基礎知識
? ? ? ?MySQL 的二進制日志(Binary Log,簡稱 Binlog)是 MySQL 服務端非常重要的一種日志文件,用于記錄數據庫中所有數據變更操作 (如 INSERT、UPDATE、DELETE、DDL 等),但不包括 SELECT 和 SHOW 這類不會修改數據的操作。Binlog 在 MySQL 數據庫的高可用、災備恢復和讀寫分離等場景中起著至關重要的作用。
???????Binlog 的主要作用包括:主從復制(Replication)、數據恢復、數據庫熱備、讀寫分離
1、主從復制(Replication)
? ? ? ? Binlog 是實現 MySQL 主從復制的基礎。主庫將所有的數據變更記錄到 Binlog 中,從庫通過讀取主庫的 Binlog 并在本地重放這些操作,從而實現與主庫的數據同步。具體流程如下:
- Master 主庫將數據變更寫入 二進制日志(Binary Log)文件 。
- Slave 從庫通過 I/O 線程向 Master 發送 Dump 協議請求 ,讀取 Master 的 Binary Log 中的事件(Events),并將其復制到本地的 中繼日志(Relay Log) 中。
- Slave 從庫的 SQL 線程讀取 中繼日志 Relay Log 中的事件(Events),并在本地執行這些事件,從而實現數據的同步更新。
2、數據恢復
? ? ? ?當發生誤刪數據或系統故障時,可以通過解析 Binlog 文件,回放特定時間段內的數據變更操作,從而實現數據的精準恢復。
3、數據庫熱備
? ? ? ?在主庫出現故障的情況下,可以快速切換到一個保持同步的從庫,保證數據庫服務的連續性,提高系統的高可用性。
4、讀寫分離
??????主數據庫只負責業務數據的寫入操作,而多個從數據庫只負責業務數據的查詢工作,在讀多寫少場景下,可以提高數據庫工作效率。
5、存儲位置及命名
??????默認情況下,Binlog 文件會被保存在 MySQL 的數據目錄中,通常為 /usr/local/mysql/data
或 /var/lib/mysql/
,具體路徑可以在 MySQL 配置文件 /etc/my.cnf
中配置。如圖所示:
? ? ? ?Binlog 文件是以二進制形式存在的,每當一個 Binlog 文件達到指定大小(由 max_binlog_size 控制,默認為 1GB)時,MySQL 會自動創建一個新的 Binlog 文件,并按順序遞增編號。
? ? 除了實際的 Binlog 文件外,還有一個名為 mysql-bin.index 的索引文件,它記錄了當前所有的 Binlog 文件列表,方便 MySQL 管理和讀取這些文件。如圖所示:
? ? ? ?如果希望自定義 Binlog 的前綴名稱(即默認的 binlog 改為其他名稱),需要編輯 MySQL 的配置文件 /etc/my.cnf,添加或修改如下配置項:
vi /etc/my.cnf
[mysqld]
# 設置 Binlog 文件的前綴名,比如 mysql-bin,則生成的文件為 mysql-bin.000001 等
log-bin=mysql-bin
????保存后重啟 MySQL 服務,新的配置才會生效。查看效果。
service mysql restart;
ll /usr/local/mysql/data
二、Maxwell簡介
1、簡介
? ? ? ?Maxwell 是由美國Zendesk公司開源,用Java編寫的MySQL變更數據抓取軟件。它會實時監控MySQL數據庫的數據變更操作(包括insert、update、delete),并將變更數據以 JSON 格式發送給 Kafka、Kinesi等流數據處理平臺。官網地址:http://maxwells-daemon.io/
? ? ? ?Maxwell的工作原理是實時讀取MySQL數據庫的二進制日志(Binlog),從中獲取變更數據,再將變更數據以JSON格式發送至Kafka等流處理平臺。
? ? ? ?Maxwell的工作原理就是將自己偽裝成slave,并遵循MySQL主從復制的協議,從master同步數據。
2、Maxwell同步數據特點
2.1.歷史記錄同步
? ? ? ?例如開始未開啟 binlog ,在表中增加100條數據。然后開啟binlog,但binlog中未記錄開啟前插入的 100條數據記錄。但在maxwell中有個腳本,到指定的數據庫的表中進行全表掃描,然后把歷史數據進行同步。
2.2.斷點續傳
? ? ? ?就是在服務掛了重啟后是否會從上次斷開的位置繼續。Maxwell會記錄斷點前的偏量,這個偏量記錄在存儲元數據的數據庫文件中,因此在使用Maxwell前要創建 存儲元數據的數據庫。
三、前期準備
1、查看網卡:
2、配置靜態IP
vi /etc/sysconfig/network-scripts/ifcfg-ens32? ----? 根據自己網卡設置。
3、設置主機名
hostnamectl --static set-hostname ?主機名
例如:
hostnamectl --static set-hostname ?hadoop001
4、配置IP與主機名映射
vi /etc/hosts
5、關閉防火墻
systemctl stop firewalld
systemctl disable?firewalld
6、配置免密登錄
傳送門
四、JDK的安裝
傳送門
五、Maxwell的安裝與部署
1、Maxwell的下載安裝?
1.1. 下載
?https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
??注:Maxwell-1.30.0及以上版本不再支持JDK1.8。?
1.2 上傳
使用xshell上傳到指定安裝路徑此處是安裝路徑是 /opt/module
??
1.3 解壓重命名
tar -zxvf maxwell-1.29.2.tar.gz
mv maxwell-1.29.2/ maxwell
??
2、目錄及腳本說明
- maxwell腳本是同步binlog日志,同步后直接生成json格式。
- maxwell-bootstrap腳本是從表中同步歷史記錄,但同步完并不直接生成json格式,需要交給maxwell本身再進行處理。
3、配置Mysql文件
vi /etc/my.cnf
#數據庫id
server-id = 1
#啟動binlog,該參數的值會作為binlog的文件名
log-bin=mysql-bin
#binlog類型,Maxwell要求Binlog采用Row-based模式。
binlog_format=row
#配置Maxwell監控的?Mysql數據庫管理系統中的數據庫名稱#如果不配置則會監控?Mysql數據庫管理系統下的所有數據庫
#maxwell監控多個數據庫,則增加即可
binlog-do-db=hwadee01#binlog-do-db=hwadee02
#binlog-do-db=hwadee03
參數說明:
server-id = 1? ? ? ? ? ? ? ? ?#將自己偽裝成Mysql主從的slave
log-bin=mysql-bin? ? ? ? #修改Mysql的日志名稱
binlog_format=row? ? ? ?#binlog類型,maxwell要求為row類型
binlog-do-db=hwadee??#啟用binlog的數據庫,需要進行創建數據庫主從復制三種方式:
Statement-based:基于語句,主機的執行的所有寫操作的SQL語句(包括insert、update、delete)等全部記錄到Binlog日志中,在從機slave執行相同語句。
Row-based:基于行,主機上操作影響的行記錄(每次寫操作后被操作行記錄的變化) 寫到Binlog日志,在從機將影響的行進行同步
mixed:混合模式,默認是Statement-based,如果SQL語句可能導致數據不一致,就自動切換到Row-based。
優缺點:
Statement-based:
優點:節省空間
缺點:有可能造成數據不一致,例如insert語句中包含now()函數。
Row-based:
優點:保持數據的絕對一致性。
缺點:占用較大空間。
4、創建Maxwell所需數據庫和用戶
4.1.創建元數據庫??
?在Mysql數據庫管理系統中創建一個 maxwell 庫用于存儲 Maxwell 的元數據。名字不要修改。
CREATE DATABASE maxwell;
4.2.設置安全級別
注意:此設置是 Mysql數據庫 8.0 以下版本才需要設置
set global validate_password.length=4;
set global validate_password.policy=0;
4.3.創建Maxwell的數據庫用戶并賦權限
? ? ?創建一個用戶,專門用來進行數據同步
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'root';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';FLUSH PRIVILEGES;
5、配置Maxwell
? ? ?主要配置:從哪里拿數據,誰拿,數據送到哪里
?????數據發送給 Kafka的主題,則需要配置Kafka的主題所在服務器地址
?????從哪臺機器上讀數據,則需配置主機IP或主機名和maxwell的數據庫
?????誰來負責同步數據,則需配置用于同步數據的用戶的用戶名和密碼
cd /opt/module/maxwell
cp config.properties.example config.propertiesvi config.properties
#數據送到哪里?有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
#此處選Kafka為案例,配置Kafka的主題所在服務器地址及主體
#Kafka topic,可靜態配置如:topic_db,可動態配置如:%{database}_%{table}
producer=kafka
kafka.bootstrap.servers=hadoop001:9092,hadoop002:9092,hadoop003:9092
kafka_topic=topic_db# 元數據數據庫相關配置
#?從哪臺機器上讀數據--從hadoop001上讀取數據
host=hadoop001#?誰來負責同步數據?用于同步數據的用戶(用戶名和密碼)
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true#用于表示唯一,后續同步歷史記錄用,不加則默認
client_id=maxwell_1
# 過濾
# 過濾掉不需要的表,如hwadee01數據庫中z_log表是日志數據備份,無須采集
filter=exclude:hwadee01.z_log
# 根據當前變化數據的主鍵進行hash,再用hash值進行決定到哪個分區。# 指定數據按照主鍵分組進入Kafka不同分區,避免數據傾斜
producer_partition_by=primary_key
六、Maxwell的使用?
? ? ? ?我們使用maxwell監控Mysql管理的數據庫中的數據變化記錄,?將變化的記錄發送到Kafka對應的主題topic_db上。
1、啟動Kafka集群
? ? ?若Maxwell發送數據的目的地為Kafka集群,則需要先確保Kafka集群為啟動狀態。并創建kafka的主?topic_db (3個分區,每個分區將來對應一個消費者),具體見Kafka的安裝與使用。
Kafka安裝與啟動用傳送門
啟動zookeeper集群:/usr/bin/zkall.sh start
啟動kafka集群? ? ? ? :/usr/bin/kfall.sh start
#創建Kafka主題與分區
bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --create?--partitions 3 --replication-factor 3 --topic topic_db
2、創建數據庫與表?
#創建數據庫
CREATE DATABASE hwadee01;
# 創建表
DROP TABLE IF EXISTS `activity_info`; CREATE TABLE `activity_info` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '活動id',`activity_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '活動名稱',`activity_type` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '活動類型(1:滿減,2:折扣)',`activity_desc` varchar(2000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '活動描述',`start_time` datetime NULL DEFAULT NULL COMMENT '開始時間',`end_time` datetime NULL DEFAULT NULL COMMENT '結束時間',`create_time` datetime NULL DEFAULT NULL COMMENT '創建時間',`operate_time` datetime NULL DEFAULT NULL COMMENT '修改時間',PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '活動表' ROW_FORMAT = DYNAMIC;/*用于歷史數據全量同步使用 */ INSERT INTO `activity_info` VALUES (1, '小米手機專場', '3101', '小米手機滿減2', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL); INSERT INTO `activity_info` VALUES (2, 'CAREMiLLE口紅滿兩個8折', '3102', 'CAREMiLLE口紅滿兩個8折', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL);
3、Maxwell啟停腳本
啟動:/opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon
停止:ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
腳本:
touch?/usr/bin/mxw.sh
chmod 777 /usr/bin/mxw.sh
vi /usr/bin/mxw.sh
#!/bin/bashMAXWELL_HOME=/opt/module/maxwellstatus_maxwell(){result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`return $result }start_maxwell(){status_maxwellif [[ $? -lt 1 ]]; thenecho "啟動Maxwell"$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemonelseecho "Maxwell正在運行"fi }stop_maxwell(){status_maxwellif [[ $? -gt 0 ]]; thenecho "停止Maxwell"ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9elseecho "Maxwell未在運行"fi }case $1 instart )start_maxwell;;stop )stop_maxwell;;restart )stop_maxwellstart_maxwell;; esac
4、啟動Maxwell
啟動 Maxwell并查看進程與maxwell數據庫
啟動:/usr/bin/mxw.sh start? ? ? ? ? ? ? ??
jps
停止:/usr/bin/mxw.sh stop
5、啟動Kafka消費者
cd?/opt/module/kafka
bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic topic_db?
6、增量數據同步
6.1 模擬生成數據
INSERT INTO `activity_info` VALUES (3, '小米手機專場', '3101', '小米手機滿減2', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL); INSERT INTO `activity_info` VALUES (4, 'CAREMiLLE口紅滿兩個8折', '3102', 'CAREMiLLE口紅滿兩個8折', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL); INSERT INTO `activity_info` VALUES (5, '聯想活動專場滿減', '3101', '聯想活動專場滿減', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL); INSERT INTO `activity_info` VALUES (6, 'TCL全場9折', '3103', 'TCL全場9折', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL);
6.2 觀察Kafka消費者
7、歷史數據全量同步
? ? ? ?我們已經實現了使用Maxwell實時增量同步MySQL變更數據的功能。但有時可能需要使用到MySQL數據庫中從歷史至今的一個完整的數據集。這就需要我們在進行增量同步之前,先進行一次歷史數據的全量同步。這樣就能保證得到一個完整的數據集。
1、maxwell-bootstrap
? ? ? 此時就需要使用 Maxwell提供的 bootstrap 功能來進行歷史數據的全量同步,使用maxwell-bootstrap,命令如下:
???????/opt/module/maxwell/bin/maxwell-bootstrap --database hwadee01?--table activity_info --config /opt/module/maxwell/config.properties
#注意:maxwell-bootstrap 只是把表的數據查出來,但自己本身沒有將數據發送到kafka的能力,因此需要通過maxwell本身來發送,因此需要根據maxwell的client_id找到maxwell,而client_id在config.properties進行了配置。
腳本:
touch?/usr/bin/mysql_to_kafka_init.sh
chmod 777 /usr/bin/mysql_to_kafka_init.sh
vi /usr/bin/mysql_to_kafka_init.sh
#!/bin/bash#該腳本的作用是初始化所有的業務數據,只需執行一次 MAXWELL_HOME=/opt/module/maxwellimport_data(){$MAXWELL_HOME/bin/maxwell-bootstrap --database hwadee01 --table $1 --config $MAXWELL_HOME/config.properties }case $1 in"activity_info" )import_data activity_info;;"sku_info" )import_data sku_info;;"all" )import_data activity_infoimport_data sku_info;; esac
歷史數據進行全量同步:
指定要全量同步的表
/usr/bin/mysql_to_kafka_init.sh?activity_info
2、執行后結果格式
cd?/opt/module/kafka
bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic topic_db?
注意事項:
(1)第一條type為bootstrap-start和最后一條type為bootstrap-complete的數據,是bootstrap開始和結束的標志,不包含數據,中間的type為bootstrap-insert的數據才包含數據。
(2)一次bootstrap輸出的所有記錄的ts都相同,為bootstrap開始的時間。