1 概述
AnalyticDB和taurusdb都是高度兼容mysql協議的數據庫,從現有的AnalyticDB官方數據同步方案來看,只有FlinkSQL合適。
同步方案官方文檔:
https://help.aliyun.com/zh/analyticdb/analyticdb-for-mysql/user-guide/flink-subscribes-binlog
2 數據同步方案
2.1 架構
2.2 限制條件
AnalyticDB for MySQL產品系列為企業版、基礎版、湖倉版和數倉版彈性模式。
AnalyticDB for MySQL集群的內核版本需為3.2.1.0及以上版本。
Flink實時計算引擎需為VVR 8.0.4及以上版本。
XUANWU_V2表不支持開啟Binlog,因此不能通過訂閱Binlog實現AnalyticDB for MySQL集群XUANWU_V2表的數據同步和流式計算。
Flink僅支持處理AnalyticDB for MySQL Binlog中的所有基礎數據類型和復雜數據類型JSON。
Flink不會處理AnalyticDB for MySQL Binlog中的DDL操作記錄和分區表自動分區刪除的操作記錄。
2.3 操作步驟
2.3.1 步驟1:在AnalyticDB中初始化一張示例源表
CREATE DATABASE demo;
use demo;
CREATE TABLE users (id INT AUTO_INCREMENT PRIMARY KEY,username VARCHAR(255) NOT NULL,email VARCHAR(255) NOT NULL
);
INSERT INTO users (username, email) VALUES ('user1', 'user1@example.com');
INSERT INTO users (username, email) VALUES ('user2', 'user2@example.com');
INSERT INTO users (username, email) VALUES ('user3', 'user3@example.com');
2.3.2 步驟2:為AnalyticDB的示例源表開啟binlog
binlog開啟僅支持表級別,需要為每一張表開啟binlog。例如,為users表開啟binglog,命令如下:
ALTER TABLE users BINLOG=true;
ALTER TABLE users binlog_ttl='3d';
2.3.3 步驟3:在目標數據庫mysql創建表結構
CREATE DATABASE demo;
use demo;
CREATE TABLE users (id INT AUTO_INCREMENT PRIMARY KEY,username VARCHAR(255) NOT NULL,email VARCHAR(255) NOT NULL
);
2.3.4 步驟4:在阿里云實時計算平臺新建自定義連接器
參考官方文檔,執行如下步驟:
1)下載連接器。
2)登錄實時計算控制臺。在Flink全托管頁簽,單擊目標工作空間操作列下的控制臺。
3)在左側導航欄,單擊數據連接。
4)在數據連接頁面,單擊創建自定義連接器。上傳步驟1下載的連接器。上傳完成后,單擊下一步,單擊完成。創建完成的自定義連接器會出現在連接器列表中。
2.3.5 步驟5:在阿里云實時計算平臺創建ETL任務
點擊ETL左側菜單,新建流作業,FlinkSQL如下:
CREATE TEMPORARY TABLE adb_source_users (id INT,username VARCHAR(255) NOT NULL,email VARCHAR(255) NOT NULL,PRIMARY KEY (`id`) NOT ENFORCED
) WITH ('connector' = 'adb-mysql-cdc','hostname' = '你的AnalyticDB','username' = 'dba','password' = '你的密碼','database-name' = 'demo','table-name' = 'users'
);CREATE TEMPORARY TABLE adb_sink_users (id INT,username VARCHAR(255) NOT NULL,email VARCHAR(255) NOT NULL,PRIMARY KEY (`id`) NOT ENFORCED
) WITH ('connector' = 'mysql','hostname' = '你的mysql','port' = '3306','username' = 'root','password' = '你的密碼','database-name' = 'demo','table-name' = 'users'
);INSERT INTO adb_sink_users
SELECT * FROM adb_source_users;
對ETL作業進行部署:
在目標數據庫查詢數據,發現源表的存量數據已自動同步:
2.3.6 步驟6:測試增量數據同步
在源表插入一條新數據,SQL如下:
INSERT INTO users (id, username, email) VALUES (4, 'user4', 'user4@example.com');
在目標數據庫查詢數據,發現源表的增量數據已自動同步: