SeaTunnel安裝教程
# ====執行流程 # 下載,解壓 # https://mirrors.aliyun.com/apache/seatunnel/2.3.8/?spm=a2c6h.25603864.0.0.2e2d3f665eBj1E # https://blog.csdn.net/taogumo/article/details/143608532 tar -zxvf apache-seatunnel-2.3.8-bin.tar.gz -C /opt/module/ # 改名 mv apache-seatunnel-2.3.8 seatunnel # 導入連接器 /seatunnel/connectors/ # 鏈接: https://pan.baidu.com/s/1Q4lTMtiBWlP5-3epmCC6jw?pwd=ejkx 提取碼: ejkx mysql hive hdoop # 測試,可以正常執行,說明安裝成功 cd /opt/module/seatunnel/ ./bin/seatunnel.sh --config ./config/v2.batch.config.template -m local
模擬數據到hive-fake2hive
編輯測試腳本fake2hive.config ,source為模擬數據,sink配置hive
env {parallelism = 1job.mode = "BATCH"job.name = "HiveSinkExample" } source {FakeSource { # 示例數據源schema = {fields {id = intname = stringscore = double}}rows = [{ kind = INSERT, fields = [1, "Alice", 90.5] },{ kind = INSERT, fields = [2, "Bob", 85.0] },{ kind = INSERT, fields = [3, "Charlie", 92.0] }]} } sink {Hive {table_name = "default.test_hive_sink"metastore_uri = "thrift://hadoop1:9083"hdfs_site_path = "/opt/module/hadoop/etc/hadoop/hdfs-site.xml"hive_site_path = "/opt/module/hive/conf/hive-site.xml"save_mode = "append"file_format = "text" # 必須與Hive表存儲格式一致} }
配置hive連接,并啟動同步腳本
# 上傳對應連接器 connector-hive-2.3.8.jar connector-file-hadoop-2.3.8.jar # 將hive和hadoop的相關依賴包復制到seatunnel的lib下(本地集群hive為3.1.3版本,hadoop為3.3.4,spark為3.3.1) cp /opt/module/hive/lib/hive-metastore-3.1.3.jar /opt/module/seatunnel/lib/ cp /opt/module/hive/lib/hive-exec-3.1.3.jar /opt/module/seatunnel/lib/ cp /opt/module/hive/lib/libfb303-0.9.3.jar /opt/module/seatunnel/lib/ cp $HADOOP_HOME/share/hadoop/common/*.jar /opt/module/seatunnel/lib/ cp $HADOOP_HOME/share/hadoop/hdfs/*.jar /opt/module/seatunnel/lib/ # 先啟動metastore服務,前后臺啟動命令 hive --service metastore nohup hive --service metastore > metastore.log 2>&1 & # 在hive cli中執行建表語句,創建測試表,配置中設置了自動建表但沒生效 CREATE TABLE IF NOT EXISTS default.test_hive_sink (id INT,name STRING,score DOUBLE ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE; # 執行數據同步命令 cd /opt/module/seatunnel/ ./bin/seatunnel.sh --config ./config/fake2hive.config -m local #如果去掉,需要單獨配置spark或flink分布式引擎 # 驗證數據 hive --database default -e "SELECT * FROM test_hive_sink;"
mysql2console
創建表、導入數據,dbeaver可以直接從數據庫1導入數據庫2。也可以不用創建表,直接將表及數據從數據庫1導入數據庫2.
創建配置文件,主要是source的設置
# Defining the runtime environment env {parallelism = 4job.mode = "BATCH"job.name = "MysqlExample" } source{Jdbc {url = "jdbc:mysql://hadoop1:3306/finance?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "xx"query = "select * from index_def limit 16"} } sink {Console {} }
執行
# 導入mysql引擎到seatunnel的plugin文件下 # /opt/module/seatunnel/plugins mysql-connector-j-8.0.31.jar # 啟動,配置的source的前面要用Jdbc,MYSQL報錯 cd /opt/module/seatunnel/ ./bin/seatunnel.sh --config ./config/mysql2console.config -m local
mysql2hive
在hive中創建要同步的表
先創建數據庫,CREATE DATABASE IF NOT EXISTS finance;
編輯配置腳本mysql2hive
env {parallelism = 1job.mode = "BATCH"job.name = "HiveSinkExample" } source{Jdbc {url = "jdbc:mysql://hadoop1:3306/finance?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "xx"query = "select * from index_def"} } sink {Hive {table_name = "finace.index_def"metastore_uri = "thrift://hadoop1:9083"hdfs_site_path = "/opt/module/hadoop/etc/hadoop/hdfs-site.xml"hive_site_path = "/opt/module/hive/conf/hive-site.xml"save_mode = "append"file_format = "text" # 必須與Hive表存儲格式一致} }
啟動
cd /opt/module/seatunnel/ ./bin/seatunnel.sh --config ./config/mysql2hive.config -m local
同步多張表
env {parallelism = 1job.mode = "BATCH"job.name = "HiveSinkExample" } source{Jdbc {name = "source1"url = "jdbc:mysql://hadoop1:3306/finance?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "xx"query = "select * from index_def1"result_table_name = "index_def1_result"}Jdbc {name = "source2"url = "jdbc:mysql://hadoop1:3306/finance?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "xx"query = "select * from index_def2"result_table_name = "index_def2_result"} } sink {Hive {name = "sink1"table_name = "finace.index_def1"metastore_uri = "thrift://hadoop1:9083"hdfs_site_path = "/opt/module/hadoop/etc/hadoop/hdfs-site.xml"hive_site_path = "/opt/module/hive/conf/hive-site.xml"save_mode = "append"file_format = "text" source_table_name = "index_def1_result" }Hive {name = "sink2"table_name = "finace.index_def2"metastore_uri = "thrift://hadoop1:9083"hdfs_site_path = "/opt/module/hadoop/etc/hadoop/hdfs-site.xml"hive_site_path = "/opt/module/hive/conf/hive-site.xml"save_mode = "append"file_format = "text" source_table_name = "index_def2_result" } }
啟動
cd /opt/module/seatunnel/ ./bin/seatunnel.sh --config ./config/n2hive.config -m local