TiDB數據庫從入門到精通系列之六:使用 TiCDC 將 TiDB 的數據同步到 Apache Kafka

TiDB數據庫從入門到精通系列之六:使用 TiCDC 將 TiDB 的數據同步到 Apache Kafka

  • 一、技術流程
  • 二、搭建環境
  • 三、創建Kafka changefeed
  • 四、寫入數據以產生變更日志
  • 五、配置 Flink 消費 Kafka 數據

一、技術流程

  • 快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群
  • 創建 changefeed,將 TiDB 增量數據輸出至 Kafka
  • 使用 go-tpc 寫入數據到上游 TiDB
  • 使用 Kafka console consumer 觀察數據被寫入到指定的 Topic
  • (可選)配置 Flink 集群消費 Kafka 內數據

二、搭建環境

部署包含 TiCDC 的 TiDB 集群

在實驗或測試環境中,可以使用 TiUP Playground 功能,快速部署 TiCDC,命令如下:

tiup playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1
# 查看集群狀態
tiup status

三、創建Kafka changefeed

1.創建 changefeed 配置文件

根據 Flink 的要求和規范,每張表的增量數據需要發送到獨立的 Topic 中,并且每個事件需要按照主鍵值分發 Partition。因此,需要創建一個名為 changefeed.conf 的配置文件,填寫如下內容:

[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}", partition="index-value"},
]

2.創建一個 changefeed,將增量數據輸出到 Kafka

tiup ctl:v<CLUSTER_VERSION> cdc changefeed 
create --server="http://127.0.0.1:8300" 
--sink-uri="kafka://127.0.0.1:9092/kafka-topic-name?protocol=canal-json" 
--changefeed-id="kafka-changefeed" 
--config="changefeed.conf"

如果命令執行成功,將會返回被創建的 changefeed 的相關信息,包含被創建的 changefeed 的 ID 以及相關信息,內容如下:

Create changefeed successfully!
ID: kafka-changefeed
Info: {... changfeed info json struct ...}

如果命令長時間沒有返回,你需要檢查當前執行命令所在服務器到 sink-uri 中指定的 Kafka 機器的網絡可達性,保證二者之間的網絡連接正常。

生產環境下 Kafka 集群通常有多個 broker 節點,你可以在 sink-uri 中配置多個 broker 的訪問地址,這有助于提升 changefeed 到 Kafka 集群訪問的穩定性,當部分被配置的 Kafka 節點故障的時候,changefeed 依舊可以正常工作。假設 Kafka 集群中有 3 個 broker 節點,地址分別為 127.0.0.1:9092 / 127.0.0.2:9092 / 127.0.0.3:9092,可以參考如下 sink-uri 創建 changefeed:

tiup ctl:v<CLUSTER_VERSION> cdc changefeed create 
--server="http://127.0.0.1:8300" 
--sink-uri="kafka://127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092/kafka-topic-name?protocol=canal-json&partition-num=3&replication-factor=1&max-message-bytes=1048576" 
--config="changefeed.conf"

3.Changefeed 創建成功后,執行如下命令,查看 changefeed 的狀態

tiup ctl:v<CLUSTER_VERSION> cdc changefeed list --server="http://127.0.0.1:8300"

四、寫入數據以產生變更日志

完成以上步驟后,TiCDC 會將上游 TiDB 的增量數據變更日志發送到 Kafka,下面對 TiDB 寫入數據,以產生增量數據變更日志。

1.模擬業務負載

在測試實驗環境下,可以使用 go-tpc 向上游 TiDB 集群寫入數據,以讓 TiDB 產生事件變更數據。如下命令,首先在上游 TiDB 創建名為 tpcc 的數據庫,然后使用 TiUP bench 寫入數據到這個數據庫中。

tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare
tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s

2.消費 Kafka Topic 中的數據

changefeed 正常運行時,會向 Kafka Topic 寫入數據,你可以通過由 Kafka 提供的 kafka-console-consumer.sh,觀測到數據成功被寫入到 Kafka Topic 中:

./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic `${topic-name}`

至此,TiDB 的增量數據變更日志就實時地復制到了 Kafka。下一步,你可以使用 Flink 消費 Kafka 數據。當然,你也可以自行開發適用于業務場景的 Kafka 消費端。

五、配置 Flink 消費 Kafka 數據

1.安裝 Flink Kafka Connector

在 Flink 生態中,Flink Kafka Connector 用于消費 Kafka 中的數據并輸出到 Flink 中。Flink Kafka Connector 并不是內建的,因此在 Flink 安裝完畢后,還需要將 Flink Kafka Connector 及其依賴項添加到 Flink 安裝目錄中。下載下列 jar 文件至 Flink 安裝目錄下的 lib 目錄中,如果你已經運行了 Flink 集群,請重啟集群以加載新的插件。

  • flink-connector-kafka-1.17.1.jar
  • flink-sql-connector-kafka-1.17.1.jar
  • kafka-clients-3.5.1.jar

2.創建一個表

可以在 Flink 的安裝目錄執行如下命令,啟動 Flink SQL 交互式客戶端:

[root@flink flink-1.15.0]# ./bin/sql-client.sh

隨后,執行如下語句創建一個名為 tpcc_orders 的表:

CREATE TABLE tpcc_orders (o_id INTEGER,o_d_id INTEGER,o_w_id INTEGER,o_c_id INTEGER,o_entry_d STRING,o_carrier_id INTEGER,o_ol_cnt INTEGER,o_all_local INTEGER
) WITH (
'connector' = 'kafka',
'topic' = 'tidb_tpcc_orders',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest'
)

請將 topic 和 properties.bootstrap.servers 參數替換為環境中的實際值。

3.查詢表內容

執行如下命令,查詢 tpcc_orders 表中的數據:

SELECT * FROM tpcc_orders;

執行成功后,可以觀察到有數據輸出,如下圖

在這里插入圖片描述
至此,就完成了 TiDB 與 Flink 的數據集成。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/40019.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/40019.shtml
英文地址,請注明出處:http://en.pswp.cn/news/40019.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【網絡編程系列】網絡編程實戰

&#x1f49d;&#x1f49d;&#x1f49d;歡迎來到我的博客&#xff0c;很高興能夠在這里和您見面&#xff01;希望您在這里可以感受到一份輕松愉快的氛圍&#xff0c;不僅可以獲得有趣的內容和知識&#xff0c;也可以暢所欲言、分享您的想法和見解。 推薦:kuan 的首頁,持續學…

使用Vue.js框架的指令和事件綁定實現一個購物車的頁面布局

使用了v-model指令來實現全選/全不選的功能&#xff0c;當全選框被點擊時&#xff0c;isAllChecked的值會被改變。使用了v-if指令來判斷購物車中是否有商品&#xff0c;如果有商品則渲染商品列表&#xff0c;否則顯示購物車為空的提示。使用了v-for指令來遍歷datalist數組&…

jvm內存溢出排查(使用idea自帶的內存泄漏分析工具)

文章目錄 1.確保生成內存溢出文件2.使用idea自帶的內存泄漏分析工具3.具體實驗一下 1.確保生成內存溢出文件 想分析堆內存溢出&#xff0c;一定在運行jar包時就寫上參數-XX:HeapDumpOnOutOfMemoryError&#xff0c;可以看我之前關于如何運行jar包的文章。若你沒有寫。可以寫上…

Keepalived入門指南:實現故障轉移和負載均衡

文章目錄 一、簡介1. Keepalived概述2. 高可用性和負載均衡的重要性 二、故障轉移1. 什么是故障轉移2. Keepalived的故障轉移原理a) VRRP協議b) 虛擬路由器ID和優先級 3. 配置Keepalived實現故障轉移a) 主備服務器的設置b) 監控網絡接口c) 虛擬IP的配置d) 備份服務器接管流程 三…

Python學習筆記_基礎篇(九)_面向對象編程

本篇內容: 1、反射2、面向對象編程3、面向對象三大特性4、類成員5、類成員修飾符6、類的特殊成員7、單例模式 反射 python中的反射功能是由以下四個內置函數提供&#xff1a;hasattr、getattr、setattr、delattr&#xff0c;改四個函數分別用于對對象內部執行&#xff1a;檢…

el-form自定義校驗規則

Vue 的 el-form 組件可以使用自定義校驗規則進行表單驗證。自定義校驗規則可以通過傳遞一個函數來實現&#xff0c;該函數接受要校驗的字段的值作為參數&#xff0c;并返回一個布爾值或一個 Promise 對象。 下面是一個示例&#xff0c;演示如何在 el-form 中使用自定義校驗規則…

若依前端npm run dev啟動時報錯

本文主要解決問題:若依前端npm run dev啟動時報錯,解決辦法。 目錄 1、第1種解決方案(親測有效) 2、第2種解決方案(親測有效) Error: error:0308010C:digital envelope routines::unsupportedat new Hash (node:internal/crypto/hash:67:19)at Object.createHash (node…

解決 adb install 錯誤INSTALL_FAILED_UPDATE_INCOMPATIBLE

最近給游戲出包&#xff0c;平臺要求 v1 簽名吧&#xff0c;AS 打包后&#xff0c;adb 執行安裝到手機&#xff0c;我用的設備是google pixel6 , android 系統 13&#xff0c; 提示如下&#xff1a; adb install -r v5_android_202308161046.apk Performing Streamed Install a…

centos 安裝.net 6 sdk

按照以下步驟在 CentOS 上安裝 .NET 6 SDK&#xff1a; 更新系統&#xff1a; sudo yum update安裝依賴項&#xff1a; sudo yum install -y curl libunwind libicu下載并添加 Microsoft 的軟件包存儲庫密鑰&#xff1a; sudo rpm -Uvh https://packages.microsoft.com/config/…

單片機第一季:零基礎13——AD和DA轉換

1&#xff0c;AD轉換基本概念 51 單片機系統內部運算時用的全部是數字量&#xff0c;即0 和1&#xff0c;因此對單片機系統而言&#xff0c;無法直接操作模擬量&#xff0c;必須將模擬量轉換成數字量。所謂數字量&#xff0c;就是用一系列0 和1 組成的二進制代碼表示某個信號大…

Linux -- 進階 Autofs自動掛載服務 實驗詳解

服務端創建共享目錄&#xff0c; 客戶端實現自動掛載 第一步 &#xff1a; 客戶端&#xff0c;服務端 均關閉安全軟件 [rootserver ~]# setenforce 0 [rootserver ~]# systemctl stop firewalld [rootnode1 ~]# setenforce 0 [rootnode1 ~]# systemctl stop firewalld 第二…

在K8s上處理nginx

基本說明 創建一個名為ssl的TLS類型的Secret對象&#xff0c;用于存儲證書和密鑰信息。 kubectl create secret tls ssl --certserver.crt --keyserver.key配置Nginx的events塊&#xff0c;設置worker連接數為1024。 events {worker_connections 1024; }配置Nginx的http塊&a…

MyBaits(單獨使用,與整合無關)小白版

文章目錄 概述比較配置寫xml加載上面配置并執行加載配置的方法方式一 執行方法方式一方式二(MyBatis映射器) 寫配置文件的映射文件設置對象的別名&#xff08;簡寫&#xff09;獲取自動生成的主鍵 查詢結果和java的映射規則基本類型映射&#xff1a;簡單對象映射&#xff1a;嵌…

加鹽加密算法

MD5加密加鹽加密項目密碼升級 MD5加密 MD5一系列公式進行復雜數學運算&#xff1b;特點&#xff1a;&#xff08;用途校驗和、計算hash值方式、加密&#xff09; 1&#xff1a;定長&#xff1b;無論原始數據多長&#xff1b;算出的結果都是4或者8字節的版本。 2&#xff1a;沖…

Java多線程實戰

Java多線程實戰 java多線程&#xff08;超詳細&#xff09; java自定義線程池總結 Java創建線程方式 方法1&#xff0c;繼承Thread類 方法2&#xff0c;實現Runable接口 方法2-2&#xff0c;匿名內部類形式lambda表達式 方法3&#xff0c;實現Callable接口&#xff0c;允許…

【深入理解Linux內核鎖】三、原子操作

我的圈子: 高級工程師聚集地 我是董哥,高級嵌入式軟件開發工程師,從事嵌入式Linux驅動開發和系統開發,曾就職于世界500強企業! 創作理念:專注分享高質量嵌入式文章,讓大家讀有所得! 文章目錄 1、原子操作思想2、整型變量原子操作2.1 API接口2.2 API實現2.2.1 原子變量結…

Shell 函數

Shell 函數 linux shell 可以用戶定義函數&#xff0c;然后在shell腳本中可以隨便調用。 shell中函數的定義格式如下&#xff1a; [ function ] funname [()] { action; [return int;] } 說明&#xff1a; 1、可以帶function fun() 定義&#xff0c;也可以直接fun…

log4j:WARN No appenders could be found for logger問題

本文將idea場景下的使用。 IDEA中&#xff0c;將配置文件命名為log4j.properties&#xff08;該命名才會被自動加載&#xff09;&#xff0c; 并放到某個目錄下&#xff08;通常放到resources目錄&#xff09;&#xff0c;并在resources上右鍵&#xff0c;找到Mark Directory a…

微信程序 自定義遮罩層遮不住底部tabbar解決

一、先上效果 二 方法 1、自定義底部tabbar 實現&#xff1a; https://developers.weixin.qq.com/miniprogram/dev/framework/ability/custom-tabbar.html 官網去抄 簡單寫下&#xff1a;在代碼根目錄下添加入口文件 除了js 文件的list 需要調整 其他原封不動 代碼&#xf…

Hadoop數據遷移distcp

Hadoop數據遷移distcp 準備工作 確認源集群&#xff08;a&#xff09;,目標集群&#xff08;b&#xff09;確認a集群的主節點和b集群的主節點確認兩個集群的網絡相通確認遷移模式&#xff08;全量遷移還是增量遷移&#xff09;&#xff0c;這里選擇全量遷移 遷移文件 遷移t…