基于Flink 1.20、StarRocks與TiCDC構建高效數據處理鏈路教程

在大數據處理領域,實現高效、實時的數據處理與分析至關重要。Flink作為強大的流批一體化計算框架,結合StarRocks這一高性能的實時分析型數據庫,再搭配TiCDC(TiDB Change Data Capture)用于捕獲數據變更,能夠構建出極為高效的數據處理鏈路。本教程將詳細介紹如何利用這些技術實現從MySQL數據源抽取數據,經Flink處理后寫入StarRocks的完整流程,并對相關表結構和字段進行合理抽象與調整,以保障數據處理的通用性與安全性。

一、技術簡介

Flink 1.20

Flink 1.20是Apache Flink的一個重要版本,它進一步強化了流批一體的計算能力。在流處理方面,其能夠以低延遲處理大規模的實時數據流;而在批處理場景下,也具備高效的性能表現。Flink提供了豐富的連接器(Connector),方便與各類數據源和數據存儲系統進行對接,同時支持使用SQL進行數據處理操作,大大降低了開發成本,提升了開發效率。

StarRocks

StarRocks是一款高性能的實時分析型數據庫,采用MPP(Massively Parallel Processing)架構,能夠對海量數據進行亞秒級的查詢分析。它支持多種數據模型,包括聚合模型、主鍵模型等,適用于各類數據分析場景,如報表生成、實時看板、即席查詢等。StarRocks通過其高效的存儲和查詢引擎,以及對多種數據格式的支持,為數據的快速分析提供了有力保障。

TiCDC

TiCDC是TiDB生態中的數據變更捕獲工具,它基于TiDB的分布式事務和MVCC(Multi-Version Concurrency Control)機制,能夠實時捕獲TiDB數據庫中的數據變更,包括增、刪、改操作。TiCDC將這些變更數據以有序的方式輸出,為數據同步、實時數據處理等場景提供了可靠的數據源。在本教程中,雖然我們主要從MySQL數據源抽取數據,但TiCDC的原理和應用思路可作為擴展參考,在涉及TiDB數據源時能夠快速遷移應用。

二、環境準備

安裝與配置Flink 1.20

  1. 下載Flink 1.20.0:通過curl命令下載安裝包,執行 curl -k -O https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
  2. 解壓文件:使用命令 tar -xzvf flink-1.20.0-bin-scala_2.12.tgz 解壓下載的壓縮包。
  3. 移動到目標目錄(可選):可將解壓后的Flink目錄移動到 /opt 或其他目標位置,例如執行 sudo mv flink-1.20.0 /opt/flink
  4. 配置環境變量:編輯 ~/.bashrc 文件,添加如下內容:
export FLINK_HOME=/opt/flink
export PATH=$FLINK_HOME/bin:$PATH

保存并退出文件后,運行 source ~/.bashrc 使修改生效。
5. 配置Flink:Flink默認已配置一些基本設置。若無需集群配置,可跳過 mastersworkers 文件的配置。如需調整參數,如內存配置或其他作業配置,可修改Flink配置文件 config.yaml,該文件位于 /opt/flink/conf 目錄下。例如,將 bind-host 設置從 localhost 改為 0.0.0.0,使Flink能夠綁定所有網絡接口,修改如下:

jobmanager:bind-host: 0.0.0.0
rpc:address: 0.0.0.0port: 6123
memory:process:size: 1600m
execution:failover-strategy: region
taskmanager:bind-host: 0.0.0.0host: 0.0.0.0numberOfTaskSlots: 1
memory:process:size: 1728m
parallelism:address: 0.0.0.0bind-address: 0.0.0.0
  1. 啟動Flink:進入Flink目錄,執行 ./bin/start-cluster.sh 啟動Flink。若要關閉Flink,執行 ./bin/stop-cluster.sh。啟動后,可通過瀏覽器訪問Flink Web UI,默認地址為 http://<your_server_ip>:8081(例如 http://192.168.1.1:8081),以查看Flink集群的狀態、提交作業等。

安裝與配置StarRocks

  1. 下載與部署:從StarRocks官方網站獲取安裝包,按照官方文檔指引進行下載與解壓操作。根據實際的生產環境需求,選擇合適的部署方式,如單節點部署用于測試環境,集群部署用于生產環境。
  2. 配置參數:在StarRocks的配置文件中,對一些關鍵參數進行設置,如FE(Frontend)節點的內存分配、BE(Backend)節點的存儲路徑等。例如,在FE節點的 fe.conf 文件中設置 query_mem_limit = 2147483648 來限制查詢內存,在BE節點的 be.conf 文件中設置 storage_root_path = /data/starrocks/be 來指定存儲路徑。
  3. 啟動服務:分別啟動FE和BE節點,確保各個節點正常運行且相互通信正常。啟動后,可通過MySQL客戶端連接到StarRocks,驗證其是否正常工作,例如執行 mysql -h <starrocks_fe_host> -P 9030 -u root -p

配置MySQL數據源

  1. 開啟Binlog:確保MySQL開啟了Binlog功能,在MySQL配置文件(通常為 my.cnfmy.ini)中,添加或修改如下配置:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1

修改完成后,重啟MySQL服務使配置生效。
2. 創建測試表:在MySQL中創建用于測試的數據表,例如創建一個名為 example_table 的表,表結構如下:

CREATE TABLE example_table (id BIGINT NOT NULL,data_column_1 VARCHAR(255),data_column_2 INT,PRIMARY KEY (id)
);

向表中插入一些測試數據,以便后續進行數據同步與處理測試。

三、表結構設計與調整

StarRocks表結構設計

在StarRocks中創建用于存儲數據的表,以用戶標簽相關數據存儲為例,設計如下表結構:

CREATE TABLE table_demo (id BIGINT NOT NULL COMMENT '主鍵',sign CHAR(32) NOT NULL COMMENT '簽名',shop_id BIGINT NOT NULL COMMENT 'shopID',shop_type BIGINT NOT NULL COMMENT '類型',user_id BIGINT NULL COMMENT 'userID',create_time DATETIME NULL COMMENT '記錄創建時間',operation_type VARCHAR(20) COMMENT '操作類型',row_change_type VARCHAR(20) COMMENT '行變更類型'
) ENGINE=OLAP
PRIMARY KEY (id)
COMMENT '用戶商品表'
DISTRIBUTED BY HASH(`id`) BUCKETS 16
PROPERTIES ("replication_num" = "3","bloom_filter_columns" = "shop_id, user_id","in_memory" = "false","storage_format" = "DEFAULT","enable_persistent_index" = "false","compression" = "LZ4"
);

該表結構設計充分考慮了數據的存儲與查詢需求,通過主鍵約束、哈希分布以及相關屬性設置,保障數據的高效存儲與查詢性能。

Flink中MySQL CDC表結構定義

在Flink中通過MySQL CDC連接器讀取MySQL數據時,定義如下表結構:

CREATE TABLE mysql_cdc_example (id BIGINT,sign STRING COMMENT '簽名',shop_id BIGINT COMMENT 'shopID',shop_type BIGINT COMMENT '類型',user_id BIGINT COMMENT 'userID',create_time TIMESTAMP(0),operation_type STRING COMMENT '業務操作字段',operation_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'operation_timestamp' VIRTUAL,row_change_type STRING METADATA FROM 'row_change_type' VIRTUAL,PRIMARY KEY (`id`) NOT ENFORCED
)
WITH 
('connector' ='mysql-cdc','hostname' = '192.168.0.1','port' = '3306','database-name' = 'your_database_name','table-name' = 'example_table','username' = 'your_username','password' = 'your_password','debezium.snapshot.mode' = 'initial'
);

該表結構定義與StarRocks中的目標表結構相對應,同時通過WITH參數配置了MySQL CDC連接器的相關信息,包括數據源地址、端口、數據庫名、表名、用戶名、密碼以及快照模式等。

Flink中StarRocks Sink表結構定義

在Flink中定義用于將處理后數據寫入StarRocks的Sink表結構如下:

CREATE TABLE starrocks_sink_example (id BIGINT PRIMARY KEY NOT ENFORCED,sign STRING,shop_id BIGINT,shop_type BIGINT,user_id bigint,create_time STRING,operation_type STRING,row_change_type STRING
) 
WITH 
('connector'='starrocks','sink.max-retries'='5','jdbc-url' = 'jdbc:mysql://192.168.0.1:9030/your_database_name?useUnicode=true&characterEncoding=utf-8&useSSL=false&connectTimeout=3000&useUnicode=true&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&&serverTimezone=Asia/Shanghai&sessionVariables=query_timeout=86400','load-url'='192.168.0.1:8030','table-name' = 'table_demo','username'='your_username','password'='your_password','sink.buffer-flush.interval-ms'='5000','sink.parallelism' = '2','database-name'='your_database_name'
);

此Sink表結構與StarRocks中的目標表結構一致,通過WITH參數配置了StarRocks連接器的相關信息,如JDBC URL、Load URL、表名、用戶名、密碼、緩沖刷新間隔以及并行度等,確保Flink能夠將處理后的數據準確高效地寫入StarRocks。

四、數據同步與處理流程

使用Flink SQL進行數據抽取與轉換

  1. 配置Flink SQL環境:在Flink的SQL客戶端或相關集成開發環境中,配置好Flink SQL的運行環境,確保能夠執行SQL語句對數據進行操作。
  2. 編寫數據抽取與轉換SQL:編寫SQL語句從MySQL CDC表中抽取數據,并進行必要的轉換操作,例如將時間格式進行轉換、根據業務規則對某些字段進行計算等。以下是一個簡單的示例,將 create_time 字段從 TIMESTAMP 類型轉換為字符串類型,并根據 operation_typerow_change_type 字段確定最終的操作類型:
INSERT INTOstarrocks_sink_example
SELECTid,sign,shop_id,shop_typeuser_id,cast(create_time as CHAR) as create_time,CASE WHEN operation_type = 'DELETE' THEN 'DELETE'WHEN row_change_type = '+I' THEN 'INSERT'WHEN row_change_type IN ('-U', '+U') THEN 'UPDATE'WHEN row_change_type = '-D' THEN 'DELETE'ELSE 'UNKNOWN'END AS operation_type,row_change_type 
FROMmysql_cdc_example;

該SQL語句從 mysql_cdc_example 表中讀取數據,對 create_time 字段進行類型轉換,并根據不同的變更類型確定最終的 operation_type,然后將處理后的數據插入到 starrocks_sink_example 表中。

使用Routine Load進行數據實時攝入(以Kafka數據源為例)

  1. 配置Kafka數據源:在Kafka中創建用于存儲數據變更的主題,確保數據源能夠正常向該主題發送數據。例如,創建一個名為 user_table_changes 的主題。
  2. 創建StarRocks的Routine Load任務:在StarRocks中創建Routine Load任務,用于實時消費Kafka主題中的數據并寫入到StarRocks表中。以下是一個示例:
CREATE ROUTINE LOAD your_load_job_name ON table_demo
COLUMNS (id,sign,shop_id,shop_type,user_id,create_time,operation_type,row_change_type,temp_operation_type=IF(operation_type = 'DELETE', 'DELETE', IF(operation_type = 'UPDATE', 'UPSERT', 'APPEND'))
)
PROPERTIES ("desired_concurrent_number" = "1","max_batch_interval" = "10","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "false","format" = "json"
)
FROM
KAFKA ("kafka_broker_list" = "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092","kafka_topic" = "user_table_changes","property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

該Routine Load任務配置了從Kafka主題 user_table_changes 中讀取數據,按照指定的列映射關系寫入到 user_table_mapping 表中,并設置了相關的屬性,如期望的并發數、最大批次間隔、最大批次行數、最大批次大小、嚴格模式以及數據格式等。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/87827.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/87827.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/87827.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

便捷的Office批量轉PDF工具

軟件介紹 本文介紹的軟件是一款能實現Office批量轉換的工具&#xff0c;名為五五Excel word批量轉PDF。 軟件小巧 這款五五Excel word批量轉PDF軟件大小不到2M。 操作步驟一 使用該軟件時&#xff0c;只需把軟件和需要轉換的Word或Excel文件放在同一個文件夾里。 操作步驟…

tcp長連接與短連接

TCP連接本身是一個傳輸層協議&#xff0c;它既可以實現長連接&#xff0c;也可以實現短連接。這取決于應用層的使用方式。 短連接&#xff08;Short Connection&#xff09; 特點&#xff1a;每次請求都建立新的TCP連接&#xff0c;完成后立即關閉流程&#xff1a;建立連接 →…

llvm polly,親自測試

1&#xff09;下載并安裝 Polly - Getting Started git clone https://github.com/llvm/llvm-project.git 大概需要半個小時&#xff0c;有時候被墻掉就打不開 2&#xff09; mkdir build && cd build cmake -DLLVM_ENABLE_PROJECTSclang;polly ../llvm cmake --b…

Spring AI 項目實戰(十四):Spring Boot + Vue3 +AI + DeepSeek 實現空氣質量智能預測系統(附完整源碼)

系列文章 序號文章名稱1Spring AI 項目實戰(一):Spring AI 核心模塊入門2Spring AI 項目實戰(二):Spring Boot + AI + DeepSeek 深度實戰(附完整源碼)3Spring AI 項目實戰(三):Spring Boot + AI + DeepSeek 打造智能客服系統(附完整源碼)4

騰訊云 CDN 不支持 WebSocket 的現狀與華為云 CDN 的替代方案-優雅草卓伊凡

騰訊云 CDN 不支持 WebSocket 的現狀與華為云 CDN 的替代方案-優雅草卓伊凡 問題背景 卓伊凡今天發現&#xff0c;騰訊云 CDN 不支持 WebSocket 協議&#xff0c;而公司的部分業務&#xff08;如實時聊天、在線協作、游戲互動、股票行情推送等&#xff09;依賴長連接通信。昨…

MybatisPlus(一)擴展功能

擴展功能 一、靜態工具二、邏輯刪除三、通用枚舉1、定義枚舉2、配置枚舉處理器3、測試 四、JSON類型處理器1、定義實體2、使用類型處理器 五、分頁1、配置分頁插件2、分頁API3、示例 一、靜態工具 有的時候Service之間也會相互調用&#xff0c;為了避免出現循環依賴問題&#…

Redis哨兵模式之Sentinel模式(二)

一、多節點哨兵如何配置&#xff1f; 哨兵配置原理圖 注意&#xff1a;sentinel哨兵模式的搭建是建立在redis主從復制節點配置基礎而搭建&#xff0c;在主從配置中從庫需要配置好replicaof關聯上主庫并關閉安全模式&#xff0c;然后設置好bind端口才能關聯上機器&#xff0c;而…

基于Excel的數據分析思維與分析方法

數據分析一定要會Excel、SQL和Python&#xff1f;非常肯定地回答您&#xff0c;Python、R語言、Excel函數和VBA&#xff0c;以及高級數據分析軟件&#xff0c;都學不到&#xff0c;您將學到&#xff1a;5個有效的數據分析利器&#xff0c;以及分析思維 一、描述性統計分析 在…

計算機網絡筆記(不全)

一、計算機網絡體系結構1.計算機網絡的概念計算機網絡&#xff1a;由若干結點和連接這些結點的鏈路組成。結點可以是計算機、集線器、交換機、路由器等。互連網(internet)&#xff1a;多個計算機網絡通過路由器互相連接而成&#xff0c;可用任意協議通信。互聯網(因特網Interne…

XML Schema 復合元素

XML Schema 復合元素 引言 XML(可擴展標記語言)作為一種靈活的標記語言,廣泛應用于數據交換和存儲。XML Schema 是一種用于描述和定義 XML 文檔結構的語言,它定義了 XML 文檔的元素、屬性、類型和約束。本文將詳細介紹 XML Schema 中的復合元素,并探討其在實際應用中的重…

華為云Flexus+DeepSeek征文 | 彈性算力實戰:Flexus X實例自動擴縮容策略優化

華為云FlexusDeepSeek征文 | 彈性算力實戰&#xff1a;Flexus X實例自動擴縮容策略優化 &#x1f31f; 嗨&#xff0c;我是IRpickstars&#xff01; &#x1f30c; 總有一行代碼&#xff0c;能點亮萬千星辰。 &#x1f50d; 在技術的宇宙中&#xff0c;我愿做永不停歇的探索者…

【倉頡】運行環境配置VSCode + Win11

作者&#xff1a;大李子 團隊&#xff1a;堅果派 十年iOS&#xff0c;All in轉鴻蒙 前言 “倉頡編程語言是一款面向全場景智能的新一代編程語言&#xff0c;主打原生智能化、天生全場景、高性能、強安全。融入鴻蒙生態&#xff0c;為開發者提供良好的編程體驗。” ——摘自倉…

【K線訓練軟件研發歷程】【日常記錄向】1.K線滑動窗口

文章目錄 當前效果未來發展思路技術選型值得分享的技術點數據加載、解析的代碼echats的代碼當前效果 ??相當于有個hello world了。 未來發展思路 開源 技術選型 界面直接采用electron,等開源后,可以直接掛release,用戶下載安裝包后,一鍵安裝,一鍵運行,降低使用門檻…

抖音解析下載工具 v1.0.0:免安裝單文件,一鍵無水印保存高清視音頻

寶子們&#xff0c;今天給你們帶來一款超輕量的抖音下載神器——抖音解析下載工具 v1.0.0。 它只有單文件&#xff0c;雙擊就能用&#xff0c;免安裝、無廣告、完全免費&#xff0c;復制粘貼鏈接即可一鍵解析下載高清無水印視頻/音頻&#xff0c;簡直不要太方便&#xff01; 為…

Ingress——2

目錄 ?一. 域名重定向&#xff08;HTTP→HTTPS/舊域名跳轉&#xff09;? ?二. 前后端分離Rewrite&#xff08;路徑改寫&#xff09;? ?三. 混合配置示例&#xff08;重定向Rewrite&#xff09;? ?四. SSL/TLS配置&#xff08;HTTPS加密&#xff09;? ?五. 基本認…

12. grafana-Dashboard的Variable(過濾)使用

說明制作這樣一個選擇過濾的下拉框&#xff0c;可以選擇某個服務器的步驟1. 點擊最上面的Dashboard settings2. 選擇Variables 并點擊ADD variable3. 寫出過濾的標簽名和查詢條件&#xff08;label_values(查詢條件)&#xff09;4. 點擊 save as... 保存退出5. 出來后左上角就…

Cursor一鍵續杯pro教程,支持最新1.0系列版本

使用前檢查&#xff1a; 使用前請先看左下角&#xff0c;是否獲取到Cursor的版本號 如果沒有請先在 功能頁面 -→ 自定義Cursor路徑 選擇你Cursor的安裝的路徑&#xff0c;并開啟后重啟YCursor&#xff0c;獲取到版本后才能正常使用功能 檢查軟件左下角的權限標識是否為綠色 如…

pyhton基礎【25】面向對象進階六

目錄 十七.單例模式 實現單例模式的兩種方式 __new__方法概述 單例模式的使用場景 十七.單例模式 引入 單例模式是一種常用的軟件設計模式&#xff0c;它確保一個類只有一個實例&#xff0c;并提供一個全局訪問點來獲取這個實例。 實現單例模式的兩種方式 使用類屬性創…

后端樹形結構

案例 在后端開發中&#xff0c;樹形結構數據的查詢和處理是一個常見的需求&#xff0c;比如部門管理、分類目錄展示等場景。接下來&#xff0c;我們以一個部門管理系統為例&#xff0c;詳細介紹如何實現后端的樹查詢功能。 案例背景 假設我們正在開發一個公司的內部管理系統&am…

高效溝通04-RIDE說服模型

高效溝通專欄–組織運轉的命脈與個人成功的基石 目錄 1. RIDE模型的核心理念2. RIDE模型的應用場景3. RIDE模型使用步驟4. RIDE模型示例與練習4.1 應用RIDE模型:4.2 練習:你來試試!5. 總結RIDE模型是一種結構化的說服框架,旨在幫助你在溝通(尤其是書面溝通或需要清晰邏輯…