Java應用Flink CDC監聽MySQL數據變動內容輸出到控制臺

文章目錄

  • maven 依賴
  • 自定義數據變化處理器
  • flink cdc監聽
  • 驗證

maven 依賴

<properties><flink.version>1.14.0</flink.version><flink-cdc.version>2.3.0</flink-cdc.version></properties><dependencies><!-- Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>${flink.version}</version></dependency></dependencies>

自定義數據變化處理器

package org.example;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class CustomSink extends RichSinkFunction<String> {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);}@Overridepublic void close() throws Exception {super.close();}@Overridepublic void invoke(String value, Context context) throws Exception {//0P字段,該字段也有4種取值。分別是C(Create ) , U(Updlate) . D(Delete ),Read 。// 對于U操作,其數據部分同時包含了Before和After.System.out.println(">>>" + value);}
}

flink cdc監聽

package org.example;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MysqlSourceExample {public static void main(String[] args) throws Exception {DebeziumDeserializationSchema debeziumDeserializationSchema = new JsonDebeziumDeserializationSchema();MySqlSource<String> source = MySqlSource.builder().hostname("127.0.0.1").port(3306).databaseList("canal_manager")// set captured database.tableList("canal_manager.canal_user")// set captured table.startupOptions(StartupOptions.latest()) // 設置從最新的修改記錄開始讀取.username("root").password("123456").deserializer(debeziumDeserializationSchema) // converts SourceRecord to JSON string.includeSchemaChanges(true).build();//啟動一個webuI。Configuration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);//檢者點間隔時間env .enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new CustomSink());env.execute();}
}

驗證

啟動后web頁面地址訪問http://localhost:8081/,MySQL數據庫canal_manager中的表canal_user數據發生修改,控制臺有輸出json:
在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/909105.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/909105.shtml
英文地址,請注明出處:http://en.pswp.cn/news/909105.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

獵板厚銅PCB工藝能力如何?

在電子產業向高功率、高集成化狂奔的今天&#xff0c;電路板早已不是沉默的配角。當5G基站、新能源汽車、工業電源等領域對電流承載、散熱效率提出嚴苛要求時&#xff0c;一塊能夠“扛得住大電流、耐得住高溫”的厚銅PCB&#xff0c;正成為決定產品性能的關鍵拼圖。而在這條賽道…

業務:資產管理功能

文章目錄 一、項目背景1.1概述1.2編寫目的 二、注意點說明三、頁面效果四、代碼AssetManagementControllerHwinfoAssetManagementHwinfoAssetManagementServiceHwinfoAssetManagementServiceImplHwinfoAssetManagementMapperHwinfoAssetManagementMapper.xmlSfpAssetManagement…

【MySQL進階】MySQL程序

目錄 一.有哪些MySQL程序 二. mysqld —— MySQL服務器 三.mysql——MySQL客戶端 3.1.連接mysql客?端 3.2.mysql客戶端選項 3.2.1.mysql常用選項 3.2.2.在命令?中使?選項 3.3.MySQL 選項&#xff08;配置&#xff09;文件 3.3.1.Linux環境下默認配置文件的路徑 3.…

Docker 運行 Kafka 帶 SASL 認證教程

Docker 運行 Kafka 帶 SASL 認證教程 Docker 運行 Kafka 帶 SASL 認證教程一、說明二、環境準備三、編寫 Docker Compose 和 jaas文件docker-compose.yml代碼說明&#xff1a;server_jaas.conf 四、啟動服務五、驗證服務六、連接kafka服務七、總結 Docker 運行 Kafka 帶 SASL 認…

??CentOS 7.9?? 上配置 ??Fail2ban 自動封禁 IP?? 的完整步驟,整合了多篇權威資料的最佳實踐

&#x1f527; ??一、安裝 Fail2ban?? ??啟用 EPEL 倉庫?? yum install epel-release -y ??安裝 Fail2ban?? yum install fail2ban -y ??啟動并設置開機自啟?? systemctl start fail2ban systemctl enable fail2ban ?? 注意&#xff1a;CentOS 7.9 默認 Py…

損壞的RAID5 第十六次CCF-CSP計算機軟件能力認證

純大模擬 提前打好板子 我只通過4個用例點 然后就超時了。 #include<iostream> #include<cstring> #include<algorithm> #include<unordered_map> #include<bits/stdc.h> using namespace std; int n, s, l; unordered_map<int, string>…

Kafka Topic中的數據在消費后還存在嗎

在 Kafka 的主題(Topic)和分區(Partition)中,數據在被消費者消費后是否仍然存在,取決于 Kafka 的設計機制和配置策略。

Linuxkernel學習-deepseek-2

以下是國際上廣受好評的 Linux 內核權威公開課&#xff0c;均來自頂級高校和技術組織&#xff0c;附課程鏈接和特色說明&#xff1a; —### 一、殿堂級大學課程1. MIT 6.S081: Operating System Engineering - 核心&#xff1a;基于 RISC-V 架構 重寫 Unix 內核&#xff08;xv6…

高頻面試之6Hive

Hive 文章目錄 Hive6.1 Hive的架構6.2 HQL轉換為MR流程6.3 Hive和數據庫比較6.4 內部表和外部表6.5 系統函數6.6 自定義UDF、UDTF函數6.7 窗口函數6.8 Hive優化6.8.1 分組聚合6.8.2 Map Join6.8.3 SMB Map Join6.8.4 Reduce并行度6.8.5 小文件合并6.8.6 謂詞下推6.8.7 并行執行…

分類場景數據集大全「包含數據標注+訓練腳本」 (持續原地更新)

一、作者介紹&#xff1a;六年算法開發經驗、AI 算法經理、阿里云專家博主。擅長&#xff1a;檢測、分割、理解、大模型 等算法訓練與推理部署任務。 二、數據集介紹&#xff1a; 質量高&#xff1a;高質量圖片、高質量標注數據&#xff0c;吐血標注、整理&#xff0c;可以作為…

從硬件視角審視Web3安全:CertiK CTO主持Proof of Talk圓桌論壇

6月10日&#xff0c;在備受矚目的全球Web3與AI峰會Proof of Talk 2025上&#xff0c;CertiK首席技術官Li Kang博士主持了一場聚焦“Web3錢包與托管安全”&#xff08;Web3 Wallet and Custodial Security&#xff09;的圓桌論壇。本次論壇從硬件與系統軟件的底層視角出發&#…

從DevOps到AIOps:智能體如何接管持續交付流程

引言&#xff1a;從DevOps到AIOps的時代躍遷 DevOps 作為軟件開發與運維一體化的最佳實踐&#xff0c;已經廣泛應用于現代軟件工程體系中。在 CI/CD&#xff08;持續集成/持續交付&#xff09;的支撐下&#xff0c;軟件交付從季度變為月度、從周變為日&#xff0c;乃至分鐘級更…

MAC-安裝Homebrew、安裝Git

1.首先嘗試用中科大和清華的源發現不行 中國科學技術大學(USTC)提供了 Homebrew 的鏡像倉庫,同步官方更新,適合國內用戶。 安裝命令??: /bin/bash -c "$(curl -fsSL https://mirrors.ustc.edu.cn/brew/install.sh)" 步驟說明??: 復制上述命令到終端,按…

flutter基礎面試知識匯總(二)

一、全局狀態管理工具-----GetX、Provider、Bloc 1.Provider Provider 是 Flutter 中一個流行的狀態管理庫&#xff0c;它簡化了數據共享和狀態管理的過程。它通過依賴注入的方式&#xff0c;讓不同的 Widget 共享數據&#xff0c;而無需過多地傳遞參數。Provider也是官方推薦…

基于YOLOv12的電力高空作業安全檢測:為電力作業“保駕護航”,告別安全隱患!

在電力行業&#xff0c;尤其是高空作業場景&#xff0c;安全隱患無處不在。高空作業本身就存在著極高的風險&#xff0c;尤其是對于電力維護和檢修工作來說&#xff0c;稍有不慎便可能造成嚴重的安全事故。傳統的安全監管方式&#xff0c;如人工巡檢和視頻監控&#xff0c;存在…

大話軟工筆記—需求分析匯總

需求調研和分析完成&#xff0c;可匯總形成兩份文檔&#xff1a;需求規格說明書和解決方案。 1. 需求規格說明書 1.1 主要內容 引言&#xff0c;包括項目目的、背景、用語等基礎信息。項目概述&#xff0c;對項目自身的說明、包括范圍、主要處理對象、與其他系統的關系等。功…

openstack實例創建過程分析

用戶驗證 1、某用戶以登錄web界面或執行rc文件的方式&#xff0c;通過RESTful API向keystone獲取credentials&#xff1b; 2、keystone進行authentication&#xff0c;若正確則生成并返回auth-token&#xff1b; 3、以攜帶auth-token的形式&#xff0c;在web界面或命令行cli&a…

安卓首次啟動Fallbackhome是否可以直接去除?--學員作業

背景&#xff1a; 有學員朋友在vip群提出一個需求相關的問題&#xff0c;他想要把settings裁剪掉&#xff0c;但是發現裁剪后Fallbackhome肯定就沒了&#xff0c;發現Launcher居然無法啟動了&#xff0c;一直處于Bootanimation的畫面&#xff0c;無法進入系統。 針對這個去除…

C++ 實現環形緩沖區

環形緩沖區&#xff08;Ring Buffer&#xff09;是一種常見的用于數據流緩沖的結構&#xff0c;通常用于生產者-消費者模型、音視頻處理等場景。 因為環形緩沖區使用的場景大多為性能敏感的場景&#xff0c;我們采用數組的數據結構和位運算來實現&#xff0c;以提高代碼效率。…

MySQL虛擬列:一個被低估的MySQL特性

前言 最近在做訂單系統重構時&#xff0c;遇到了一個有趣的問題。 系統里有很多地方都要計算訂單的總價&#xff08;數量單價&#xff09;&#xff0c;這個計算邏輯分散在各個服務中&#xff0c;產生了不少相似甚至重復的代碼。 代碼評審時&#xff0c;同事提出了一個建議 —…