在大數據處理領域,實現高效、實時的數據處理與分析至關重要。Flink作為強大的流批一體化計算框架,結合StarRocks這一高性能的實時分析型數據庫,再搭配TiCDC(TiDB Change Data Capture)用于捕獲數據變更,能夠構建出極為高效的數據處理鏈路。本教程將詳細介紹如何利用這些技術實現從MySQL數據源抽取數據,經Flink處理后寫入StarRocks的完整流程,并對相關表結構和字段進行合理抽象與調整,以保障數據處理的通用性與安全性。
一、技術簡介
Flink 1.20
Flink 1.20是Apache Flink的一個重要版本,它進一步強化了流批一體的計算能力。在流處理方面,其能夠以低延遲處理大規模的實時數據流;而在批處理場景下,也具備高效的性能表現。Flink提供了豐富的連接器(Connector),方便與各類數據源和數據存儲系統進行對接,同時支持使用SQL進行數據處理操作,大大降低了開發成本,提升了開發效率。
StarRocks
StarRocks是一款高性能的實時分析型數據庫,采用MPP(Massively Parallel Processing)架構,能夠對海量數據進行亞秒級的查詢分析。它支持多種數據模型,包括聚合模型、主鍵模型等,適用于各類數據分析場景,如報表生成、實時看板、即席查詢等。StarRocks通過其高效的存儲和查詢引擎,以及對多種數據格式的支持,為數據的快速分析提供了有力保障。
TiCDC
TiCDC是TiDB生態中的數據變更捕獲工具,它基于TiDB的分布式事務和MVCC(Multi-Version Concurrency Control)機制,能夠實時捕獲TiDB數據庫中的數據變更,包括增、刪、改操作。TiCDC將這些變更數據以有序的方式輸出,為數據同步、實時數據處理等場景提供了可靠的數據源。在本教程中,雖然我們主要從MySQL數據源抽取數據,但TiCDC的原理和應用思路可作為擴展參考,在涉及TiDB數據源時能夠快速遷移應用。
二、環境準備
安裝與配置Flink 1.20
- 下載Flink 1.20.0:通過curl命令下載安裝包,執行
curl -k -O https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
。 - 解壓文件:使用命令
tar -xzvf flink-1.20.0-bin-scala_2.12.tgz
解壓下載的壓縮包。 - 移動到目標目錄(可選):可將解壓后的Flink目錄移動到
/opt
或其他目標位置,例如執行sudo mv flink-1.20.0 /opt/flink
。 - 配置環境變量:編輯
~/.bashrc
文件,添加如下內容:
export FLINK_HOME=/opt/flink
export PATH=$FLINK_HOME/bin:$PATH
保存并退出文件后,運行 source ~/.bashrc
使修改生效。
5. 配置Flink:Flink默認已配置一些基本設置。若無需集群配置,可跳過 masters
和 workers
文件的配置。如需調整參數,如內存配置或其他作業配置,可修改Flink配置文件 config.yaml
,該文件位于 /opt/flink/conf
目錄下。例如,將 bind-host
設置從 localhost
改為 0.0.0.0
,使Flink能夠綁定所有網絡接口,修改如下:
jobmanager:bind-host: 0.0.0.0
rpc:address: 0.0.0.0port: 6123
memory:process:size: 1600m
execution:failover-strategy: region
taskmanager:bind-host: 0.0.0.0host: 0.0.0.0numberOfTaskSlots: 1
memory:process:size: 1728m
parallelism:address: 0.0.0.0bind-address: 0.0.0.0
- 啟動Flink:進入Flink目錄,執行
./bin/start-cluster.sh
啟動Flink。若要關閉Flink,執行./bin/stop-cluster.sh
。啟動后,可通過瀏覽器訪問Flink Web UI,默認地址為http://<your_server_ip>:8081
(例如http://192.168.1.1:8081
),以查看Flink集群的狀態、提交作業等。
安裝與配置StarRocks
- 下載與部署:從StarRocks官方網站獲取安裝包,按照官方文檔指引進行下載與解壓操作。根據實際的生產環境需求,選擇合適的部署方式,如單節點部署用于測試環境,集群部署用于生產環境。
- 配置參數:在StarRocks的配置文件中,對一些關鍵參數進行設置,如FE(Frontend)節點的內存分配、BE(Backend)節點的存儲路徑等。例如,在FE節點的
fe.conf
文件中設置query_mem_limit = 2147483648
來限制查詢內存,在BE節點的be.conf
文件中設置storage_root_path = /data/starrocks/be
來指定存儲路徑。 - 啟動服務:分別啟動FE和BE節點,確保各個節點正常運行且相互通信正常。啟動后,可通過MySQL客戶端連接到StarRocks,驗證其是否正常工作,例如執行
mysql -h <starrocks_fe_host> -P 9030 -u root -p
。
配置MySQL數據源
- 開啟Binlog:確保MySQL開啟了Binlog功能,在MySQL配置文件(通常為
my.cnf
或my.ini
)中,添加或修改如下配置:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
修改完成后,重啟MySQL服務使配置生效。
2. 創建測試表:在MySQL中創建用于測試的數據表,例如創建一個名為 example_table
的表,表結構如下:
CREATE TABLE example_table (id BIGINT NOT NULL,data_column_1 VARCHAR(255),data_column_2 INT,PRIMARY KEY (id)
);
向表中插入一些測試數據,以便后續進行數據同步與處理測試。
三、表結構設計與調整
StarRocks表結構設計
在StarRocks中創建用于存儲數據的表,以用戶標簽相關數據存儲為例,設計如下表結構:
CREATE TABLE table_demo (id BIGINT NOT NULL COMMENT '主鍵',sign CHAR(32) NOT NULL COMMENT '簽名',shop_id BIGINT NOT NULL COMMENT 'shopID',shop_type BIGINT NOT NULL COMMENT '類型',user_id BIGINT NULL COMMENT 'userID',create_time DATETIME NULL COMMENT '記錄創建時間',operation_type VARCHAR(20) COMMENT '操作類型',row_change_type VARCHAR(20) COMMENT '行變更類型'
) ENGINE=OLAP
PRIMARY KEY (id)
COMMENT '用戶商品表'
DISTRIBUTED BY HASH(`id`) BUCKETS 16
PROPERTIES ("replication_num" = "3","bloom_filter_columns" = "shop_id, user_id","in_memory" = "false","storage_format" = "DEFAULT","enable_persistent_index" = "false","compression" = "LZ4"
);
該表結構設計充分考慮了數據的存儲與查詢需求,通過主鍵約束、哈希分布以及相關屬性設置,保障數據的高效存儲與查詢性能。
Flink中MySQL CDC表結構定義
在Flink中通過MySQL CDC連接器讀取MySQL數據時,定義如下表結構:
CREATE TABLE mysql_cdc_example (id BIGINT,sign STRING COMMENT '簽名',shop_id BIGINT COMMENT 'shopID',shop_type BIGINT COMMENT '類型',user_id BIGINT COMMENT 'userID',create_time TIMESTAMP(0),operation_type STRING COMMENT '業務操作字段',operation_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'operation_timestamp' VIRTUAL,row_change_type STRING METADATA FROM 'row_change_type' VIRTUAL,PRIMARY KEY (`id`) NOT ENFORCED
)
WITH
('connector' ='mysql-cdc','hostname' = '192.168.0.1','port' = '3306','database-name' = 'your_database_name','table-name' = 'example_table','username' = 'your_username','password' = 'your_password','debezium.snapshot.mode' = 'initial'
);
該表結構定義與StarRocks中的目標表結構相對應,同時通過WITH參數配置了MySQL CDC連接器的相關信息,包括數據源地址、端口、數據庫名、表名、用戶名、密碼以及快照模式等。
Flink中StarRocks Sink表結構定義
在Flink中定義用于將處理后數據寫入StarRocks的Sink表結構如下:
CREATE TABLE starrocks_sink_example (id BIGINT PRIMARY KEY NOT ENFORCED,sign STRING,shop_id BIGINT,shop_type BIGINT,user_id bigint,create_time STRING,operation_type STRING,row_change_type STRING
)
WITH
('connector'='starrocks','sink.max-retries'='5','jdbc-url' = 'jdbc:mysql://192.168.0.1:9030/your_database_name?useUnicode=true&characterEncoding=utf-8&useSSL=false&connectTimeout=3000&useUnicode=true&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&&serverTimezone=Asia/Shanghai&sessionVariables=query_timeout=86400','load-url'='192.168.0.1:8030','table-name' = 'table_demo','username'='your_username','password'='your_password','sink.buffer-flush.interval-ms'='5000','sink.parallelism' = '2','database-name'='your_database_name'
);
此Sink表結構與StarRocks中的目標表結構一致,通過WITH參數配置了StarRocks連接器的相關信息,如JDBC URL、Load URL、表名、用戶名、密碼、緩沖刷新間隔以及并行度等,確保Flink能夠將處理后的數據準確高效地寫入StarRocks。
四、數據同步與處理流程
使用Flink SQL進行數據抽取與轉換
- 配置Flink SQL環境:在Flink的SQL客戶端或相關集成開發環境中,配置好Flink SQL的運行環境,確保能夠執行SQL語句對數據進行操作。
- 編寫數據抽取與轉換SQL:編寫SQL語句從MySQL CDC表中抽取數據,并進行必要的轉換操作,例如將時間格式進行轉換、根據業務規則對某些字段進行計算等。以下是一個簡單的示例,將
create_time
字段從TIMESTAMP
類型轉換為字符串類型,并根據operation_type
和row_change_type
字段確定最終的操作類型:
INSERT INTOstarrocks_sink_example
SELECTid,sign,shop_id,shop_typeuser_id,cast(create_time as CHAR) as create_time,CASE WHEN operation_type = 'DELETE' THEN 'DELETE'WHEN row_change_type = '+I' THEN 'INSERT'WHEN row_change_type IN ('-U', '+U') THEN 'UPDATE'WHEN row_change_type = '-D' THEN 'DELETE'ELSE 'UNKNOWN'END AS operation_type,row_change_type
FROMmysql_cdc_example;
該SQL語句從 mysql_cdc_example
表中讀取數據,對 create_time
字段進行類型轉換,并根據不同的變更類型確定最終的 operation_type
,然后將處理后的數據插入到 starrocks_sink_example
表中。
使用Routine Load進行數據實時攝入(以Kafka數據源為例)
- 配置Kafka數據源:在Kafka中創建用于存儲數據變更的主題,確保數據源能夠正常向該主題發送數據。例如,創建一個名為
user_table_changes
的主題。 - 創建StarRocks的Routine Load任務:在StarRocks中創建Routine Load任務,用于實時消費Kafka主題中的數據并寫入到StarRocks表中。以下是一個示例:
CREATE ROUTINE LOAD your_load_job_name ON table_demo
COLUMNS (id,sign,shop_id,shop_type,user_id,create_time,operation_type,row_change_type,temp_operation_type=IF(operation_type = 'DELETE', 'DELETE', IF(operation_type = 'UPDATE', 'UPSERT', 'APPEND'))
)
PROPERTIES ("desired_concurrent_number" = "1","max_batch_interval" = "10","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "false","format" = "json"
)
FROM
KAFKA ("kafka_broker_list" = "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092","kafka_topic" = "user_table_changes","property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
該Routine Load任務配置了從Kafka主題 user_table_changes
中讀取數據,按照指定的列映射關系寫入到 user_table_mapping
表中,并設置了相關的屬性,如期望的并發數、最大批次間隔、最大批次行數、最大批次大小、嚴格模式以及數據格式等。