速通FlinkCDC3.0

1.FlinkCDC概述

1.1FlinkCDC是什么?

? ? ? ? FlinkCDC(Flink Change Data Capture)是一個用于實時捕獲數據庫變更日志的工具,它可以將數據庫的變更實時同步到ApacheFlink系統中。

1.2 FlinkCDC的三個版本?

? ? ? ? 1.x 這個版本的FlinkCDC的提供了DataStream以及FlinkSQL的方式實現數據的動態獲取

? ? ? ? 存在的問題:

? ? ? ? 就是在生產環境中,我們在同步環境的時候,萬一前腳剛同步了數據,后腳就被修改了怎么辦呢?這樣我們不就是讀到了,不正確的數據了。因此,在FlinkCDC1.x的版本中的解決方案是,在讀表的過程中,鎖住整張表,這時候不會有新的數據寫入了。但是由此又帶來了一個新的問題,生產環境中時時刻刻就是會有新的數據寫入的,如果鎖住整張表,就會對線上產生很多問題。所以迎來了2.x版本。

? ? ? ? 2.x 這個版本提供了豐富的數據庫對接以及增加全量的同步鎖表的解決問題的解決方案。

? ? ? ? 提供API或者FlinkSql去進行操作,打包代碼上傳到集群去進行操作,但是我們的本身任務并不復雜,就是一個導數據的任務,所以需要更簡單的方法去實現數據導入的作用。

? ? ? ? 3.x 這個版本提供了StreamingETL方式導入數據方案。?

? ? ? ? FlinkCDC在這個版本形成了自己的框架,可以像平時的那些框架文件如果spark,hadoop一樣又bin,conf等文件夾,所以我們在使用FlinkCDC的時候就是可以直接在Conf中配置Resource(要導入的數據庫)sink(目標文件),可以通過命令啟動來進行同步。

順帶提一下兩種CDC的同步方式:

? ? ? ? CDC一種是通過查詢的方式和通過Binlog兩種方式,簡單說一下兩種的不同

cdc對比
基于查詢的CDC基于binlog的cdc
產品

Sqoop、DataX

Canal
執行模式BatchStreaming
是否可以檢測到所有變化否(同步最終態)
延遲性延遲高(按天進行同步)低延遲
增加數據庫壓力

2.flinkCDC 同步mysql數據庫數據到doris

? ? ? ? 2.1 環境準備

? ? ? ? ? ?1)安裝FlinkCDC

? ? ? ? ? ? ? ?flinkCDC下載地址?https://pan.baidu.com/s/1_BKPxommK5dsY3hD7rYVUA 提取碼: pisv?

tar -zxvf flink-cdc-3.0.0-bin.tar.gz -C /opt/module/

? ? ? ? 2)向FlinkCDC的目錄下的lib目錄下傳入Mysql以及Doris 的依賴包

? ? ? ? doris的jar包
????????https://pan.baidu.com/s/1pgtsYT9VyXD1U4RbjYA6rg 提取碼: kx2q?
? ? ? ? mysql的jar包
????????https://pan.baidu.com/s/1pxCy0-iSutqN9YjdzGAfZw 提取碼: p65d?
? ? ? ? 還需要一個mysql-connector的jar
????????https://pan.baidu.com/s/1lzJuQRPL3KtDqDXaoGBSMQ 提取碼: dwnw?

? ? ? ? 為啥還需要已經有了mysql的jar包了還需要一個mysql-connector?

????????是因為mysql的jar包依賴于mysql-connector,

? ? ? ? 為啥不封裝在一起?

? ? ? ? 首先來說就是MySQL的jar相當于對數據庫一個能力的封裝底層可以用別的connector,也是為了解耦,還有一個原因就是兩個包所用的協議不一樣,上面的這個msyql的jar包是用的apache的協議。??

? ? ? ? 為啥用了mysql的驅動包,不用doris的驅動包?

? ? ? ? 因為doris兼容mysql的協議。

? ? ? ? 2.2 同步變更

? ? ? ? 編寫 MySQL同步到doris的配置文件

? ? ? ? 可以選擇在FlinkCDC中創建一個單獨的文件夾寫配置文件,也可以寫在conf的目錄下。

vim mysql-to-doris.yaml

source:

?#數據源的數據庫類型

??type: mysql

?#地址/主機名稱

??hostname: hadoop103

?#端口號

??port: 3306

?#數據庫用戶名

??username: root

? #數據庫密碼

??password: "000000"

? #要同步的表名

? tables: test.\.*

? #ServerID 下面詳細解釋

??server-id: 5400-5404

?#時區

??server-time-zone: UTC+8

sink:

? #目標數據庫類型

??type: doris

? #目標數據庫物理存儲主機名加端口號

??fenodes: hadoop102:7030

? #數據庫用戶名

??username: root

??#數據庫密碼

??password: "000000"

?#是否同步表的初始變化,就是類似新增字段之類的

??table.create.properties.light_schema_change: true

?#副本數

??table.create.properties.replication_num: 1

pipeline:

? #任務名稱

??name: Sync MySQL Database to Doris

? #并行任務數量

??parallelism: 1

server-id 的作用:
MySQL復制標識:在MySQL主從復制中,每個從庫必須有一個唯一的server-id來標識自己。同樣,當你的CDC工具連接MySQL時,它實際上扮演了一個MySQL從庫的角色,通過binlog來獲取數據變更。

避免沖突:如果你有多個CDC工具或從庫連接同一個MySQL主庫,每個實例必須有不同的server-id,否則會導致沖突和數據不一致。

????????這種配置方式通常在分布式或并行環境中使用,允許多個任務實例使用不同的server-id(在你的配置中parallelism: 4表示并行度為4,所以需要4個不同的server-id)。

? ? ? ? 啟動環境

? ? ? ? 1)開啟Flink集群

? ? ? ? 首先要添加如下配置

vim conf/flink-conf.yaml

添加如下配置信息

execution.checkpointing.interval: 5000

#啟動集群
bin/start-cluster.sh

? ? ? ? 2)開啟doris的FE

bin/start_fe.sh

? ? ? ? 3)? 開啟Doris的BE

bin/start_be.sh

? ? ? ? 4)啟動FlinkCDC同步變更任務

? ? ? ? 尚硅谷給的是這個命令,但是我用這個命令不行

flink-cdc-3.0.0]$ bin/flink-cdc.sh job/mysql-to-doris.yaml

? ? ? ? 我用的這個可以

bin/flink-cdc.sh config/你的配置文件 --jar lib/mysql....

? ? ? ? 然后刷新數據庫觀察結果

? ? ? ? 以上情況適用于就是我們的主庫mysql的數據庫名.表名,在doris中的數據庫名.表名是一樣。如果doris中的表名不一樣就用到下面的路由變更。

2.3路由變更

ource:

??type: mysql

??hostname: hadoop103

??port: 3306

??username: root

??password: "000000"

??tables: test_route.\.*

??server-id: 5400-5404

??server-time-zone: UTC+8

sink:

??type: doris

??fenodes: hadoop102:7030

??benodes: hadoop102:7040

??username: root

??password: "000000"

??table.create.properties.light_schema_change: true

??table.create.properties.replication_num: 1

#增加了路由規則

route:

??- source-table: test_route.t1

????sink-table: doris_test_route.doris_t1

??- source-table: test_route.t2

????sink-table: doris_test_route.doris_t1

??- source-table: test_route.t3

????sink-table: doris_test_route.doris_t3

pipeline:

??name: Sync MySQL Database to Doris

??parallelism: 1

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/77677.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/77677.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/77677.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

B+樹節點與插入操作

B樹節點與插入操作 設計B樹節點 在設計B樹的數據結構時,我們首先需要定義節點的格式,這將幫助我們理解如何進行插入、刪除以及分裂和合并操作。以下是對B樹節點設計的詳細說明。 節點格式概述 所有的B樹節點大小相同,這是為了后續使用自由…

C# 檢查字符串是否包含在另一個字符串中

string shopList "我是大浪,你的小狼"; this.ShopId"你的小狼"; bool existsShopId false; if (!string.IsNullOrEmpty(shopList)) {existsShopId shopList.Split(,).Any(part > part.Trim() this.ShopId); }檢查 goodsIdSet 中的每個元素是否都在 …

珈和科技遙感賦能農業保險創新 入選省級衛星應用示范標桿

為促進空天信息與數字經濟深度融合,拓展衛星數據應用場景價值,提升衛星數據應用效能和用戶體驗,加速衛星遙感技術向民生領域轉化應用,近日,湖北省國防科工辦組織開展了2024年湖北省衛星應用示范項目遴選工作。 經多渠…

深入理解 React 組件的生命周期:從創建到銷毀的全過程

React 作為當今最流行的前端框架之一,其組件生命周期是每個 React 開發者必須掌握的核心概念。本文將全面剖析 React 組件的生命周期,包括類組件的各個生命周期方法和函數組件如何使用 Hooks 模擬生命周期行為,幫助開發者編寫更高效、更健壯的…

緩存 --- Redis性能瓶頸和大Key問題

緩存 --- Redis性能瓶頸和大Key問題 內存瓶頸網絡瓶頸CPU 瓶頸持久化瓶頸大key問題優化方案 Redis 是一個高性能的內存數據庫,但在實際使用中,可能會在內存、網絡、CPU、持久化、大鍵值對等方面遇到性能瓶頸。下面從這些方面詳細分析 Redis 的性能瓶頸&a…

Python爬蟲與代理IP:高效抓取數據的實戰指南

目錄 一、基礎概念解析 1.1 爬蟲的工作原理 1.2 代理IP的作用 二、環境搭建與工具選擇 2.1 Python庫準備 2.2 代理IP選擇技巧 三、實戰步驟分解 3.1 基礎版:單線程免費代理 3.2 進階版:多線程付費代理池 3.3 終極版:Scrapy框架自動…

Nginx HTTP 414 與“大面積”式洪水攻擊聯合防御實戰

一、引言 在大規模分布式應用中,Nginx 常作為前端負載均衡和反向代理服務器。攻擊者若結合超長 URI/頭部攻擊(觸發 HTTP 414)與海量洪水攻擊,可在網絡層與應用層形成雙重打擊:一方面耗盡緩沖區和內存,另一…

【上位機——MFC】運行時類信息機制

運行時類信息機制的使用 類必須派生自CObject類內必須添加聲明宏DECLARE_DYNAMIC(theClass)3.類外必須添加實現宏 IMPLEMENT_DYNAMIC(theClass,baseClass) 具備上述三個條件后&#xff0c;CObject::IsKindOf函數就可以正確判斷對象是否屬于某個類。 代碼示例 #include <…

Maven插件管理的基本原理

&#x1f9d1; 博主簡介&#xff1a;CSDN博客專家&#xff0c;歷代文學網&#xff08;PC端可以訪問&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移動端可微信小程序搜索“歷代文學”&#xff09;總架構師&#xff0c;15年工作經驗&#xff0c;精通Java編…

卷積神經網絡--手寫數字識別

本文我們通過搭建卷積神經網絡模型&#xff0c;實現手寫數字識別。 pytorch中提供了手寫數字的數據集 &#xff0c;我們可以直接從pytorch中下載 MNIST中包含70000張手寫數字圖像&#xff1a;60000張用于訓練&#xff0c;10000張用于測試 圖像是灰度的&#xff0c;28x28像素 …

大文件分片上傳進階版(新增md5校驗、上傳進度展示、并行控制,智能分片、加密上傳、斷點續傳、自動重試),實現四位一體的網絡感知型大文件傳輸系統?

上篇文章我們總結了大文件分片上傳的主要核心&#xff0c;但是我對md5校驗和上傳進度展示這塊也比較感興趣&#xff0c;所以在deepseek的幫助下&#xff0c;擴展了一下我們的代碼&#xff0c;如果有任何問題和想法&#xff0c;非常歡迎大家在評論區與我交流&#xff0c;我需要學…

C# 點擊導入,將需要的參數傳遞到彈窗的頁面

點擊導入按鈕&#xff0c;獲取本頁面的datagridview標題的結構&#xff0c;并傳遞到導入界面。 新增一個datatable用于存儲datagridview的caption和name&#xff0c;這里用的是devexpress組件中的gridview。 DataTable dt new DataTable(); DataColumn CAPTION …

android的 framework 是什么

Android的Framework&#xff08;框架&#xff09;是Android系統的核心組成部分&#xff0c;它為開發者提供了一系列的API&#xff08;應用程序編程接口&#xff09;&#xff0c;使得開發者能夠方便地創建各種Android應用。以下是關于它的詳細介紹&#xff1a; 位置與架構 在A…

【MySQL】表的約束(主鍵、唯一鍵、外鍵等約束類型詳解)、表的設計

目錄 1.數據庫約束 1.1 約束類型 1.2 null約束 — not null 1.3 unique — 唯一約束 1.4 default — 設置默認值 1.5 primary key — 主鍵約束 自增主鍵 自增主鍵的局限性&#xff1a;經典面試問題&#xff08;進階問題&#xff09; 1.6 foreign key — 外鍵約束 1.7…

數據結構-C語言版本(三)棧

數據結構中的棧&#xff1a;概念、操作與實戰 第一部分 棧分類及常見形式 棧是一種遵循后進先出(LIFO, Last In First Out)原則的線性數據結構。棧主要有以下幾種實現形式&#xff1a; 1. 數組實現的棧&#xff08;順序棧&#xff09; #define MAX_SIZE 100typedef struct …

如何以特殊工藝攻克超薄電路板制造難題?

一、超薄PCB的行業定義與核心挑戰 超薄PCB通常指厚度低于1.0毫米的電路板&#xff0c;而高端產品可進一步壓縮至0.4毫米甚至0.2毫米以下。這類電路板因體積小、重量輕、熱傳導性能優異&#xff0c;被廣泛應用于折疊屏手機、智能穿戴設備、醫療植入器械及新能源汽車等領域。然而…

AI 賦能 3D 創作!Tripo3D 全功能深度解析與實操教程

大家好&#xff0c;歡迎來到本期科技工具分享&#xff01; 今天要給大家帶來一款革命性的 AI 3D 模型生成平臺 ——Tripo3D。 無論你是游戲開發者、設計師&#xff0c;還是 3D 建模愛好者&#xff0c;只要想降低創作門檻、提升效率&#xff0c;這款工具都值得深入了解。 接下…

如何理解抽象且不易理解的華為云 API?

API的概念在華為云的使用中非常抽象&#xff0c;且不容易理解&#xff0c;用通俗的語言 形象的比喻來講清楚——什么是華為云 API&#xff0c;怎么用&#xff0c;背后原理&#xff0c;以及主要元素有哪些&#xff0c;盡量讓新手也能明白。 &#x1f9e0; 一句話先理解&#xf…

第 7 篇:總結與展望 - 時間序列學習的下一步

第 7 篇&#xff1a;總結與展望 - 時間序列學習的下一步 (圖片來源: Guillaume Hankenne on Pexels) 恭喜你&#xff01;如果你一路跟隨這個系列走到了這里&#xff0c;那么你已經成功地完成了時間序列分析的入門之旅。我們從零開始&#xff0c;一起探索了時間數據的基本概念、…

PPT無法編輯怎么辦?原因及解決方法全解析

在日常辦公中&#xff0c;我們經常會遇到需要編輯PPT的情況。然而&#xff0c;有時我們會發現PPT文件無法編輯&#xff0c;這可能由多種原因引起。今天我們來看看PPT無法編輯的幾種常見原因&#xff0c;并提供實用的解決方法&#xff0c;幫助你輕松應對。 原因1&#xff1a;文…