Flink-cdc更好的流式數據集成工具

What’s Flink-cdc?

在這里插入圖片描述

Flink CDC 是基于Apache Flink的一種數據變更捕獲技術,用于從數據源(如數據庫)中捕獲和處理數據的變更事件。CDC技術允許實時地捕獲數據庫中的增、刪、改操作,將這些變更事件轉化為流式數據,并能夠對這些事件進行實時處理和分析。

Flink CDC提供了與各種數據源集成的功能,包括常見的關系型數據庫(如MySQL、PostgreSQL、Oracle等)以及NoSQL數據庫(如MongoDB、HBase等)。它通過監控數據庫的日志或輪詢方式來捕獲數據變更,并將變更事件作為數據流發送到Flink的任務中進行處理。

Flink CDC 深度集成并由 Apache Flink 驅動,提供以下核心功能:

? 端到端的數據集成框架
? 為數據集成的用戶提供了易于構建作業的 API
? 支持在 Source 和 Sink 中處理多個表
? 整庫同步
?具備表結構變更自動同步的能力(Schema Evolution)

在使用者的角度,就是Flink-cdc可以簡化流處理的流程:

  • 引入Flink-cdc之前流處理流程
    ![在這里插入圖片描述](https://img-blog.csdnimg.cn/direct/449d813da3f945cc9974baba563f6424.png

  • 引入Flink-cdc之后后流處理流程
    在這里插入圖片描述

如上所示,在flink-cdc被引入后大大簡化了流處理流程

Flink-cdc支持的鏈接及對應的版本

Pipeline Connectors
在這里插入圖片描述
Source Connectors
在這里插入圖片描述截止目前(2024-05-23)

Flink-cdc與Flink對應對影版本的關系

在這里插入圖片描述截止目前(2024-05-23)

flink-connector-mysql-cdc 實例分析

示例代碼

demo代碼:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MySqlSourceDemo {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("mysql-server-host").port(3306).databaseList("mydb") // 設置捕獲的數據庫.tableList("mydb.products") // 設置捕獲的表,如果需要同步整個數據庫,請將 tableList 設置為 ".*".
//                .tableList(".*") // 捕獲整個數據庫的表
//                .tableList("^(?!mysql|information_schema|performance_schema).*") // 設置捕獲的表,排除系統庫
//                .tableList("mydb.(?!products|orders).*") // 同步排除products和orders表之外的整個my_db庫.username("flink-cdc").password("xxx").serverId("5400-5405").deserializer(new JsonDebeziumDeserializationSchema()) // 將 SourceRecord 轉換為 JSON 字符串.serverTimeZone("Asia/Shanghai") // 設置時區.startupOptions(StartupOptions.initial()).scanNewlyAddedTableEnabled(true) // 啟用掃描新添加的表功能
//                .includeSchemaChanges(true) // 包括 schema 變更.build();org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration();config.setString("rest.port", "8081");
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(config); //本地環境,調試用StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 設置 3s 的 checkpoint 間隔env.enableCheckpointing(3000);env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("file:///tmp/ck");//本地文件系統
//        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 1.14.0 版本開始支持env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// 設置 source 節點的并行度為 4.setParallelism(5).print().setParallelism(1); // 設置 sink 節點并行度為 1env.execute("Print MySQL Snapshot + Binlog");}
}

maven依賴:

<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.14.5</flink.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><!-- 將 Apache Flink 的 Web 運行時模塊添加到項目中 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope> <!--provided生命周期在test模式才可以運行,在main模式會找不到包--></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>provided</scope></dependency></dependencies>

日志配置文件:
log4j.properties

log4j.rootCategory=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1}:%L - %m%n

啟動standalone Flink級群

# jobmanager
docker run -d \
--name flink-jm \
--hostname flink-jm \
-p 8082:8081 \
--env FLINK_PROPERTIES="jobmanager.rpc.address: flink-jm" \
--network flink-network-standalone \
ponylee/flink:1.15.0-java8  \
jobmanager# taskmanager
docker run -d \
--name flink-tm \
--hostname flink-tm \
--env FLINK_PROPERTIES="jobmanager.rpc.address: flink-jm" \
--network flink-network-standalone \
ponylee/flink:1.15.0-java8 \
taskmanager \
-Dtaskmanager.memory.process.size=1024m \
-Dtaskmanager.numberOfTaskSlots=5 \
-Drest.flamegraph.enabled=true

分析說明

為每個 Reader 設置不同的 Server id

每個用于讀取 binlog 的 MySQL 數據庫客戶端都應該有一個唯一的 id,稱為 Server id。 MySQL 服務器將使用此 id 來維護網絡連接和 binlog 位置。 因此,如果不同的作業共享相同的 Server id, 則可能導致從錯誤的 binlog 位置讀取數據。 因此,建議通過為每個 Reader 設置不同的 Server id , 假設 Source 并行度為 4,server id 配置必須:serverId(“5400-5405”),5405-5400=5 >= 4。來為 4 個 Source readers 中的每一個分配唯一的 Server id。

查看mysql鏈接發現
select * from information_schema.processlist where user = ‘flink-cdc’;
在這里插入圖片描述Flink-cdc對mysql的影響
正常情況下,Flink-cdc是No-lock Read,主庫可以繼續處理事務和查詢,而不會導致主庫進程阻塞,不會對主庫產生直接影響。但是,在某些情況下數據同步的過程中可能會對主庫產生一些間接影響,比如:網絡、IO、CPU負載以及mysql的并發連接數等資源消耗。但這些對主庫的開銷影響相對較小(全量同步階段可能比較耗能,但時間相對比較短)。

斷點續傳

通過從checkpoint/savepoint 恢復,flink-cdc可以保證斷點續傳。

  • 從checkpoint/savepoint恢復,縮小同步范圍,例如:從tableList(“mydb.products,mydb.orders”)或tableList(“.*”) 縮小到 tableList(“mydb.products”),應用更新生效。

  • 應用從checkpoint/savepoint恢復,擴大同步范圍的部分不會生效,例如:從tableList(“mydb.products”) 到 tableList(“mydb.products,mydb.orders”)或tableList(“.*”),應用更新不生效生效。若想使動態加表生效,可以顯示制定scanNewlyAddedTableEnabled(true) ,來啟用掃描新添加的表功能。如沒有特殊情況,建議在開發環境開啟此配置。

flink-cdc包名變更

Flink CDC 項目 從 2.0.0 版本將 group id 從com.alibaba.ververica 改成 com.ververica, 自 3.1 版本從將 group id 從 com.ververica 改成 org.apache.flink。 這是為了讓項目更加社區中立,讓各個公司的開發者共建時更方便。所以在maven倉庫找 2.x 的包時,路徑是 /com/ververica;找3.1及以上版本的包時,路徑是/org/apache/flink

參考:
flink-cdc
flink-cdc docs

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/16141.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/16141.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/16141.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Windows平臺C#版RTSP轉RTMP直播推送定制版

技術背景 前幾年我們發布了C版的多路RTMP/RTSP轉RTMP轉發官方定制版。在秉承低延遲、靈活穩定、低資源占用的前提下&#xff0c;客戶無需關注開發細節&#xff0c;只需圖形化配置轉發等各類參數&#xff0c;實現產品快速上線目的。 如監控類攝像機、NVR等&#xff0c;通過廠商…

【啟程Golang之旅】深入解析函數的奧秘與技巧

歡迎來到Golang的世界&#xff01;在當今快節奏的軟件開發領域&#xff0c;選擇一種高效、簡潔的編程語言至關重要。而在這方面&#xff0c;Golang&#xff08;又稱Go&#xff09;無疑是一個備受矚目的選擇。在本文中&#xff0c;帶領您探索Golang的世界&#xff0c;一步步地了…

【全開源】海報在線制作系統源碼(ThinkPHP+FastAdmin+UniApp)

打造個性化創意海報的利器 引言 在數字化時代&#xff0c;海報作為一種重要的宣傳媒介&#xff0c;其設計質量和效率直接影響著宣傳效果。為了滿足廣大用戶對于個性化、高效制作海報的需求&#xff0c;海報在線制作系統源碼應運而生。本文將詳細介紹海報在線制作系統源碼的特…

AbMole - 腫瘤發展與免疫器官的“舞蹈”:一場細胞層面的時間賽跑

在生物醫學領域&#xff0c;腫瘤與免疫系統之間的相互作用一直是研究的熱點話題。腫瘤細胞不是孤立存在的&#xff0c;它們與宿主的免疫系統進行著一場復雜的“舞蹈”。 最近&#xff0c;一項發表在《Molecular & Cellular Proteomics》雜志上的研究&#xff0c;為我們揭開…

【C++】二分查找算法

1.題目 2.算法思路 暴力解法&#xff1a;可以將數組遍歷一遍&#xff0c;就可以找到。時間復雜度為O(n)。不算太差&#xff0c;可以接受。 但是有更優秀的解法&#xff1a; 就是二分查找算法。 算法的特點&#xff1a;我們所查找的“數組”具有二段性。這里的二段性不一定有…

頭歌OpenGauss數據庫-L.應用開發(Python)-選做

第1關:簡單查詢 編程要求 正確使用 psycopg2 ,查詢金融應用場景數據庫 finance 的 client 表(客戶表)中郵箱不為空的客戶信息,列出客戶姓名,郵箱和電話.一個展示結果的示例如下(字體顏色不是編程要求): 注意:你要連接到finance數據庫上(后面第2-6關也是連接這個數據庫)…

【C/C++】詳解關聯容器map的使用

&#x1f517; 運行環境&#xff1a;Matlab &#x1f6a9; 撰寫作者&#xff1a;左手の明天 &#x1f947; 精選專欄&#xff1a;《python》 &#x1f525; 推薦專欄&#xff1a;《算法研究》 &#x1f510;#### 防偽水印——左手の明天 ####&#x1f510; &#x1f497; 大家…

mpv常用快捷鍵

1 mpv mpv是Linux下的一個開源視頻播放器&#xff0c;使用Manjaro的話安裝方式如下&#xff1a; paru -S mpv2 常用快捷鍵 q&#xff1a;推出w/e&#xff1a;視頻縮放r/t&#xff1a;調整字幕位置u&#xff1a;開啟/關閉ass/ssa字幕覆蓋i&#xff1a;顯示當前播放的視頻信息…

Oracle 并行和 session 數量的

這也就是為什么我們指定parallel為4&#xff0c;而實際并行度為8的原因。 insert create index&#xff0c;發現并行數都是加倍的 Indexes seem always created with parallel degree 1 during import as seen from a sqlfile. The sql file shows content like: CREATE INDE…

求平方數 1 到 N 之間所有正整數的平方數

概念&#xff1a; 平方數的概念&#xff1a; 平方數是指一個數的平方等于另一個數的數&#xff0c;具有正平方數和負平方數&#xff0c;其性質和運用在多領域中具有重要意義&#xff0c;如幾何、自然科學、計算機科學和物理學。平方數的計算和運用在多領域中常見&#xff0c;例…

滑不動窗口的秘密—— “滑動窗口“算法 (Java版)

本篇會加入個人的所謂魚式瘋言 ??????魚式瘋言:??????此瘋言非彼瘋言 而是理解過并總結出來通俗易懂的大白話, 小編會盡可能的在每個概念后插入魚式瘋言,幫助大家理解的. &#x1f92d;&#x1f92d;&#x1f92d;可能說的不是那么嚴謹.但小編初心是能讓更多人能接…

《python編程從入門到實踐》day39

# 昨日知識點回顧 創建主頁、繼承模版、顯示特定主題頁面 # view.py from django.shortcuts import render# 導入所需數據相關聯的模型 from .models import Topic# Create your views here. def index(request):"""學習筆記的主頁"""#…

Java進階學習筆記13——抽象類

認識抽象類&#xff1a; 當我們在做子類共性功能抽取的時候&#xff0c;有些方法在父類中并沒有具體的體現&#xff0c;這個時候就需要抽象類了。在Java中&#xff0c;一個沒有方法體的方法應該定義為抽象方法&#xff0c;而類中如果有抽象方法&#xff0c;該類就定義為抽象類…

ISCC2024個人挑戰賽WP-迷失之門

&#xff08;非官方解&#xff0c;以下內容均互聯網收集的信息和個人思路&#xff0c;僅供學習參考&#xff09; 迷失之門 方法一&#xff1a; IDA看一下 check函數邏輯 進入到check2函數 R鍵將ascii碼轉字符&#xff0c;寫出逆向腳本 #include <stdio.h> #include &l…

嵌入式0基礎開始學習 Ⅱ 數據結構(7)小結練習

1,如果使用比較高效的算法判斷單鏈表有沒有環的算法中&#xff0c;至少需要幾個指針&#xff1f; A,1 B,2 C,3 D,4 2&#xff0c;以鏈接方式存儲的線性表(X1,X2,...,Xn),當訪問第i個元素的時間復雜度為? A,o(1) B,o(n) C,o(logn) Do(n) 3,下列鏈表中&…

Linux C++ Socket 套接字、select、poll、epoll 實例

文章目錄 1. 概述2. TCP 網絡編程實例2.1 服務器端2.2 客戶端2.3 運行截圖 3. I/O 模型3.1 阻塞式I/O模型3.2 非阻塞I/O模型3.3 I/O 復用模型3.4 信號驅動式I/O3.5 異步I/O模型 4. I/O復用之 select4.1 select 函數描述4.2 服務端代碼4.3 客戶端代碼4.4 運行截圖 5. I/O復用之 …

RocketMq局部順序消息

package com.ldj.rocketmq.producer;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;/*** User: ldj* Date: 2024/5/26* Time: 15:09* Description: 局部順序消…

【Linux】$()中的內容與不加$()時有什么區別

$()中的內容與不加$()有什么區別&#xff0c;例如$(/usr/local/hadoop/bin/hadoop classpath)與/usr/local/hadoop/bin/hadoop classpath兩者有何區別&#xff1f;&#xff1f;&#xff1f; 關于這個問題&#xff0c;筆者建議可以參考如下文章&#xff1a; Linux—shell中$((…

css卡片翻轉 父元素翻轉子元素不翻轉效果

css卡片翻轉 父元素翻轉子元素不翻轉效果 vue <div class"moduleBox"><div class"headTitle"><span class"headName">大額案例</span></div><div class"moduleItem"><span class"module…

three.js判斷物體在人的前面,還是后面

three.js判斷物體在人的前面&#xff0c;還是后面 const player new THREE.Vectors(10, 0, 5); const mesh new THREE.Vectors(15, 0, 6);上面&#xff0c;兩個變量分別表示&#xff0c;玩家的位置&#xff0c;物體的位置。 從這發現&#xff0c;當玩家和物體的角度關系 小…