使用 Apache Flink CDC 3.0 實現 MySQL 到 Elasticsearch 的數據同步

下面我將創建一個完整的 Spring Boot 項目,使用 Flink CDC 3.0 基于 MySQL 的 binlog 實現數據同步到 Elasticsearch。

項目概述

這個項目將:

  1. 使用 Flink CDC 連接 MySQL 并讀取 binlog
  2. 處理數據變化(插入、更新、刪除)
  3. 將數據同步到 Elasticsearch
  4. 提供 REST API 管理同步任務

項目結構

src/main/java/
├── com/example/cdc/
│   ├── config/
│   │   ├── FlinkConfig.java
│   │   └── ElasticsearchConfig.java
│   ├── model/
│   │   └── User.java
│   ├── service/
│   │   ├── SyncService.java
│   │   └── JobManager.java
│   ├── controller/
│   │   └── SyncController.java
│   └── FlinkCdcApplication.java

1. 添加依賴 (pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>FlinkCDC</artifactId><version>0.0.1-SNAPSHOT</version><name>FlinkCDC</name><description>FlinkCDC</description><properties><java.version>11</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.7.6</spring-boot.version><flink.version>1.16.0</flink.version><flink-cdc.version>3.0.1</flink-cdc.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Flink CDC --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><!-- Flink Connector Elasticsearch --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>${flink.version}</version></dependency><!-- Flink JSON --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><!-- Flink Java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- Flink CLI --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.36</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>11</source><target>11</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><mainClass>com.example.cdc.FlinkCdcApplication</mainClass><skip>true</skip></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build>
</project>

2. 數據模型 (User.java)

package com.example.cdc.model;import lombok.Data;@Data
public class User {private Long id;private String name;private String email;private Long createdAt;private Long updatedAt;private Boolean deleted;
}

3. Flink 配置 (FlinkConfig.java)

package com.example.cdc.config;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FlinkConfig {@Beanpublic StreamExecutionEnvironment streamExecutionEnvironment() {return StreamExecutionEnvironment.getExecutionEnvironment();}
}

4. Elasticsearch 配置 (ElasticsearchConfig.java)

package com.example.cdc.config;import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ElasticsearchConfig {@Value("${elasticsearch.host:localhost}")private String host;@Value("${elasticsearch.port:9200}")private int port;@Bean(destroyMethod = "close")public RestHighLevelClient restHighLevelClient() {return new RestHighLevelClient(RestClient.builder(new HttpHost(host, port, "http")));}
}

5. 同步服務 (SyncService.java)

package com.example.cdc.service;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;@Service
public class SyncService {@Autowiredprivate StreamExecutionEnvironment env;@Value("${mysql.host:localhost}")private String mysqlHost;@Value("${mysql.port:3306}")private int mysqlPort;@Value("${mysql.username:root}")private String mysqlUsername;@Value("${mysql.password:password}")private String mysqlPassword;@Value("${mysql.database:test}")private String mysqlDatabase;@Value("${mysql.table:users}")private String mysqlTable;@Value("${elasticsearch.host:localhost}")private String esHost;@Value("${elasticsearch.port:9200}")private int esPort;@Value("${elasticsearch.index:users}")private String esIndex;public void startSync() throws Exception {// 創建 MySQL CDC SourceMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(mysqlHost).port(mysqlPort).databaseList(mysqlDatabase).tableList(mysqlDatabase + "." + mysqlTable).username(mysqlUsername).password(mysqlPassword).deserializer(new JsonDebeziumDeserializationSchema()).build();// 創建數據流DataStream<String> stream = env.fromSource(mySqlSource,org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),"MySQL Source");// 轉換和處理數據DataStream<String> processedStream = stream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {// 這里可以添加自定義的數據處理邏輯return value;}});// 配置 Elasticsearch SinkList<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost(esHost, esPort, "http"));ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,(element, ctx, indexer) -> {IndexRequest request = Requests.indexRequest().index(esIndex).source(element, XContentType.JSON);indexer.add(request);});// 配置批量請求esSinkBuilder.setBulkFlushMaxActions(1);// 將數據發送到 Elasticsearch - 使用 addSink 而不是 sinkToprocessedStream.addSink(esSinkBuilder.build());// 啟動任務env.execute("MySQL to Elasticsearch Sync");}
}

6. 任務管理器 (JobManager.java)

package com.example.cdc.service;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;@Component
public class JobManager {@Autowiredprivate StreamExecutionEnvironment env;@Autowiredprivate SyncService syncService;private Thread jobThread;public void startJob() {jobThread = new Thread(() -> {try {syncService.startSync();} catch (Exception e) {e.printStackTrace();}});jobThread.start();}public void stopJob() {if (env != null) {try {env.close();} catch (Exception e) {e.printStackTrace();}}if (jobThread != null && jobThread.isAlive()) {jobThread.interrupt();}}@PreDestroypublic void onDestroy() {stopJob();}
}

7. REST 控制器 (SyncController.java)

package com.example.cdc.controller;import com.example.cdc.service.JobManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/api/sync")
public class SyncController {@Autowiredprivate JobManager jobManager;@PostMapping("/start")public String startSync() {try {jobManager.startJob();return "Sync job started successfully";} catch (Exception e) {return "Failed to start sync job: " + e.getMessage();}}@PostMapping("/stop")public String stopSync() {try {jobManager.stopJob();return "Sync job stopped successfully";} catch (Exception e) {return "Failed to stop sync job: " + e.getMessage();}}
}

8. 應用主類 (FlinkCdcApplication.java)

package com.example.cdc;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class FlinkCdcApplication {public static void main(String[] args) {SpringApplication.run(FlinkCdcApplication.class, args);}
}

9. 配置文件 (application.yml)

server:port: 8080spring:application:name: mysql-cdc-to-esmysql:host: localhostport: 3306username: rootpassword: your_mysql_passworddatabase: your_databasetable: your_tableelasticsearch:host: localhostport: 9200index: your_es_indexflink:parallelism: 1

使用說明

  1. 確保 MySQL 已開啟 binlog:

    SHOW VARIABLES LIKE 'log_bin';
    

    如果未開啟,需要在 MySQL 配置文件中添加:

    [mysqld]
    server-id=1
    log-bin=mysql-bin
    binlog_format=row
    binlog_row_image=full
    
  2. 創建具有復制權限的 MySQL 用戶:

    CREATE USER 'flink_user'@'%' IDENTIFIED BY 'password';
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_user'@'%';
    FLUSH PRIVILEGES;
    
  3. 啟動應用程序:

    mvn spring-boot:run
    
  4. 通過 REST API 啟動同步任務:

    POST http://localhost:8080/api/sync/start
    

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/96455.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/96455.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/96455.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Web網站的運行原理2

請求Web網站的文件-HTTP 可以使用HTTP協議在Web瀏覽器和Web服務器應用程序之間傳輸Web網頁的文件。 在進行HTTP傳輸之前&#xff0c;需要先在Web瀏覽器和Web服務器應用程序之間建立TCP連接。 使用HTTP請求可以要求Web瀏覽器向Web服務器應用程序傳輸文件。 傳輸Web網站的文件-HT…

論文閱讀:Do As I Can, Not As I Say: Grounding Language in Robotic Affordances

地址&#xff1a;Do As I Can, Not As I Say: Grounding Language in Robotic Affordances 摘要 大型語言模型&#xff08;LLM&#xff09;能夠編碼豐富的世界語義知識&#xff0c;這類知識對于機器人執行自然語言表達的高層級、時間擴展指令具有重要價值。然而&#xff0c;語…

Django管理后臺結合剪映實現課件視頻生成應用

在教學內容的數字化制作中&#xff0c;如何將課件與音頻快速轉換為視頻是一項高頻需求。借助管理后臺和剪輯工具&#xff0c;可以實現課件內容的下載、轉換和草稿生成&#xff0c;大幅減少重復操作。 【AI教育教學考試系統】課件在線剪映視頻草稿生成應用這里實現的課件PPT部分…

AI升級社區便民服務:AI辦事小程序高效辦證+應急系統秒響應,告別跑腿愁住得更安心

朋友&#xff0c;你有沒有在社區辦過事&#xff1f;想給孩子辦入學證明&#xff0c;得先跑居委會開證明&#xff0c;再去街道辦事處蓋章&#xff0c;來回幾趟不說&#xff0c;要是材料沒帶全&#xff0c;還得重新跑&#xff1b;家里水管爆了&#xff0c;半夜聯系物業&#xff0…

el-table-draggable拖拽實現表格內容排序

1、圖片2、安裝包import ElTableDraggable from "el-table-draggable";3、代碼&#xff08;html&#xff09;<el-table-draggable:data"soloTableData"input"dragInputHandlerSolo"><el-table:data"soloTableData"row-key&qu…

Linux設備模型技術路線圖

Linux設備模型涉及的技術和知識點 1. 核心架構組件 1.1 Kobject 子系統 kobject(內核對象):Linux設備模型的基礎構建塊 kset(對象集合):kobject的容器,管理相同類型的對象 ktype(對象類型):定義kobject的行為和屬性 引用計數機制:使用kref管理對象生命周期 對象層…

面試問題詳解六:元對象系統調用槽函數

Qt 的 元對象系統&#xff08;Meta-Object System&#xff09; 是 Qt 核心機制之一&#xff0c;正是它讓 C 語言具備了類似腳本語言&#xff08;如 Python&#xff09;的反射、動態綁定、屬性系統等能力。 自定義信號與槽&#xff0c;是 Qt 元對象系統最常見、最實用的體現。&a…

Scala面試題及詳細答案100道(1-10)-- 基礎語法與數據類型

《前后端面試題》專欄集合了前后端各個知識模塊的面試題,包括html,javascript,css,vue,react,java,Openlayers,leaflet,cesium,mapboxGL,threejs,nodejs,mangoDB,SQL,Linux… 。 前后端面試題-專欄總目錄 文章目錄 一、本文面試題目錄 1. 簡述Scala與Java的主要…

http請求有哪些?

TTP請求方法常見方法&#xff1a;GET&#xff1a;獲取資源&#xff0c;參數通過URL傳遞&#xff0c;可緩存到瀏覽器本地。POST&#xff1a;提交數據&#xff0c;參數通過請求體傳遞&#xff0c;不可緩存&#xff0c;常用于創建資源。PUT&#xff1a;更新資源&#xff0c;參數通…

MAPGIS6.7地質編錄

1.編錄文件excel位于D:\mapgis67\program\section&#xff0c;文件名稱&#xff1a;ZKInfoEdit.xls2生成副本&#xff0c;復制ZKInfoEdit.xls到桌面3開始編寫 04回次4開始編寫 03編錄5開始編寫 11采樣6開始編寫 06標志面7開始編寫 10鉆孔資料8 最后總結 …

輕松掌握Chrome插件開發全流程

Chrome插件開發概述介紹Chrome插件的基本概念、核心功能和應用場景&#xff0c;包括插件與瀏覽器擴展的區別、插件的主要組成部分&#xff08;如manifest文件、后臺腳本、內容腳本等&#xff09;。開發環境搭建列出開發Chrome插件所需的工具和環境配置&#xff0c;包括Chrome瀏…

智能二維碼QR\刷IC卡\人臉AI識別梯控系統功能設計需基于模塊化架構,整合物聯網、生物識別、權限控制等技術,以下是多奧分層次的系統設計框架

一、系統架構設計硬件層主控模塊&#xff1a;32位ARM嵌入式處理器&#xff0c;支持CAN/RS485/TCP/IP協議識別終端&#xff1a;支持IC卡(CPU/國密/HID)、二維碼掃碼器(動態碼)、人臉識別(活體檢測)電梯控制單元&#xff1a;繼電器矩陣控制板&#xff0c;支持20層以上電梯按鈕控制…

Kubernetes配置與密鑰管理深度指南:ConfigMap與Secret企業級實踐

目錄 專欄介紹 作者與平臺 您將學到什么&#xff1f; 學習特色 Kubernetes配置與密鑰管理深度指南&#xff1a;ConfigMap與Secret企業級實踐 一、 配置管理&#xff1a;云原生應用的基石 1.1 配置管理的演進與挑戰 1.2 ConfigMap與Secret的設計哲學 二、 ConfigMap深度…

知行社黃劍杰:金融跨界,重塑震區救援新章

曾在紐約證券交易所敲響上市鐘聲的黃劍杰&#xff0c;這位知行社的靈魂人物&#xff0c;此次在西藏震區開啟了一場震撼人心的“跨界救援”之旅。他帶著在華爾街積累的深厚金融智慧&#xff0c;毅然投身到這場與時間賽跑、與災難較量的戰斗中&#xff0c;為傳統救災模式帶來了顛…

API模型與接口棄用指南:歷史、替代方案及開發者應對策略

API模型及接口棄用&#xff08;Deprecation&#xff09;全解 概覽 在AI與API領域&#xff0c;模型的持續迭代與技術進步推動著平臺不斷優化服務。與此同時&#xff0c;隨著更安全、更強大的新模型推出&#xff0c;舊模型與接口的棄用&#xff08;Deprecation&#xff09;成為…

python3GUI--Joy音樂播放器 在線播放器 播放器 By:PyQt5(附下載地址)

文章目錄一&#xff0e;前言二&#xff0e;項目簡介三&#xff0e;詳細模塊介紹1.主界面2.歌單廣場3.歌單詳情頁4.歌手篩選5.歌手詳情頁6.專輯詳情頁7.歌曲榜單頁8.搜索結果頁9.其他1.托盤菜單2.設置四&#xff0e;核心問題回答1.軟件UI效果實現2.為什么我做不出來這么漂亮的界…

Spring Boot整合Feign實現RPC調用,并通過Hystrix實現服務降級

feign/openfeign和dubbo是常用的微服務RPC框架&#xff0c;由于feigin內部已經集成ribbon&#xff0c;自帶了負載均衡的功能&#xff0c;當有多個同名的服務注冊到注冊中心時&#xff0c;會根據ribbon默認的負載均衡算法將請求分配到不同的服務。這篇文章就簡單介紹一下怎么使用…

Java 性能優化實戰(三):并發編程的 4 個優化維度

在多核CPU時代&#xff0c;并發編程是提升Java應用性能的關鍵手段&#xff0c;但不合理的并發設計反而會導致性能下降、死鎖等問題。本文將聚焦并發編程的四個核心優化方向&#xff0c;通過真實案例和代碼對比&#xff0c;帶你掌握既能提升性能又能保證線程安全的實戰技巧。 一…

【秋招筆試】2025.08.19百度秋招機考第一套

?? 點擊直達筆試專欄 ??《大廠筆試突圍》 ?? 春秋招筆試突圍在線OJ ?? 筆試突圍在線刷題 bishipass.com 題目一:花園路徑優化問題 1??:使用棧維護必須保留的觀景點,基于三角不等式判斷 2??:貪心策略,檢查中間點是否為"轉折點" 3??:時間復雜度 …

SmartX 用戶建云實踐|某人壽保險:從開發測試、核心生產到信創轉型,按需推進企業云建設

某人壽保險自 2018 年起開始探索基于 SmartX 超融合架構搭建私有云 IaaS 資源池&#xff0c;先后部署了開發測試業務、生產業務和重要生產業務的 Oracle 數據庫&#xff08;含 RAC&#xff09;&#xff0c;并探索了基于海光芯片的信創云搭建&#xff0c;最終以基于超融合架構的…