文章目錄
- 一、業務中常見的需要數據同步的場景
- CDC是什么
- FlinkCDC是什么
- CDC原理
- 為什么是FlinkCDC
- 業務場景
- flink cdc對應flink的版本
- 二、模擬案例
- 1.阿里云flink sql
- 2.開源flink sql(單機模式)
- flink 安裝
- 安裝mysql
- 3.flink datastream
- 三、總結
提示:以下是本篇文章正文內容,下面案例可供參考
一、業務中常見的需要數據同步的場景
1、多個庫的表合并到一張表。不同的業務線或者微服務在不同的數據庫里開發,但是此時有些報表需要將多個庫的類似的數據合并后做查詢統計。或者,某些歷史原因,類似剛開始的商業模式不清晰,導致一些業務線分分合合。或者某些邊緣業務逐步融合到了主業務。早起的數據是分開的,業務運營也是分開,后來又合并成了一個大塊業務。
2、某個數據需要寫到多個存儲中。業務數據需要寫入到多個中間件或者存儲中,比如業務的數據存儲再Mysql的數據中,后來為了方便檢索需要寫入到ES,或者為了緩存需要寫入到Redis,或者是Mysql分表的數據合并寫入到Doris中。
3、數據倉庫的場景。比如將表里的數據實時寫入到DWS數據倉庫的寬表中。
4、應急場景。如果不采專用CDC的方案,那么要達到實時查詢的效果,只能在BFF層的代碼調用多個中心層的查詢API,然后再BFF層做各種聚合,運算。這種方式開發效率低下,萬一有的中心層沒有提供合適的查詢API,臨時開發的話,會讓開發進度不可控。
總之,不管是數據多寫、還是多表合并、還是建立數據倉庫,都屬于數據同步任務。
示例:pandas 是基于NumPy 的一種工具,該工具是為了解決數據分析任務而創建的。
CDC是什么
CDC 是變更數據捕獲(Change Data Capture)技術的縮寫,它可以將源數據庫(Source)的增量變動記錄,同步到一個或多個數據目的(Sink)。在同步過程中,還可以對數據進行一定的處理,例如過濾、關聯、分組、統計等。
目前專業做數據庫事件接受和解析的中間件是Debezium,如果是捕獲Mysql,還有Canal。
FlinkCDC是什么
官網地址:官網FlinkCDC
官方定義:This project provides a set of source connectors for Apache Flink? directly ingesting changes coming from different databases using Change Data Capture(CDC)。根據FlinkCDC官方給出的定義,FlinkCDC提供一組源數據的連接器,使用變更數據捕獲的方式,直接吸收來自不同數據庫的變更數據。
CDC原理
CDC的原理是,當數據源表發生變動時,會通過附加在表上的觸發器或者 binlog 等途徑,將操作記錄下來。下游可以通過數據庫底層的協議,訂閱并消費這些事件,然后對數據庫變動記錄做重放,從而實現同步。這種方式的優點是實時性高,可以精確捕捉上游的各種變動。
為什么是FlinkCDC
1、FlinkCDC 提供了對 Debezium 連接器的封裝和集成,簡化了配置和使用的過程,并提供了更高級的 API 和功能,例如數據格式轉換、事件時間處理等。Flink CDC 使用 Debezium 連接器作為底層的實現,將其與 Flink 的數據處理能力結合起來。通過配置和使用 Flink CDC,您可以輕松地將數據庫中的變化數據流轉化為 Flink 的 DataStream 或 Table,并進行實時的數據處理、轉換和分析。
2、Flink的DataStream和SQL比較成熟和易用
3、Flink支持狀態后端(State Backends),允許存儲海量的數據狀態
4、Flink有更好的生態,更多的Source和Sink的支持
業務場景
- 數據合并流向:
- 數據多寫流向:
- 單數據源寫單表流向:
- 數據鏈路對比
通過下圖,我們可以看到Canal處理數據的鏈路比FlinkCDC更長,數據鏈路一旦變長意味著,出錯的可能性更高。
flink cdc對應flink的版本
二、模擬案例
1.阿里云flink sql
- 驗證mysql開啟binlog
flink sql 定義binlog源數據,拿到數據處理(業務邏輯)再寫表,后面很簡單了
2.開源flink sql(單機模式)
背景:win10電腦安裝vmware(虛擬化)軟件,虛擬機中安裝
linux節點一個,flink,mysql(yum默認安裝的最新版本:8.0.37),java環境(此處安裝java環境略,網上有)
flink 安裝
###在linux下載flink包
[root@slave2 ~]# wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
### 加壓包到當前目錄下
[root@slave2 ~]# tar zxvf flink-1.16.3-bin-scala_2.12.tgz
[root@slave2 ~]# cd flink-1.16.3
[root@slave2 lib]# wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.0/flink-sql-connector-mysql-cdc-2.4.0.jar修改flink 文件
[root@slave2 flink-1.16.3]# cat conf/flink-conf.yaml 修改內容如下(如果不修改則win10本地電腦無法訪問flink web UI,這里浪費很多時間):
taskmanager.host: localhost
rest.bind-address: 0.0.0.0###啟動flink
[root@slave2 flink-1.16.3]# bin/start-cluster.sh
###啟動客戶端
root@slave2 flink-1.16.3]# ./bin/sql-client.sh embedded
安裝mysql
安裝mysql遇到很多小問題
[root@slave2 ~]# yum -y install mysql-community-server
“MySQL 8.0 Community Server” 的 GPG 密鑰已安裝,但是不適用于此軟件包。請檢查源的公鑰 URL 是否配置正確。
失敗的軟件包是:mysql-community-libs-8.0.37-1.el7.x86_64
###用于跳過GPG簽名檢查 可以安裝成功
[root@slave2 ~]# yum -y install mysql-server --nogpgcheck
###驗證myqsl是否可用
[root@slave2 ~]# systemctl start mysqld
[root@slave2 ~]# mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 63
Server version: 8.0.37 MySQL Community Server - GPL
Copyright (c) 2000, 2024, Oracle and/or its affiliates.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> ### 配置開啟binlog
[root@slave2 ~]# vim /etc/my.cnf
log_bin=mysql_bin
binlog-format=Row
server-id=1
###重啟mysql
[root@slave2 ~]# systemctl restart mysqld
###重新登陸mysql并查看binlog開啟情況
mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.01 sec)
mysql創建新用戶等
###創建新用戶時發現一直報錯。查看mysql建密碼時要求比較高(新版本mysql對密碼要求高)
mysql> SHOW VARIABLES LIKE 'validate_password%';
+-------------------------------------------------+--------+
| Variable_name | Value |
+-------------------------------------------------+--------+
| validate_password.changed_characters_percentage | 0 |
| validate_password.check_user_name | ON |
| validate_password.dictionary_file | |
| validate_password.length | 10 |
| validate_password.mixed_case_count | 1 |
| validate_password.number_count | 1 |
| validate_password.policy | MEDIUM |
| validate_password.special_char_count | 1 |
+-------------------------------------------------+--------+
8 rows in set (0.00 sec)
###修改密碼要求(在工作生產環境不建議這么做)
mysql> SET GLOBAL validate_password.length = 3;
Query OK, 0 rows affected (0.03 sec)
mysql> SET GLOBAL validate_password.policy = LOW;
Query OK, 0 rows affected (0.00 sec)
###再次查看對新建用戶密碼要求
mysql> SHOW VARIABLES LIKE 'validate_password%';
+-------------------------------------------------+-------+
| Variable_name | Value |
+-------------------------------------------------+-------+
| validate_password.changed_characters_percentage | 0 |
| validate_password.check_user_name | ON |
| validate_password.dictionary_file | |
| validate_password.length | 4 |
| validate_password.mixed_case_count | 1 |
| validate_password.number_count | 1 |
| validate_password.policy | LOW |
| validate_password.special_char_count | 1 |
+-------------------------------------------------+-------+
8 rows in set (0.00 sec)
###新建用戶:root1234
mysql> CREATE USER 'root1234'@'localhost' IDENTIFIED BY 'root1234';
Query OK, 0 rows affected (0.01 sec)mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'root1234'@'localhost';
Query OK, 0 rows affected (0.02 sec)mysql> GRANT SELECT ON *.* TO 'root1234'@'localhost';
Query OK, 0 rows affected (0.01 sec)mysql> SELECT User, Host FROM mysql.user;
+------------------+-----------+
| User | Host |
+------------------+-----------+
| mysql.infoschema | localhost |
| mysql.session | localhost |
| mysql.sys | localhost |
| root | localhost |
| root1234 | localhost |
+------------------+-----------+
5 rows in set (0.00 sec)mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.00 sec)
建mysql庫表(數據源頭庫表)
Flink SQL>
select * from test_flink_cdc5;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Can’t find any matched tables, please check your configured database-name: [mysql] and table-name: [mysql.test_cdc]
###mysql 默認只有這4個庫(當時直接用默認庫mysql建表導致flink 報一些上面奇詭的錯)
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| mysql |
| performance_schema |
| sys |
+--------------------+
###新建一個庫:test
mysql> CREATE DATABASE test CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
Query OK, 1 row affected (0.02 sec)mysql> use test;
Database changed
###建表
mysql> CREATE TABLE `test_cdc` (-> `id` int NOT NULL AUTO_INCREMENT,-> `name` varchar(255) DEFAULT NULL,-> PRIMARY KEY (`id`)-> ) ENGINE=InnoDB ;
Query OK, 0 rows affected (0.04 sec)mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| test_cdc |
+----------------+
1 row in set (0.00 sec)
##開始驗證
##mysql 使用上面創建的用戶密碼登錄mysql
[root@slave2 ~]# mysql -u root -p'root1234'
mysql> use test
##flink登錄
[root@slave2 flink-1.16.3]# bin/stop-cluster.sh
[root@slave2 flink-1.16.3]# ./bin/sql-client.sh embedded
##在flink sql中定義mysql源
CREATE TABLE test_flink_cdc ( id INT, name STRING,primary key(id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username'='root1234', 'password'='root1234', 'database-name'='test', 'table-name'='test_cdc'
);
###查詢flink 接收到的binlog數據
Flink SQL> select * from test_flink_cdc;###到mysql sql界面向test_cdc表插入數據
mysql> INSERT INTO test_cdc VALUES (001, 'test01');
Query OK, 1 row affected (0.02 sec)mysql> INSERT INTO test_cdc VALUES (002, 'test02');
Query OK, 1 row affected (0.01 sec)mysql> INSERT INTO test_cdc VALUES (003, 'test03');
Query OK, 1 row affected (0.04 sec)mysql> INSERT INTO test_cdc VALUES (004, 'test04');
Query OK, 1 row affected (0.01 sec)mysql> INSERT INTO test_cdc VALUES (005, 'test05');
Query OK, 1 row affected (0.01 sec)mysql> INSERT INTO test_cdc VALUES (006, 'test06');
Query OK, 1 row affected (0.00 sec)
向mysql表中插入數據
flink sql這時可以接到binlog數據
查看flink UI job情況
小結:當flink可以拿到mysql binlog源頭數據,下面就好做了,根據自己的業務處理sink到任何數據庫或組件中(例如sink到mysql,hbase,hive,pg,kafka等等),后面sink就不演示了。
下載鏈接:
1.mysql jdbc jar包驅動下載
2.flink cdc驅動下載
3.flink下載
3.flink datastream
datastream 比較靈活簡單,下面是舉例代碼片段(datastream的CDC比flink sql還簡單,打個jar包在flink web UI界面上傳運行即可,此處不做舉例)
public class MySqlSourceExample {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("yourHostname").port(yourPort).databaseList("yourDatabaseName") // set captured database.tableList("yourDatabaseName.yourTableName") // set captured table.username("yourUsername").password("yourPassword").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// set 4 parallel source tasks.setParallelism(4).print().setParallelism(1); // use parallelism 1 for sink to keep message orderingenv.execute("Print MySQL Snapshot + Binlog");}
}
三、總結
- 公司用的cdc不能分享出來,所以搭建上面案例時遇到很多問題,遇到很多坑。
- 工作中使用的就是如下截圖流程,沒有使用canal和kafka,使用的是
logtail和SLS(阿里云的組件,類似kafka,但要比kafka等功能強大)
- 上面說了很多關于flink cdc的優點。我結合工作中使用的flink cdc說一些缺點(自己結合業務場景)。
1.使用flink cdc不適合直觀觀察binlog的數據(例如臟數據,數據斷流,不能直觀看到最近的binglog情況,表的更新頻率不高等造成的困擾)。
2.使用阿里云SLS收集mysql binlog數據,將數據保存近1個月(保持時間可設置),同時可以在logstore查看數據結構樣式(sls支持sql語法查詢日志數據),方便后續flink代碼開發,也方便flink sql和datastream代碼debug。
3.使用flink cdc做數據質量監控比較后知后覺(只能監控已表數據),我這邊做法是直接監控sls原始數據質量,發現有數據質量問題會報出來。sls也支持實時斷流提醒。
4.一般對于簡單的不太重要的業務適合使用flink cdc,這樣開發快,數據流不用咋校驗。對于復雜數據或復雜業務或重要數據,需要觀察binlog數據結果(不信任上游其他部門數據)還是使用類似sls比較好。方便查詢數據變化與匯總,方便做數據報警等。
5.總之還是要根據自己的業務場景和自己公司現有技術組件組合著使用比較好。