Seatunnel本地模式快速測驗

前言

SeaTunnel(先前稱為WaterDrop)是一個分布式、高性能、易于擴展的數據集成平臺,旨在實現海量數據的同步和轉換。它支持多種數據處理引擎,包括Apache Spark和Apache Flink,并在某個版本中引入了自主研發的Zeta引擎。SeaTunnel不僅適用于離線數據同步,還能支持CDC(Change Data Capture)實時數據同步,這使得它在處理多樣化數據集成場景時表現出色。

本節內容作為官方的一個補充測驗,快速開始體驗吧。


一、Apache Seatunnel是什么?

從官網的介紹看:
Next-generation high-performance, distributed, massive data integration tool.
通過這幾個關鍵詞你能看到它的定位:下一代,高性能,分布式,大規模數據集成工具。

那到底好不好用呢?

二、安裝

  1. 下載
https://seatunnel.apache.org/download

推薦:v2.3.5

  1. 安裝
    環境:Java8
    安裝插件:修改 config/plugin_config 以下是一些常用的,不要的暫時不裝。
--connectors-v2--
connector-cdc-mysql
connector-clickhouse
connector-file-local
connector-hive
connector-jdbc
connector-kafka
connector-redis
connector-doris
connector-fake
connector-console
connector-elasticsearch
--end--

然后執行:
? seatunnel bin/install-plugin.sh 2.3.5
等待執行完畢,就安裝完了,很簡單。

三、測試

1. 測試 local模式下的用例

修改下模板的測試用例,然后執行如下命令:

bin/seatunnel.sh --config ./config/v2.batch.config -e local任務的配置很簡單:
這里使用了FakeSource來模擬輸出兩列,通過設置并行度=2 來打印 16 條輸出數據。
2024-07-01 21:56:06,617 INFO  [o.a.s.c.s.u.ConfigBuilder     ] [main] - Parsed config file:
{"env" : {"parallelism" : 2,"job.mode" : "BATCH","checkpoint.interval" : 10000},"source" : [{"schema" : {"fields" : {"name" : "string","age" : "int"}},"row.num" : 16,"parallelism" : 2,"result_table_name" : "fake","plugin_name" : "FakeSource"}],"sink" : [{"plugin_name" : "Console"}]
}任務的輸出信息,這里的輸出組件是 Console所以打印到了控制臺
2024-07-01 21:56:07,559 INFO  [o.a.s.c.s.f.s.FakeSourceReader] [BlockingWorker-TaskGroupLocation{jobId=860156818549112833, pipelineId=1, taskGroupId=30000}] - Closed the bounded fake source
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : hECbG, 520364021
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1  rowIndex=1:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : LnGDW, 105727523
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=2:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : UYXBT, 1212484110
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1  rowIndex=2:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : NYiCn, 1208734703
2024-07-01 21:56:07,561 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=3:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : cSZan, 151817804
任務的統計信息:
***********************************************Job Statistic Information
***********************************************
Start Time                : 2024-07-01 21:56:06
End Time                  : 2024-07-01 21:56:08
Total Time(s)             :                   2
Total Read Count          :                  32
Total Write Count         :                  32
Total Failed Count        :                   0
***********************************************

2. 使用 Flink引擎

在上面的測試用例中可以看到如下的日志輸出:

 Discovery plugin jar for: PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='FakeSource'

這表示默認情況下它使用的是 seatunnel engine 執行的,官方稱之為 zeta 。 這一塊內容我們先看下 Flink引擎這邊是如何執行的。

  1. 下載安裝 flink1.17
    https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/

    啟動local cluster 模式

    ?  flink bin/start-cluster.sh
    Starting cluster.
    Starting standalonesession daemon on host MacBook-Pro-2.local.
    Starting taskexecutor daemon on host MacBook-Pro-2.local.
    
  2. 配置環境變量

    ?  config cat seatunnel-env.sh
    # Home directory of spark distribution.
    SPARK_HOME=${SPARK_HOME:-/Users/mac/apps/spark}
    # Home directory of flink distribution.
    FLINK_HOME=${FLINK_HOME:-/Users/mac/apps/flink}
    
  3. 修改slot插槽數量為大于等于 2
    為什么?因為默認的配置中配置了 2 個并行度,而 local啟動的默認情況下只有個插槽可供使用,因此任務無法運行。
    在這里插入圖片描述
    默認啟動后資源插槽:
    在這里插入圖片描述
    提交程序運行后,發現一直無法對 sourcez做任務切分:
    在這里插入圖片描述

    這是因為 job 的并行度是 2,如下所示:
    在這里插入圖片描述

    在這里插入圖片描述

    因此需要修改插槽數量才可以運行,官方這點可沒說清楚,需要注意下。

  4. 運行測試用例

    ?  seatunnel bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/v2.streaming.conf.template
    Execute SeaTunnel Flink Job: ${FLINK_HOME}/bin/flink run -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink /Users/mac/server/apache-seatunnel-2.3.5/starter/seatunnel-flink-15-starter.jar --config ./config/v2.streaming.conf.template --name SeaTunnel
    Job has been submitted with JobID 9a949409a6f218d50b66ca22cc49b9c4
    

    現在我們修改插槽數量為 2,測試如下:
    訪問:http://localhost:8081/#/overview
    在這里插入圖片描述
    TaskManager輸出日志如下:
    在這里插入圖片描述

3. 使用 Spark引擎

  1. 提交命令
?  seatunnel bin/start-seatunnel-spark-3-connector-v2.sh \
--master 'local[4]' \
--deploy-mode client \
--config ./config/v2.streaming.conf.templateExecute SeaTunnel Spark Job: ${SPARK_HOME}/bin/spark-submit --class "org.apache.seatunnel.core.starter.spark.SeaTunnelSpark" --name "SeaTunnel" --master "local[4]" --deploy-mode "client" --jars "/Users/mac/server/seatunnel/lib/seatunnel-transforms-v2.jar,/Users/mac/server/seatunnel/lib/seatunnel-hadoop3-3.1.4-uber.jar,/Users/mac/server/seatunnel/connectors/connector-fake-2.3.5.jar,/Users/mac/server/seatunnel/connectors/connector-console-2.3.5.jar" --conf "job.mode=STREAMING" --conf "parallelism=2" --conf "checkpoint.interval=2000" /Users/mac/server/apache-seatunnel-2.3.5/starter/seatunnel-spark-3-starter.jar --config "./config/v2.streaming.conf.template" --master "local[4]" --deploy-mode "client" --name "SeaTunnel"

遇到報錯:

2024-07-01 23:25:04,610 INFO v2.V2ScanRelationPushDown:
Pushing operators to SeaTunnelSourceTable
Pushed Filters:
Post-Scan Filters:
Output: name#0, age#1Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/write/Write

看樣子是缺少包導致的導致的,可以參見 issue討論https://github.com/apache/seatunnel/issues/4879 貌似需要 spark 版本 >=3.2 ,而我的是 3.1.1 因此當前這個問題暫時無解。

Since spark 3.2.0, buildForBatch and buildForStreaming have been deprecated in org.apache.spark.sql.connector.write.WriteBuilder. So you should keep spark version >= 3.2.0.

于是,我便下載了 3.2.4(spark -> spark-3.2.4-bin-without-hadoop) 測試后出現了新的問題。

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/log4j/spi/Filterat java.lang.Class.getDeclaredMethods0(Native Method)at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)at java.lang.Class.privateGetMethodRecursive(Class.java:3048)at java.lang.Class.getMethod0(Class.java:3018)at java.lang.Class.getMethod(Class.java:1784)at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:684)at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:666)
Caused by: java.lang.ClassNotFoundException: org.apache.log4j.spi.Filterat java.net.URLClassLoader.findClass(URLClassLoader.java:387)at java.lang.ClassLoader.loadClass(ClassLoader.java:418)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:359)at java.lang.ClassLoader.loadClass(ClassLoader.java:351)... 7 more

這說的是 log4j的 jar包似乎不存在,由于我們使用的 spark 版本沒有 hadoop的依賴,因此需要在 spark-env.sh里面配置相關的屬性,如下:

export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home
export HADOOP_HOME=/Users/mac/apps/hadoop
export HADOOP_CONF_DIR=/Users/mac/apps/hadoop/etc/hadoop
export SPARK_DIST_CLASSPATH=$(/Users/mac/apps/hadoop/bin/hadoop classpath)
export SPARK_MASTER_HOST=localhost
export SPARK_MASTER_PORT=7077

再次提交測試后,結果如下:

24/07/02 13:40:19 INFO ConfigBuilder: Parsed config file:
{"env" : {"parallelism" : 2,"job.mode" : "STREAMING","checkpoint.interval" : 2000},"source" : [{"schema" : {"fields" : {"name" : "string","age" : "int"}},"row.num" : 16,"parallelism" : 2,"result_table_name" : "fake","plugin_name" : "FakeSource"}],"sink" : [{"plugin_name" : "Console"}]
}24/07/02 13:40:19 INFO SparkContext: Running Spark version 3.2.4
24/07/02 13:40:25 INFO FakeSourceReader: wait split!
24/07/02 13:40:25 INFO FakeSourceReader: wait split!
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits for table fake successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits for table fake successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigned [FakeSourceSplit(tableId=fake, splitId=1, rowNum=16), FakeSourceSplit(tableId=fake, splitId=0, rowNum=16)] to 2 readers.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigned [FakeSourceSplit(tableId=fake, splitId=1, rowNum=16), FakeSourceSplit(tableId=fake, splitId=0, rowNum=16)] to 2 readers.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigning splits to readers 1 [FakeSourceSplit(tableId=fake, splitId=1, rowNum=16)]
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Calculated splits successfully, the size of splits is 2.
24/07/02 13:40:25 INFO FakeSourceSplitEnumerator: Assigning splits to readers 0 [FakeSourceSplit(tableId=fake, splitId=0, rowNum=16)]
24/07/02 13:40:26 INFO FakeSourceReader: 16 rows of data have been generated in split(fake_1) for table fake. Generation time: 1719898826259
24/07/02 13:40:26 INFO FakeSourceReader: 16 rows of data have been generated in split(fake_0) for table fake. Generation time: 1719898826259
24/07/02 13:40:26 INFO ConsoleSinkWriter: subtaskIndex=1  rowIndex=1:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : eMaly, 2131476727
24/07/02 13:40:26 INFO ConsoleSinkWriter: subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : Osfqi, 257240275
24/07/02 13:40:26 INFO ConsoleSinkWriter: subtaskIndex=1  rowIndex=2:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : BYVKb, 730735331

看結果符合預期,也就是使用 spark 提交 seatunnl引擎的流任務,通過FakeSource模擬兩列輸出了 16 條數據。看來的確是需要 spark3.2.x版本的才能成功了。


參考

https://www.modb.pro/db/605827

總結

本節主要總結了單機模式下使用 seatunel完成官方示例程序,初步體會使用,其實使用起來還是很簡單的,模式同我之前介紹的 DataX如出一轍,可喜的是它有自己的 web頁面可以配置,
因此后面我將分享下如何在頁面中進行配置同步任務,最后時間允許的情況下,分析起優秀的源碼設計思路,千里之行始于足下,要持續學習,持續成長,然后持續分享,再會~。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/38655.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/38655.shtml
英文地址,請注明出處:http://en.pswp.cn/web/38655.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

在Python asyncio中如何識別協程是否被block了

現在asyncio在Python中的使用越來越廣泛了,但是很多人對于協程(corotine)的一些使用方式還不太熟悉。在這篇文章中,我將會介紹如何識別協程是否被block了,并以常用的HTTP網絡庫requests/httpx為例來說明如何避免協程被block的問題。 為什么協程會被block 在Python中,可…

Django學習第二天

啟動項目命令 python manage.py runserver 動態獲取當前時間 javascript實現數據動態更新代碼 <script>setInterval(function() {var currentTimeElement document.getElementById(current-time);var currentTime new Date();currentTimeElement.textContent Curren…

經典的卷積神經網絡模型 - ResNet

經典的卷積神經網絡模型 - ResNet flyfish 2015年&#xff0c;何愷明&#xff08;Kaiming He&#xff09;等人在論文《Deep Residual Learning for Image Recognition》中提出了ResNet&#xff08;Residual Network&#xff0c;殘差網絡&#xff09;。在當時&#xff0c;隨著…

【List】判斷集合相等、集合拷貝

【List】判斷集合相等、集合拷貝 【一】判斷集合是否相等【1】☆使用list中的containAll【2】使用for循環遍歷contains方法【3】將list先排序再轉為String進行比較【4】使用list.retainAll()方法【5】使用MD5加密方式【6】轉換為Java8中的新特性steam流再進行排序來進行比較 【…

AI數字人直播源碼出售價格公布!

隨著數字人行業的興起&#xff0c;以數字人直播為代表的應用場景逐漸成為人們日常生活中不可分割的一部分&#xff0c;再加上艾媒研究數據顯示&#xff0c;超五成以上的被調查群體的企業使用過虛擬人技術&#xff0c;超三成被調查群體的企業計劃使用虛擬人技術等結論的公布&…

python-圖像模糊處理(賽氪OJ)

[題目描述] 給定 n 行 m 列的圖像各像素點的灰度值&#xff0c;要求用如下方法對其進行模糊化處理&#xff1a; 1. 四周最外側的像素點灰度值不變。 2. 中間各像素點新灰度值為該像素點及其上下左右相鄰四個像素點原灰度值的平均&#xff08;四舍五入&#xff09;輸入&#xff…

【C語言】inline 關鍵字

在C語言中&#xff0c;inline關鍵字用于建議編譯器對函數進行內聯展開&#xff0c;而不是像普通函數一樣調用。內聯函數的目的是減少函數調用的開銷&#xff0c;特別是對于簡單的、頻繁調用的函數。 內聯函數的定義和使用 定義內聯函數 要定義一個內聯函數&#xff0c;需要在…

《代號鳶》國服,能否推動國乙市場重新洗牌?

靈犀互娛《如鳶》順利拿到版號&#xff0c;再次攪渾了國乙市場這潭水。 六月份游戲版號審批公布后&#xff0c;靈犀互娛運營的《如鳶》引起了關注&#xff0c;這個與《代號鳶》原名《三國志如鳶》雷同的名字&#xff0c;竟然讓《代號鳶》玩家大面積破防了。 其實目前關于《如…

for循環中list觸發fast-fail或不觸發的原理和方法

Iterable和Iterator Iterator接口位于的位置是java.util.Iterator&#xff0c;它主要有兩個抽象方法供子類實現。hasNext()用來判斷還有沒有數據可供訪問&#xff0c;next()用來訪問下一個數據。 集合Collection不是直接去實現Iterator接口&#xff0c;而是去實現Iterable接口…

【Python】字典練習

python期考練習 目錄 1. 首都名?編輯 2. 摩斯電碼 3. 登錄 4. 學生的姓名和年齡?編輯 5. 電商 6. 學生基本信息 7. 字母數 1. 首都名 初始字典 (可復制) : d{"China":"Beijing","America":"Washington","Norway":…

HCM智能人力資源系統存在命令執行漏洞Getshell

0x01 閱讀須知 技術文章僅供參考&#xff0c;此文所提供的信息只為網絡安全人員對自己所負責的網站、服務器等&#xff08;包括但不限于&#xff09;進行檢測或維護參考&#xff0c;未經授權請勿利用文章中的技術資料對任何計算機系統進行入侵操作。利用此文所提供的信息而造成…

防爆對講終端是什么?在哪些行業中應用廣泛?

防爆對講終端是一種特殊設計的通信設備&#xff0c;它具備防爆性能和可靠的通信功能&#xff0c;確保在存在爆炸性氣體或粉塵的危險環境中使用時不會引發爆炸或火災等危險情況。這種設備通過特殊的設計和防護措施&#xff0c;如采用防爆材料、防靜電、絕緣、阻燃材料等&#xf…

ABAQUS軟件天津正版代理商億達四方:創新技術,驅動產業升級

在環渤海經濟圈的核心地帶——天津&#xff0c;隨著智能制造與高新技術產業的蓬勃發展&#xff0c;對高端仿真軟件的需求日益增長。億達四方&#xff0c;作為ABAQUS在天津的官方正版代理商&#xff0c;憑借其深厚的行業經驗和卓越的服務體系&#xff0c;正為這片熱土上的科研機…

2024年度濰坊市職業技能大賽——網絡搭建(網絡與信息安全管理員)職業技能競賽樣題

2024年度濰坊市職業技能大賽 ——網絡搭建&#xff08;網絡與信息安全管理員&#xff09;職業技能競賽樣題 網絡搭建職業技能競賽組委會 2024年6月 一、項目簡介 &#xff08;一&#xff09;競賽須知 1.技能操作比賽時間150分鐘&#xff0c;你需要合理分配時間。 2.如果沒…

Hive常用的內置函數

文章目錄 聚合類1.指定列值的數目2.指定列值求和3.最大值4.最小值5.平均值6.中位數函數7.分位數函數 數值類1.取整函數Round(a)2.指定精度取整ROUND(double a,int b)3.向上取整FLOOR()4.向下取整CEIL()5.隨機數 rand()6.絕對值函數 日期類獲取當前日期獲取當前時間戳日期前后日…

C++:枚舉類的使用案例及場景

一、使用案例 在C中&#xff0c;枚舉類&#xff08;也稱為枚舉類型或enum class&#xff09;是C11及以后版本中引入的一種更加強大的枚舉類型。與傳統的枚舉&#xff08;enum&#xff09;相比&#xff0c;枚舉類提供了更好的類型安全性和作用域控制。下面是一個使用枚舉類的案…

(linux系統服務)Linux下yum源配置實戰

一、Linux下軟件包的管理 1、軟件安裝方式 ① RPM包管理&#xff08;需要單獨解決依賴問題&#xff09; ② YUM包管理&#xff08;需要有網絡及YUM倉庫的支持&#xff0c;會自動從互聯網下載軟件&#xff0c;自動解決依賴&#xff09; ③ 源碼安裝&#xff08;安裝過程比較…

總體設計在軟件設計中的意義

總體設計&#xff08;High-Level Design, HLD&#xff09;是軟件開發生命周期中的一個關鍵階段&#xff0c;旨在從宏觀層面定義系統的結構和主要組件。總體設計的目標是為詳細設計和實現提供一個清晰的框架和藍圖。 總體設計的意義 明確系統架構&#xff1a;總體設計幫助開發…

基于Java的外賣點餐系統設計與實現

作者介紹&#xff1a;計算機專業研究生&#xff0c;現企業打工人&#xff0c;從事Java全棧開發 主要內容&#xff1a;技術學習筆記、Java實戰項目、項目問題解決記錄、AI、簡歷模板、簡歷指導、技術交流、論文交流&#xff08;SCI論文兩篇&#xff09; 上點關注下點贊 生活越過…

深?理解 JVM 底層原理、垃圾回收機制,能通過mat、jstat進行JVM參數調優

深入理解JVM&#xff08;Java虛擬機&#xff09;底層原理和垃圾回收機制是Java開發者和系統管理員的重要技能&#xff0c;尤其是在性能調優方面。下面是一些關鍵點&#xff0c;幫助你更好地理解這些概念&#xff1a; ### JVM 底層原理 1. **類加載機制**&#xff1a;JVM如何加…