flink cdc mysql整理與總結

文章目錄

  • 一、業務中常見的需要數據同步的場景
    • CDC是什么
    • FlinkCDC是什么
    • CDC原理
    • 為什么是FlinkCDC
    • 業務場景
    • flink cdc對應flink的版本
  • 二、模擬案例
    • 1.阿里云flink sql
    • 2.開源flink sql(單機模式)
    • flink 安裝
    • 安裝mysql
    • 3.flink datastream
  • 三、總結


提示:以下是本篇文章正文內容,下面案例可供參考

一、業務中常見的需要數據同步的場景

1、多個庫的表合并到一張表。不同的業務線或者微服務在不同的數據庫里開發,但是此時有些報表需要將多個庫的類似的數據合并后做查詢統計。或者,某些歷史原因,類似剛開始的商業模式不清晰,導致一些業務線分分合合。或者某些邊緣業務逐步融合到了主業務。早起的數據是分開的,業務運營也是分開,后來又合并成了一個大塊業務。

2、某個數據需要寫到多個存儲中。業務數據需要寫入到多個中間件或者存儲中,比如業務的數據存儲再Mysql的數據中,后來為了方便檢索需要寫入到ES,或者為了緩存需要寫入到Redis,或者是Mysql分表的數據合并寫入到Doris中。

3、數據倉庫的場景。比如將表里的數據實時寫入到DWS數據倉庫的寬表中。

4、應急場景。如果不采專用CDC的方案,那么要達到實時查詢的效果,只能在BFF層的代碼調用多個中心層的查詢API,然后再BFF層做各種聚合,運算。這種方式開發效率低下,萬一有的中心層沒有提供合適的查詢API,臨時開發的話,會讓開發進度不可控。

總之,不管是數據多寫、還是多表合并、還是建立數據倉庫,都屬于數據同步任務。

示例:pandas 是基于NumPy 的一種工具,該工具是為了解決數據分析任務而創建的。

CDC是什么

CDC 是變更數據捕獲(Change Data Capture)技術的縮寫,它可以將源數據庫(Source)的增量變動記錄,同步到一個或多個數據目的(Sink)。在同步過程中,還可以對數據進行一定的處理,例如過濾、關聯、分組、統計等。

目前專業做數據庫事件接受和解析的中間件是Debezium,如果是捕獲Mysql,還有Canal。

FlinkCDC是什么

官網地址:官網FlinkCDC

官方定義:This project provides a set of source connectors for Apache Flink? directly ingesting changes coming from different databases using Change Data Capture(CDC)。根據FlinkCDC官方給出的定義,FlinkCDC提供一組源數據的連接器,使用變更數據捕獲的方式,直接吸收來自不同數據庫的變更數據。

CDC原理

CDC的原理是,當數據源表發生變動時,會通過附加在表上的觸發器或者 binlog 等途徑,將操作記錄下來。下游可以通過數據庫底層的協議,訂閱并消費這些事件,然后對數據庫變動記錄做重放,從而實現同步。這種方式的優點是實時性高,可以精確捕捉上游的各種變動。

為什么是FlinkCDC

1、FlinkCDC 提供了對 Debezium 連接器的封裝和集成,簡化了配置和使用的過程,并提供了更高級的 API 和功能,例如數據格式轉換、事件時間處理等。Flink CDC 使用 Debezium 連接器作為底層的實現,將其與 Flink 的數據處理能力結合起來。通過配置和使用 Flink CDC,您可以輕松地將數據庫中的變化數據流轉化為 Flink 的 DataStream 或 Table,并進行實時的數據處理、轉換和分析。

2、Flink的DataStream和SQL比較成熟和易用

3、Flink支持狀態后端(State Backends),允許存儲海量的數據狀態

4、Flink有更好的生態,更多的Source和Sink的支持

業務場景

  • 數據合并流向:
    在這里插入圖片描述
  • 數據多寫流向:
    在這里插入圖片描述
  • 單數據源寫單表流向:
    在這里插入圖片描述
  • 數據鏈路對比
    通過下圖,我們可以看到Canal處理數據的鏈路比FlinkCDC更長,數據鏈路一旦變長意味著,出錯的可能性更高。
    在這里插入圖片描述
    在這里插入圖片描述

flink cdc對應flink的版本

在這里插入圖片描述

二、模擬案例

1.阿里云flink sql

  • 驗證mysql開啟binlog
    在這里插入圖片描述
    flink sql 定義binlog源數據,拿到數據處理(業務邏輯)再寫表,后面很簡單了
    在這里插入圖片描述

2.開源flink sql(單機模式)

背景:win10電腦安裝vmware(虛擬化)軟件,虛擬機中安裝
linux節點一個,flink,mysql(yum默認安裝的最新版本:8.0.37),java環境(此處安裝java環境略,網上有)

flink 安裝

###在linux下載flink包
[root@slave2 ~]# wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
### 加壓包到當前目錄下
[root@slave2 ~]# tar zxvf flink-1.16.3-bin-scala_2.12.tgz 
[root@slave2 ~]# cd flink-1.16.3
[root@slave2 lib]# wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.0/flink-sql-connector-mysql-cdc-2.4.0.jar修改flink 文件
[root@slave2 flink-1.16.3]# cat conf/flink-conf.yaml 修改內容如下(如果不修改則win10本地電腦無法訪問flink web UI,這里浪費很多時間):
taskmanager.host: localhost
rest.bind-address: 0.0.0.0###啟動flink
[root@slave2 flink-1.16.3]# bin/start-cluster.sh
###啟動客戶端
root@slave2 flink-1.16.3]# ./bin/sql-client.sh embedded 

在這里插入圖片描述

安裝mysql

安裝mysql遇到很多小問題

[root@slave2 ~]# yum -y install mysql-community-server
“MySQL 8.0 Community Server” 的 GPG 密鑰已安裝,但是不適用于此軟件包。請檢查源的公鑰 URL 是否配置正確。

失敗的軟件包是:mysql-community-libs-8.0.37-1.el7.x86_64

在這里插入圖片描述

###用于跳過GPG簽名檢查 可以安裝成功
[root@slave2 ~]# yum -y install mysql-server --nogpgcheck   
###驗證myqsl是否可用
[root@slave2 ~]# systemctl start mysqld
[root@slave2 ~]# mysql -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 63
Server version: 8.0.37 MySQL Community Server - GPL
Copyright (c) 2000, 2024, Oracle and/or its affiliates.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> ### 配置開啟binlog
[root@slave2 ~]# vim /etc/my.cnf
log_bin=mysql_bin 
binlog-format=Row 
server-id=1   
###重啟mysql
[root@slave2 ~]# systemctl restart mysqld
###重新登陸mysql并查看binlog開啟情況
mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.01 sec)

在這里插入圖片描述
mysql創建新用戶等

###創建新用戶時發現一直報錯。查看mysql建密碼時要求比較高(新版本mysql對密碼要求高)
mysql> SHOW VARIABLES LIKE 'validate_password%';
+-------------------------------------------------+--------+
| Variable_name                                   | Value  |
+-------------------------------------------------+--------+
| validate_password.changed_characters_percentage | 0      |
| validate_password.check_user_name               | ON     |
| validate_password.dictionary_file               |        |
| validate_password.length                        | 10     |
| validate_password.mixed_case_count              | 1      |
| validate_password.number_count                  | 1      |
| validate_password.policy                        | MEDIUM |
| validate_password.special_char_count            | 1      |
+-------------------------------------------------+--------+
8 rows in set (0.00 sec)
###修改密碼要求(在工作生產環境不建議這么做)
mysql> SET GLOBAL validate_password.length = 3;
Query OK, 0 rows affected (0.03 sec)
mysql> SET GLOBAL validate_password.policy = LOW;
Query OK, 0 rows affected (0.00 sec)
###再次查看對新建用戶密碼要求
mysql> SHOW VARIABLES LIKE 'validate_password%';
+-------------------------------------------------+-------+
| Variable_name                                   | Value |
+-------------------------------------------------+-------+
| validate_password.changed_characters_percentage | 0     |
| validate_password.check_user_name               | ON    |
| validate_password.dictionary_file               |       |
| validate_password.length                        | 4     |
| validate_password.mixed_case_count              | 1     |
| validate_password.number_count                  | 1     |
| validate_password.policy                        | LOW   |
| validate_password.special_char_count            | 1     |
+-------------------------------------------------+-------+
8 rows in set (0.00 sec)
###新建用戶:root1234
mysql> CREATE USER 'root1234'@'localhost' IDENTIFIED BY 'root1234';
Query OK, 0 rows affected (0.01 sec)mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'root1234'@'localhost';
Query OK, 0 rows affected (0.02 sec)mysql> GRANT SELECT ON *.* TO 'root1234'@'localhost';
Query OK, 0 rows affected (0.01 sec)mysql> SELECT User, Host FROM mysql.user;
+------------------+-----------+
| User             | Host      |
+------------------+-----------+
| mysql.infoschema | localhost |
| mysql.session    | localhost |
| mysql.sys        | localhost |
| root             | localhost |
| root1234         | localhost |
+------------------+-----------+
5 rows in set (0.00 sec)mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.00 sec)

建mysql庫表(數據源頭庫表)

Flink SQL>
select * from test_flink_cdc5;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Can’t find any matched tables, please check your configured database-name: [mysql] and table-name: [mysql.test_cdc]

###mysql 默認只有這4個庫(當時直接用默認庫mysql建表導致flink 報一些上面奇詭的錯)
mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| performance_schema |
| sys                |
+--------------------+
###新建一個庫:test
mysql> CREATE DATABASE test CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
Query OK, 1 row affected (0.02 sec)mysql> use test;
Database changed
###建表
mysql>  CREATE TABLE `test_cdc` (->   `id` int NOT NULL AUTO_INCREMENT,->   `name` varchar(255) DEFAULT NULL,->   PRIMARY KEY (`id`)->  ) ENGINE=InnoDB ;
Query OK, 0 rows affected (0.04 sec)mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| test_cdc       |
+----------------+
1 row in set (0.00 sec)

##開始驗證

##mysql 使用上面創建的用戶密碼登錄mysql
[root@slave2 ~]# mysql -u root -p'root1234'
mysql> use test
##flink登錄
[root@slave2 flink-1.16.3]# bin/stop-cluster.sh
[root@slave2 flink-1.16.3]# ./bin/sql-client.sh embedded 
##在flink sql中定義mysql源
CREATE TABLE test_flink_cdc ( id INT, name STRING,primary key(id)  NOT ENFORCED
) WITH ('connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username'='root1234', 'password'='root1234', 'database-name'='test', 'table-name'='test_cdc'
);
###查詢flink 接收到的binlog數據
Flink SQL> select * from test_flink_cdc;###到mysql sql界面向test_cdc表插入數據
mysql> INSERT INTO test_cdc VALUES (001, 'test01');
Query OK, 1 row affected (0.02 sec)mysql> INSERT INTO test_cdc VALUES (002, 'test02');
Query OK, 1 row affected (0.01 sec)mysql> INSERT INTO test_cdc VALUES (003, 'test03');
Query OK, 1 row affected (0.04 sec)mysql> INSERT INTO test_cdc VALUES (004, 'test04');
Query OK, 1 row affected (0.01 sec)mysql> INSERT INTO test_cdc VALUES (005, 'test05');
Query OK, 1 row affected (0.01 sec)mysql> INSERT INTO test_cdc VALUES (006, 'test06');
Query OK, 1 row affected (0.00 sec)


向mysql表中插入數據
在這里插入圖片描述
flink sql這時可以接到binlog數據
在這里插入圖片描述
查看flink UI job情況
在這里插入圖片描述
在這里插入圖片描述
小結:當flink可以拿到mysql binlog源頭數據,下面就好做了,根據自己的業務處理sink到任何數據庫或組件中(例如sink到mysql,hbase,hive,pg,kafka等等),后面sink就不演示了。

下載鏈接:
1.mysql jdbc jar包驅動下載
2.flink cdc驅動下載
3.flink下載

3.flink datastream

datastream 比較靈活簡單,下面是舉例代碼片段(datastream的CDC比flink sql還簡單,打個jar包在flink web UI界面上傳運行即可,此處不做舉例)

public class MySqlSourceExample {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("yourHostname").port(yourPort).databaseList("yourDatabaseName") // set captured database.tableList("yourDatabaseName.yourTableName") // set captured table.username("yourUsername").password("yourPassword").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// set 4 parallel source tasks.setParallelism(4).print().setParallelism(1); // use parallelism 1 for sink to keep message orderingenv.execute("Print MySQL Snapshot + Binlog");}
}

三、總結

  • 公司用的cdc不能分享出來,所以搭建上面案例時遇到很多問題,遇到很多坑。
  • 工作中使用的就是如下截圖流程,沒有使用canal和kafka,使用的是logtail和SLS(阿里云的組件,類似kafka,但要比kafka等功能強大)
  • 上面說了很多關于flink cdc的優點。我結合工作中使用的flink cdc說一些缺點(自己結合業務場景)。

1.使用flink cdc不適合直觀觀察binlog的數據(例如臟數據,數據斷流,不能直觀看到最近的binglog情況,表的更新頻率不高等造成的困擾)。


2.使用阿里云SLS收集mysql binlog數據,將數據保存近1個月(保持時間可設置),同時可以在logstore查看數據結構樣式(sls支持sql語法查詢日志數據),方便后續flink代碼開發,也方便flink sql和datastream代碼debug。
3.使用flink cdc做數據質量監控比較后知后覺(只能監控已表數據),我這邊做法是直接監控sls原始數據質量,發現有數據質量問題會報出來。sls也支持實時斷流提醒。
4.一般對于簡單的不太重要的業務適合使用flink cdc,這樣開發快,數據流不用咋校驗。對于復雜數據或復雜業務或重要數據,需要觀察binlog數據結果(不信任上游其他部門數據)還是使用類似sls比較好。方便查詢數據變化與匯總,方便做數據報警等。
5.總之還是要根據自己的業務場景和自己公司現有技術組件組合著使用比較好。

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/16275.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/16275.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/16275.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

mac中文件夾怎么顯示.git隱藏文件

1. 打開終端應用程序&#xff0c;然后進入到包含.git文件夾的目錄&#xff0c;可以使用以下命令來顯示隱藏文件和文件夾&#xff1a; defaults write com.apple.finder AppleShowAllFiles YES 2. 然后重啟 Finder&#xff1a; killall Finder

kali基本掃描工具(自帶)

免責聲明:本文僅做技術交流與學習...請勿非法破壞... 詳細用法: 命令 -h/百度/翻譯 fping 用法 hostlist 文件里面為ip fping -a -q -f hostlist -a 只看存活的 fping -g 202.100.1.1 202.100.1.255 -a -q > Ahost 輸出到Ahost文件上 nping nping -c 1 201.100.2.155-244 …

工具方法 - 如何在網上找資料

在查詢USB相關的技術資料時&#xff0c;官網的文檔中心里找到個spec的記錄&#xff0c;但下載鏈接沒有。然后在Google上搜索&#xff1a; fileytpe:pdf my_keyword 只找到一個收費的文檔下載網站&#xff0c;這讓我不開心。 于是在Yandex上搜了下&#xff0c;找到了兩個網站可以…

香橙派AIpro使用SSH遠程登錄

香橙派AIpro可以連接HDMI顯示器使用&#xff0c;也可以遠程登錄。這里采用MobaXterm軟件遠程登錄開發板。 首先要使得控制電腦和香橙派開發板連接到同一個局域網&#xff0c;兩者的IP地址能夠ping通。在Windows 下可以使用MobaXterm 遠程登錄開發板&#xff0c;首先新建一個ss…

屬于程序員的浪漫,一顆會跳動的心!!!

繪制一顆會跳動的心? 嘿嘿 可以說是程序員的專屬浪漫了吧&#xff0c;就像點燃一顆LED燈一樣&#xff1f;&#xff08;我瞎說的啊&#xff0c;大家別當真&#xff0c;我很菜的&#xff01;&#xff01;&#xff01;&#xff01;&#xff09; 程序就在下面啦&#xff0c;然…

hive結合Hbase實現實時數據處理和批量分析

問題背景 Hive主要設計為一個用于大數據集的批處理查詢引擎&#xff0c;并不是為實時查詢或實時數據更新而設計的。它主要用于執行數據摘要、查詢和分析。因此&#xff0c;Hive本身不支持實時數據更新或實時查詢&#xff0c;它更適合用于對大量數據進行批量處理和分析。 分析…

Java8Stream

目錄 什么是Stream? IO流&#xff1a; Java8Stream&#xff1a; 什么是流&#xff1f; stream圖解 獲取流 集合類&#xff0c;使用 Collection 接口下的 stream() 代碼 數組類&#xff0c;使用 Arrays 中的 stream() 方法 代碼 stream&#xff0c;使用 Stream 中的…

重生之 SpringBoot3 入門保姆級學習(02、打包部署)

重生之 SpringBoot3 入門保姆級學習&#xff08;02、打包部署&#xff09; 1.6 打包插件1.7 測試 jar 包1.8 application.properties 的相關配置 1.6 打包插件 官網鏈接 https://docs.spring.io/spring-boot/docs/current/reference/html/getting-started.html#getting-starte…

【Python】 XGBoost模型的使用案例及原理解析

原諒把你帶走的雨天 在漸漸模糊的窗前 每個人最后都要說再見 原諒被你帶走的永遠 微笑著容易過一天 也許是我已經 老了一點 那些日子你會不會舍不得 思念就像關不緊的門 空氣里有幸福的灰塵 否則為何閉上眼睛的時候 又全都想起了 誰都別說 讓我一個人躲一躲 你的承諾 我竟然沒懷…

自學動態規劃—— 一和零

一和零 474. 一和零 - 力扣&#xff08;LeetCode&#xff09; 其實遇到這種還好說&#xff0c;我寧愿遇見這種&#xff0c;也不想遇見那些奇奇怪怪遞推公式的題目。 這里其實相當背包要滿足兩個條件&#xff0c;所以我們可以將dp開成二維的&#xff0c;之后的操作&#xff0…

Kubernetes(K8S) 集群環境搭建指南

Kubernetes&#xff08;簡稱K8s&#xff09;是一個開源的容器編排平臺&#xff0c;旨在自動化部署、擴展和管理容器化應用。K8S環境搭建過程比較復雜&#xff0c;涉及到非常多組件安裝和系統配置&#xff0c;本文將會詳細介紹如何在服務器上搭建好Kubernetes集群環境。 在學習…

C語言---求一個整數存儲在內存中的二進制中1的個數--3種方法

//編寫代碼實現&#xff1a;求一個整數存儲在內存中的二進制中1的個數 //第一種寫法 /*int count_bit_one(unsigned int n) {int count 0;while (n )//除到最后余數是0&#xff0c;那么這個循環就結束了{//這個題就是可以想成求15的二進制的過程//每次都除以2&#xff0c;余數…

跟小伙伴們說一下

因為很忙&#xff0c;有一段時間沒有更新了&#xff0c;這次先把菜鳥教程停更一下&#xff0c;因為自己要查缺補漏一些細節問題&#xff0c;而且為了方便大家0基礎也想學C語言&#xff0c;這里打算給大家開一個免費專欄&#xff0c;這里大家就可以好好學習啦&#xff0c;哪怕0基…

面試題·棧和隊列的相互實現·詳解

A. 用隊列實現棧 用隊列實現棧 實現代碼如下 看著是隊列&#xff0c;其實實際實現更接近數組模擬 typedef struct {int* queue1; // 第一個隊列int* queue2; // 第二個隊列int size; // 棧的大小int front1, rear1, front2, rear2; // 兩個隊列的首尾指針 } MyS…

圖像處理ASIC設計方法 筆記25 紅外成像技術:未來視覺的革命

在當今科技飛速發展的時代,紅外成像技術以其獨特的優勢,在醫療、工業檢測等多個領域扮演著越來越重要的角色。本章節(P146 第7章紅外焦平面非均勻性校正SoC)將深入探討紅外成像系統中的關鍵技術——非均勻性校正SoC,以及它如何推動紅外成像技術邁向新的高度。 紅外成像系統…

6.Redis之String命令

1.String類型基本介紹 redis 所有的 key 都是字符串, value 的類型是存在差異的~~ 一般來說,redis 遇到亂碼問題的概率更小~~ Redis 中的字符串,直接就是按照二進制數據的方式存儲的. (不會做任何的編碼轉換【講 mysql 的時候,知道 mysql 默認的字符集, 是拉丁文,插入中文…

Jenkins--從入門到入土

Jenkins–從入門到入土 文章目錄 Jenkins--從入門到入土〇、概念提要--什么是CI/DI&#xff1f;1、CI&#xff08;Continuous Integration&#xff0c;持續集成&#xff09;2、DI&#xff08;DevOps Integration&#xff0c;DevOps 集成&#xff09;3、解決的問題 一、Jenkins安…

iOS 開發系列:基于VNRecognizeTextRequest識別圖片文字

1.添加Vision Kit依賴 在項目設置中點擊"General"選項卡&#xff0c;然后在"Frameworks, Libraries, and Embedded Content"&#xff08;框架、庫和嵌入內容&#xff09;部分&#xff0c;點擊""按鈕。搜索并選擇"Vision.framework"。…

[AIGC] flink sql 消費kafka消息,然后寫到mysql中的demo

這是一個使用 Flink SQL 從 Kafka 中消費數據并寫入 MySQL 的示例。在這個示例中&#xff0c;我們將假設有一個 Kafka 主題 “input_topic”&#xff0c;它產生格式為 (user_id: int, item_id: int, behavior: string, timestamp: long) 的數據&#xff0c;我們需要把這些數據寫…

world machine學習筆記(4)

選擇設備&#xff1a; select acpect&#xff1a; heading&#xff1a;太陽的方向 elevation&#xff1a;太陽的高度 select colour&#xff1a;選擇顏色 select convexity&#xff1a;選擇突起&#xff08;曲率&#xff09; select height&#xff1a;選擇高度 falloff&a…