目錄
線程池
介紹
分類
實現
工作原理
核心組成
拒絕策略?
固態線程池
功能
std::future
實現
拒絕策略支持
提交任務
超時取消
用戶檢測取消
安全銷毀
代碼
測試
線程池
介紹
線程池(圖解,本質,模擬實現代碼),添加單例模式(懶漢思路+代碼)_線程池單例-CSDN博客
- 包括線程池介紹+使用pthread庫接口
這里我們就使用c++11中的thread庫來實現一下,并且引入更多特性
分類
固定大小線程池(Fixed-size Thread Pool):
線程池中的線程數在初始化時被設定并保持固定,不會根據負載自動擴展或收縮。
適用于負載較為平穩的場景,例如Web服務器、數據庫連接池等。
動態線程池(DynamicThread Pool):
線程池的線程數是動態變化的,根據任務的數量來增加或減少線程數。
當線程池中沒有任務時,線程會被回收(通常有一個最大線程數限制)。
適用于任務量不穩定、并發變化較大的場景,如文件處理、短時間的批量任務等。
單線程化線程池(Single-threaded Thread Pool):
線程池中只有一個線程,所有任務都會按順序提交給這個線程執行。
適用于串行化任務,如日志記錄、事件驅動任務等。
調度線程池(Scheduled Thread Pool):
該線程池支持任務的延遲執行和周期性執行。
通常用于定時任務或周期性任務,例如定時清理緩存等。
實現
工作原理
- 線程池初始化:線程池初始化時創建一定數量的工作線程,并使其處于“等待”狀態,準備執行任務
- 任務提交:當有新任務提交時,線程池將任務放入任務隊列中
- 任務執行:線程池中的空閑線程從任務隊列中取出任務并執行
- 任務完成:任務執行完后,線程回到等待狀態,準備接收新的任務
- 線程銷毀:當線程池中的線程閑置超過一定時間,線程池可以銷毀一些線程以釋放系統資源(適用于動態線程池)
核心組成
一個線程池一般由以下幾個核心組件組成:
線程工作線程集合
- 一組預先創建的固定數量或動態伸縮(包括核心線程和臨時創建)的線程
- 每個線程循環從任務隊列中獲取任務執行
任務隊列
- 用于存放等待執行的任務,一般為線程安全的隊列(如
std::queue + mutex
),支持任務入隊/出隊任務提交接口
- 對外暴露的函數接口,用于將任務提交到線程池,如
submit()
同步機制
- 用于保護共享資源和協調線程間關系(每個線程都要訪問任務隊列)
- 常用
mutex
,condition_variable
,atomic
等任務拒絕策略(Rejection Policy)
- 當隊列已滿時,決定如何處理新任務(下面介紹)
生命周期管理(Shutdown & Destruction)
- 控制線程池的啟動、停止、銷毀,確保線程安全退出并釋放資源
可選的任務取消機制
- 支持任務在執行中被取消(比如因當前任務執行超時等原因)
可選的任務超時機制
- 為每個任務設置執行超時,到期未完成的任務自動取消或通知中斷
拒絕策略?
拋出異常,拒絕執行(默認策略)
- 拋出
RejectedExecutionException
異常,告知任務無法執行- 適合你希望及時發現問題并中止提交任務
由提交任務的線程執行
- 將任務回退給調用者,即由提交任務的線程執行任務,而不是交給線程池中的線程處理
可以起到“削峰”作用,但影響主線程性能(會阻塞提交線程)
靜默丟棄
- 丟棄無法執行的任務,不拋出異常
- 適合對任務可丟棄、無嚴重后果的場景(如日志)
丟棄隊列中最舊的任務,然后嘗試重新提交當前任務
- 適合任務有時效性(如更新 UI、股票報價)
固態線程池
功能
支持:
固定線程數: 構造時指定線程數,固定數量線程常駐執行任務
有界隊列: 支持設置任務隊列最大容量,避免過載
拒絕策略支持: 支持 BLOCK(等待)、DISCARD(丟棄)、THROW(拋異常)三種策略
提交任務: 提交
void(exec_context)
類型任務,通過適配器轉換為 void() 并存入任務隊列,最終返回std::future<void>
超時取消: 自動按任務類型 (因為我這里是把自己寫的搜索引擎項目中增設的線程池作為例子,所以這里的類型就分為建立索引,搜索,搜索聯想) , 設置超時時間,到時通知任務取消
用戶檢測取消: 任務內部可用?
canceled()
?檢測取消請求并安全退出安全銷毀: 析構時安全停止所有線程并等待退出
線程安全: 所有關鍵資源由
mutex
和condition_variable
保護,支持多線程并發提交任務
std::future
std::future
<T>
是 C++11 引入的標準庫類型
- 可以異步獲取一個任務的執行結果
- 作用 -- 當你把一個任務提交給線程池時,這個任務可能要等一會兒才能執行(畢竟線程有限,排隊中),那么你就需要一個東西來 “將來拿到結果”
實現
拒絕策略支持
定義了一個枚舉類
enum class RejectionPolicy{BLOCK,DISCARD,THROW};
當任務提交后,根據當前的任務隊列使用情況和拒絕策略的設置,決定對任務的處理方式
{std::unique_lock<std::mutex> lock(mtx_);// 阻塞策略:等待有空間if (tasks_.size() >= max_queue_size_ && !stop_){if (reject_policy_ == RejectionPolicy::BLOCK){//因為這里是帶謂詞的wait,所以即使虛假喚醒也會在條件為真后返回cond_.wait(lock, [this]{ return tasks_.size() < max_queue_size_ || stop_; });}else if (reject_policy_ == RejectionPolicy::DISCARD){throw std::runtime_error("Task queue is full. Task was discarded.");}else if (reject_policy_ == RejectionPolicy::THROW){throw std::runtime_error("Task queue is full.");}}if (stop_){throw std::runtime_error("ThreadPool is stopping. Cannot accept new tasks.");}tasks_.emplace([taskWrapper](){ (*taskWrapper)(); });}
這里的靜默丟棄
- 雖然定義上是不拋出異常,但為了調用方知道任務沒被接收,還是加上了
這里的while循環是為了防止虛假喚醒:
提交任務
// 傳入void(exec_context) ,因為內部也需要配合template <class F>std::future<void> submit(const std::string &taskType, F &&f){auto timeout = getTimeoutForTask(taskType);auto cancellableTask = std::make_shared<CancellableTask>();cancellableTask->func = std::forward<F>(f);// 管理任務執行結果auto taskWrapper = std::make_shared<std::packaged_task<void()>>([cancellableTask](){ (*cancellableTask)(); });// 從packaged_task中獲取一個關聯的future對象std::future<void> taskFuture = taskWrapper->get_future();{std::unique_lock<std::mutex> lock(mtx_);// 阻塞策略:等待有空間if (tasks_.size() >= max_queue_size_ && !stop_){if (reject_policy_ == RejectionPolicy::BLOCK){cond_.wait(lock, [this]{ return tasks_.size() < max_queue_size_ || stop_; });}else if (reject_policy_ == RejectionPolicy::DISCARD){throw std::runtime_error("Task queue is full. Task was discarded.");}else if (reject_policy_ == RejectionPolicy::THROW){throw std::runtime_error("Task queue is full.");}}if (stop_){throw std::runtime_error("ThreadPool is stopping. Cannot accept new tasks.");}tasks_.emplace([taskWrapper](){ (*taskWrapper)(); });}cond_.notify_one();// 啟動一個后臺線程監控超時取消std::thread([taskFuture = std::move(taskFuture),controller = cancellableTask->controller, timeout]() mutable{if (taskFuture.wait_for(timeout) == std::future_status::timeout) {controller->notify_cancel();std::cerr << "[Timeout] Task exceeded time limit and was cancelled.\n";} }).detach();return taskFuture;}
這里因為任務隊列中使用的void()類型,而外部傳入的是void(exec_context) (因為需要內部配合停止)
- 所以需要對類型進行一個轉換
- 也就是通過cancellableTask
這個
可取消的任務封裝器,實際是一個無參可調用對象,內部實例化一個exec_context ,然后調用帶參數版本的函數- 最后將 (*taskWrapper)() 放入隊列,實際就是調用了cancellableTask的operator()()
- 于是實現了將void(exec_context) ->?void()
超時取消
首先,是定義了兩個模塊,分別的作用是 --? 取消狀態的設置 和 作為對外接口
// 控制標識符 struct exec_controller {bool notify_cancel() { return _should_cancel.exchange(true); }bool should_cancel() const { return _should_cancel; }private:std::atomic<bool> _should_cancel{false}; }; // 判斷是否被取消 struct exec_context {exec_context(std::shared_ptr<exec_controller> impl): _impl(std::move(impl)) {}bool canceled() const { return _impl->should_cancel(); }private:std::shared_ptr<exec_controller> _impl; };
- 使用原子變量避免競態條件
- 讀取控制器可以被多個任務共享 (通過智能指針控制生命周期)
其次,定義了一個可取消的任務封裝器
struct CancellableTask {// 封裝一個取消控制器std::shared_ptr<exec_controller> controller =std::make_shared<exec_controller>();// 將取消上下文封裝進普通任務中std::function<void(exec_context)> func;void operator()(){exec_context ctx{controller};func(ctx);} };
然后,還定義了根據不同任務類型,返回對應超時時間的函數
static std::chrono::millisecondsgetTimeoutForTask(const std::string &taskType){if (taskType == ns_helper::TASK_TYPE_BUILD_INDEX){return std::chrono::seconds(120);}else if (taskType == ns_helper::TASK_TYPE_PERSIST_INDEX){return std::chrono::seconds(4 * 3600);}else if (taskType == ns_helper::TASK_TYPE_SEARCH ||taskType == ns_helper::TASK_TYPE_AUTOCOMPLETE){return std::chrono::seconds(1);}else{return std::chrono::seconds(1);}}
- 我這里因為是搜索引擎,并且沒有做性能優化,所以設置的時間比較長(跪)
并且,啟動了一個后臺線程去監控任務狀態
// 啟動一個后臺線程監控超時取消std::thread([taskFuture = std::move(taskFuture),controller = cancellableTask->controller, timeout]() mutable{if (taskFuture.wait_for(timeout) == std::future_status::timeout) {controller->notify_cancel();std::cerr << "[Timeout] Task exceeded time limit and was cancelled.\n";} }).detach();
- 這里就只講一下lambda內部邏輯,傳參會在下面的核心邏輯介紹
- 總之,這個線程會等待任務完成timeout秒,如果未完成,則任務超時 -> 設置取消控制器的標識符
- 這里還設置了調用了datach,讓它和主線程分離(也就是后臺運行)
最后,就是核心邏輯了
// 傳入void(exec_context) ,因為內部也需要配合template <class F>std::future<void> submit(const std::string &taskType, F &&f){auto timeout = getTimeoutForTask(taskType);auto cancellableTask = std::make_shared<CancellableTask>();cancellableTask->func = std::forward<F>(f);// 管理任務執行結果auto taskWrapper = std::make_shared<std::packaged_task<void()>>([cancellableTask](){ (*cancellableTask)(); });// 從packaged_task中獲取一個關聯的future對象std::future<void> taskFuture = taskWrapper->get_future();{std::unique_lock<std::mutex> lock(mtx_);// 阻塞策略:等待有空間if (tasks_.size() >= max_queue_size_ && !stop_){if (reject_policy_ == RejectionPolicy::BLOCK){cond_.wait(lock, [this]{ return tasks_.size() < max_queue_size_ || stop_; });}else if (reject_policy_ == RejectionPolicy::DISCARD){throw std::runtime_error("Task queue is full. Task was discarded.");}else if (reject_policy_ == RejectionPolicy::THROW){throw std::runtime_error("Task queue is full.");}}if (stop_){throw std::runtime_error("ThreadPool is stopping. Cannot accept new tasks.");}tasks_.emplace([taskWrapper](){ (*taskWrapper)(); });}cond_.notify_one();// 啟動一個后臺線程監控超時取消std::thread([taskFuture = std::move(taskFuture),controller = cancellableTask->controller, timeout]() mutable{if (taskFuture.wait_for(timeout) == std::future_status::timeout) {controller->notify_cancel();std::cerr << "[Timeout] Task exceeded time limit and was cancelled.\n";} }).detach();return taskFuture;}
- 其實大頭設計就是那些組件,把它們組合起來使用+任務代碼中顯式檢測取消標識,就能實現超時取消機制? -- 在后臺單獨啟動一個線程,監控提交的任務是否在規定的超時時間內完成,如果超時則通知取消
用戶檢測取消
傳入的任務也需要配合(檢測取消標識符,為真時結束執行)
void test_cancel_task(exec_context ctx) {std::cout << "Task started.\n";for (int i = 0; i < 50; ++i){if (ctx.canceled()){std::cout << "Task detected cancellation, exiting early.\n";return;}std::this_thread::sleep_for(std::chrono::milliseconds(100));std::cout << "Task working... step " << i << "\n";}std::cout << "Task completed normally.\n"; }
安全銷毀
explicit FixedThreadPool(size_t thread_count, size_t max_queue_size = 1000,RejectionPolicy policy = RejectionPolicy::BLOCK): max_queue_size_(max_queue_size), reject_policy_(policy), stop_(false){for (size_t i = 0; i < thread_count; ++i){workers_.emplace_back([this]{while (true) {std::function<void()> task;{std::unique_lock<std::mutex> lock(mtx_);cond_.wait(lock, [this] { return stop_ || !tasks_.empty(); });if (stop_ && tasks_.empty())return;task = std::move(tasks_.front());tasks_.pop();}task();} });}}
~FixedThreadPool(){{std::unique_lock<std::mutex> lock(mtx_);stop_ = true;}cond_.notify_all();for (std::thread &worker : workers_){if (worker.joinable()){worker.join();}}}
- 釋放資源前,先設置停止標志,并喚醒所有線程,讓線程檢測到停止標識后,安全地將隊列中的遺留任務處理完畢,然后釋放線程資源
代碼
#include <atomic> #include <chrono> #include <condition_variable> #include <exception> #include <functional> #include <future> #include <iostream> #include <mutex> #include <optional> #include <queue> #include <stdexcept> #include <string> #include <thread> #include <vector>#include "../code/assistance.hpp"// --- exec control logic ---struct exec_controller {bool notify_cancel() { return _should_cancel.exchange(true); }bool should_cancel() const { return _should_cancel; }private:std::atomic<bool> _should_cancel{false}; };struct exec_context {exec_context(std::shared_ptr<exec_controller> impl): _impl(std::move(impl)) {}bool canceled() const { return _impl->should_cancel(); }private:std::shared_ptr<exec_controller> _impl; };// --- CancellableTask ---struct CancellableTask {std::shared_ptr<exec_controller> controller =std::make_shared<exec_controller>();std::function<void(exec_context)> func;void operator()(){exec_context ctx{controller};func(ctx);} };// --- FixedThreadPool ---class FixedThreadPool { public:enum class RejectionPolicy{BLOCK,DISCARD,THROW};explicit FixedThreadPool(size_t thread_count, size_t max_queue_size = 1000,RejectionPolicy policy = RejectionPolicy::BLOCK): max_queue_size_(max_queue_size), reject_policy_(policy), stop_(false){for (size_t i = 0; i < thread_count; ++i){workers_.emplace_back([this]{while (true) {std::function<void()> task;{std::unique_lock<std::mutex> lock(mtx_);cond_.wait(lock, [this] { return stop_ || !tasks_.empty(); });if (stop_ && tasks_.empty())return;task = std::move(tasks_.front());tasks_.pop();}task();} });}}~FixedThreadPool(){{std::unique_lock<std::mutex> lock(mtx_);stop_ = true;}cond_.notify_all();for (std::thread &worker : workers_){if (worker.joinable()){worker.join();}}}template <class F>std::future<void> submit(const std::string &taskType, F &&f){auto timeout = getTimeoutForTask(taskType);auto cancellableTask = std::make_shared<CancellableTask>();cancellableTask->func = std::forward<F>(f);auto taskWrapper = std::make_shared<std::packaged_task<void()>>([cancellableTask](){ (*cancellableTask)(); });std::future<void> taskFuture = taskWrapper->get_future();{std::unique_lock<std::mutex> lock(mtx_);// 阻塞策略:等待有空間if (reject_policy_ == RejectionPolicy::BLOCK){cond_.wait(lock,[this]{ return tasks_.size() < max_queue_size_ || stop_; });}// 丟棄策略:拋出異常(不再返回默認構造的 future)else if (reject_policy_ == RejectionPolicy::DISCARD){if (tasks_.size() >= max_queue_size_){throw std::runtime_error("Task queue is full. Task was discarded.");}}// 異常策略:同樣拋出異常else if (reject_policy_ == RejectionPolicy::THROW){if (tasks_.size() >= max_queue_size_){throw std::runtime_error("Task queue is full.");}}if (stop_){throw std::runtime_error("ThreadPool is stopping. Cannot accept new tasks.");}tasks_.emplace([taskWrapper](){ (*taskWrapper)(); });}cond_.notify_one();// 啟動一個后臺線程監控超時取消std::thread([taskFuture = std::move(taskFuture),controller = cancellableTask->controller, timeout]() mutable{if (taskFuture.wait_for(timeout) == std::future_status::timeout) {controller->notify_cancel();std::cerr << "[Timeout] Task exceeded time limit and was cancelled.\n";} }).detach();return taskFuture;}private:static std::chrono::millisecondsgetTimeoutForTask(const std::string &taskType){if (taskType == ns_helper::TASK_TYPE_BUILD_INDEX){return std::chrono::seconds(120);}else if (taskType == ns_helper::TASK_TYPE_PERSIST_INDEX) // ? 修復這里{return std::chrono::seconds(4 * 3600);}else if (taskType == ns_helper::TASK_TYPE_SEARCH ||taskType == ns_helper::TASK_TYPE_AUTOCOMPLETE){return std::chrono::seconds(1);}else{return std::chrono::seconds(1);}}std::vector<std::thread> workers_;std::queue<std::function<void()>> tasks_;std::mutex mtx_;std::condition_variable cond_;bool stop_;size_t max_queue_size_;RejectionPolicy reject_policy_; };
測試
void test_cancel_task(exec_context ctx) {std::cout << "Task started.\n";for (int i = 0; i < 50; ++i){if (ctx.canceled()){std::cout << "Task detected cancellation, exiting early.\n";return;}std::this_thread::sleep_for(std::chrono::milliseconds(100));std::cout << "Task working... step " << i << "\n";}std::cout << "Task completed normally.\n"; }int main() {FixedThreadPool fp(2, 10, FixedThreadPool::RejectionPolicy::BLOCK);auto future = fp.submit("search", test_cancel_task);try{if (future.valid()){future.get();}}catch (const std::exception &e){std::cout << "Task threw exception: " << e.what() << "\n";}return 0; }
動態線程池會在另一篇講(我還沒寫 1 - 1) ,大家也可以幫我糾一糾錯(跪倒)