文章作者:雷寶鑫
整理排版:白鯨開源 曾輝
Apache SeaTunnel官網鏈接: https://seatunnel.apache.org/
Apache SeaTunnel(以下簡稱SeaTunnel)是一款新一代高性能、分布式的數據集成同步工具,正受到業界廣泛關注和應用。SeaTunnel支持三種部署模式:本地模式(Local)、混合集群模式(Hybrid Cluster Mode)和分離集群模式(Separated Cluster Mode)。
本文嘗試介紹如何在K8s上以分離集群模式部署SeaTunnel,為有相關需求的伙伴提供完整的部署流程和配置案例參考。
前期準備
在開始部署之前,需要確保以下環境和組件已經準備就緒:
- Kubernetes集群環境
- kubectl命令行工具
- docker
- helm (option)
對于熟悉和具有Helm環境的部署,可以直接參考官網中使用Helm部署教程:
- https://seatunnel.apache.org/docs/2.3.10/start-v2/kubernetes/helm
- https://github.com/apache/seatunnel/tree/dev/deploy/kubernetes/seatunnel
本文主要介紹基于Kubernetes
環境和kubectl
工具的方式實現部署。
構建SeaTunnel Docker鏡像
目前官方已提供各版本的Docker鏡像,可直接拉取,詳細信息可參考官方文檔:Set Up With Docker。
docker pull apache/seatunnel:<version_tag>
由于我們需要部署的是集群模式,接下來需要配置集群間的網絡通信。SeaTunnel集群的網絡服務是通過Hazelcast實現的,所以接下來對這部分內容進行配置。
Hazelcast集群相關配置
Headless Service配置
Hazelcast 集群是由運行 Hazelcast 的集群成員組成的網絡,集群成員自動聯合起來形成一個集群,這種自動加入是通過集群成員用于查找彼此的各種發現機制實現的。
Hazelcast 支持以下發現機制:
- 自動發現機制,支持以下環境:
- AWS
- Azure
- GCP
- Kubernetes
- TCP
- Multicast
- Eureka
- Zookeeper
在本文的集群部署中,我們基于Hazelcast
的Kubernetes
自動發現機制來配置文件,詳細的原理可以參考官網文檔:Kubernetes Auto Discovery。
Hazelcast的k8s自動發現機制(DNS Lookup mode)需要借助于k8s的Headless Service
功能來實現。
Headless Service
在查詢服務域名時,會將域名解析為所有匹配Pod
的IP
地址列表,以此來實現Hazelcast集群成員互相發現彼此。
為此,首先我們創建K8s Headless Service
服務:
# use for hazelcast cluster join
apiVersion: v1
kind: Service
metadata:name: seatunnel-cluster
spec:type: ClusterIPclusterIP: Noneselector:app.kubernetes.io/instance: seatunnel-cluster-appapp.kubernetes.io/version: 2.3.10ports:- port: 5801name: hazelcast
上述配置中的關鍵部分:
metadata.name: seatunnel-cluster
: 服務名稱,Hazelcast 客戶端/節點將通過該名稱發現集群spec.clusterIP: None
:關鍵配置,聲明為 Headless Service,不分配虛擬 IPspec.selector
: 選擇器匹配的 Pod 標簽,包含相應標簽的pod會被該Service識別和代理spec.port
:Hazelcast的暴露端口
同時,為了能從系統外部利用rest api
訪問集群,我們定義另一個Service來包含Master的節點pod
:
# use for access seatunnel from outside system via rest api
apiVersion: v1
kind: Service
metadata:name: seatunnel-cluster-master
spec:type: ClusterIPclusterIP: Noneselector:app.kubernetes.io/instance: seatunnel-cluster-appapp.kubernetes.io/version: 2.3.10app.kubernetes.io/name: seatunnel-cluster-masterapp.kubernetes.io/component: masterports:- port: 8080name: "master-port"targetPort: 8080protocol: TCP
定義好上述K8s的Service服務后,接下來根據Hazelcast的k8s發現機制來配置hazelcast-master.yaml
和hazelcast-worker.yaml
文件。
Hazelcast master和worker的yaml配置
對于SeaTunnel分離集群模式來說,所有網絡相關的配置都在hazelcast-master.yaml
和hazelcast-worker.yaml
文件中。
hazelcast-master.yaml
的配置如下所示:
hazelcast:cluster-name: seatunnel-clusternetwork:rest-api:enabled: trueendpoint-groups:CLUSTER_WRITE:enabled: trueDATA:enabled: truejoin:kubernetes:enabled: trueservice-dns: seatunnel-cluster.bigdata.svc.cluster.localservice-port: 5801port:auto-increment: falseport: 5801properties:hazelcast.invocation.max.retry.count: 20hazelcast.tcp.join.port.try.count: 30hazelcast.logging.type: log4j2hazelcast.operation.generic.thread.count: 50hazelcast.heartbeat.failuredetector.type: phi-accrualhazelcast.heartbeat.interval.seconds: 30hazelcast.max.no.heartbeat.seconds: 300hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 15hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 200
上述配置文件中的關鍵配置項如下:
cluster-name
該配置用于確定多個節點是否屬于同一個集群,即只有相同cluster-name的節點才會屬于同一個集群。如果兩個節點之間的cluster-name名稱不同,Hazelcast 將會拒絕服務請求。
網絡配置
- rest-api.enabled:在ST 2.3.10版本中Hazelcast REST 服務默認在配置中禁用,需要手動顯式指定開啟。
- service-dns(必填):Headless Service 的完整域名,通常為 S E R V I C E ? N A M E . {SERVICE-NAME}. SERVICE?NAME.{NAMESPACE}.svc.cluster.local。
- service-port(可選):Hazelcast 端口;如果指定的值大于 0,則覆蓋默認值(默認端口 = 5701)
使用上述基于k8s的join機制,在Hazelcast Pod啟動時會解析service-dns,獲取所有成員pod的IP列表(通過Headless Service
),然后成員之間通過5801
端口嘗試建立TCP連接。
同樣的,對于hazelcast-worker.yaml
配置文件如下所示:
hazelcast:cluster-name: seatunnel-clusternetwork:rest-api:enabled: trueendpoint-groups:CLUSTER_WRITE:enabled: trueDATA:enabled: truejoin:kubernetes:enabled: trueservice-dns: seatunnel-cluster.bigdata.svc.cluster.localservice-port: 5801port:auto-increment: falseport: 5801properties:hazelcast.invocation.max.retry.count: 20hazelcast.tcp.join.port.try.count: 30hazelcast.logging.type: log4j2hazelcast.operation.generic.thread.count: 50hazelcast.heartbeat.failuredetector.type: phi-accrualhazelcast.heartbeat.interval.seconds: 30hazelcast.max.no.heartbeat.seconds: 300hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 15hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 200member-attributes:rule:type: stringvalue: worker
通過上述流程,我們就創建好了與Hazelcast集群相關的配置和服務,實現了Hazecast基于Kubernetes的集群成員發現。
接下來,繼續完成有關SeaTunnel引擎的相關配置。
配置SeaTunnel引擎
SeaTunnel引擎的相關配置都在seatunnel.yaml
文件中,下面給出seatunnel.yaml
配置示例以供參考:
seatunnel:engine:history-job-expire-minutes: 1440backup-count: 1queue-type: blockingqueueprint-execution-info-interval: 60print-job-metrics-info-interval: 60classloader-cache-mode: truehttp:enable-http: trueport: 8080enable-dynamic-port: falseport-range: 100slot-service:dynamic-slot: truecheckpoint:interval: 300000timeout: 60000storage:type: hdfsmax-retained: 3plugin-config:namespace: /tmp/seatunnel/checkpoint_snapshotstorage.type: hdfsfs.defaultFS: hdfs://xxx:8020 # Ensure that the directory has written permissiontelemetry:metric:enabled: true
包含以下配置信息:
history-job-expire-minutes
:任務歷史記錄保留時長為 24 小時(1440 分鐘),超時自動清理。backup-count: 1
:任務狀態備份副本數為 1。queue-type: blockingqueue
:使用阻塞隊列管理任務,避免資源耗盡。print-execution-info-interval: 60
:每分鐘打印一次任務執行狀態。print-job-metrics-info-interval: 60
:每分鐘輸出一次任務指標(如吞吐量、延遲)。classloader-cache-mode: true
:啟用類加載緩存,減少重復加載開銷,提升性能。dynamic-slot: true
:允許根據負載動態調整任務槽(Slot)數量,優化資源利用率。checkpoint.interval: 300000
:每 5 分鐘觸發一次檢查點(Checkpoint)。checkpoint.timeout: 60000
:檢查點超時時間為 1 分鐘。telemetry.metric.enabled: true
:啟用任務運行指標采集(如延遲、吞吐量),便于監控。
創建k8s yaml文件部署應用
在完成上面的工作流程后,我們就可以進入到最后一步:創建Master和Worker節點的k8s yaml文件定義部署的相關配置。
為了將配置文件與應用程序解耦,我們將上文中列出的配置文件合并到一個ConfigMap中,并掛載到容器的配置路徑下,便于對配置文件的統一管理和更新。
以下是針對 seatunnel-cluster-master.yaml
和 seatunnel-cluster-worker.yaml
的配置示例,涵蓋了配置 ConfigMap
掛載、容器啟動命令以及部署資源定義等相關內容。
seatunnel-cluster-master.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:name: seatunnel-cluster-master
spec:replicas: 2 # modify replicas according to your casestrategy:type: RollingUpdaterollingUpdate:maxUnavailable: 25%maxSurge: 50%selector:matchLabels:app.kubernetes.io/instance: seatunnel-cluster-appapp.kubernetes.io/version: 2.3.10app.kubernetes.io/name: seatunnel-cluster-masterapp.kubernetes.io/component: mastertemplate:metadata:annotations:prometheus.io/path: /hazelcast/rest/instance/metricsprometheus.io/port: "5801"prometheus.io/scrape: "true"prometheus.io/role: "seatunnel-master"labels:app.kubernetes.io/instance: seatunnel-cluster-appapp.kubernetes.io/version: 2.3.10app.kubernetes.io/name: seatunnel-cluster-masterapp.kubernetes.io/component: masterspec:affinity:nodeAffinity:requiredDuringSchedulingIgnoredDuringExecution:nodeSelectorTerms:- matchExpressions:- key: nodeAffinity-keyoperator: Existscontainers:- name: seatunnel-masterimage: seatunnel:2.3.10imagePullPolicy: IfNotPresentports:- containerPort: 5801name: hazelcast- containerPort: 8080name: "master-port"command:- /opt/seatunnel/bin/seatunnel-cluster.sh- -r- masterresources:requests:cpu: "1"memory: 4GvolumeMounts:- mountPath: "/opt/seatunnel/config/hazelcast-master.yaml"name: seatunnel-configssubPath: hazelcast-master.yaml- mountPath: "/opt/seatunnel/config/hazelcast-worker.yaml"name: seatunnel-configssubPath: hazelcast-worker.yaml- mountPath: "/opt/seatunnel/config/seatunnel.yaml"name: seatunnel-configssubPath: seatunnel.yaml- mountPath: "/opt/seatunnel/config/hazelcast-client.yaml"name: seatunnel-configssubPath: hazelcast-client.yaml- mountPath: "/opt/seatunnel/config/log4j2_client.properties"name: seatunnel-configssubPath: log4j2_client.properties- mountPath: "/opt/seatunnel/config/log4j2.properties"name: seatunnel-configssubPath: log4j2.propertiesvolumes:- name: seatunnel-configsconfigMap:name: seatunnel-cluster-configs
部署策略
- 采用多副本(replicas=2)部署確保服務高可用
- 滾動更新策略(RollingUpdate)實現零停機部署:
maxUnavailable: 25%
:保證更新期間至少75%的Pod保持運行maxSurge: 50%
:允許臨時增加50%的Pod資源用于平滑過渡
標簽選擇器
- 采用Kubernetes推薦的標準標簽體系
spec.selector.matchLabels:
根據標簽定義Deployment管理Pod的范圍spec.template.labels
: 定義新創建Pod的標簽,標識Pod的元數據。
節點親和性
- 配置
affinity
屬性指定Pod調度的節點,需要根據自己k8s環境的節點標簽進行替換。
配置文件掛載
- 核心配置文件統一管理在
ConfigMap
中,便于管理以及與應用程序解耦 - 通過subPath指定掛載的單個文件
seatunnel-cluster-worker.yaml
配置文件如下:
apiVersion: apps/v1
kind: Deployment
metadata:name: seatunnel-cluster-worker
spec:replicas: 3 # modify replicas according to your casestrategy:type: RollingUpdaterollingUpdate:maxUnavailable: 25%maxSurge: 50%selector:matchLabels:app.kubernetes.io/instance: seatunnel-cluster-appapp.kubernetes.io/version: 2.3.10app.kubernetes.io/name: seatunnel-cluster-workerapp.kubernetes.io/component: workertemplate:metadata:annotations:prometheus.io/path: /hazelcast/rest/instance/metricsprometheus.io/port: "5801"prometheus.io/scrape: "true"prometheus.io/role: "seatunnel-worker"labels:app.kubernetes.io/instance: seatunnel-cluster-appapp.kubernetes.io/version: 2.3.10app.kubernetes.io/name: seatunnel-cluster-workerapp.kubernetes.io/component: workerspec:affinity:nodeAffinity:requiredDuringSchedulingIgnoredDuringExecution:nodeSelectorTerms:- matchExpressions:- key: nodeAffinity-keyoperator: Existscontainers:- name: seatunnel-workerimage: seatunnel:2.3.10imagePullPolicy: IfNotPresentports:- containerPort: 5801name: hazelcastcommand:- /opt/seatunnel/bin/seatunnel-cluster.sh- -r- workerresources:requests:cpu: "1"memory: 10GvolumeMounts:- mountPath: "/opt/seatunnel/config/hazelcast-master.yaml"name: seatunnel-configssubPath: hazelcast-master.yaml- mountPath: "/opt/seatunnel/config/hazelcast-worker.yaml"name: seatunnel-configssubPath: hazelcast-worker.yaml- mountPath: "/opt/seatunnel/config/seatunnel.yaml"name: seatunnel-configssubPath: seatunnel.yaml- mountPath: "/opt/seatunnel/config/hazelcast-client.yaml"name: seatunnel-configssubPath: hazelcast-client.yaml- mountPath: "/opt/seatunnel/config/log4j2_client.properties"name: seatunnel-configssubPath: log4j2_client.properties- mountPath: "/opt/seatunnel/config/log4j2.properties"name: seatunnel-configssubPath: log4j2.propertiesvolumes:- name: seatunnel-configsconfigMap:name: seatunnel-cluster-configs
定義好上述master和worker的yaml文件后,就可以執行以下命令進行部署到k8s集群了:
kubectl apply -f seatunnel-cluster-master.yaml
kubectl apply -f seatunnel-cluster-worker.yaml
正常情況下會看到SeaTunnel集群中共有2個master節點和3個worker節點:
$ kubectl get pods | grep seatunnel-clusterseatunnel-cluster-master-6989898f66-6fjz8 1/1 Running 0 156m
seatunnel-cluster-master-6989898f66-hbtdn 1/1 Running 0 155m
seatunnel-cluster-worker-87fb469f7-5c96x 1/1 Running 0 156m
seatunnel-cluster-worker-87fb469f7-7kt2h 1/1 Running 0 155m
seatunnel-cluster-worker-87fb469f7-drm9r 1/1 Running 0 156m
至此,我們已成功在Kubernetes環境中以分離集群模式部署了SeaTunnel集群。
如今,集群已就緒,如何在客戶端向其提交任務呢?
客戶端提交任務到集群
使用命令行工具提交任務
有關SeaTunnel客戶端的配置都在hazelcast-client.yaml文件中。
首先需要在客戶端本地下載二進制安裝包(包含bin、config文件),并保證SeaTunnel的安裝路徑與服務端一致,這也就是官網中所說的:Setting the?SEATUNNEL_HOME?the same as the server
,否則,可能會導致出現諸如無法在服務器端找到連接器插件路徑等錯誤(因為服務端插件路徑與客戶端路徑不一致)。
進入安裝路徑下,只需要修改config/hazelcast-client.yaml
文件,配置指向剛剛創建的Headless Service
服務地址即可:
hazelcast-client:cluster-name: seatunnel-clusterproperties:hazelcast.logging.type: log4j2connection-strategy:connection-retry:cluster-connect-timeout-millis: 3000network:cluster-members:- seatunnel-cluster.bigdata.svc.cluster.local:5801
客戶端配置完成后,即可將任務提交至集群執行。任務提交時的JVM參數配置方式主要有兩種:
-
在
config/jvm_client_options
文件中配置任務提交時的JVM參數此方法配置的JVM參數將應用于所有通過
seatunnel.sh
提交的任務,無論運行于本地模式還是集群模式。所有提交的任務都將共享相同的JVM參數配置。 -
在提交任務的命令行中指定JVM參數。
使用
seatunnel.sh
提交任務時,可在命令行中直接指定JVM參數,例如:sh bin/seatunnel.sh --config $SEATUNNEL_HOME/config/v2.batch.config.template -DJvmOption=-Xms2G -Xmx2G
。此方法允許為每個提交的任務獨立配置JVM參數。
接下來通過一個案例來演示客戶端提交任務至集群執行的完整流程:
env {parallelism = 2job.mode = "STREAMING"checkpoint.interval = 2000
}source {FakeSource {parallelism = 2plugin_output = "fake"row.num = 16schema = {fields {name = "string"age = "int"}}}
}sink {Console {}
}
在客戶端使用以下命令提交任務:
sh bin/seatunnel.sh --config config/v2.streaming.example.template -m cluster -n st.example.template -DJvmOption="-Xms2G -Xmx2G"
在Master節點,使用如下命令列出正在運行的任務列表:
$ sh bin/seatunnel.sh -lJob ID Job Name Job Status Submit Time Finished Time
------------------ ------------------- ---------- ----------------------- -----------------------
964354250769432580 st.example.template RUNNING 2025-04-15 10:39:30.588
可以看到,我們剛剛向集群中提交的st.example.template
任務已經處于RUNNING狀態了。現在我們可以在Worker節點日志中看到如下日志打印:
2025-04-15 10:34:41,998 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : bdaUB, 110348049
2025-04-15 10:34:41,998 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1 rowIndex=1: SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : mOifY, 1974539087
2025-04-15 10:34:41,999 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : jKFrR, 1828047742
2025-04-15 10:34:41,999 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1 rowIndex=2: SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : gDiqR, 1177544796
2025-04-15 10:34:41,999 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : bCVxc, 909343602
...
說明我們的任務成功提交至所創建的SeaTunnel集群,并且確認其正常運行。
使用Rest Api接口提交任務
SeaTunnel提供了通過Rest Api接口的方式來查詢運行作業的狀態和統計信息,以及提交/停止作業等操作。
在上文中我們配置了只包含Master節點的Headless Service,并指定暴露的端口為8080
。因此,我們就可以在客戶端使用Rest API
接口的方式來實現任務的提交。
SeaTunnel Rest API
接口提供了通過上傳配置文件來提交任務,命令如下:
$ curl 'http://seatunnel-cluster-master.bigdata.svc.cluster.local:8080/submit-job/upload' --form 'config_file=@"/opt/seatunnel/config/v2.streaming.example.template"' --form 'jobName=st.example.template'{"jobId":"964553575034257409","jobName":"st.example.template"}
如果作業提交成功,會返回jobId
和jobName
,如上所示。
接下來,通過Rest API
接口獲取集群正在運行的所有任務,觀察剛剛提交的任務信息:
curl 'http://seatunnel-cluster-master.bigdata.svc.colo.gzgalocal:8080/running-jobs'
[{"jobId":"964553575034257409","jobName":"st.example.template","jobStatus":"RUNNING","envOptions":{"job.mode":"STREAMING","checkpoint.interval":"2000","parallelism":"2"}, ...]
可以看到接口返回顯示了任務狀態和其他額外的元數據信息,說明我們通過Rest Api接口提交任務的方式也成功執行。更多Rest Api接口介紹可以參考官網:RESTful API V2
總結
本文著重介紹了如何以推薦的分離集群模式(Separated Cluster Mode)部署k8s集群的實踐,總結下來,部署過程主要包含以下步驟:
-
準備 Kubernetes 環境
確保已搭建并運行一個可用的 Kubernetes 集群,并安裝所有必要的組件。
-
構建 SeaTunnel Docker 鏡像
如果沒有二次開發需求,可直接使用官方提供的鏡像。否則,在本地編譯打包后,編寫 Dockerfile 并構建 SeaTunnel 鏡像。
-
配置Headless Service和Hazelcast集群
Hazelcast的k8s自動發現機制的DNS Lookup模式是基于k8s的Headless Service功能來實現的,因此首先創建Headless Service服務,并在hazelcast的yaml配置文件中通過service-dns來指定服務地址。
Headless Service會在域名解析時解析成所包含pod的IP地址集合,以此實現hazelcast集群成員之間的彼此發現。
-
配置 SeaTunnel 引擎
修改seatunnel.yaml文件,配置SeaTunnel引擎參數。
-
創建k8s yaml部署文件
分別創建Master和Worker的k8s yaml文件,配置節點標簽、啟動命令、資源和數據卷掛載等內容,最終將其部署到k8s集群。
-
配置 SeaTunnel 客戶端
在客戶端安裝SeaTunnel,并確保客戶端的安裝路徑 (
SEATUNNEL_HOME
) 與服務端一致。修改hazelcast-client.yaml
文件,配置客戶端連接到集群Service服務的地址。 -
任務提交與執行:
完成以上步驟后,即可在客戶端提交任務并由 SeaTunnel 集群執行。
本文上述配置案例僅供參考,可能仍有很多配置項和配置內容未涉及,歡迎各位補充與討論,希望有各位有所幫助!