C++和Python實現SQL Server數據庫導出數據到S3并導入Redshift數據倉庫

用C++實現高性能數據處理,Python實現操作Redshift導入數據文件。
在Visual Studio 2022中用C++和ODBC API導出SQL Server數據庫中張表中的所有表的數據為CSV文件格式的數據流,用逗號作為分隔符,用雙引號包裹每個數據,字符串類型的數據去掉前后的空格,數據中如果包含雙引號,則將一個雙引號替換為兩個雙引號,創建gzip壓縮文件,輸出數據流寫入到gzip壓縮文件包中的唯一的csv文件中,一張表對應一個gzip壓縮文件。用異步編程的方法優化數據庫表的讀取和寫入文件的過程,每個gzip壓縮文件包中的csv文件的第一行都是用逗號分割的表頭,對應數據庫表的字段,程序需要異常處理和日志文件輸出,當前路徑下唯一的日志文件中包含運行時間戳、數據庫名、schema名、表名、導出數據行數、成功或者失敗的狀態,日志文件不存在則創建。

以下是一個使用C++和ODBC API在Visual Studio 2022中實現將SQL Server數據庫表數據導出為CSV文件并進行gzip壓縮的示例代碼。代碼使用異步編程優化數據庫讀取和文件寫入過程,并包含異常處理和日志記錄功能。

#include <iostream>
#include <fstream>
#include <sstream>
#include <string>
#include <vector>
#include <future>
#include <sql.h>
#include <sqlext.h>
#include <zlib.h>
#include <ctime>
#include <iomanip>// 日志記錄函數
void logMessage(const std::string& message) {std::ofstream logFile("export_log.txt", std::ios::app);if (logFile.is_open()) {auto now = std::chrono::system_clock::now();std::time_t now_c = std::chrono::system_clock::to_time_t(now);std::tm* now_tm = std::localtime(&now_c);std::ostringstream oss;oss << std::put_time(now_tm, "%Y-%m-%d %H:%M:%S") << " " << message << std::endl;logFile << oss.str();logFile.close();}
}// 處理字符串中的雙引號
std::string escapeDoubleQuotes(const std::string& str) {std::string result = str;size_t pos = 0;while ((pos = result.find('"', pos))!= std::string::npos) {result.replace(pos, 1, 2, '"');pos += 2;}return result;
}// 從數據庫讀取表數據
std::vector<std::vector<std::string>> readTableData(SQLHSTMT hstmt) {std::vector<std::vector<std::string>> data;SQLSMALLINT columnCount = 0;SQLNumResultCols(hstmt, &columnCount);std::vector<SQLCHAR*> columns(columnCount);std::vector<SQLINTEGER> lengths(columnCount);for (SQLSMALLINT i = 0; i < columnCount; ++i) {columns[i] = new SQLCHAR[SQL_MAX_MESSAGE_LENGTH];SQLBindCol(hstmt, i + 1, SQL_C_CHAR, columns[i], SQL_MAX_MESSAGE_LENGTH, &lengths[i]);}while (SQLFetch(hstmt) == SQL_SUCCESS) {std::vector<std::string> row;for (SQLSMALLINT i = 0; i < columnCount; ++i) {std::string value(reinterpret_cast<const char*>(columns[i]));value = escapeDoubleQuotes(value);row.push_back(value);}data.push_back(row);}for (SQLSMALLINT i = 0; i < columnCount; ++i) {delete[] columns[i];}return data;
}// 將數據寫入CSV文件
void writeToCSV(const std::vector<std::vector<std::string>>& data, const std::vector<std::string>& headers, const std::string& filename) {std::ofstream csvFile(filename);if (csvFile.is_open()) {// 寫入表頭for (size_t i = 0; i < headers.size(); ++i) {csvFile << '"' << headers[i] << '"';if (i < headers.size() - 1) csvFile << ',';}csvFile << std::endl;// 寫入數據for (const auto& row : data) {for (size_t i = 0; i < row.size(); ++i) {csvFile << '"' << row[i] << '"';if (i < row.size() - 1) csvFile << ',';}csvFile << std::endl;}csvFile.close();} else {throw std::runtime_error("Failed to open CSV file for writing");}
}// 壓縮CSV文件為gzip
void compressCSV(const std::string& csvFilename, const std::string& gzipFilename) {std::ifstream csvFile(csvFilename, std::ios::binary);std::ofstream gzipFile(gzipFilename, std::ios::binary);if (csvFile.is_open() && gzipFile.is_open()) {gzFile gzOut = gzopen(gzipFilename.c_str(), "wb");if (gzOut) {char buffer[1024];while (csvFile.read(buffer, sizeof(buffer))) {gzwrite(gzOut, buffer, sizeof(buffer));}gzwrite(gzOut, buffer, csvFile.gcount());gzclose(gzOut);} else {throw std::runtime_error("Failed to open gzip file for writing");}csvFile.close();gzipFile.close();std::remove(csvFilename.c_str());} else {throw std::runtime_error("Failed to open files for compression");}
}// 導出單個表
void exportTable(const std::string& server, const std::string& database, const std::string& schema, const std::string& table) {SQLHENV henv = nullptr;SQLHDBC hdbc = nullptr;SQLHSTMT hstmt = nullptr;try {SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &henv);SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER)SQL_OV_ODBC3, 0);SQLAllocHandle(SQL_HANDLE_DBC, henv, &hdbc);std::string connectionString = "DRIVER={ODBC Driver 17 for SQL Server};SERVER=" + server + ";DATABASE=" + database + ";UID=your_username;PWD=your_password";SQLRETURN ret = SQLDriverConnect(hdbc, nullptr, (SQLCHAR*)connectionString.c_str(), SQL_NTS, nullptr, 0, nullptr, SQL_DRIVER_NOPROMPT);if (ret!= SQL_SUCCESS && ret!= SQL_SUCCESS_WITH_INFO) {throw std::runtime_error("Failed to connect to database");}std::string query = "SELECT * FROM " + schema + "." + table;SQLAllocHandle(SQL_HANDLE_STMT, hdbc, &hstmt);ret = SQLExecDirect(hstmt, (SQLCHAR*)query.c_str(), SQL_NTS);if (ret!= SQL_SUCCESS && ret!= SQL_SUCCESS_WITH_INFO) {throw std::runtime_error("Failed to execute query");}std::vector<std::vector<std::string>> data = readTableData(hstmt);std::vector<std::string> headers;SQLSMALLINT columnCount = 0;SQLNumResultCols(hstmt, &columnCount);for (SQLSMALLINT i = 0; i < columnCount; ++i) {SQLCHAR columnName[SQL_MAX_COLUMN_NAME_LEN];SQLSMALLINT nameLen;SQLDescribeCol(hstmt, i + 1, columnName, SQL_MAX_COLUMN_NAME_LEN, &nameLen, nullptr, nullptr, nullptr, nullptr);headers.push_back(reinterpret_cast<const char*>(columnName));}std::string csvFilename = table + ".csv";writeToCSV(data, headers, csvFilename);std::string gzipFilename = table + ".gz";compressCSV(csvFilename, gzipFilename);std::ostringstream oss;oss << "Database: " << database << ", Schema: " << schema << ", Table: " << table << ", Rows Exported: " << data.size() << ", Status: Success";logMessage(oss.str());} catch (const std::exception& e) {std::ostringstream oss;oss << "Database: " << database << ", Schema: " << schema << ", Table: " << table << ", Status: Failed, Error: " << e.what();logMessage(oss.str());} finally {if (hstmt) SQLFreeHandle(SQL_HANDLE_STMT, hstmt);if (hdbc) SQLDisconnect(hdbc); SQLFreeHandle(SQL_HANDLE_DBC, hdbc);if (henv) SQLFreeHandle(SQL_HANDLE_ENV, henv);}
}int main() {std::string server = "<實例名>";std::string database = "<數據庫名>";std::string schema = "<Schema名>";std::vector<std::string> tables = {"<表名1>", "<表名2>", "<表名3>"};std::vector<std::future<void>> futures;for (const auto& table : tables) {futures.push_back(std::async(std::launch::async, exportTable, server, database, schema, table));}for (auto& fut : futures) {fut.get();}return 0;
}

代碼說明:

日志記錄: logMessage 函數用于記錄操作日志,包括時間戳、數據庫名、schema名、表名、導出數據行數和操作狀態。
字符串處理: escapeDoubleQuotes 函數用于處理字符串中的雙引號,將其替換為兩個雙引號。
數據庫讀取: readTableData 函數使用ODBC API從數據庫中讀取表數據,并將其存儲在二維向量中。
CSV寫入: writeToCSV 函數將數據寫入CSV文件,包括表頭和數據行,并用雙引號包裹每個數據,使用逗號作為分隔符。
文件壓縮: compressCSV 函數將生成的CSV文件壓縮為gzip格式,并刪除原始CSV文件。
表導出: exportTable 函數負責連接數據庫、執行查詢、讀取數據、寫入CSV文件并壓縮。
主函數: main 函數定義了數據庫服務器、數據庫名、schema名和表名,并使用異步任務并行導出每個表的數據。

用Python刪除當前目錄下所有功能擴展名為gz文件,接著運行export_sqlserver.exe程序,輸出該程序的輸出內容并等待它運行完成,然后連接SQL Server數據庫和Amazon Redshift數據倉庫,從數據庫中獲取所有表和它們的字段名,然后在Redshift中創建字段名全部相同的同名表,字段長度全部為最長的varchar類型,如果表已經存在則不創建表,自動上傳當前目錄下所有功能擴展名為gz文件到S3,默認覆蓋同名的文件,然后使用COPY INTO將S3上包含csv文件的gz壓縮包導入對應創建的Redshift表中,文件數據的第一行是表頭,導入所有上傳的文件到Redshift表,程序需要異常處理和日志文件輸出,當前路徑下唯一的日志文件中包含運行時間戳、數據庫名、schema名、表名、導入數據行數、成功或者失敗的狀態,日志文件不存在則創建。

import os
import subprocess
import pyodbc
import redshift_connector
import boto3
import logging
from datetime import datetime# 配置日志記錄
logging.basicConfig(filename='operation_log.log', level=logging.INFO,format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')def delete_gz_files():try:for file in os.listdir('.'):if file.endswith('.gz'):os.remove(file)logging.info('所有.gz文件已刪除')except Exception as e:logging.error(f'刪除.gz文件時出錯: {e}')def run_export_sqlserver():try:result = subprocess.run(['export_sqlserver.exe'], capture_output=True, text=True)print(result.stdout)logging.info('export_sqlserver.exe運行成功')except Exception as e:logging.error(f'運行export_sqlserver.exe時出錯: {e}')def create_redshift_tables():# SQL Server 連接配置sqlserver_conn_str = 'DRIVER={ODBC Driver 17 for SQL Server};SERVER=your_sqlserver_server;DATABASE=your_database;UID=your_username;PWD=your_password'try:sqlserver_conn = pyodbc.connect(sqlserver_conn_str)sqlserver_cursor = sqlserver_conn.cursor()# Redshift 連接配置redshift_conn = redshift_connector.connect(host='your_redshift_host',database='your_redshift_database',user='your_redshift_user',password='your_redshift_password',port=5439)redshift_cursor = redshift_conn.cursor()sqlserver_cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")tables = sqlserver_cursor.fetchall()for table in tables:table_name = table[0]sqlserver_cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'")columns = sqlserver_cursor.fetchall()column_definitions = ', '.join([f"{column[0]} VARCHAR(MAX)" for column in columns])try:redshift_cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({column_definitions})")redshift_conn.commit()logging.info(f'在Redshift中成功創建表 {table_name}')except Exception as e:logging.error(f'在Redshift中創建表 {table_name} 時出錯: {e}')sqlserver_conn.close()redshift_conn.close()except Exception as e:logging.error(f'連接數據庫或創建表時出錯: {e}')def upload_gz_files_to_s3():s3 = boto3.client('s3')bucket_name = 'your_bucket_name'try:for file in os.listdir('.'):if file.endswith('.gz'):s3.upload_file(file, bucket_name, file)logging.info(f'成功上傳文件 {file} 到S3')except Exception as e:logging.error(f'上傳文件到S3時出錯: {e}')def copy_data_to_redshift():redshift_conn = redshift_connector.connect(host='your_redshift_host',database='your_redshift_database',user='your_redshift_user',password='your_redshift_password',port=5439)redshift_cursor = redshift_conn.cursor()bucket_name = 'your_bucket_name'try:for file in os.listdir('.'):if file.endswith('.gz') and file.endswith('.csv.gz'):table_name = file.split('.')[0]s3_path = f's3://{bucket_name}/{file}'sql = f"COPY {table_name} FROM '{s3_path}' IAM_ROLE 'your_iam_role' CSV HEADER"try:redshift_cursor.execute(sql)redshift_conn.commit()row_count = redshift_cursor.rowcountlogging.info(f'成功將數據導入表 {table_name},導入行數: {row_count}')except Exception as e:logging.error(f'將數據導入表 {table_name} 時出錯: {e}')except Exception as e:logging.error(f'連接Redshift或導入數據時出錯: {e}')finally:redshift_conn.close()if __name__ == "__main__":delete_gz_files()run_export_sqlserver()create_redshift_tables()upload_gz_files_to_s3()copy_data_to_redshift()

代碼說明:

日志記錄:使用 logging 模塊配置日志記錄,記錄操作的時間戳和操作信息到 operation_log.log 文件。
刪除.gz文件: delete_gz_files 函數刪除當前目錄下所有擴展名為 .gz 的文件。
運行export_sqlserver.exe: run_export_sqlserver 函數運行 export_sqlserver.exe 程序并輸出其內容。
創建Redshift表: create_redshift_tables 函數連接SQL Server和Redshift數據庫,獲取SQL Server中所有表和字段名,在Redshift中創建同名表,字段類型為 VARCHAR(MAX) 。
上傳.gz文件到S3: upload_gz_files_to_s3 函數上傳當前目錄下所有擴展名為 .gz 的文件到S3。
將數據從S3導入Redshift: copy_data_to_redshift 函數使用 COPY INTO 語句將S3上的CSV壓縮包數據導入對應的Redshift表中。

請根據實際的數據庫配置、S3桶名和IAM角色等信息修改代碼中的相關參數。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/67178.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/67178.shtml
英文地址,請注明出處:http://en.pswp.cn/web/67178.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

基于OpenCV實現的答題卡自動判卷系統

一、圖像預處理 ?? 二、查找答題卡輪廓 ?? 三、透視變換 ?? 四、判卷與評分 ?? 五、主函數 六、完整代碼+測試圖像集 總結 ?? 在這篇博客中,我將分享如何使用Python結合OpenCV庫開發一個答題卡自動判卷系統。這個系統能夠自動從掃描的答題卡中提取信…

Android AOP:aspectjx

加入引用 在整個項目的 build.gradle 中&#xff0c;添加 classpath "com.hujiang.aspectjx:gradle-android-plugin-aspectjx:2.0.10" 可以看到測試demo的 gradle 版本是很低的。 基于 github 上的文檔&#xff0c;可以看到原版只支持到 gradle 4.4 。后續需要使…

第84期 | GPTSecurity周報

GPTSecurity是一個涵蓋了前沿學術研究和實踐經驗分享的社區&#xff0c;集成了生成預訓練Transformer&#xff08;GPT&#xff09;、人工智能生成內容&#xff08;AIGC&#xff09;以及大語言模型&#xff08;LLM&#xff09;等安全領域應用的知識。在這里&#xff0c;您可以找…

TCP/IP 協議:互聯網通信的基石

TCP/IP 協議:互聯網通信的基石 引言 TCP/IP協議,全稱為傳輸控制協議/互聯網協議,是互聯網上應用最為廣泛的通信協議。它定義了數據如何在網絡上傳輸,是構建現代互聯網的基礎。本文將深入探討TCP/IP協議的原理、結構、應用以及其在互聯網通信中的重要性。 TCP/IP 協議概述…

蛇年特別版貪吃蛇H5小游戲

該作者的原創文章目錄: 生產制造執行MES系統的需求設計和實現 企業后勤管理系統的需求設計和實現 行政辦公管理系統的需求設計和實現 人力資源管理HR系統的需求設計和實現 企業財務管理系統的需求設計和實現 董事會辦公管理系統的需求設計和實現 公司組織架構圖設計工具 庫存管…

MapReduce,Yarn,Spark理解與執行流程

MapReduce的API理解 Mapper 如果是單詞計數&#xff1a;hello&#xff1a;1&#xff0c; hello&#xff1a;1&#xff0c; world&#xff1a;1 public void map(Object key, // 首字符偏移量Text value, // 文件的一行內容Context context) // Mapper端的上下文&#xff0c;…

如何將xps文件轉換為txt文件?xps轉為pdf,pdf轉為txt,提取pdf表格并轉為txt

文章目錄 xps轉txt方法一方法二 pdf轉txt整頁轉txt提取pdf表格&#xff0c;并轉為txt 總結另外參考XPS文件轉換為TXT文件XPS文件轉換為PDF文件PDF文件轉換為TXT文件提取PDF表格并轉為TXT示例代碼&#xff08;部分&#xff09; 本文測試代碼已上傳&#xff0c;路徑如下&#xff…

Day26-【13003】短文,什么是順序表?順序表和數組、內存地址的關系?順序表的插入、刪除操作如何實現?操作的時間復雜度是多少?

文章目錄 第二節&#xff0c;線性表的順序存儲及實現概覽什么是順序表和鏈表&#xff1f;順序存儲的叫順序表順序表和數組還有內存地址的關系&#xff1f;順序表的基本操作如何實現&#xff1f;1、插入操作如何實現&#xff1f;2、刪除操作如何實現&#xff1f;3、賦值和查找操…

【含開題報告+文檔+PPT+源碼】基于SpringBoot的校園跑腿管理系統

開題報告 本文旨在探討校園跑腿系統的設計與實現&#xff0c;通過深入研究與分析&#xff0c;實現了一套包含用戶管理、發布跑腿單、跑腿搶單、跑腿單評論、在線留言以及用戶在線充值等功能的綜合性系統。該系統以提高校園內物品跑腿與配送效率為核心目標&#xff0c;為廣大學…

zookeeper的介紹和簡單使用

1 zookerper介紹 zookeeper是一個開源的分布式協調服務&#xff0c;由Apache軟件基金會提供&#xff0c;主要用于解決分布式應用中的數據管理、狀態同步和集群協調等問題。通過提供一個高性能、高可用的協調服務&#xff0c;幫助構建可靠的分布式系統。 Zookeeper的特點和功能…

二級 二維數組3

對角線之和 題目描述 輸入一個矩陣&#xff0c;輸出右上-左下對角線上的數字和 輸入 輸入1個整數N。(N<10)表示矩陣有n行n列 輸出 對角線的和 樣例 輸入復制 4 1 2 3 4 2 3 4 5 4 5 6 7 1 2 3 4 輸出復制 14 #include<iostream> using namespace std; int main() {i…

Spring Boot MyBatis Plus 版本兼容問題(記錄)

Spring Boot & MyBatis Plus 版本兼容問題&#xff08;Invalid value type for attribute factoryBeanObjectType: java.lang.String&#xff09; 問題描述問題排查1. 檢查 MapperScan 的路徑2. 項目中沒有配置 FactoryBean3. 檢查 Spring 和 MyBatis Plus 版本兼容性 解決…

嵌入式學習筆記-雜七雜八

文章目錄 連續波光纖耦合激光器工作原理主要特點應用領域設計考慮因素 數值孔徑&#xff08;Numerical Aperture&#xff0c;簡稱NA&#xff09;數值孔徑的定義數值孔徑的意義數值孔徑的計算示例數值孔徑與光纖 四象限探測器檢測目標方法四象限劃分檢測目標的步驟1. 數據采集2.…

Java Web-Cookie與Session

會話跟蹤技術 會話跟蹤技術是一種在 Web 應用程序中跟蹤用戶會話狀態的機制&#xff0c;它允許服務器在多個請求之間識別和關聯屬于同一用戶的請求&#xff0c;以便在整個會話過程中保持用戶相關的信息。以下是幾種常見的會話跟蹤技術&#xff1a; Cookie 概念&#xff1a;Cook…

Spring Boot - 數據庫集成04 - 集成Redis

Spring boot集成Redis 文章目錄 Spring boot集成Redis一&#xff1a;redis基本集成1&#xff1a;RedisTemplate Jedis1.1&#xff1a;RedisTemplate1.2&#xff1a;實現案例1.2.1&#xff1a;依賴引入和屬性配置1.2.2&#xff1a;redisConfig配置1.2.3&#xff1a;基礎使用 2&…

STM32使用VScode開發

文章目錄 Makefile形式創建項目新建stm項目下載stm32cubemx新建項目IED makefile保存到本地arm gcc是編譯的工具鏈G++配置編譯Cmake +vscode +MSYS2方式bilibiliMSYS2 統一環境配置mingw32-make -> makewindows環境變量Cmake CmakeListnijia 編譯輸出elfCMAKE_GENERATOR查詢…

Oracle 12c 中的 CDB和PDB的啟動和關閉

一、簡介 Oracle 12c引入了多租戶架構&#xff0c;允許一個容器數據庫&#xff08;Container Database, CDB&#xff09;托管多個獨立的可插拔數據庫&#xff08;Pluggable Database, PDB&#xff09;。本文檔旨在詳細描述如何啟動和關閉CDB及PDB。 二、容器數據庫 (CDB) 2.1…

網絡仿真工具Core環境搭建

目錄 安裝依賴包 源碼下載 Core安裝 FAQ 下載源碼TLS出錯誤 問題 解決方案 找不到dbus-launch 問題 解決方案 安裝依賴包 調用以下命令安裝依賴包 apt-get install -y ca-certificates git sudo wget tzdata libpcap-dev libpcre3-dev \ libprotobuf-dev libxml2-de…

FPGA實現任意角度視頻旋轉(二)視頻90度/270度無裁剪旋轉

本文主要介紹如何基于FPGA實現視頻的90度/270度無裁剪旋轉&#xff0c;旋轉效果示意圖如下&#xff1a; 為了實時對比旋轉效果&#xff0c;采用分屏顯示進行處理&#xff0c;左邊代表旋轉前的視頻在屏幕中的位置&#xff0c;右邊代表旋轉后的視頻在屏幕中的位置。 分屏顯示的…

JavaEE:多線程進階

JavaEE&#xff1a;多線程進階 一、對比不同鎖策略之間的應用場景及其區別1. 悲觀鎖 和 樂觀鎖1.1 定義和原理1.2 應用場景1.3 示例代碼 2. 重量級鎖 和 輕量級鎖2.1 定義和原理2.2 應用場景2.3 示例代碼 3. 掛起等待鎖 和 自旋鎖3.1 定義和原理3.2 應用場景3.3 示例代碼 4. 幾…