Linux下GCC的C++實現Hive到Snowflake數據遷移

程序結構

├── main.cpp
├── config.json
├── hive_export/
├── parquet_data/
├── sql_scripts/
└── logs/

核心代碼實現 (main.cpp)

#include <iostream>
#include <fstream>
#include <vector>
#include <thread>
#include <mutex>
#include <queue>
#include <cstdlib>
#include <ctime>
#include <filesystem>
#include <nlohmann/json.hpp>
#include <unistd.h>namespace fs = std::filesystem;
using json = nlohmann::json;
using namespace std;// 全局鎖用于日志和隊列同步
mutex log_mutex, queue_mutex;// 配置文件結構
struct Config {string hive_jdbc;string hql_output_dir;string parquet_output_dir;string sql_script_dir;string snowflake_cfg;int export_threads;int import_threads;
};// 日志記錄函數
void log_message(const string& message, const string& log_path) {lock_guard<mutex> guard(log_mutex);ofstream log_file(log_path, ios::app);if (log_file) {time_t now = time(nullptr);log_file << "[" << put_time(localtime(&now), "%F %T") << "] " << message << endl;}
}// 解析配置文件
Config load_config(const string& config_path) {ifstream config_file(config_path);if (!config_file) throw runtime_error("Config file not found");json j;config_file >> j;return {j["hive_jdbc"],j["directories"]["hql_output"],j["directories"]["parquet_output"],j["directories"]["sql_scripts"],j["snowflake"]["config_path"],j["threads"]["export"],j["threads"]["import"]};
}// 導出Hive建表語句
void export_hql(const Config& cfg, const string& log_path) {string cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 ""-e 'SHOW DATABASES;' | tail -n +2 > databases.txt";system(cmd.c_str());ifstream db_file("databases.txt");string db;while (getline(db_file, db)) {cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 ""-e 'USE " + db + "; SHOW TABLES;' | tail -n +2 > " + db + "_tables.txt";system(cmd.c_str());ifstream table_file(db + "_tables.txt");string table;while (getline(table_file, table)) {fs::path dir = fs::path(cfg.hql_output_dir) / db;fs::create_directories(dir);string hql_path = (dir / (table + ".hql")).string();cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 ""-e 'USE " + db + "; SHOW CREATE TABLE " + table + ";' | ""awk 'NR>2' | head -n -1 > " + hql_path;if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Exported HQL for " + db + "." + table, log_path);} else {log_message("ERROR: Failed to export HQL for " + db + "." + table, log_path);}}}
}// 導出Parquet數據(線程任務)
void export_worker(queue<string> tasks, const Config& cfg, const string& log_path) {while (true) {string task;{lock_guard<mutex> guard(queue_mutex);if (tasks.empty()) return;task = move(tasks.front());tasks.pop();}size_t pos = task.find('.');string db = task.substr(0, pos);string table = task.substr(pos + 1);fs::path out_dir = fs::path(cfg.parquet_output_dir) / db / table;fs::create_directories(out_dir);string cmd = "hive -e \"SET hive.exec.compress.output=false; ""INSERT OVERWRITE DIRECTORY '" + out_dir.string() + "' ""STORED AS PARQUET SELECT * FROM " + task + ";\"";if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Exported Parquet for " + task, log_path);} else {log_message("ERROR: Failed to export Parquet for " + task, log_path);}}
}// 多線程導出Parquet
void export_parquet(const Config& cfg, const string& log_path) {ifstream db_file("databases.txt");queue<string> tasks;string db;while (getline(db_file, db)) {ifstream table_file(db + "_tables.txt");string table;while (getline(table_file, table)) {tasks.push(db + "." + table);}}vector<thread> threads;for (int i = 0; i < cfg.export_threads; ++i) {threads.emplace_back(export_worker, tasks, ref(cfg), ref(log_path));}for (auto& t : threads) t.join();
}// 執行SnowSQL腳本
void run_snowsql(const Config& cfg, const string& log_path) {for (const auto& entry : fs::directory_iterator(cfg.sql_script_dir)) {if (entry.path().extension() == ".sql") {string cmd = "snowsql -c " + cfg.snowflake_cfg + " -f " + entry.path().string();if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Executed SQL " + entry.path().filename().string(), log_path);} else {log_message("ERROR: Failed to execute SQL " + entry.path().filename().string(), log_path);}}}
}// 導入Parquet到Snowflake(線程任務)
void import_worker(queue<fs::path> tasks, const Config& cfg, const string& log_path) {while (true) {fs::path task;{lock_guard<mutex> guard(queue_mutex);if (tasks.empty()) return;task = move(tasks.front());tasks.pop();}string db = task.parent_path().filename();string table = task.stem();string cmd = "snowsql -c " + cfg.snowflake_cfg + " -q \"""COPY INTO " + db + "." + table + " ""FROM @" + cfg.parquet_output_dir + "/" + db + "/" + table + " ""FILE_FORMAT = (TYPE = PARQUET);\"";if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Imported Parquet to " + db + "." + table, log_path);} else {log_message("ERROR: Failed to import Parquet to " + db + "." + table, log_path);}}
}// 多線程導入Parquet
void import_parquet(const Config& cfg, const string& log_path) {queue<fs::path> tasks;for (const auto& db_entry : fs::directory_iterator(cfg.parquet_output_dir)) {for (const auto& table_entry : fs::directory_iterator(db_entry.path())) {tasks.push(table_entry.path());}}vector<thread> threads;for (int i = 0; i < cfg.import_threads; ++i) {threads.emplace_back(import_worker, tasks, ref(cfg), ref(log_path));}for (auto& t : threads) t.join();
}int main() {try {// 初始化配置和日志Config cfg = load_config("config.json");string log_path = "logs/transfer_" + to_string(time(nullptr)) + ".log";fs::create_directories("logs");// 執行全流程export_hql(cfg, log_path);export_parquet(cfg, log_path);run_snowsql(cfg, log_path);import_parquet(cfg, log_path);log_message("ALL OPERATIONS COMPLETED", log_path);} catch (const exception& e) {cerr << "CRITICAL ERROR: " << e.what() << endl;return 1;}return 0;
}

配置文件示例 (config.json)

{"hive_jdbc": "jdbc:hive2://hive-server:10000","directories": {"hql_output": "hive_export","parquet_output": "parquet_data","sql_scripts": "sql_scripts"},"snowflake": {"config_path": "~/.snowsql/config"},"threads": {"export": 8,"import": 8}
}

關鍵功能說明

  1. HQL導出

    • 使用beeline連接Hive獲取所有數據庫和表
    • 數據庫/表名.hql格式存儲建表語句
    • 自動跳過系統表(通過tailawk過濾)
  2. Parquet導出

    • 使用Hive的INSERT OVERWRITE DIRECTORY導出為Parquet格式
    • 多線程處理不同表(線程數由配置控制)
    • 輸出路徑:parquet_data/數據庫/表名/
  3. SnowSQL執行

    • 遍歷指定目錄的所有.sql文件
    • 使用snowsql -c執行配置文件中的連接
    • 支持認證文件自動加載(需預先配置)
  4. Parquet導入

    • 使用Snowflake的COPY INTO命令
    • 多線程并發導入不同表
    • 自動匹配目錄結構與表名
  5. 日志系統

    • 按天分割日志文件(文件名含時間戳)
    • 記錄操作類型、狀態和時間
    • 線程安全的日志寫入
  6. 異常處理

    • 配置文件缺失檢測
    • 命令執行狀態碼檢查
    • 目錄創建失敗處理
    • JSON解析異常捕獲

編譯與運行

  1. 安裝依賴
sudo apt-get install libboost-filesystem-dev nlohmann-json3-dev
  1. 編譯程序
g++ -std=c++17 -o hive2snowflake main.cpp -lboost_filesystem -lpthread
  1. 運行程序
./hive2snowflake

注意事項

  1. 需要預先配置:

    • Hive的beeline客戶端
    • SnowSQL及認證配置
    • Hive表訪問權限
    • Snowflake表結構匹配
  2. 性能調整:

    • 通過config.json調整線程數
    • 大表建議單獨處理
    • 可添加重試機制應對網絡波動
  3. 安全增強建議:

    • 配置文件加密(如使用jq解密)
    • 敏感信息使用環境變量
    • 添加操作審計日志

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/92682.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/92682.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/92682.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

drippingblues靶機教程

一、信息搜集首先將其在VirtualBOX中安裝&#xff0c;并將kali與靶機都設置為橋接模式緊接著我們掃描IP&#xff0c;來發現靶機地址&#xff0c;經過搜集&#xff0c;發現IP是192.168.1.9&#xff0c;我們去訪問一下緊接著我們掃一下開放了哪些端口。發現開放了21、22以及80端口…

39.【.NET8 實戰--孢子記賬--從單體到微服務--轉向微服務】--擴展功能--調整發布腳本

這篇文章&#xff0c;我們要調整發布腳本。之所以要調整發布腳本&#xff0c;是因為現在我們的項目有三個環境&#xff1a;本地&#xff08;Local&#xff09;、開發&#xff08;Development&#xff09;、生產&#xff08;Production&#xff09;。Tip&#xff1a;我們的項目雖…

商品、股指及ETF期權五檔盤口Tick級與分鐘級歷史行情數據多維解析

在金融數據分析領域&#xff0c;本地CSV文件是存儲高頻與低頻數據的常用載體。本文以期權市場數據為例&#xff0c;探討如何基于CSV格式處理分鐘級行情、高頻Tick數據、日頻數據、逐筆委托記錄、五檔訂單簿及歷史行情數據&#xff0c;并提供專業的技術實現方案。以下將從數據預…

云端軟件工程智能代理:任務委托與自動化實踐全解

云端軟件工程智能代理&#xff1a;任務委托與自動化實踐全解 背景與未來趨勢 隨著軟件工程復雜度不斷提升&#xff0c;開發者對自動化工具的依賴也日益增強。我們正進入一個“人機協作”的新時代&#xff0c;開發者可以專注于核心創新&#xff0c;將重復性、繁瑣的任務委托給智…

making stb style lib(1): do color print in console

col.h: see origin repo // origin repo: https://github.com/resyfer/libcol #ifndef _COL_HOL_H_ #define _COL_HOL_H_#include <stdlib.h> #include <stdio.h> #include <stdbool.h> #include <string.h> #include <math.h> // 新增&#xf…

llm本地部署+web訪問+交互

要實現基于llm的web訪問和交互&#xff0c;需支持對llm的訪問和對網絡搜索的調用。 這里使用ollama llm兼容openai sdk訪問&#xff1b;使用proxyless-llm-websearch模擬網絡搜索。 1 ollama本地部署 假設ollama已經部署&#xff0c;具體過程參考 在mac m1基于ollama運行dee…

自動駕駛數據閉環

自動駕駛的數據閉環是支撐算法持續迭代的核心機制&#xff0c;其本質是通過“數據采集-處理-訓練-部署-反饋”的循環&#xff0c;不斷優化模型對復雜場景的適應性。由于自動駕駛數據量極大&#xff08;單車日均TB級&#xff09;、場景多樣&#xff08;從常規道路到極端邊緣場景…

二十、MySQL-DQL-條件查詢

DQL-條件查詢代碼&#xff1a; DQL-條件查詢 -- 1.查詢 姓名 為 楊逍 的員工 select * from tb_emp where name 楊逍; -- 2.查詢 id小于等于5 的員工信息 select * from tb_emp where id < 5; -- 3.查詢 沒有分配職位 的員工信息 select * from tb_emp where job is null; …

Mac下安裝Conda虛擬環境管理器

Conda 是一個開源的包、環境管理器&#xff0c;可以用于在同一個機器上創建不同的虛擬環境&#xff0c;安裝不同Python 版本的軟件包及其依賴&#xff0c;并能夠在不同的虛擬環境之間切換 Conda常通過安裝Anaconda/Miniconda來進行使用。一般使用Miniconda就夠了。Miniconda 是…

Android 中解決 Button 按鈕背景色設置無效的問題

1、問題描述 在布局文件中有兩個 Button 按鈕&#xff0c;為每個按鈕設置不同的背景色&#xff0c;但是顯示出來的效果都是紫色的&#xff0c;跟設置的顏色不同&#xff0c;布局文件如下所示&#xff1a;<Buttonandroid:id"id/button_cancel"android:layout_width…

云服務器--阿里云OSS(2)【Springboot使用阿里云OSS】

&#x1f4d2; 阿里云 OSS Spring Boot 異步任務&#xff08;直接存 OSS&#xff09; 1. 項目結構 src/main/java/com/example/demo├── controller│ └── UploadController.java // 接收上傳請求├── service│ ├── AsyncUploadService.java // 異步上傳…

get請求中文字符參數亂碼問題

第一種方法 服務器默認的傳參編碼格式是ISO8859-1,所以前端直接原樣字符串請求&#xff0c;到后端解析一下就得到正確字符 String fileName request.getParameter("fileName"); fileName new String(fileName.getBytes("ISO8859-1"),"UTF-8");…

C語言(10)——結構體、聯合體、枚舉

關于C語言零基礎學習知識&#xff0c;小編有話說&#xff0c;各位看官敬請入下面的專欄世界&#xff1a;打怪升級之路——C語言之路_ankleless的博客-CSDN博客 Hi&#xff01;冒險者&#x1f60e;&#xff0c;歡迎闖入 C 語言的奇幻異世界&#x1f30c;&#xff01; 我是 Ankle…

海康威視攝像頭實時推流到阿里云公網服務器(Windows + FFmpeg + nginx-rtmp)

海康威視攝像頭實時推流到阿里云公網服務器&#xff08;Windows FFmpeg nginx-rtmp1. 步驟總覽2. 阿里云 ECS&#xff08;Linux&#xff09;配置2.1 開放端口2.2 安裝 nginx-rtmp3. Windows 電腦端配置3.1 安裝 FFmpeg3.1.1 官網/鏡像下載&#xff1a;3.1.2 解壓后將 bin 目錄…

基礎網絡網路層——IPV4地址

在IP網絡上&#xff0c;如果用戶要將一臺計算機連接到Internet上&#xff0c;就需要向因特網服務提供方ISP&#xff08;Internet Service Provider&#xff09;申請一個IP地址。IP地址是在計算機網絡中被用來唯一標識一臺設備的一組數字。IPv4地址由32位二進制數值組成&#xf…

技術速遞|GPT-5 正式上線 Azure AI Foundry

AI 應用正在經歷一場深刻變革——對企業來說&#xff0c;僅僅“能聊天”早已不夠&#xff0c;生成內容、邏輯推理、落地生產&#xff0c;這些才是新時代對 AI 能力的真正考驗。 今天&#xff0c;我們非常激動地宣布&#xff0c;OpenAI 最新旗艦大模型 GPT-5 正式上線 Azure AI …

Logistic Regression|邏輯回歸

----------------------------------------------------------------------------------------------- 這是我在我的網站中截取的文章&#xff0c;有更多的文章歡迎來訪問我自己的博客網站rn.berlinlian.cn&#xff0c;這里還有很多有關計算機的知識&#xff0c;歡迎進行留言或…

三極管在電路中的應用

1、信號放大&#xff08;電壓放大&#xff09; 應用場景 &#xff1a;麥克風聲音放大、耳機驅動、廣播信號接收等音頻設備 原理解析 &#xff1a; 想象三極管如同一個精準的水龍頭&#xff1a; 基極&#xff08;B&#xff09;電流如同擰動閥門的微弱力量&#xff08;輸入信號&a…

Redis 事務機制

文章目錄一、什么是事務&#xff1f;二、事務相關操作總體認識基本操作流程watch 操作演示watch 原理一、什么是事務&#xff1f; Redis 的事務和 MySQL 的事務概念上是類似的. 都是把?系列操作綁定成?組. 讓這?組能夠批量執?. Redis 的事務和 MySQL 事務的區別&#xff1…

Mybatis學習之自定義映射resultMap(七)

這里寫目錄標題一、準備工作1、新建maven工程2、準備兩張表3、建立mapper、pojo、映射文件mapper接口pojoxxxMapper.xml二、resultMap處理字段和屬性的映射關系1、用起別名的方式保證字段名與屬性名一致2、逐一設置resultMap映射關系3、配置mapUnderscoreToCamelCase三、多對一…