Apache SeaTunnel腳本升級及參數調優實戰

最近作者針對實時數倉的Apache SeaTunnel同步鏈路,完成了雙引擎架構升級與全鏈路參數深度調優,希望本文能夠給大家有所啟發,歡迎批評指正!

在這里插入圖片描述

Apache SeaTunnel 版本 :2.3.9

Doris版本:2.0.6

MySQL JDBC Connector : 8.0.28

架構升級

  • 批處理鏈路:JDBC并行度進行提升,基于ID分區實現分片讀取,結合批量參數(fetch_size=10000+batch_size=5000)使全量同步吞吐量大幅增加

  • 實時增量鏈路:引入MySQL-CDC組件,通過initial快照模式+chunk.size.rows=8096實現全量/增量平滑切換,事件延遲壓降至500ms內

穩定性增強

  • 資源管控:JDBC連接池動態擴容(max_size=20)+ CDC限流策略(rows_per_second=1000),源庫CPU峰值負載下降40%

  • 容錯機制:Doris兩階段提交(enable-2pc=true)配合檢查點(checkpoint.interval=10s),故障恢復時間縮短80%

寫入優化

  • 緩沖區三級聯控(buffer-size=10000+buffer-count=3+flush.interval=5s)提升Doris寫入批次質量

  • Tablet粒度控制(request_tablet_size=5)使BE節點負載均衡度提升

實戰演示

同步之前創建Doris表

在這里插入圖片描述

-- DROP TABLE IF EXISTS ods.ods_activity_info_full;
CREATE TABLE ods.ods_activity_info_full
(`id`            VARCHAR(255) COMMENT '活動id',`k1`            DATE NOT NULL   COMMENT '分區字段',`activity_name` STRING COMMENT '活動名稱',`activity_type` STRING COMMENT '活動類型',`activity_desc` STRING COMMENT '活動描述',`start_time`    STRING COMMENT '開始時間',`end_time`      STRING COMMENT '結束時間',`create_time`   STRING COMMENT '創建時間'
)ENGINE=OLAP  -- 使用Doris的OLAP引擎,適用于高并發分析場景UNIQUE KEY(`id`,`k1`)  -- 唯一鍵約束,保證(id, k1)組合的唯一性(Doris聚合模型特性)
COMMENT '活動信息全量表'
PARTITION BY RANGE(`k1`) ()  -- 按日期范圍分區(具體分區規則由動態分區配置決定)
DISTRIBUTED BY HASH(`id`)  -- 按id哈希分桶,保證相同id的數據分布在同一節點
PROPERTIES
("replication_allocation" = "tag.location.default: 1",  -- 副本分配策略:默認標簽分配1個副本"is_being_synced" = "false",          -- 是否處于同步狀態(通常保持false)"storage_format" = "V2",             -- 存儲格式版本(V2支持更高效壓縮和索引)"light_schema_change" = "true",      -- 啟用輕量級schema變更(僅修改元數據,無需數據重寫)"disable_auto_compaction" = "false", -- 啟用自動壓縮(合并小文件提升查詢性能)"enable_single_replica_compaction" = "false", -- 禁用單副本壓縮(多副本時保持數據一致性)"dynamic_partition.enable" = "true",            -- 啟用動態分區"dynamic_partition.time_unit" = "DAY",          -- 按天創建分區"dynamic_partition.start" = "-60",             -- 保留最近60天的歷史分區"dynamic_partition.end" = "3",                 -- 預先創建未來3天的分區"dynamic_partition.prefix" = "p",              -- 分區名前綴(如p20240101)"dynamic_partition.buckets" = "32",            -- 每個分區的分桶數(影響并行度)"dynamic_partition.create_history_partition" = "true", -- 自動創建缺失的歷史分區"bloom_filter_columns" = "id,activity_name",  -- 為高頻過濾字段(id/名稱)創建布隆過濾器,加速WHERE查詢"compaction_policy" = "time_series",          -- 按時間序合并策略優化時序數據(適合活動時間字段)"enable_unique_key_merge_on_write" = "true",  -- 唯一鍵寫時合并(實時更新場景減少讀放大)"in_memory" = "false"                        -- 關閉全內存存儲(僅小表可開啟)
);
配置SeaTunnel JDBC同步腳本

在這里插入圖片描述

env {# 環境配置parallelism = 8                     # 增加并行度以提高吞吐量job.mode = "STREAMING"              # 使用流式處理模式進行實時同步checkpoint.interval = 10000         # 檢查點間隔,單位毫秒# 限流配置 - 避免對源數據庫造成過大壓力read_limit.bytes_per_second = 10000000  # 每秒讀取字節數限制,約10MB/sread_limit.rows_per_second = 1000       # 每秒讀取行數限制# 本地檢查點配置execution.checkpoint.data-uri = "file:///opt/seatunnel/checkpoints"execution.checkpoint.max-concurrent = 1  # 最大并發檢查點數# 性能優化參數execution.buffer-timeout = 5000          # 緩沖超時時間(毫秒)execution.jvm-options = "-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"
}source {MySQL-CDC {# 基本連接配置# server-id = 5652-5657             # MySQL復制客戶端的唯一ID范圍username = "root"                # 數據庫用戶名password = ""                # 數據庫密碼table-names = ["gmall.activity_info"]  # 要同步的表base-url = "jdbc:mysql://192.168.241.128:3306/gmall?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"# CDC 特有配置schema-changes.enabled = true     # 啟用架構變更捕獲server-time-zone = "Asia/Shanghai"  # 服務器時區# 性能優化配置snapshot.mode = "initial"         # 初始快照模式snapshot.fetch.size = 10000       # 快照獲取大小chunk.size.rows = 8096            # 分塊大小,用于并行快照connection.pool.size = 10         # 連接池大小# 高級配置include.schema.changes = true     # 包含架構變更事件scan.startup.mode = "initial"     # 啟動模式:initial(全量+增量)scan.incremental.snapshot.chunk.size = 8096  # 增量快照分塊大小debezium.min.row.count.to.stream.results = 1000  # 流式結果的最小行數# 容錯配置connect.timeout = 30000           # 連接超時時間(毫秒)connect.max-retries = 3           # 最大重試次數# 輸出表名result_table_name = "mysql_cdc_source"}
}# 可選的轉換邏輯,如果需要對數據進行處理
transform {Sql {source_table_name = "mysql_cdc_source"result_table_name = "doris_sink_data"# 根據需要轉換字段,這里添加了一個分區字段k1query = """selectid,formatdatetime(create_time,'yyyy-MM-dd') as k1,activity_name,activity_type,activity_desc,start_time,end_time,create_timefrom mysql_cdc_source"""}
}sink {Doris {# 基本連接配置source_table_name = "doris_sink_data"  # 或直接使用 "mysql_cdc_source"fenodes = "192.168.241.128:8030"username = "root"password = ""table.identifier = "ods.ods_activity_info_full"  # Doris目標表# 事務和標簽配置sink.enable-2pc = "true"          # 啟用兩階段提交,確保一致性sink.label-prefix = "cdc_sync"    # 導入標簽前綴# 寫入模式配置sink.properties {format = "json"read_json_by_line = "true"column_separator = "\t"         # 列分隔符line_delimiter = "\n"           # 行分隔符max_filter_ratio = "0.1"        # 允許的最大錯誤率# CDC特有配置 - 處理不同操作類型# 使用Doris的UPSERT模式處理CDC事件merge_type = "MERGE"            # 合并類型:APPEND或MERGEdelete_enable = "true"          # 啟用刪除操作}# 性能優化配置sink.buffer-size = 10000          # 緩沖區大小sink.buffer-count = 3             # 緩沖區數量sink.flush.interval-ms = 5000     # 刷新間隔sink.max-retries = 3              # 最大重試次數sink.parallelism = 8              # 寫入并行度# Doris連接優化doris.config = {format = "json"read_json_by_line = "true"request_connect_timeout_ms = "5000"  # 連接超時request_timeout_ms = "30000"         # 請求超時request_tablet_size = "5"            # 每個請求的tablet數量}}
}
配置SeaTunnel MySQLCDC 同步腳本

在這里插入圖片描述

env {parallelism = 8job.mode = "BATCH"checkpoint.interval = 30000# 本地文件系統檢查點execution.checkpoint.data-uri = "file:///opt/seatunnel/checkpoints"execution.buffer-timeout = 5000# JVM 參數優化execution.jvm-options = "-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"
}source {Jdbc {result_table_name = "mysql_seatunnel"url = "jdbc:mysql://192.168.241.128:3306/gmall?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 30user = "gmall"password = "gmall"# 使用分區并行讀取query = "select id, activity_name, activity_type, activity_desc, start_time, end_time, create_time from gmall.activity_info"partition_column = "id"partition_num = 8# 連接池配置connection_pool {max_size = 20min_idle = 5max_idle_ms = 60000}# 批處理配置fetch_size = 10000batch_size = 5000is_exactly_once = true}
}transform {Sql {source_table_name = "mysql_seatunnel"result_table_name = "seatunnel_doris"query = """select id, formatdatetime(create_time,'yyyy-MM-dd') as k1,  activity_name, activity_type, activity_desc, start_time, end_time, create_time from mysql_seatunnel"""}
}sink {Doris {source_table_name = "seatunnel_doris"fenodes = "192.168.241.128:8030"username = "root"password = ""table.identifier = "ods.ods_activity_info_full"sink.enable-2pc = "true"sink.label-prefix = "test_json"# 優化Doris寫入配置sink.properties {format = "json"read_json_by_line = "true"column_separator = "\t"line_delimiter = "\n"max_filter_ratio = "0.1"}# 批量寫入配置sink.buffer-size = 10000sink.buffer-count = 3sink.flush.interval-ms = 5000sink.max-retries = 3sink.parallelism = 8doris.config = {format = "json"read_json_by_line = "true"request_connect_timeout_ms = "5000"request_timeout_ms = "30000"request_tablet_size = "5"}}
}

最終Apache Doris數據:
在這里插入圖片描述

本文完!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/74220.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/74220.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/74220.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

C++ 時間操作:獲取有史以來的天數與文件計數器

C 時間操作:獲取有史以來的天數與文件計數器 在C中,時間操作是一個非常重要的功能,尤其是在需要處理日期、時間戳或定時任務時。本文將介紹如何利用C的時間操作功能,實現以下兩個目標: 獲取從Unix紀元時間&#xff0…

Python Bug修復案例分析:Python 中常見的 IndentationError 錯誤 bug 的修復

在 Python 編程的世界里,代碼的可讀性和規范性至關重要。Python 通過強制使用縮進來表示代碼塊的層次結構,這一獨特的設計理念使得代碼更加清晰易讀。然而,正是這種對縮進的嚴格要求,導致開發者在編寫代碼時,稍有不慎就…

【論文筆記】Transformer

Transformer 2017 年,谷歌團隊提出 Transformer 結構,Transformer 首先應用在自然語言處理領域中的機器翻譯任務上,Transformer 結構完全構建于注意力機制,完全丟棄遞歸和卷積的結構,這使得 Transformer 結構效率更高…

CI/CD(三) 安裝nfs并指定k8s默認storageClass

一、NFS 服務端安裝(主節點 10.60.0.20) 1. 安裝 NFS 服務端 sudo apt update sudo apt install -y nfs-kernel-server 2. 創建共享目錄并配置權限 sudo mkdir -p /data/k8s sudo chown nobody:nogroup /data/k8s # 允許匿名訪問 sudo chmod 777 /dat…

【QA】單件模式在Qt中有哪些應用?

單例設計模式確保一個類僅有一個實例,并提供一個全局訪問點來獲取該實例。在 Qt 框架中,有不少類的設計采用了單例模式,以下為你詳細介紹并給出相應代碼示例。 1. QApplication QApplication 是 Qt GUI 應用程序的核心類,每個 Q…

存儲過程觸發器習題整理1

46、{blank}設有商品表(商品號,商品名,單價)和銷售表(銷售單據號,商品號,銷售時間,銷售數量,銷售單價)。其中,商品號代表一類商品,商品號、單價、銷售數量和銷售單價均為整型。請編寫…

基于ChatGPT、GIS與Python機器學習的地質災害風險評估、易發性分析、信息化建庫及災后重建高級實踐

第一章、ChatGPT、DeepSeek大語言模型提示詞與地質災害基礎及平臺介紹【基礎實踐篇】 1、什么是大模型? 大模型(Large Language Model, LLM)是一種基于深度學習技術的大規模自然語言處理模型。 代表性大模型:GPT-4、BERT、T5、Ch…

單表達式倒計時工具:datetime的極度優雅(智普清言)

一個簡單表達式,也可以優雅自成工具。 筆記模板由python腳本于2025-03-22 20:25:49創建,本篇筆記適合任意喜歡學習的coder翻閱。 【學習的細節是歡悅的歷程】 博客的核心價值:在于輸出思考與經驗,而不僅僅是知識的簡單復述。 Pyth…

最優編碼樹的雙子性

現在看一些書,不太愿意在書上面做一些標記,也沒啥特殊的原因。。哈哈。 樹的定義 無環連通圖,極小連通圖,極大無環圖。 度 某個節點,描述它的度,一般默認是出度,分叉的邊的條數。或者說孩子…

MiB和MB

本文來自騰訊元寶 MiB 和 ?MB 有區別,盡管它們都用于表示數據存儲的單位,但它們的計算方式不同,分別基于不同的進制系統。 1. ?MiB(Mebibyte)? ?MiB 是基于二進制的單位,使用1024作為基數。1 MiB 102…

Labview和C#調用KNX API 相關東西

敘述:完全沒有聽說過KNX這個協議...................我這次項目中也是簡單的用了一下沒有過多的去研究 C#調用示例工程鏈接(labview調用示例在 DEBUG文件夾里面) 通過網盤分享的文件:KNX調用示例.zip 鏈接: https://pan.baidu.com/s/1NQUEYM11HID0M4ksetrTyg?pwd…

損失函數理解(二)——交叉熵損失

損失函數的目的是為了定量描述不同模型(例如神經網絡模型和人腦模型)的差異。 交叉熵,顧名思義,與熵有關,先把模型換成熵這么一個數值,然后用這個數值比較不同模型之間的差異。 為什么要做這一步轉換&…

Kubernetes的Replica Set和ReplicaController有什么區別

ReplicaSet 和 ReplicationController 是 Kubernetes 中用于管理應用程序副本的兩種資源,它們有類似的功能,但 ReplicaSet 是 ReplicationController 的增強版本。 以下是它們的主要區別: 1. 功能的演進 ReplicationController 是 Kubernete…

信息系統運行管理員教程3--信息系統設施運維

第3章 信息系統設施運維 信息系統設施是支撐信息系統業務活動的信息系統軟硬件資產及環境。 第1節 信息系統設施運維的管理體系 信息系統設施運維的范圍包含信息系統涉及的所有設備及環境,主要包括基礎環境、硬件設備、網絡設備、基礎軟件等。 信息系統設施運維…

如何通過Python實現自動化任務:從入門到實踐

在當今快節奏的數字化時代,自動化技術正逐漸成為提高工作效率的利器。無論是處理重復性任務,還是管理復雜的工作流程,自動化都能為我們節省大量時間和精力。本文將以Python為例,帶你從零開始學習如何實現自動化任務,并通過一個實際案例展示其強大功能。 一、為什么選擇Pyt…

Spring Boot 與 MyBatis Plus 整合 KWDB 實現 JDBC 數據訪問

? 引言 本文主要介紹如何在 IDEA 中搭建一個使用 Maven 管理的 Spring Boot 應用項目工程,并結合在本地搭建的 KWDB 數據庫(版本為:2.0.3)來演示 Spring Boot 與 MyBatis Plus 的集成,以及對 KWDB 數據庫的數據操作…

Java鎖等待喚醒機制

在 Java 并發編程中,鎖的等待和喚醒機制至關重要,通常使用 wait()、notify() 和 notifyAll() 來實現線程間的協調。本文將詳細介紹這些方法的用法,并通過示例代碼加以說明。 1. wait()、notify() 與 notifyAll() 在 Java 中,Obj…

? UNIX網絡編程筆記:TCP客戶/服務器程序示例

服務器實例 有個著名的項目&#xff0c;tiny web&#xff0c;本項目將其改到windows下&#xff0c;并使用RAII重構&#xff0c;編寫過程中對于內存泄漏確實很頭疼&#xff0c;還沒寫完&#xff0c;后面會繼續更&#xff1a; #include <iostream> #include <vector&g…

AI Agent開發大全第四課-提示語工程:從簡單命令到AI對話的“魔法”公式

什么是提示語工程&#xff1f;一個讓AI“聽話”的秘密 如果你曾經嘗試過用ChatGPT或者其他大語言模型完成任務&#xff0c;那么你一定遇到過這樣的情況&#xff1a;明明你的問題是清晰的&#xff0c;但答案卻離題萬里&#xff1b;或者你認為自己提供的信息足夠詳盡&#xff0c…

系統架構設計知識體系總結

1.技術選型 1.什么是技術選型&#xff1f; 技術選型是指評估和選擇在項目或系統開發中使用的最合適的技術和工具的過程。這涉及考慮基于其能力、特性、與項目需求的兼容性、可擴展性、性能、維護和其他因素的各種可用選項。技術選型的目標是確定與項目目標相符合、能夠有效解…