seatunnel配置mysql2hive

SeaTunnel安裝教程

# ====執行流程
# 下載,解壓
# https://mirrors.aliyun.com/apache/seatunnel/2.3.8/?spm=a2c6h.25603864.0.0.2e2d3f665eBj1E
# https://blog.csdn.net/taogumo/article/details/143608532
tar -zxvf apache-seatunnel-2.3.8-bin.tar.gz -C /opt/module/ 
# 改名
mv apache-seatunnel-2.3.8 seatunnel
# 導入連接器 /seatunnel/connectors/
# 鏈接: https://pan.baidu.com/s/1Q4lTMtiBWlP5-3epmCC6jw?pwd=ejkx 提取碼: ejkx 
mysql hive hdoop
# 測試,可以正常執行,說明安裝成功
cd /opt/module/seatunnel/ 
./bin/seatunnel.sh 
--config ./config/v2.batch.config.template 
-m local

模擬數據到hive-fake2hive

編輯測試腳本fake2hive.config ,source為模擬數據,sink配置hive

env {parallelism = 1job.mode = "BATCH"job.name = "HiveSinkExample"
}
source {FakeSource {  # 示例數據源schema = {fields {id = intname = stringscore = double}}rows = [{ kind = INSERT, fields = [1, "Alice", 90.5] },{ kind = INSERT, fields = [2, "Bob", 85.0] },{ kind = INSERT, fields = [3, "Charlie", 92.0] }]}
}
sink {Hive {table_name = "default.test_hive_sink"metastore_uri = "thrift://hadoop1:9083"hdfs_site_path = "/opt/module/hadoop/etc/hadoop/hdfs-site.xml"hive_site_path = "/opt/module/hive/conf/hive-site.xml"save_mode = "append"file_format = "text"                  # 必須與Hive表存儲格式一致}
}

配置hive連接,并啟動同步腳本

# 上傳對應連接器
connector-hive-2.3.8.jar
connector-file-hadoop-2.3.8.jar
# 將hive和hadoop的相關依賴包復制到seatunnel的lib下(本地集群hive為3.1.3版本,hadoop為3.3.4,spark為3.3.1)
cp /opt/module/hive/lib/hive-metastore-3.1.3.jar /opt/module/seatunnel/lib/
cp /opt/module/hive/lib/hive-exec-3.1.3.jar /opt/module/seatunnel/lib/
cp /opt/module/hive/lib/libfb303-0.9.3.jar /opt/module/seatunnel/lib/
cp $HADOOP_HOME/share/hadoop/common/*.jar /opt/module/seatunnel/lib/
cp $HADOOP_HOME/share/hadoop/hdfs/*.jar /opt/module/seatunnel/lib/
# 先啟動metastore服務,前后臺啟動命令
hive --service metastore
nohup hive --service metastore > metastore.log 2>&1 &
# 在hive cli中執行建表語句,創建測試表,配置中設置了自動建表但沒生效
CREATE TABLE IF NOT EXISTS default.test_hive_sink (id INT,name STRING,score DOUBLE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','  
STORED AS TEXTFILE;  
# 執行數據同步命令
cd /opt/module/seatunnel/ 
./bin/seatunnel.sh 
--config ./config/fake2hive.config 
-m local #如果去掉,需要單獨配置spark或flink分布式引擎
# 驗證數據
hive --database default -e "SELECT * FROM test_hive_sink;"

mysql2console

創建表、導入數據,dbeaver可以直接從數據庫1導入數據庫2。也可以不用創建表,直接將表及數據從數據庫1導入數據庫2.

創建配置文件,主要是source的設置

# Defining the runtime environment
env {parallelism = 4job.mode = "BATCH"job.name = "MysqlExample"
}
source{Jdbc {url = "jdbc:mysql://hadoop1:3306/finance?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "xx"query = "select * from index_def limit 16"}
}
sink {Console {}
}

執行

# 導入mysql引擎到seatunnel的plugin文件下
# /opt/module/seatunnel/plugins
mysql-connector-j-8.0.31.jar
# 啟動,配置的source的前面要用Jdbc,MYSQL報錯
cd /opt/module/seatunnel/ 
./bin/seatunnel.sh 
--config ./config/mysql2console.config
-m local

mysql2hive

在hive中創建要同步的表

先創建數據庫,CREATE DATABASE IF NOT EXISTS finance;

編輯配置腳本mysql2hive

env {parallelism = 1job.mode = "BATCH"job.name = "HiveSinkExample"
}
source{Jdbc {url = "jdbc:mysql://hadoop1:3306/finance?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "xx"query = "select * from index_def"}
}
sink {Hive {table_name = "finace.index_def"metastore_uri = "thrift://hadoop1:9083"hdfs_site_path = "/opt/module/hadoop/etc/hadoop/hdfs-site.xml"hive_site_path = "/opt/module/hive/conf/hive-site.xml"save_mode = "append"file_format = "text"                  # 必須與Hive表存儲格式一致}
}

啟動

cd /opt/module/seatunnel/ 
./bin/seatunnel.sh 
--config ./config/mysql2hive.config
-m local

同步多張表

env {parallelism = 1job.mode = "BATCH"job.name = "HiveSinkExample"
}
source{Jdbc {name = "source1"url = "jdbc:mysql://hadoop1:3306/finance?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "xx"query = "select * from index_def1"result_table_name = "index_def1_result"}Jdbc {name = "source2"url = "jdbc:mysql://hadoop1:3306/finance?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "xx"query = "select * from index_def2"result_table_name = "index_def2_result"}    
}
sink {Hive {name = "sink1"table_name = "finace.index_def1"metastore_uri = "thrift://hadoop1:9083"hdfs_site_path = "/opt/module/hadoop/etc/hadoop/hdfs-site.xml"hive_site_path = "/opt/module/hive/conf/hive-site.xml"save_mode = "append"file_format = "text"                 source_table_name = "index_def1_result" }Hive {name = "sink2"table_name = "finace.index_def2"metastore_uri = "thrift://hadoop1:9083"hdfs_site_path = "/opt/module/hadoop/etc/hadoop/hdfs-site.xml"hive_site_path = "/opt/module/hive/conf/hive-site.xml"save_mode = "append"file_format = "text"        source_table_name = "index_def2_result" 
}
}

啟動

cd /opt/module/seatunnel/ 
./bin/seatunnel.sh 
--config ./config/n2hive.config
-m local

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/74758.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/74758.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/74758.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

SSH項目負載均衡中的Session一致性解決方案?

SSH項目負載均衡中的Session一致性解決方案? 1. 粘性會話(Session Sticky)?2. Session復制(集群同步)?3. 集中式Session存儲?4. 客戶端存儲(Cookie加密)?方案選型建議?注意事項? 1. 粘性會話&#x…

MySQL 表連接(內連接與外連接)

🏝?專欄:Mysql_貓咪-9527的博客-CSDN博客 🌅主頁:貓咪-9527-CSDN博客 “欲窮千里目,更上一層樓。會當凌絕頂,一覽眾山小。” 目錄 1、表連接的核心概念 1.1 為什么需要表連接? 2、內連接&a…

解鎖Spring Boot異步編程:讓你的應用“飛“起來!

引言:從點餐說起 🍔 想象你在快餐店點餐: 同步模式:排隊等餐,隊伍越來越長(就像卡死的服務器)異步模式:拿號后去旁邊坐著等(服務員喊號通知你) 今天我們就…

做一個有天有地的css及html畫的旋轉陰陽魚

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>天地陰陽</title><style>/* 重置默認樣…

ngx_http_core_main_conf_t

定義在 src\http\ngx_http_core_module.h typedef struct {ngx_array_t servers; /* ngx_http_core_srv_conf_t */ngx_http_phase_engine_t phase_engine;ngx_hash_t headers_in_hash;ngx_hash_t variables_hash;…

計算機二級(C語言)考試高頻考點總匯(二)—— 控制流、函數、數組和指針

目錄 六、控制流 七、函數 八、數組和指針 六、控制流 76. if 語句可以&#xff08;嵌套&#xff09;&#xff0c; if 語句可以嵌套在另一個 if 語句內部&#xff0c;形成&#xff08;嵌套的條件判斷結構&#xff09;&#xff0c;用于處理更復雜的條件判斷邏輯。 77. els…

WebRTC協議全面教程:原理、應用與優化指南

一、WebRTC協議概述 **WebRTC&#xff08;Web Real-Time Communication&#xff09;**是一種開源的實時通信協議&#xff0c;支持瀏覽器和移動應用直接進行音頻、視頻及數據傳輸&#xff0c;無需插件或第三方軟件。其核心特性包括&#xff1a; P2P傳輸&#xff1a;點對點直連…

使用 WSL + Ubuntu + Go + GoLand(VSCode) 開發環境配置指南

1. 安裝和配置 WSL 與 Ubuntu 啟用 WSL 功能(以管理員身份運行 PowerShell): wsl --install 或手動啟用: dism.exe /online /enable-feature /featurename:Microsoft-Windows-Subsystem-Linux /all /norestart dism.exe /online /enable-feature /featurename:VirtualMachi…

element-plus中,Tour 漫游式引導組件的使用

目錄 一.Tour 漫游式引導組件的簡單介紹 1.作用 2.基本使用 3.展示效果 二.實戰1&#xff1a;介紹患者病歷表單 1.要求 2.實現步驟 3.展示效果 結語 一.Tour 漫游式引導組件的簡單介紹 1.作用 快速了解一個功能/產品。 2.基本使用 從官網復制如下代碼&#xff1a; &…

39-Ajax工作原理

1. 簡明定義開場 “AJAX(Asynchronous JavaScript and XML)是一種允許網頁在不重新加載整個頁面的情況下&#xff0c;與服務器交換數據并更新部分網頁內容的技術。它通過JavaScript的XMLHttpRequest對象或現代的Fetch API實現異步通信。” 2. 核心工作原理 "AJAX的工作…

Python 爬蟲案例

以下是一些常見的 Python 爬蟲案例&#xff0c;涵蓋了不同的應用場景和技術點&#xff1a; 1. 簡單網頁內容爬取 案例&#xff1a;爬取網頁標題和簡介 import requests from bs4 import BeautifulSoup url "https://www.runoob.com/" response requests.get(url) …

【C++進階】函數:深度解析 C++ 函數的 12 大進化特性

目錄 一、函數基礎 1.1 函數定義與聲明 1.2 函數調用 1.3 引用參數 二、函數重載&#xff1a;同名函數的「多態魔法」&#xff08;C 特有&#xff09; 2.1 基礎實現 2.2 重載決議流程圖 2.3 與 C 語言的本質區別 2.4 實戰陷阱 三、默認參數&#xff1a;接口的「彈性設…

Redis的基礎,經典,高級問題解答篇

目錄 一&#xff0c;基礎 二&#xff0c;經典 緩存雪崩&#xff1a; 1. Redis事務的原子性 2. 與MySQL事務的區別 1. 主從復制原理 2. 哨兵模式故障轉移流程 3. 客戶端感知故障轉移 三&#xff0c;高級 一&#xff0c;基礎 Redis的5種基礎數據類型及使用場景&#xf…

【藍橋杯】好數

好數 問題描述代碼解釋代碼 好數 問題描述 一個整數如果按從低位到高位的順序&#xff0c;奇數位 (個位、百位、萬位 ? ) 上的數字是奇數&#xff0c;偶數位 (十位、千位、十萬位 ? ) 上的數字是偶數&#xff0c;我們就稱之為 “好數”。 給定一個正整數 N&#xff0c;請計算…

利用 Patroni + etcd + HAProxy 搭建高可用 PostgreSQL 集群

在生產環境中&#xff0c;數據庫的高可用性是系統穩定運行的關鍵。本文將詳細講解如何利用 Docker 部署一個由 etcd、Patroni 和 HAProxy 組成的 PostgreSQL 高可用集群&#xff0c;實現自動故障轉移和負載均衡。 架構概述 本架構主要包括三部分&#xff1a; etcd 集群 etcd …

bash 和 pip 是兩種完全不同用途的命令,分別用于[系統終端操作]和[Python 包管理]

bash 和 pip 是兩種完全不同用途的命令&#xff0c;分別用于 系統終端操作 和 Python 包管理。以下是它們的核心區別、用法及常見場景對比&#xff1a; 1. 本質區別 特性bashpip類型Shell 命令解釋器&#xff08;一種腳本語言&#xff09;Python 包管理工具作用執行系統命令、…

分布式系統的CAP理論、事務和鎖實現

分布式系統核心概念 1. CAP理論 CAP理論指出&#xff0c;分布式系統最多同時滿足以下三項中的兩項&#xff1a; 一致性&#xff08;CC&#xff09;&#xff1a;所有節點訪問同一份最新數據。可用性&#xff08;AA&#xff09;&#xff1a;每個請求都能在合理時間內獲得非錯誤…

鴻蒙UI開發

鴻蒙UI開發 本文旨在分享一些鴻蒙UI布局開發上的一些建議&#xff0c;特別是對屏幕寬高比發生變化時的應對思路和好的實踐。 折疊屏適配 一般情況&#xff08;自適應布局/響應式布局&#xff09; 1.自適應布局 1.1自適應拉伸 左右組件定寬 TypeScript //左右定寬 Row() { …

FreeRTOS 五種內存管理算法深度對比分析

FreeRTOS 提供了五種動態內存管理算法&#xff08;heap_1 至 heap_5&#xff09;&#xff0c;針對不同應用場景在實時性、內存效率、碎片控制等方面進行了差異化設計。以下從實現原理、性能指標及適用場景進行全面對比&#xff1a; 一、Heap_1&#xff1a;靜態分配優先 ?核心…

基于EFISH-SBC-RK3576的無人機智能飛控與數據存儲方案

一、方案背景 民用無人機在電力巡檢、農業植保、應急救援等領域快速普及&#xff0c;但傳統方案面臨?多協議設備兼容性差?、?野外環境數據易丟失?、?復雜電磁干擾?三大痛點。 電魚智能推出?EFISH-SBC-RK3576?&#xff0c;可集成雙冗余總線接口與工業級加固存儲&#x…