Flink CDC的使用

MySQL數據準備

create database if not exists test;
use test;
drop table if exists stu;
create table stu (id int primary key auto_increment, name varchar(100), age int);
insert into stu(name, age) values("張三",18);
insert into stu(name, age) values("李四",20);
insert into stu(name, age) values("王五",21);

注意:表必須有主鍵

開啟MySQL binlog

修改MySQL配置,開啟binlog

$ sudo vim /etc/my.cnf,添加如下設置

server-id = 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=test

注意:啟用binlog的數據庫,需根據實際情況作出修改

重啟mysql

$ sudo systemctl restart mysqld

代碼開發

依賴

Flink CDC依賴

<!--cdc 依賴--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency>

完整依賴

    <properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><flink-cdc.vesion>2.4.0</flink-cdc.vesion></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><!--目前中央倉庫還沒有 jdbc的連接器,暫時用一個快照版本--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.17-SNAPSHOT</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version>
<!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.4</version>
<!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-changelog</artifactId><version>${flink.version}</version><scope>runtime</scope></dependency><dependency><groupId>com.google.code.findbugs</groupId><artifactId>jsr305</artifactId><version>1.3.9</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!--cdc 依賴--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc.vesion}</version></dependency></dependencies>

?Flink代碼

Flink CDC捕獲MySQL變更數據(增加、修改、刪除),輸出到控制臺。

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkCDCDemo {public static void main(String[] args) throws Exception {// 環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);// 數據源MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node4").port(3306).username("root").password("000000").databaseList("test").tableList("test.stu").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"mysql_source").setParallelism(1);// 處理數據// 輸出數據dataStreamSource.print();// 執行env.execute();}
}

運行程序,確保程序無報錯,看到如下輸出:

18:58:51,826 INFO ?io.debezium.connector.mysql.MySqlStreamingChangeEventSource ? - Keepalive thread is running

測試

添加數據

mysql添加數據

mysql> insert into stu(name, age) values("趙六",23);

IDEA控制臺輸出

{"before":null,"after":{"id":4,"name":"趙六","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719831654000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2300,"row":0,"thread":13,"query":null},"op":"c","ts_ms":1719831654692,"transaction":null}

格式化輸出

{"before": null,"after": {"id": 4,"name": "趙六","age": 23},"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1719831654000,"snapshot": "false","db": "test","sequence": null,"table": "stu","server_id": 1,"gtid": null,"file": "mysql-bin.000001","pos": 2300,"row": 0,"thread": 13,"query": null},"op": "c","ts_ms": 1719831654692,"transaction": null
}

關注before、after符合增加數據的邏輯,op為c表示添加數據

修改數據

mysql修改數據

mysql> update stu set name="zl", age=19 where name="趙六";

IDEA控制臺輸出

{"before":{"id":4,"name":"趙六","age":23},"after":{"id":4,"name":"zl","age":19},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719831987000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2604,"row":0,"thread":13,"query":null},"op":"u","ts_ms":1719831987238,"transaction":null}

格式化輸出

{"before": {"id": 4,"name": "趙六","age": 23},"after": {"id": 4,"name": "zl","age": 19},"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1719831987000,"snapshot": "false","db": "test","sequence": null,"table": "stu","server_id": 1,"gtid": null,"file": "mysql-bin.000001","pos": 2604,"row": 0,"thread": 13,"query": null},"op": "u","ts_ms": 1719831987238,"transaction": null
}

關注before、after符合更新的邏輯,op為u表示更新數據

刪除數據

mysql刪除數據

mysql> delete from stu where id=4;

IDEA控制臺輸出

{"before":{"id":4,"name":"zl","age":19},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719832151000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2913,"row":0,"thread":13,"query":null},"op":"d","ts_ms":1719832151198,"transaction":null}
?

格式化輸出

{"before": {"id": 4,"name": "zl","age": 19},"after": null,"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1719832151000,"snapshot": "false","db": "test","sequence": null,"table": "stu","server_id": 1,"gtid": null,"file": "mysql-bin.000001","pos": 2913,"row": 0,"thread": 13,"query": null},"op": "d","ts_ms": 1719832151198,"transaction": null
}

關注before、after符合刪除的邏輯,op為d表示刪除數據

完成!enjoy it!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/38794.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/38794.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/38794.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ESOP 系統助力電子設備公司的管理模式升級

在科技飛速發展的時代&#xff0c;電子設備行業競爭愈發激烈&#xff0c;企業要想在市場中立足并持續發展&#xff0c;不斷升級管理模式成為關鍵。ESOP系統的引入&#xff0c;為電子設備公司帶來了全新的機遇&#xff0c;有力地推動了管理模式的升級。 ESOP 系統首先為電子設備…

element el-table表格切換分頁保留分頁數據+限制多選數量

el-table表格并沒有相關的方法來禁用表頭里面的多選按鈕 那么我們可以另辟蹊徑&#xff0c;來實現相同的多選切換分頁&#xff08;保留分頁數據&#xff09; 限制多選數量的效果 <el-table:data"tableData"style"width: 100%">// 不使用el-talbe自帶…

農村程序員陳隨易2024年中總結

今天是 2024年7月1日&#xff0c;時間如白駒過隙&#xff0c;今年已去其一半。 總結一下今年上半年的情況&#xff0c;給大家提供一些參考和建議。 希望大家關注一下公眾號 陳隨易&#xff0c;有些內容只在公眾號發表。 先看看我的年初計劃&#xff0c;這個在今年年初的時候&…

泛微E9開發 限制明細表列的值重復

限制明細表列的值重復 1、需求說明2、實現方法3、擴展知識點3.1 修改單個字段值&#xff08;不支持附件類型&#xff09;3.1.1 格式3.1.2 參數3.1.3 案例 3.2 獲取明細行所有行標示3.2.1 格式3.2.2 參數說明 1、需求說明 限制明細表的“類型”字段&#xff0c;在同一個流程表單…

【全網首發】雙字重疊語序驗證碼識別

【省流&#xff1a;打算直接測試效果的可以訪問這個網址】 http://decaptcha.ai?project_namenetease_zh_overlap 【實現方案】 如圖所示&#xff0c;我們能看到&#xff0c;比起以往的“單個字”語序點選&#xff0c;這個驗證碼的難點在于“重疊漢字“&#xff0c;我們知道…

【Python機器學習】模型評估與改進——簡單的網格搜索

為了提升模型的泛化性能&#xff0c;我們可以通過調參來實現。 在嘗試調參之前&#xff0c;重要的是理解參數的含義&#xff0c;找到一個模型的重要參數&#xff08;提供最佳泛化性能的參數&#xff09;的取值是一項棘手的任務&#xff0c;但對于幾乎所有模型和數據集來說都是…

API-Window對象

學習目標&#xff1a; 掌握Window對象 學習內容&#xff1a; BOM&#xff08;瀏覽器對象模型&#xff09;定時器-延時函數JS執行機制location對象navigation對象history對象 BOM&#xff08;瀏覽器對象模型&#xff09;&#xff1a; BOM是瀏覽器對象模型。 window對象是一個全…

Windows 11的市場份額越來越大了,推薦你升級!

7月1日&#xff0c;系統之家發布最新數據&#xff0c;顯示Windows 11操作系統的市場份額正在穩步上升。自2021年10月Windows 11發布以來&#xff0c;Windows 10一直占據著市場主導地位&#xff0c;當時其市場份額高達81.44%。然而&#xff0c;隨著時間的推移&#xff0c;Window…

鴻蒙學習1:ArkTS基礎入門

1 變量和常量 1.1 變量 常見的基礎數據類型&#xff1a; string 字符串、number 數字、boolean布爾 判斷。 變量&#xff1a;專門用來存儲數據的容器。 語法&#xff1a;let 變量名: 數據類型 值。例如&#xff1a;let name: 張三;let price:number 12.4; let isSuccess …

【triton-inference-server】 官方python_backend 文檔及例子

https://github.com/triton-inference-server/python_backend#building-from-source 一。 從源碼構建python_backend root@ubuntu-server:/home/ubuntu/hzh# sudo apt-get install rapidjson-dev libarchive-dev zlib1g-dev Reading package lists... Done Building dependency…

vue3中的自定義指令

全局自定義指令 假設我們要創建一個全局指令v-highlight&#xff0c;用于高亮顯示元素。這個指令將接受一個顏色參數&#xff0c;并有一個可選的修飾符bold來決定是否加粗文本。 首先&#xff0c;在創建Vue應用時定義這個指令&#xff1a;&#xff08;這里可以將指令抽離成單…

昂科燒錄器支持BPS晶豐明源半導體的多相Buck控制器BPD93004E

芯片燒錄行業領導者-昂科技術近日發布最新的燒錄軟件更新及新增支持的芯片型號列表&#xff0c;其中BPS晶豐明源半導體的多相Buck控制器BPD93004E已經被昂科的通用燒錄平臺AP8000所支持。 BPD93004E是一款多相Buck控制器&#xff0c;支持原生1~4相&#xff0c;數字方式控制&am…

科普文:一文搞懂jvm原理(二)類加載器

概敘 科普文&#xff1a;一文搞懂jvm(一)jvm概敘-CSDN博客 前面我們介紹了jvm&#xff0c;jvm主要包括兩個子系統和兩個組件&#xff1a; Class loader(類裝載器) 子系統&#xff0c;Execution engine(執行引擎) 子系統&#xff1b;Runtime data area (運行時數據區域)組件&am…

Cambrian-1: A Fully Open, Vision-Centric Exploration of Multimodal LLMs

摘要 https://arxiv.org/pdf/2406.16860v1 我們介紹了Cambrian-1&#xff0c;這是一系列以視覺為中心的多模態大型語言模型&#xff08;MLLMs&#xff09;。盡管更強大的語言模型可以增強多模態能力&#xff0c;但視覺組件的設計選擇往往沒有得到充分的探索&#xff0c;并且與…

學習筆記(linux高級編程)9

void pthread_cleanup_push(void (*routine)(void *)&#xff0c; void *arg); 功能&#xff1a;注冊一個線程清理函數 參數&#xff0c;routine&#xff0c;線程清理函數的入口 arg&#xff0c;清理函數的參數。 返回值&#xff0c;無 void pthread_cleanup_pop(int execute)…

Perl語言入門指南

一、緒論 1.1 Perl語言概述 1.2 Perl的特色 1.3 Perl面臨的問題 1.4 Perl語言的應用領域 二、Perl語言基礎 2.1 Perl語言的歷史發展 2.2 Perl語言的基本語法 2.3 Perl語言的數據類型 三、Perl語言控制結構 3.1 條件語句 3.2 循環結構 3.3 函數和子程序 四、Perl語…

OpenStack開源虛擬化平臺(一)

目錄 一、OpenStack背景介紹&#xff08;一&#xff09;OpenStack是什么&#xff08;二&#xff09;OpenStack的主要服務 二、計算服務Nova&#xff08;一&#xff09;Nova組件介紹&#xff08;二&#xff09;Libvirt簡介&#xff08;三&#xff09;Nova中的RabbitMQ解析 OpenS…

MySQL-數據操作類型的角度理解 S鎖 X鎖

文章目錄 1、S鎖和S鎖互相兼容2、S鎖和X鎖互斥3、X鎖和X鎖也互斥4、X鎖和S鎖也互斥5、select * from account for update;6、select * from account for update nowait;7、select * from account for update skip locked; 1、S鎖和S鎖互相兼容 2、S鎖和X鎖互斥 3、X鎖和X鎖也互…

20240702 每日AI必讀資訊

&#x1f50d;GPTPdf&#xff1a;使用類似GPT-4o的多模態LLM分析PDF文件 - 使用類似 GPT-4o 多模態模型解析 PDF 文件&#xff0c;轉換為 Markdown 格式。 - 代碼簡潔高效&#xff0c;僅293行。 - 解析結果幾乎完美包括排版、數學公式、表格、圖片、圖表等內容。 &#x1…

【記錄】IDEA2023的激活與安裝

前言&#xff1a; 記錄IDEA2023的激活與安裝 第一步&#xff1a;官網下載安裝包&#xff1a; 下載地址&#xff1a;https://www.jetbrains.com/idea/download/other.html 這個最好選擇2023版本&#xff0c;用著很nice。 安裝步驟就不詳解了&#xff0c;無腦下一步就可以了…