github看到一個項目(GitHub - markparticle/WebServer: C++ Linux WebServer服務器),內部使用的一個線程池看著不錯,拿來學習一下。
/** @Author : mark* @Date : 2020-06-15* @copyleft Apache 2.0*/ #ifndef THREADPOOL_H
#define THREADPOOL_H#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <functional>
class ThreadPool {
public:explicit ThreadPool(size_t threadCount = 8): pool_(std::make_shared<Pool>()) {assert(threadCount > 0);for(size_t i = 0; i < threadCount; i++) {std::thread([pool = pool_] {std::unique_lock<std::mutex> locker(pool->mtx);while(true) {if(!pool->tasks.empty()) {auto task = std::move(pool->tasks.front());pool->tasks.pop();locker.unlock();task();locker.lock();} else if(pool->isClosed) break;else pool->cond.wait(locker);}}).detach();}}ThreadPool() = default;ThreadPool(ThreadPool&&) = default;~ThreadPool() {if(static_cast<bool>(pool_)) {{std::lock_guard<std::mutex> locker(pool_->mtx);pool_->isClosed = true;}pool_->cond.notify_all();}}template<class F>void AddTask(F&& task) {{std::lock_guard<std::mutex> locker(pool_->mtx);pool_->tasks.emplace(std::forward<F>(task));}pool_->cond.notify_one();}private:struct Pool {std::mutex mtx;std::condition_variable cond;bool isClosed;std::queue<std::function<void()>> tasks;};std::shared_ptr<Pool> pool_;
};#endif //THREADPOOL_H
1,先看下 私有變量。
struct Pool {std::mutex mtx;std::condition_variable cond;bool isClosed;std::queue<std::function<void()>> tasks;};
std::shared_ptr<Pool> pool_;
一個結構體 Pool 將用到的變量,全部包含進去。
mtx 一個鎖,用于加入,取出任務。
cond 信號量,用于線程間同步。
isClosed 線程池退出的標志。
tasks 任務隊列,存放所有需要在線程中執行的任務。
pool_ 智能指針,管理所有資源。
2,構造函數
(1),默認開啟8個線程(size_t threadCount = 8),并在初始化列表中? 初始化pool_對象(std::make_shared<Pool>())
(2),循環啟動每個線程
for(){std::thread({//todo}).detach();
}
(3),加鎖? 確保 ,各個線程對 任務隊列(tasks) 不產生競爭,因為添加任務,取任務都要操作這個隊列。
std::unique_lock<std::mutex> locker(pool->mtx);
(4),單獨看一個線程。
首先判斷任務隊列是否為空,為空 則利用信號量阻塞當前線程。
else pool->cond.wait(locker);
如果線程池是退出狀態,則跳出當前循環,當前線程也會退出。
else if(pool->isClosed) break;
如果任務隊列不為空,則取出第一個任務,隊列減1,然后解鎖,執行任務,再加鎖。
auto task = std::move(pool->tasks.front());
pool->tasks.pop();
locker.unlock();
task();
locker.lock();
std::move 用于移動語義,允許在不復制內存的情況下轉移資源的所有權。這段代碼之后,相當于8個線程,從任務隊列tasks 中搶任務,搶到一個執行一個。比如A線程 搶到了任務1,任務1 在執行中過程中,由于隊列處于未加鎖狀態,那B線程,就可以繼續搶任務2。
3,添加任務
std::forward ,在一個函數中將參數以原始的形式傳遞給另一個函數,同時保持其值類別(lvalue 或 rvalue)和 const 修飾符。
這里將task 添加到 任務隊列?tasks中。并且利用locker 進行保護。這里添加完成之后,上述的8個線程就會從這個任務隊列中搶任務,然后執行。
template<class F>void AddTask(F&& task) {{std::lock_guard<std::mutex> locker(pool_->mtx);pool_->tasks.emplace(std::forward<F>(task));}pool_->cond.notify_one();}
3,銷毀
將isClosed 標志置為false,并通知所有線程繼續執行,防止因為任務隊列為空,造成阻塞無法退出。在上述循環中,檢測到isClosed 為fasel,則while循環退出,線程退出。
~ThreadPool() {if(static_cast<bool>(pool_)) {{std::lock_guard<std::mutex> locker(pool_->mtx);pool_->isClosed = true;}pool_->cond.notify_all();}}
5,使用
構造完ThreadPool 之后,直接調用AddTask ,將任務傳入其中即可。
std::bind? 用于創建一個可調用對象,將其與特定的參數綁定在一起。
?在C++11 之下,封裝一個線程池 還是很優雅的。