Linux上用C++和GCC開發程序實現不同MySQL實例下單個Schema之間的穩定高效的數據遷移

設計一個在Linux上運行的GCC C++程序,同時連接兩個不同的MySQL實例,兩個實例中分別有兩個Schema的表結構完全相同,復制一個實例中一個Schema里的所有表的數據到另一個實例中一個Schema里,使用以下快速高效的方法,加入異常處理,支持每隔固定時間重試一定次數,每張表的復制運行狀態和記錄條數,開始結束時間戳,運行時間,以及每個批次的運行狀態,開始結束時間戳,運行時間,總記錄條數都要到運行日志文件里,日志文件全部放在logs目錄下,每天一個.log擴展名的文本文件,文件名帶日期,所有運行參數寫在json配置文件中。

C++程序分步解決方案:


文件結構

├── config.json          # 配置文件
├── include/             # 頭文件目錄
│   ├── DatabaseManager.h
│   ├── Logger.h
│   └── DataReplicator.h
├── src/                # 源文件目錄
│   ├── main.cpp
│   ├── DatabaseManager.cpp
│   ├── Logger.cpp
│   └── DataReplicator.cpp
└── CMakeLists.txt      # 編譯配置

1. 配置文件 (config.json)

{"source": {"host": "127.0.0.1","port": 3306,"user": "root","password": "source_pass","schema": "source_db"},"target": {"host": "127.0.0.1","port": 3307,"user": "root","password": "target_pass","schema": "target_db"},"retry": {"max_attempts": 3,"interval_seconds": 5},"batch_size": 1000,"log_dir": "./logs"
}

2. 日志類 (Logger.h/Logger.cpp)

// Logger.h
#include <fstream>
#include <string>
#include <mutex>
#include <chrono>class Logger {
public:static Logger& getInstance();void log(const std::string& message);private:Logger();~Logger();std::ofstream logFile;std::mutex mtx;std::string getCurrentDate();std::string getCurrentTimestamp();void openNewLogFile();
};

3. 數據庫管理類 (DatabaseManager.h/DatabaseManager.cpp)

// DatabaseManager.h
#include <mysqlx/xdevapi.h>class DatabaseManager {
public:DatabaseManager(const mysqlx::SessionSettings& settings);mysqlx::Session& getSession();mysqlx::Schema getSchema(const std::string& name);private:mysqlx::Session session;
};

4. 數據復制核心類 (DataReplicator.h/DataReplicator.cpp)

// DataReplicator.h
#include <vector>
#include <string>
#include <functional>class DataReplicator {
public:struct TableStats {std::string table_name;size_t total_records = 0;std::chrono::system_clock::time_point start_time;std::chrono::system_clock::time_point end_time;bool success = false;};void replicateAllTables();private:std::vector<std::string> getSourceTables();void replicateTable(const std::string& tableName);void batchInsert(mysqlx::Table& source, mysqlx::Table& target, int batchSize);void withRetry(std::function<void()> operation, const std::string& context);
};

5. 主程序 (main.cpp)

#include "DatabaseManager.h"
#include "DataReplicator.h"
#include "Logger.h"
#include <nlohmann/json.hpp>using json = nlohmann::json;json loadConfig() {// 實現配置文件加載
}int main() {auto config = loadConfig();DatabaseManager sourceManager(createSettings(config["source"]));DatabaseManager targetManager(createSettings(config["target"]));DataReplicator replicator(sourceManager, targetManager, config);replicator.replicateAllTables();return 0;
}

6. 關鍵實現細節

  1. 批量數據復制
void DataReplicator::batchInsert(mysqlx::Table& source, mysqlx::Table& target, int batchSize) {auto result = source.select("*").execute();std::vector<mysqlx::Row> buffer;while (auto row = result.fetchOne()) {buffer.push_back(row);if (buffer.size() >= batchSize) {target.insert().values(buffer).execute();buffer.clear();}}if (!buffer.empty()) {target.insert().values(buffer).execute();}
}
  1. 帶重試的邏輯
void DataReplicator::withRetry(std::function<void()> operation, const std::string& context) {int attempts = 0;while (attempts < maxRetries) {try {operation();return;} catch (const mysqlx::Error& e) {Logger::getInstance().log("Error: " + std::string(e.what()));if (++attempts >= maxRetries) throw;std::this_thread::sleep_for(retryInterval);}}
}
  1. 日志記錄實現
void Logger::log(const std::string& message) {std::lock_guard<std::mutex> lock(mtx);if (logFile.tellp() == 0 || getCurrentDate() != currentDate) {openNewLogFile();}logFile << "[" << getCurrentTimestamp() << "] " << message << std::endl;
}

7. 編譯配置 (CMakeLists.txt)

cmake_minimum_required(VERSION 3.10)
project(DataReplicator)set(CMAKE_CXX_STANDARD 17)find_package(MySQLCppConn REQUIRED)
find_package(nlohmann_json REQUIRED)include_directories(include${MySQLCppConn_INCLUDE_DIRS}
)add_executable(data_replicatorsrc/main.cppsrc/DatabaseManager.cppsrc/Logger.cppsrc/DataReplicator.cpp
)target_link_libraries(data_replicatorPRIVATE MySQLCppConn::mysqlcppconnPRIVATE nlohmann_json::nlohmann_jsonpthread ssl crypto
)

8. 運行方式

mkdir -p build && cd build
cmake ..
make
./data_replicator

9. 日志示例

[2023-10-05 14:30:45] [users] STARTED
[2023-10-05 14:30:47] [users] BATCH 1-1000 inserted (1.2s)
[2023-10-05 14:30:49] [users] COMPLETED 1500 records (4.5s)
[2023-10-05 14:30:50] [products] FAILED: Duplicate entry 'XYZ' (Attempt 1/3)
[2023-10-05 14:30:55] [products] RETRY SUCCESS (2nd attempt)

優化建議

  1. 并行處理
std::vector<std::future<void>> futures;
for (auto& table : tables) {futures.push_back(std::async(std::launch::async, [&](){replicateTable(table);}));
}
for (auto& f : futures) f.wait();
  1. 索引優化
-- 復制前禁用索引
ALTER TABLE target_table DISABLE KEYS;-- 復制后重建索引
ALTER TABLE target_table ENABLE KEYS;
  1. 內存管理
  • 使用智能指針管理數據庫連接
  • 限制同時打開的連接數
  • 實現連接池機制

這個解決方案實現了:

  • 配置驅動的數據庫連接
  • 帶重試機制的可靠數據傳輸
  • 詳細的運行日志記錄
  • 批量處理優化
  • 異常安全保證
  • 可擴展的架構設計

完整的C++代碼實現,包括各個頭文件和源文件以及?CMakeLists.txt?的完整內容:

請確保安裝了MySQL C++ Connector和?nlohmann_json?庫,并按照運行方式中的步驟進行編譯和運行。

include/DatabaseManager.h

#include <mysqlx/xdevapi.h>class DatabaseManager {
public:DatabaseManager(const mysqlx::SessionSettings& settings);mysqlx::Session& getSession();mysqlx::Schema getSchema(const std::string& name);private:mysqlx::Session session;
};

include/Logger.h

#include <fstream>
#include <string>
#include <mutex>
#include <chrono>class Logger {
public:static Logger& getInstance();void log(const std::string& message);private:Logger();~Logger();std::ofstream logFile;std::mutex mtx;std::string getCurrentDate();std::string getCurrentTimestamp();void openNewLogFile();std::string currentDate;
};

include/DataReplicator.h

#include <vector>
#include <string>
#include <functional>
#include <mysqlx/xdevapi.h>
#include <nlohmann/json.hpp>using json = nlohmann::json;class DataReplicator {
public:struct TableStats {std::string table_name;size_t total_records = 0;std::chrono::system_clock::time_point start_time;std::chrono::system_clock::time_point end_time;bool success = false;};DataReplicator(DatabaseManager& sourceManager, DatabaseManager& targetManager, const json& config);void replicateAllTables();private:std::vector<std::string> getSourceTables();void replicateTable(const std::string& tableName);void batchInsert(mysqlx::Table& source, mysqlx::Table& target, int batchSize);void withRetry(std::function<void()> operation, const std::string& context);DatabaseManager& sourceManager;DatabaseManager& targetManager;json config;int maxRetries;std::chrono::seconds retryInterval;int batchSize;
};

src/DatabaseManager.cpp

#include "../include/DatabaseManager.h"DatabaseManager::DatabaseManager(const mysqlx::SessionSettings& settings) : session(settings) {if (!session.isOpen()) {throw std::runtime_error("Failed to connect to database");}
}mysqlx::Session& DatabaseManager::getSession() {return session;
}mysqlx::Schema DatabaseManager::getSchema(const std::string& name) {return session.getSchema(name);
}

src/Logger.cpp

#include "../include/Logger.h"
#include <iostream>
#include <iomanip>Logger::Logger() {openNewLogFile();
}Logger::~Logger() {logFile.close();
}Logger& Logger::getInstance() {static Logger instance;return instance;
}std::string Logger::getCurrentDate() {auto now = std::chrono::system_clock::now();auto in_time_t = std::chrono::system_clock::to_time_t(now);std::tm tm_info;localtime_r(&in_time_t, &tm_info);std::ostringstream oss;oss << std::put_time(&tm_info, "%Y-%m-%d");return oss.str();
}std::string Logger::getCurrentTimestamp() {auto now = std::chrono::system_clock::now();auto in_time_t = std::chrono::system_clock::to_time_t(now);std::tm tm_info;localtime_r(&in_time_t, &tm_info);std::ostringstream oss;oss << std::put_time(&tm_info, "%Y-%m-%d %H:%M:%S");return oss.str();
}void Logger::openNewLogFile() {currentDate = getCurrentDate();std::string logFileName = config["log_dir"].get<std::string>() + "/" + currentDate + ".log";logFile.open(logFileName, std::ios::app);if (!logFile.is_open()) {std::cerr << "Failed to open log file: " << logFileName << std::endl;}
}void Logger::log(const std::string& message) {std::lock_guard<std::mutex> lock(mtx);if (logFile.tellp() == 0 || getCurrentDate()!= currentDate) {openNewLogFile();}logFile << "[" << getCurrentTimestamp() << "] " << message << std::endl;
}

src/DataReplicator.cpp

#include "../include/DataReplicator.h"
#include <iostream>
#include <future>DataReplicator::DataReplicator(DatabaseManager& sourceManager, DatabaseManager& targetManager, const json& config): sourceManager(sourceManager), targetManager(targetManager), config(config) {maxRetries = config["retry"]["max_attempts"];retryInterval = std::chrono::seconds(config["retry"]["interval_seconds"]);batchSize = config["batch_size"];
}std::vector<std::string> DataReplicator::getSourceTables() {auto schema = sourceManager.getSchema(config["source"]["schema"]);auto tables = schema.getTables();std::vector<std::string> tableNames;for (auto& table : tables) {tableNames.push_back(table.getName());}return tableNames;
}void DataReplicator::replicateTable(const std::string& tableName) {auto sourceSchema = sourceManager.getSchema(config["source"]["schema"]);auto targetSchema = targetManager.getSchema(config["target"]["schema"]);auto sourceTable = sourceSchema.getTable(tableName);auto targetTable = targetSchema.getTable(tableName);TableStats stats;stats.table_name = tableName;stats.start_time = std::chrono::system_clock::now();try {withRetry([&]() {batchInsert(sourceTable, targetTable, batchSize);}, tableName);stats.success = true;} catch (const mysqlx::Error& e) {Logger::getInstance().log("Error replicating table " + tableName + ": " + std::string(e.what()));stats.success = false;}stats.end_time = std::chrono::system_clock::now();auto duration = std::chrono::duration_cast<std::chrono::seconds>(stats.end_time - stats.start_time).count();std::string status = stats.success? "COMPLETED" : "FAILED";Logger::getInstance().log("[" + tableName + "] " + status + " " + std::to_string(stats.total_records) + " records (" + std::to_string(duration) + "s)");
}void DataReplicator::batchInsert(mysqlx::Table& source, mysqlx::Table& target, int batchSize) {auto result = source.select("*").execute();std::vector<mysqlx::Row> buffer;int batchCount = 0;auto batchStartTime = std::chrono::system_clock::now();while (auto row = result.fetchOne()) {buffer.push_back(row);stats.total_records++;if (buffer.size() >= batchSize) {withRetry([&]() {target.insert().values(buffer).execute();}, stats.table_name + " batch " + std::to_string(++batchCount));auto batchEndTime = std::chrono::system_clock::now();auto batchDuration = std::chrono::duration_cast<std::chrono::seconds>(batchEndTime - batchStartTime).count();Logger::getInstance().log("[" + stats.table_name + "] BATCH " + std::to_string((batchCount - 1) * batchSize + 1) + "-" + std::to_string(batchCount * batchSize) + " inserted (" + std::to_string(batchDuration) + "s)");batchStartTime = batchEndTime;buffer.clear();}}if (!buffer.empty()) {withRetry([&]() {target.insert().values(buffer).execute();}, stats.table_name + " batch " + std::to_string(++batchCount));auto batchEndTime = std::chrono::system_clock::now();auto batchDuration = std::chrono::duration_cast<std::chrono::seconds>(batchEndTime - batchStartTime).count();Logger::getInstance().log("[" + stats.table_name + "] BATCH " + std::to_string((batchCount - 1) * batchSize + 1) + "-" + std::to_string(stats.total_records) + " inserted (" + std::to_string(batchDuration) + "s)");}
}void DataReplicator::withRetry(std::function<void()> operation, const std::string& context) {int attempts = 0;while (attempts < maxRetries) {try {operation();return;} catch (const mysqlx::Error& e) {Logger::getInstance().log("Error: " + std::string(e.what()) + " (" + context + " Attempt " + std::to_string(attempts + 1) + "/" + std::to_string(maxRetries) + ")");if (++attempts >= maxRetries) throw;std::this_thread::sleep_for(retryInterval);}}
}void DataReplicator::replicateAllTables() {auto tables = getSourceTables();std::vector<std::future<void>> futures;for (auto& table : tables) {futures.push_back(std::async(std::launch::async, [&](){replicateTable(table);}));}for (auto& f : futures) f.wait();
}

src/main.cpp

#include "../include/DatabaseManager.h"
#include "../include/DataReplicator.h"
#include "../include/Logger.h"
#include <nlohmann/json.hpp>
#include <fstream>using json = nlohmann::json;json loadConfig() {std::ifstream configFile("config.json");json config;configFile >> config;return config;
}mysqlx::SessionSettings createSettings(const json& config) {mysqlx::SessionSettings settings;settings.setHost(config["host"]);settings.setPort(config["port"]);settings.setUser(config["user"]);settings.setPassword(config["password"]);return settings;
}int main() {auto config = loadConfig();mysqlx::SessionSettings sourceSettings = createSettings(config["source"]);mysqlx::SessionSettings targetSettings = createSettings(config["target"]);DatabaseManager sourceManager(sourceSettings);DatabaseManager targetManager(targetSettings);DataReplicator replicator(sourceManager, targetManager, config);replicator.replicateAllTables();return 0;
}

CMakeLists.txt

cmake_minimum_required(VERSION 3.10)
project(DataReplicator)set(CMAKE_CXX_STANDARD 17)find_package(MySQLCppConn REQUIRED)
find_package(nlohmann_json REQUIRED)include_directories(include${MySQLCppConn_INCLUDE_DIRS}
)add_executable(data_replicatorsrc/main.cppsrc/DatabaseManager.cppsrc/Logger.cppsrc/DataReplicator.cpp
)target_link_libraries(data_replicatorPRIVATE MySQLCppConn::mysqlcppconnPRIVATE nlohmann_json::nlohmann_jsonpthread ssl crypto
)

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/70962.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/70962.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/70962.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Redis除了做緩存還能做什么?

Redis 除了作為高性能緩存外&#xff0c;還因其豐富的數據結構和功能&#xff0c;廣泛應用于多種場景。以下是 Redis 的十大核心用途及具體示例&#xff1a; 1. 分布式會話存儲 用途&#xff1a;存儲用戶會話信息&#xff08;如登錄狀態&#xff09;&#xff0c;實現多服務間共…

JBoltAI_SpringBoot如何區分DeepSeek R1深度思考和具體回答的內容(基于Ollama)?

當我們用Ollama運行DeepSeek R1模型&#xff0c;向它提問時&#xff0c;會發現它的回答里是有think標簽的 如果我們直接將Ollama的回復用于生產環境&#xff0c;肯定是不行的&#xff0c;對于不同的場景&#xff0c;前面輸出的一堆內容&#xff0c;可能并不需要在客戶端展示&a…

MySQL 使用 `WHERE` 子句時 `COUNT(*)`、`COUNT(1)` 和 `COUNT(column)` 的區別解析

文章目錄 1. COUNT() 函數的基本作用2. COUNT(*)、COUNT(1) 和 COUNT(column) 的詳細對比2.1 COUNT(*) —— 統計所有符合條件的行2.2 COUNT(1) —— 統計所有符合條件的行2.3 COUNT(column) —— 統計某一列非 NULL 的記錄數 3. 性能對比3.1 EXPLAIN 分析 4. 哪種方式更好&…

將DeepSeek接入vscode的N種方法

接入deepseek方法一:cline 步驟1:安裝 Visual Studio Code 后,左側導航欄上點擊擴展。 步驟2:搜索 cline,找到插件后點擊安裝。 步驟3:在大模型下拉菜單中找到deep seek,然后下面的輸入框輸入你在deepseek申請的api key,就可以用了 讓deepseek給我寫了一首關于天氣的…

AndroidManifest.xml文件的作用

AndroidManifest.xml文件在Android應用程序中扮演著至關重要的角色。它是應用程序的全局配置文件&#xff0c;提供了關于應用程序的所有必要信息&#xff0c;這些信息對于Android系統來說是至關重要的&#xff0c;因為它決定了應用程序的運行方式和權限要求&#xff0c;確保了應…

Mac本地部署Deep Seek R1

Mac本地部署Deep Seek R1 1.安裝本地部署大型語言模型的工具 ollama 官網&#xff1a;https://ollama.com/ 2.下載Deepseek R1模型 網址&#xff1a;https://ollama.com/library/deepseek-r1 根據電腦配置&#xff0c;選擇模型。 我的電腦&#xff1a;Mac M3 24G內存。 這…

React進階之前端業務Hooks庫(五)

前端業務Hooks庫 Hooks原理useStateuseEffect上述問題useState,useEffect 復用的能力練習:怎樣實現一套React過程中的hooks狀態 & 副作用Hooks原理 不能在循環中、條件判斷、子函數中調用,只能在函數最外層去調用useEffect 中,deps 為空,執行一次useState 使用: imp…

從像素到光線:現代Shader開發的范式演進與性能優化實踐

引言 在實時圖形渲染領域&#xff0c;Shader作為GPU程序的核心載體&#xff0c;其開發范式已從早期的固定功能管線演進為高度可編程的計算單元。本文通過解析關鍵技術案例&#xff0c;結合現代圖形API&#xff08;如Vulkan、Metal&#xff09;的特性&#xff0c;深入探討Shade…

(七)消息隊列-Kafka 序列化avro(傳遞)

&#xff08;七&#xff09;消息隊列-Kafka 序列化avro&#xff08;傳遞&#xff09; 客從遠方來&#xff0c;遺我雙鯉魚。呼兒烹鯉魚&#xff0c;中有尺素書。 ——佚名《飲馬長城窟行》 本文已同步CSDN、掘金平臺、知乎等多個平臺&#xff0c;圖片依然保持最初發布的水印&…

PXE批量網絡裝機與Kickstart自動化安裝工具

目錄 一、系統裝機的原理 1.1、系統裝機方式 1.2、系統安裝過程 二、PXE批量網絡裝機 2.1、PXE實現原理 2.2、搭建PXE實際案例 2.2.1、安裝必要軟件 2.2.2、搭建DHCP服務器 2.2.3、搭建TFTP服務器 2.2.4、掛載鏡像并拷貝引導文件到tftp服務啟動引導文件夾下 2.2.5、編…

【全棧開發】從0開始搭建一個圖書管理系統【一】框架搭建

【全棧開發】從0開始搭建一個圖書管理系統【一】框架搭建 前言 現在流行降本增笑&#xff0c;也就是不但每個人都要有事干不能閑著&#xff0c;更重要的是每個人都要通過報功的方式做到平日的各項工作異常飽和&#xff0c;實現1.5人的支出干2人的活計。單純的數據庫開發【膚淺…

部署Flink1.20.1

1、設置環境變量 export JAVA_HOME/cluster/jdk export CLASSPATH.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jarp #export HIVE_HOME/cluster/hive export MYSQL_HOME/cluster/mysql export HADOOP_HOME/cluster/hadoop3 export HADOOP_CONF_DIR$HADOOP_HOME/etc/hadoop …

【超詳細】神經網絡的可視化解釋

《------往期經典推薦------》 一、AI應用軟件開發實戰專欄【鏈接】 項目名稱項目名稱1.【人臉識別與管理系統開發】2.【車牌識別與自動收費管理系統開發】3.【手勢識別系統開發】4.【人臉面部活體檢測系統開發】5.【圖片風格快速遷移軟件開發】6.【人臉表表情識別系統】7.【…

深入了解 Python 中的 MRO(方法解析順序)

文章目錄 深入了解 Python 中的 MRO&#xff08;方法解析順序&#xff09;什么是 MRO&#xff1f;如何計算 MRO&#xff1f;C3 算法的合并規則C3 算法的合并步驟示例&#xff1a;合并過程解析 MRO 解析失敗的場景使用 mro() 方法查看 MRO示例 1&#xff1a;基本用法 菱形繼承與…

數字化賦能:制造業如何突破低效生產的瓶頸?

隨著全球經濟的快速發展與市場需求的變化&#xff0c;制造業面臨著前所未有的壓力與挑戰。生產效率、資源管理、品質控制、成本控制等方面的問題日益突出&#xff0c;尤其是低效生產成為了許多制造企業亟待解決的瓶頸。在這種背景下&#xff0c;數字化轉型成為提升制造業效率的…

Element-Plus,使用 El-form中 的 scroll-to-error 沒有效果問題記錄

因業務需要表單組件中嵌套著表格列表&#xff0c;內容比較多&#xff1b; 所以需要表單校驗不通過時&#xff0c;自動定位到不通過的節點&#xff1b; 但發現這個像是沒有起到效果一樣&#xff0c;后面就是排查的思路了&#xff1a; 容器高度問題&#xff1a;如果表單容器的高度…

基于Javase的停車場收費管理系統

基于Javase的停車場收費管理系統 停車場管理系統開發文檔 項目概述 1.1 項目背景 隨著現代化城市的不斷發展&#xff0c;車輛數量不斷增加&#xff0c;停車難問題也日益突出。為了更好地管理停車場資 源&#xff0c;提升停車效率&#xff0c;需要一個基于Java SE的停車場管理…

網絡協議 HTTP、HTTPS、HTTP/1.1、HTTP/2 對比分析

1. 基本定義 HTTP&#xff08;HyperText Transfer Protocol&#xff09; 應用層協議&#xff0c;用于客戶端與服務器之間的數據傳輸&#xff08;默認端口 80&#xff09;。 HTTP/1.0&#xff1a;早期版本&#xff0c;每個請求需單獨建立 TCP 連接&#xff0c;效率低。HTTP/1.1&…

DeepSeek掘金——調用DeepSeek API接口 實現智能數據挖掘與分析

調用DeepSeek API接口:實現智能數據挖掘與分析 在當今數據驅動的時代,企業和開發者越來越依賴高效的數據挖掘與分析工具來獲取有價值的洞察。DeepSeek作為一款先進的智能數據挖掘平臺,提供了強大的API接口,幫助用戶輕松集成其功能到自己的應用中。本文將詳細介紹如何調用D…

LabVIEW同步數據采集功能

VI通過使用數據采集&#xff08;DAQ&#xff09;硬件系統&#xff0c;進行多通道同步采集&#xff0c;實時獲取模擬信號數據。它利用外部時鐘信號觸發數據采集&#xff0c;支持連續采樣模式&#xff0c;并將采集到的數據實時顯示在波形圖上&#xff0c;方便用戶進行數據監控和分…