程序結構
├── main.cpp
├── config.json
├── hive_export/
├── parquet_data/
├── sql_scripts/
└── logs/
核心代碼實現 (main.cpp)
#include <iostream>
#include <fstream>
#include <vector>
#include <thread>
#include <mutex>
#include <queue>
#include <cstdlib>
#include <ctime>
#include <filesystem>
#include <nlohmann/json.hpp>
#include <unistd.h>namespace fs = std::filesystem;
using json = nlohmann::json;
using namespace std;// 全局鎖用于日志和隊列同步
mutex log_mutex, queue_mutex;// 配置文件結構
struct Config {string hive_jdbc;string hql_output_dir;string parquet_output_dir;string sql_script_dir;string snowflake_cfg;int export_threads;int import_threads;
};// 日志記錄函數
void log_message(const string& message, const string& log_path) {lock_guard<mutex> guard(log_mutex);ofstream log_file(log_path, ios::app);if (log_file) {time_t now = time(nullptr);log_file << "[" << put_time(localtime(&now), "%F %T") << "] " << message << endl;}
}// 解析配置文件
Config load_config(const string& config_path) {ifstream config_file(config_path);if (!config_file) throw runtime_error("Config file not found");json j;config_file >> j;return {j["hive_jdbc"],j["directories"]["hql_output"],j["directories"]["parquet_output"],j["directories"]["sql_scripts"],j["snowflake"]["config_path"],j["threads"]["export"],j["threads"]["import"]};
}// 導出Hive建表語句
void export_hql(const Config& cfg, const string& log_path) {string cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 ""-e 'SHOW DATABASES;' | tail -n +2 > databases.txt";system(cmd.c_str());ifstream db_file("databases.txt");string db;while (getline(db_file, db)) {cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 ""-e 'USE " + db + "; SHOW TABLES;' | tail -n +2 > " + db + "_tables.txt";system(cmd.c_str());ifstream table_file(db + "_tables.txt");string table;while (getline(table_file, table)) {fs::path dir = fs::path(cfg.hql_output_dir) / db;fs::create_directories(dir);string hql_path = (dir / (table + ".hql")).string();cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 ""-e 'USE " + db + "; SHOW CREATE TABLE " + table + ";' | ""awk 'NR>2' | head -n -1 > " + hql_path;if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Exported HQL for " + db + "." + table, log_path);} else {log_message("ERROR: Failed to export HQL for " + db + "." + table, log_path);}}}
}// 導出Parquet數據(線程任務)
void export_worker(queue<string> tasks, const Config& cfg, const string& log_path) {while (true) {string task;{lock_guard<mutex> guard(queue_mutex);if (tasks.empty()) return;task = move(tasks.front());tasks.pop();}size_t pos = task.find('.');string db = task.substr(0, pos);string table = task.substr(pos + 1);fs::path out_dir = fs::path(cfg.parquet_output_dir) / db / table;fs::create_directories(out_dir);string cmd = "hive -e \"SET hive.exec.compress.output=false; ""INSERT OVERWRITE DIRECTORY '" + out_dir.string() + "' ""STORED AS PARQUET SELECT * FROM " + task + ";\"";if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Exported Parquet for " + task, log_path);} else {log_message("ERROR: Failed to export Parquet for " + task, log_path);}}
}// 多線程導出Parquet
void export_parquet(const Config& cfg, const string& log_path) {ifstream db_file("databases.txt");queue<string> tasks;string db;while (getline(db_file, db)) {ifstream table_file(db + "_tables.txt");string table;while (getline(table_file, table)) {tasks.push(db + "." + table);}}vector<thread> threads;for (int i = 0; i < cfg.export_threads; ++i) {threads.emplace_back(export_worker, tasks, ref(cfg), ref(log_path));}for (auto& t : threads) t.join();
}// 執行SnowSQL腳本
void run_snowsql(const Config& cfg, const string& log_path) {for (const auto& entry : fs::directory_iterator(cfg.sql_script_dir)) {if (entry.path().extension() == ".sql") {string cmd = "snowsql -c " + cfg.snowflake_cfg + " -f " + entry.path().string();if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Executed SQL " + entry.path().filename().string(), log_path);} else {log_message("ERROR: Failed to execute SQL " + entry.path().filename().string(), log_path);}}}
}// 導入Parquet到Snowflake(線程任務)
void import_worker(queue<fs::path> tasks, const Config& cfg, const string& log_path) {while (true) {fs::path task;{lock_guard<mutex> guard(queue_mutex);if (tasks.empty()) return;task = move(tasks.front());tasks.pop();}string db = task.parent_path().filename();string table = task.stem();string cmd = "snowsql -c " + cfg.snowflake_cfg + " -q \"""COPY INTO " + db + "." + table + " ""FROM @" + cfg.parquet_output_dir + "/" + db + "/" + table + " ""FILE_FORMAT = (TYPE = PARQUET);\"";if (system(cmd.c_str()) == 0) {log_message("SUCCESS: Imported Parquet to " + db + "." + table, log_path);} else {log_message("ERROR: Failed to import Parquet to " + db + "." + table, log_path);}}
}// 多線程導入Parquet
void import_parquet(const Config& cfg, const string& log_path) {queue<fs::path> tasks;for (const auto& db_entry : fs::directory_iterator(cfg.parquet_output_dir)) {for (const auto& table_entry : fs::directory_iterator(db_entry.path())) {tasks.push(table_entry.path());}}vector<thread> threads;for (int i = 0; i < cfg.import_threads; ++i) {threads.emplace_back(import_worker, tasks, ref(cfg), ref(log_path));}for (auto& t : threads) t.join();
}int main() {try {// 初始化配置和日志Config cfg = load_config("config.json");string log_path = "logs/transfer_" + to_string(time(nullptr)) + ".log";fs::create_directories("logs");// 執行全流程export_hql(cfg, log_path);export_parquet(cfg, log_path);run_snowsql(cfg, log_path);import_parquet(cfg, log_path);log_message("ALL OPERATIONS COMPLETED", log_path);} catch (const exception& e) {cerr << "CRITICAL ERROR: " << e.what() << endl;return 1;}return 0;
}
配置文件示例 (config.json)
{"hive_jdbc": "jdbc:hive2://hive-server:10000","directories": {"hql_output": "hive_export","parquet_output": "parquet_data","sql_scripts": "sql_scripts"},"snowflake": {"config_path": "~/.snowsql/config"},"threads": {"export": 8,"import": 8}
}
關鍵功能說明
-
HQL導出:
- 使用
beeline
連接Hive獲取所有數據庫和表 - 按
數據庫/表名.hql
格式存儲建表語句 - 自動跳過系統表(通過
tail
和awk
過濾)
- 使用
-
Parquet導出:
- 使用Hive的
INSERT OVERWRITE DIRECTORY
導出為Parquet格式 - 多線程處理不同表(線程數由配置控制)
- 輸出路徑:
parquet_data/數據庫/表名/
- 使用Hive的
-
SnowSQL執行:
- 遍歷指定目錄的所有
.sql
文件 - 使用
snowsql -c
執行配置文件中的連接 - 支持認證文件自動加載(需預先配置)
- 遍歷指定目錄的所有
-
Parquet導入:
- 使用Snowflake的
COPY INTO
命令 - 多線程并發導入不同表
- 自動匹配目錄結構與表名
- 使用Snowflake的
-
日志系統:
- 按天分割日志文件(文件名含時間戳)
- 記錄操作類型、狀態和時間
- 線程安全的日志寫入
-
異常處理:
- 配置文件缺失檢測
- 命令執行狀態碼檢查
- 目錄創建失敗處理
- JSON解析異常捕獲
編譯與運行
- 安裝依賴:
sudo apt-get install libboost-filesystem-dev nlohmann-json3-dev
- 編譯程序:
g++ -std=c++17 -o hive2snowflake main.cpp -lboost_filesystem -lpthread
- 運行程序:
./hive2snowflake
注意事項
-
需要預先配置:
- Hive的beeline客戶端
- SnowSQL及認證配置
- Hive表訪問權限
- Snowflake表結構匹配
-
性能調整:
- 通過
config.json
調整線程數 - 大表建議單獨處理
- 可添加重試機制應對網絡波動
- 通過
-
安全增強建議:
- 配置文件加密(如使用jq解密)
- 敏感信息使用環境變量
- 添加操作審計日志