SeaTunnel?是一個非常易用、超高性能的分布式數據集成平臺,支持實時海量數據同步。 每天可穩定高效地同步數百億數據,已被近百家企業應用于生產,在國內較為普及。
Databend?是一款開源、彈性、低成本,基于對象存儲也可以做實時分析的云原生湖倉。
SeaTunnel 架構
SeaTunnel 整體架構:
本文將使用 SeaTunnel 建立從 MySQL 到 Databend 的數據同步管道,實現從 MySQL 數據源同步數據到 Databend 目標表的目的。
SeaTunnel MySQL-CDC 和 Databend Sink Connector
SeaTunnel 的 MySQL CDC 連接器允許從 MySQL 數據庫中讀取快照數據和增量數據,其實現的原理是基于 debezium-mysql-connector 。
而 Databend 在 PR?[Feature][Connector-V2] Support databend source/sink connector?之后也同時在 SeaTunnel 中支持了 Databend 作為 Source 和 Sink Connector。這里我們使用 SeaTunnel 的 MySQL-CDC Source Connector 和 Databend Sink Connector 來搭建數據同步管道。
編譯 SeaTunnel
由于上述 Databend Connector 的 PR 剛合并入 SeaTunnel 的 dev 分支,還沒有正式 release,所以目前要使用 Databend Connector 的話,需要基于源碼對 SeaTunnel 進行構建。
Clone 源碼
首先我們需要從?GitHub?克隆 SeaTunnel 源代碼。
git clone git@github.com:apache/seatunnel.git
本地安裝子項目
在克隆源代碼之后,需要運行?./mvnw
?命令將子項目安裝到 maven 本地存儲庫。否則代碼無法在 JetBrains IntelliJ IDEA 中正確啟動。
./mvnw install -Dmaven.test.skip
構建 SeaTunnel
安裝 maven 后,可以使用以下命令進行編譯和打包。
mvn clean package -pl seatunnel-dist -am -Dmaven.test.skip=true
構建后的內容在?seatunnel/seatunnel-dist/target
?中,我們需要解壓?apache-seatunnel-2.3.12-SNAPSHOT-src.tar.gz
,得到如下目錄:?
bin
?下面是可以直接運行的 shell 腳本,能夠一鍵啟動 SeaTunnel;
config
?中是 jvm options 相關的配置文件;
lib
中是運行 SeaTunnel 或者 connector 相關的 jar 包。
創建 connector 配置文件
我們的任務設定是通過 SeaTunnel 從 MySQL 中同步 mydb.t1 表。 配置文件 為 mysql-to-databend.conf:
env{parallelism = 1job.mode = "STREAMING"checkpoint.interval = 2000
}source {MySQL-CDC {base-url="jdbc:mysql://127.0.0.1:3306/mydb"username="root"password="123456"table-names=["mydb.t1"]startup.mode="initial"}
}
sink {Databend {url = "jdbc:databend://127.0.0.1:8000?presigned_url_disabled=true"database = "default"table = "t1"username = "databend"password = "databend"# 批量操作設置batch_size = 2# 如果目標表不存在,是否自動創建auto_create = true}
}
相關的參數設定可以參考?seatunnel MySQL文檔?和?seatunnel Databend Connector。
本地啟動 MySQL 與 Databend
啟動并初始化 MySQL 表數據
本地啟動 MySQL 后,創建一個數據庫?mydb
,在 mydb 中新建一張表并插入 10 條數據:
create database mydb;
use mydb;
create table t1 (a int, b varchar(100));
insert into t1 values(1,'aa')
...
insert into t1 values(10,'bb')
本地啟動 Databend
version: '3'
services:databend:image: datafuselabs/databend:v1.2.754-nightlyplatform: linux/arm64ports:- "8000:8000"environment:- QUERY_DEFAULT_USER=databend- QUERY_DEFAULT_PASSWORD=databend- MINIO_ENABLED=truevolumes:- ./data:/var/lib/miniohealthcheck:test: "curl -f localhost:8080/v1/health || exit 1"interval: 2sretries: 10start_period: 2stimeout: 1s
直接?docker-compose up
?即可啟動 Databend 服務。
啟動 SeaTunnel
./bin/seatunnel.sh --config ./bin/mysql-to-databend.conf -m local
啟動后 Databend Sink Connector 會首先將 MySQL 表中的全量數據同步過來:
接下來我們往 MySQL 中插入幾條數據,就會同步 MySQL 中增量的數據:
可以看到 SeaTunnel 在終端輸出的日志:?
以及 Databend 中查詢到數據:
說明數據已經及時同步過來了。
目前 Databend Sink Connector 還只支持 Append Only 模式,對于 update、delete 的數據沒做處理,會在下一個 seatunnel 的 PR 中實現完整的 CDC 功能。
結論
通過本文我們成功實現了從 MySQL 到 Databend 的實時數據同步管道。這個解決方案具有以下優勢:
- 簡單易用:SeaTunnel 提供了簡潔的配置方式,只需少量配置即可建立高效的數據同步管道。
- 實時性強:基于 CDC 技術,能夠實時捕獲 MySQL 的數據變更并同步到 Databend。
- 可擴展性好:SeaTunnel 的分布式架構使其能夠處理海量數據同步需求。
- 低開發成本:無需編寫復雜的 ETL 代碼,通過配置文件即可完成數據集成任務。
需要注意的是,目前 Databend Sink Connector 還只支持 Append Only 模式,對于 update、delete 的數據沒做處理,完整的 CDC 功能將在后續的 PR 中實現。這個方案特別適合需要將 MySQL 數據實時同步到 Databend 進行分析的場景,幫助企業構建實時數據湖倉架構。
關于 Databend
Databend 是一款開源、彈性、低成本,基于對象存儲也可以做實時分析的新式湖倉。期待您的關注,一起探索云原生數倉解決方案,打造新一代開源 Data Cloud。
👨?💻? Databend Cloud:databend.cn
📖 Databend 文檔:docs.databend.cn
💻 Wechat:Databend
? GitHub:github.com/databendlab...