最近作者針對實時數倉的Apache SeaTunnel同步鏈路,完成了雙引擎架構升級與全鏈路參數深度調優,希望本文能夠給大家有所啟發,歡迎批評指正!
Apache SeaTunnel 版本 :2.3.9
Doris版本:2.0.6
MySQL JDBC Connector : 8.0.28
架構升級
-
批處理鏈路:JDBC并行度進行提升,基于ID分區實現分片讀取,結合批量參數(fetch_size=10000+batch_size=5000)使全量同步吞吐量大幅增加
-
實時增量鏈路:引入MySQL-CDC組件,通過initial快照模式+chunk.size.rows=8096實現全量/增量平滑切換,事件延遲壓降至500ms內
穩定性增強
-
資源管控:JDBC連接池動態擴容(max_size=20)+ CDC限流策略(rows_per_second=1000),源庫CPU峰值負載下降40%
-
容錯機制:Doris兩階段提交(enable-2pc=true)配合檢查點(checkpoint.interval=10s),故障恢復時間縮短80%
寫入優化
-
緩沖區三級聯控(buffer-size=10000+buffer-count=3+flush.interval=5s)提升Doris寫入批次質量
-
Tablet粒度控制(request_tablet_size=5)使BE節點負載均衡度提升
實戰演示
同步之前創建Doris表
-- DROP TABLE IF EXISTS ods.ods_activity_info_full;
CREATE TABLE ods.ods_activity_info_full
(`id` VARCHAR(255) COMMENT '活動id',`k1` DATE NOT NULL COMMENT '分區字段',`activity_name` STRING COMMENT '活動名稱',`activity_type` STRING COMMENT '活動類型',`activity_desc` STRING COMMENT '活動描述',`start_time` STRING COMMENT '開始時間',`end_time` STRING COMMENT '結束時間',`create_time` STRING COMMENT '創建時間'
)ENGINE=OLAP -- 使用Doris的OLAP引擎,適用于高并發分析場景UNIQUE KEY(`id`,`k1`) -- 唯一鍵約束,保證(id, k1)組合的唯一性(Doris聚合模型特性)
COMMENT '活動信息全量表'
PARTITION BY RANGE(`k1`) () -- 按日期范圍分區(具體分區規則由動態分區配置決定)
DISTRIBUTED BY HASH(`id`) -- 按id哈希分桶,保證相同id的數據分布在同一節點
PROPERTIES
("replication_allocation" = "tag.location.default: 1", -- 副本分配策略:默認標簽分配1個副本"is_being_synced" = "false", -- 是否處于同步狀態(通常保持false)"storage_format" = "V2", -- 存儲格式版本(V2支持更高效壓縮和索引)"light_schema_change" = "true", -- 啟用輕量級schema變更(僅修改元數據,無需數據重寫)"disable_auto_compaction" = "false", -- 啟用自動壓縮(合并小文件提升查詢性能)"enable_single_replica_compaction" = "false", -- 禁用單副本壓縮(多副本時保持數據一致性)"dynamic_partition.enable" = "true", -- 啟用動態分區"dynamic_partition.time_unit" = "DAY", -- 按天創建分區"dynamic_partition.start" = "-60", -- 保留最近60天的歷史分區"dynamic_partition.end" = "3", -- 預先創建未來3天的分區"dynamic_partition.prefix" = "p", -- 分區名前綴(如p20240101)"dynamic_partition.buckets" = "32", -- 每個分區的分桶數(影響并行度)"dynamic_partition.create_history_partition" = "true", -- 自動創建缺失的歷史分區"bloom_filter_columns" = "id,activity_name", -- 為高頻過濾字段(id/名稱)創建布隆過濾器,加速WHERE查詢"compaction_policy" = "time_series", -- 按時間序合并策略優化時序數據(適合活動時間字段)"enable_unique_key_merge_on_write" = "true", -- 唯一鍵寫時合并(實時更新場景減少讀放大)"in_memory" = "false" -- 關閉全內存存儲(僅小表可開啟)
);
配置SeaTunnel JDBC同步腳本
env {# 環境配置parallelism = 8 # 增加并行度以提高吞吐量job.mode = "STREAMING" # 使用流式處理模式進行實時同步checkpoint.interval = 10000 # 檢查點間隔,單位毫秒# 限流配置 - 避免對源數據庫造成過大壓力read_limit.bytes_per_second = 10000000 # 每秒讀取字節數限制,約10MB/sread_limit.rows_per_second = 1000 # 每秒讀取行數限制# 本地檢查點配置execution.checkpoint.data-uri = "file:///opt/seatunnel/checkpoints"execution.checkpoint.max-concurrent = 1 # 最大并發檢查點數# 性能優化參數execution.buffer-timeout = 5000 # 緩沖超時時間(毫秒)execution.jvm-options = "-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"
}source {MySQL-CDC {# 基本連接配置# server-id = 5652-5657 # MySQL復制客戶端的唯一ID范圍username = "root" # 數據庫用戶名password = "" # 數據庫密碼table-names = ["gmall.activity_info"] # 要同步的表base-url = "jdbc:mysql://192.168.241.128:3306/gmall?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"# CDC 特有配置schema-changes.enabled = true # 啟用架構變更捕獲server-time-zone = "Asia/Shanghai" # 服務器時區# 性能優化配置snapshot.mode = "initial" # 初始快照模式snapshot.fetch.size = 10000 # 快照獲取大小chunk.size.rows = 8096 # 分塊大小,用于并行快照connection.pool.size = 10 # 連接池大小# 高級配置include.schema.changes = true # 包含架構變更事件scan.startup.mode = "initial" # 啟動模式:initial(全量+增量)scan.incremental.snapshot.chunk.size = 8096 # 增量快照分塊大小debezium.min.row.count.to.stream.results = 1000 # 流式結果的最小行數# 容錯配置connect.timeout = 30000 # 連接超時時間(毫秒)connect.max-retries = 3 # 最大重試次數# 輸出表名result_table_name = "mysql_cdc_source"}
}# 可選的轉換邏輯,如果需要對數據進行處理
transform {Sql {source_table_name = "mysql_cdc_source"result_table_name = "doris_sink_data"# 根據需要轉換字段,這里添加了一個分區字段k1query = """selectid,formatdatetime(create_time,'yyyy-MM-dd') as k1,activity_name,activity_type,activity_desc,start_time,end_time,create_timefrom mysql_cdc_source"""}
}sink {Doris {# 基本連接配置source_table_name = "doris_sink_data" # 或直接使用 "mysql_cdc_source"fenodes = "192.168.241.128:8030"username = "root"password = ""table.identifier = "ods.ods_activity_info_full" # Doris目標表# 事務和標簽配置sink.enable-2pc = "true" # 啟用兩階段提交,確保一致性sink.label-prefix = "cdc_sync" # 導入標簽前綴# 寫入模式配置sink.properties {format = "json"read_json_by_line = "true"column_separator = "\t" # 列分隔符line_delimiter = "\n" # 行分隔符max_filter_ratio = "0.1" # 允許的最大錯誤率# CDC特有配置 - 處理不同操作類型# 使用Doris的UPSERT模式處理CDC事件merge_type = "MERGE" # 合并類型:APPEND或MERGEdelete_enable = "true" # 啟用刪除操作}# 性能優化配置sink.buffer-size = 10000 # 緩沖區大小sink.buffer-count = 3 # 緩沖區數量sink.flush.interval-ms = 5000 # 刷新間隔sink.max-retries = 3 # 最大重試次數sink.parallelism = 8 # 寫入并行度# Doris連接優化doris.config = {format = "json"read_json_by_line = "true"request_connect_timeout_ms = "5000" # 連接超時request_timeout_ms = "30000" # 請求超時request_tablet_size = "5" # 每個請求的tablet數量}}
}
配置SeaTunnel MySQLCDC 同步腳本
env {parallelism = 8job.mode = "BATCH"checkpoint.interval = 30000# 本地文件系統檢查點execution.checkpoint.data-uri = "file:///opt/seatunnel/checkpoints"execution.buffer-timeout = 5000# JVM 參數優化execution.jvm-options = "-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"
}source {Jdbc {result_table_name = "mysql_seatunnel"url = "jdbc:mysql://192.168.241.128:3306/gmall?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 30user = "gmall"password = "gmall"# 使用分區并行讀取query = "select id, activity_name, activity_type, activity_desc, start_time, end_time, create_time from gmall.activity_info"partition_column = "id"partition_num = 8# 連接池配置connection_pool {max_size = 20min_idle = 5max_idle_ms = 60000}# 批處理配置fetch_size = 10000batch_size = 5000is_exactly_once = true}
}transform {Sql {source_table_name = "mysql_seatunnel"result_table_name = "seatunnel_doris"query = """select id, formatdatetime(create_time,'yyyy-MM-dd') as k1, activity_name, activity_type, activity_desc, start_time, end_time, create_time from mysql_seatunnel"""}
}sink {Doris {source_table_name = "seatunnel_doris"fenodes = "192.168.241.128:8030"username = "root"password = ""table.identifier = "ods.ods_activity_info_full"sink.enable-2pc = "true"sink.label-prefix = "test_json"# 優化Doris寫入配置sink.properties {format = "json"read_json_by_line = "true"column_separator = "\t"line_delimiter = "\n"max_filter_ratio = "0.1"}# 批量寫入配置sink.buffer-size = 10000sink.buffer-count = 3sink.flush.interval-ms = 5000sink.max-retries = 3sink.parallelism = 8doris.config = {format = "json"read_json_by_line = "true"request_connect_timeout_ms = "5000"request_timeout_ms = "30000"request_tablet_size = "5"}}
}
最終Apache Doris數據:
本文完!