MySQL數據準備
create database if not exists test;
use test;
drop table if exists stu;
create table stu (id int primary key auto_increment, name varchar(100), age int);
insert into stu(name, age) values("張三",18);
insert into stu(name, age) values("李四",20);
insert into stu(name, age) values("王五",21);
注意:表必須有主鍵
開啟MySQL binlog
修改MySQL配置,開啟binlog
$ sudo vim /etc/my.cnf,添加如下設置
server-id = 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=test
注意:啟用binlog的數據庫,需根據實際情況作出修改
重啟mysql
$ sudo systemctl restart mysqld
代碼開發
依賴
Flink CDC依賴
<!--cdc 依賴--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency>
完整依賴
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><flink-cdc.vesion>2.4.0</flink-cdc.vesion></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><!--目前中央倉庫還沒有 jdbc的連接器,暫時用一個快照版本--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.17-SNAPSHOT</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version>
<!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.4</version>
<!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-changelog</artifactId><version>${flink.version}</version><scope>runtime</scope></dependency><dependency><groupId>com.google.code.findbugs</groupId><artifactId>jsr305</artifactId><version>1.3.9</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!--cdc 依賴--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc.vesion}</version></dependency></dependencies>
?Flink代碼
Flink CDC捕獲MySQL變更數據(增加、修改、刪除),輸出到控制臺。
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkCDCDemo {public static void main(String[] args) throws Exception {// 環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);// 數據源MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node4").port(3306).username("root").password("000000").databaseList("test").tableList("test.stu").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"mysql_source").setParallelism(1);// 處理數據// 輸出數據dataStreamSource.print();// 執行env.execute();}
}
運行程序,確保程序無報錯,看到如下輸出:
18:58:51,826 INFO ?io.debezium.connector.mysql.MySqlStreamingChangeEventSource ? - Keepalive thread is running
測試
添加數據
mysql添加數據
mysql> insert into stu(name, age) values("趙六",23);
IDEA控制臺輸出
{"before":null,"after":{"id":4,"name":"趙六","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719831654000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2300,"row":0,"thread":13,"query":null},"op":"c","ts_ms":1719831654692,"transaction":null}
格式化輸出
{"before": null,"after": {"id": 4,"name": "趙六","age": 23},"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1719831654000,"snapshot": "false","db": "test","sequence": null,"table": "stu","server_id": 1,"gtid": null,"file": "mysql-bin.000001","pos": 2300,"row": 0,"thread": 13,"query": null},"op": "c","ts_ms": 1719831654692,"transaction": null }
關注before、after符合增加數據的邏輯,op為c表示添加數據
修改數據
mysql修改數據
mysql> update stu set name="zl", age=19 where name="趙六";
IDEA控制臺輸出
{"before":{"id":4,"name":"趙六","age":23},"after":{"id":4,"name":"zl","age":19},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719831987000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2604,"row":0,"thread":13,"query":null},"op":"u","ts_ms":1719831987238,"transaction":null}
格式化輸出
{"before": {"id": 4,"name": "趙六","age": 23},"after": {"id": 4,"name": "zl","age": 19},"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1719831987000,"snapshot": "false","db": "test","sequence": null,"table": "stu","server_id": 1,"gtid": null,"file": "mysql-bin.000001","pos": 2604,"row": 0,"thread": 13,"query": null},"op": "u","ts_ms": 1719831987238,"transaction": null }
關注before、after符合更新的邏輯,op為u表示更新數據
刪除數據
mysql刪除數據
mysql> delete from stu where id=4;
IDEA控制臺輸出
{"before":{"id":4,"name":"zl","age":19},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719832151000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2913,"row":0,"thread":13,"query":null},"op":"d","ts_ms":1719832151198,"transaction":null} ?
格式化輸出
{"before": {"id": 4,"name": "zl","age": 19},"after": null,"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1719832151000,"snapshot": "false","db": "test","sequence": null,"table": "stu","server_id": 1,"gtid": null,"file": "mysql-bin.000001","pos": 2913,"row": 0,"thread": 13,"query": null},"op": "d","ts_ms": 1719832151198,"transaction": null }
關注before、after符合刪除的邏輯,op為d表示刪除數據
完成!enjoy it!