線程池
基本功能接口
C++11 及以后的標準中,std::packaged_task
和std::future
是并發編程中用于任務封裝和結果獲取的重要組件,它們通常與線程配合使用,實現異步操作。
std::packaged_task
std::packaged_task
:封裝可調用對象為異步任務,它是一個模板類,用于封裝任何可調用對象(包括函數、lambda
、函數對象等),并且它還需要與std::future
關聯使用,當std::packaged_task
被執行時,其中封裝的任務也會運行,結果會存儲在內部,和這個std::packaged_task
關聯的std::future
可以進行調用。
核心功能
- 異步任務封裝:將任務(在線程池中每個任務起始就是一個函數)打包起來,讓它可以異步執行
- 和
std::future
綁定:因為每一個std::packaged_task()
會對應一個std::future
,所以在std::packaged_task
執行之后,其中的任務也運行了,結果就存儲在內部,等待std::future
通過get_future()
調用. - 執行任務:可以直接通過
operator()
調用,也可傳遞給線程執行(std::thread
接收std::packaged_task()
當參數就行)
簡單函數的示例
這里先給出一個簡單的小示例,之后會結合線程池進行闡述
#include <future>
#include <thread>
#include <iostream>int add(int a, int b) {return a + b;
}int main()
{// 1、首先需要封裝任務-->這里是封裝add(),需要一個int做返回值以及兩個int參數std::packaged_task<int(int, int)> task(add);// 2、和std::future綁定 異步任務task調用get_future進行綁定std::future<int> f = task.get_future();// 3、在一個線程中執行這個封裝好的異步任務std::thread t(std::move(task), 10, 20);// 這里需要強調一下,封裝后的異步任務不可復制,只能進行移動,所以再傳給線程做參數時,只能使用std::move()// 因為這里是異步執行的,所以主線程可以執行其他任務// 現在獲取這個線程中執行的結果// future() 具有唯一性,使用get()獲取一次之后,就不能獲取第二次了,如果結果沒有就緒就阻塞等待結果就緒int res = f.get(); // 現在就可以打印獲得的這個結果了std::cout << "res = " << res << std::endl;// 在創建線程之后,必須 join或者detach,否則就會出現錯誤t.join();return 0;
}
那么接下來是關于std::future
獲取結果
std::future
用于獲取異步操作(線程、任務)執行的結果,可以理解為一種類似于“未來結果的占位符”,因為你啟動一個異步線程時,可能無法立即得到結果,但是可以使用std::future
對象在未來某個時刻獲取結果。
核心功能
- 可以通過
get()
方法獲取異步操作得到的結果(返回值),如果在調用get()
時,異步操作還未完成,那么就會阻塞當前線程等待有結果產生。 - 可以通過
valid()
判斷future
是否與一個有效的異步操作關聯成功,可以通過wait()
阻塞等待結果,也可以通過wait_for()
或wait_until()
等待指定時長之后返回狀態。
上方已經給出了示例用法,都是一樣的,這里就不給了,待會直接上線程池相關的示例。
任務入隊操作
當有新任務到來時,任務會被添加到任務隊列中,這個過程中,需要先獲取互斥鎖,保證任務隊列的線程安全,添加任務后,通過條件變量通知等待的線程有新任務到來。我這里將任務劃分成了不帶返回值的普通任務和帶返回值的任務,其中帶返回值的任務使用異步封包的方式進行封裝,分別如下:
帶返回值的異步任務提交到任務隊
步驟:
- 通過
std::bind()
和std::make_shared()
創建一個包裝了任務的std::package_task
- 獲取其對應的
std::future
用于獲取任務執行結果 - 在臨界區內(加鎖)將任務添加到任務隊列
tasks
中 - 通知一個等待的線程有新任務
以下是線程池提交帶有返回值的任務的示例過程
template<typename F, typename... Args>
auto SubmitRetTask(F&& func, Args... args) -> std::future<std::invoke_result_t<F, Args...>> {// 首先定義返回類型auto ret_type = std::invoke_result_t<F, Args...>;// 狀態判斷if (is_shuntdown_.load() || !is_available_.load()) {// 返回一個控制return std::future<ret_type>();}// 開始封裝異步任務auto bind_task = std::bind(std::forward<F>(func), std::forward<Args>(args...));auto task = std::make_shared<std::packaged_task<ret_type>()>(std::move(bind_task));std::future<ret_type> res = task.get_future();{// 在臨界區加鎖,將任務添加到任務隊列中std::lock_guard<std::mutex> lock(task_mutex_);tasks_.emplace([task](){(*task)();});}task_cv_.notify_one();return res;
}// 用到的成員變量
std::queue<std::function<void()> tasks_; // 任務隊列
std::atomic<bool> is_shutdown_; // 線程是否關閉
std::atomic<bool> is_available_; // 線程池是否還有效std::mutex task_mutex_; // 任務鎖
std::condition_variable task_cv_; // 條件變量,用于阻塞任務
不帶返回值的普通任務
template<typename F, typename... Args>
void SubmitTask(F&& func, Args... args) {// 終止條件if (is_shutdown_.load() || !is_available_.load()) {return;}// 封裝任務auto task = std::bind(std::forward<F>(func), std::forward<Args>(args...));{std::lock_guard<std::mutex> lock(task_mutex_);tasks.emplace([task](){task(); // 調用對應的任務});}// 喚醒一個阻塞中的線程task_cv_.notify_one();
}
所以可以看出,起始線程池中任務的提交過程整體思路都是一致的,只是有返回值的提交上,添加了std::packaged_task
與std::future
來做異步任務的封裝而已。
工作線程取出任務執行過程
在工作線程開啟之后,需要去任務隊列中取出任務然后執行。主要的過程是,獲取互斥鎖保證資源的互斥訪問,然后檢查任務隊列是否為空,如果為空,就需要通過條件變量阻塞,等待任務添加進來。獲取到任務之后就會執行任務,執行完畢馬上繼續獲取任務,除非線程池停止并且任務隊列為空。
主要的過程如下:
- 由于每次都會取出一個任務
task
,每個任務都是一個函數std::function<void()>
- 無限循環,一直訪問任務隊列,直到線程池停止,然后任務隊列為空
- 取出任務隊列中的任務,執行
我的取出任務的接口函數
成員變量信息:
using ThreadPtr = std::shared_ptr<std::thread>;
using Task = std::function<void()>;// 一個線程信息結構體,包含管理線程的智能指針
struct ThreadInfo {ThreadInfo();~ThreadInfo();ThreadPtr ptr{nullptr};
}// 每一個線程的信息都是有一個智能指針來管理
using ThreadInfoPtr = std::shared_ptr<ThreadInfo>;// 線程數組
std::vector<ThreadInfoPtr> work_threads_;
添加線程函數:
void ThreadPool::AddThread() {// 先從任務隊列中取出一個任務auto func = [this]() {while (true) {Task task;{// 首先獲取互斥鎖std::unique_lock<std::mutex> lock(task_mutex_);// 通過條件變量等待條件滿足task_cv_.wait(lock, [this](){return is_shutdown_.load() || !tasks.empty();});if (is_shutdown_.load() && tasks.empty()) {return;}// 取出任務task = std::move(tasks.front());tasks.pop();}task();}};// 將取出來的任務封裝到線程中添加到線程池ThreadInfoPtr thread_ptr = std::shared_ptr<std::thread>();thread_ptr->ptr = std::make_shared<ThreadInfo>(std::move(func));// 添加到線程池中work_threads_.emplace_back(std::move(thread_ptr));
}
線程池類設計
線程池類負責創建線程池、銷毀線程池以及管理線程隊列、任務隊列以及添加任務或者取出任務執行等操作。
類定義如下:
class ThreadPool{
public:explicit ThreadPool(uint32_t thread_count);// 禁止拷貝線程池ThreadPool(const ThreadPool&) = delete;ThreadPool& operator=(const ThreadPool&) = delete;~ThreadPool();bool Start(); // 啟動線程池void Stop(); // 停止線程池// 提交任務,分別有普通任務和帶返回值的任務template<typename F, typename... Args>void SubmitTask(F&& func, Args... args) {if (is_shutdown_.load() || !is_available_.load()) {return;}auto task = std::bind(std::forward<F>(func), std::forward<Args>(args...));{std::unique_lock<std::mutex> lock(task_mutex_);// 添加任務tasks.emplace([task](){task();});}// 喚醒一個等待任務的阻塞線程task_cv_.notify_one();}// 提交帶有返回值的任務template<typename F, typename... Args>auto SubmitRetTask(F&& func, Args... args) -> std::future<std::invoke_result_t<F, Args...>> {auto ret_type = std::invoke_result_t<F, Args...>;// 檢查變量判斷是否還能繼續if (is_shutdown_.load() || !is_available_.load()) {return std::future<ret_type>(); // 此時需要返回一個空對象}auto bind_task = std::bind(std::forward<F>(func), std::forward<Args>(args...));// 用packaged_task和shared_ptr封裝異步任務auto task = std::make_shared<std::packaged_task<ret_type>()>(std::move(bind_task));// 與future綁定std::future<ret_type> res = task.get_future();{std::unique_lock<std::mutex> lock(task_mutex_);tasks_.emplace([task](){(*task)();});}// 喚醒等待線程task_cv_.notify_one();return res;}private:// 增加線程函數void AddThread();// 通過智能指針來管理線程using ThreadPtr = std::shared_ptr<std::thread>;using Task = std::function<void()>;struct ThreadInfo{ThreadInfo();~ThreadInfo();ThreadInfo ptr{nullptr};}using ThreadInfoPtr = std::shared_ptr<ThreadInfo>;std::vector<ThreadInfoPtr> works_threads_; std::queue<Task> tasks_;std::mutex task_mutex_;std::condition_variable task_cv_;std::atomic<uint32_t> thread_count_;std::atomic<bool> is_shutdown_;std::atomic<bool> is_available_;
}
接口實現
構造與析構
我這里的思路是構造函數初始化一些基本的成員變量,比如thread_count_
,is_shutdown_
,is_available_
就夠了,在啟動線程池時采取初始化,并且創建線程添加到線程池中,所以構造函數如下:
explicit ThreadPool::ThreadPool(uint32_t thread_count) : thread_count_(thread_count), is_shutdown_(false), is_available(false){}
析構函數和構造函數的思路類似,里面由Stop()
這個接口來處理線程池的終止
ThreadPool::~ThreadPool() { Stop();}
Start() 啟動線程池和 Stop()終止線程池
Start()
負責啟動線程池,然后循環創建線程并且添加到容器中
bool ThreadPool::Start() {if (!is_available_.load()) {is_availeable_.store(true);uint32_t thread_count = thread_count_.load();for (uint32_t i = 0; i < thread_count; i++) {AddThread(); // 由這個添加函數完成創建線程并且綁定任務添加到容器中}return true;}return false;
}
Stop()
代表線程池停止接口,首先需要將所有相關的成員變量置為停止狀態下對應的值,然后停止所有進程,回收所有進程,保證所有進程只join()
一次
void ThreadPool::Stop() {if (!is_shotdown_.load()) {return ;}// 將對應的變量置為退出狀態is_shutdown_.store(true);is_available_.store(false);// 通知所有線程task_cv_.notify_all();// 回收所有線程for (auto& thread_info_ptr : work_threads_) {if (thread_info_ptr && thread_info_ptr->ptr) {std::thread& t = *thread_info_ptr->ptr;if (t.joinable()) {t.join();}}}// 清空所有線程容器work_threads_.clear();{// 在線程池關閉的時候,還需要將任務隊列中的所有任務popstd::lock_guard<std::mutex> lock(task_mutex_);while (!tasks_.empty()) {tasks_.pop();}}
}
取出任務綁定線程然后添加到線程函數 AddThread()
AddThread()
這個函數主要是從任務隊列中取出任務,然后將其綁定到線程,并且添加到容器中.
void ThreadPool::AddThread() {// 取出任務auto func = [this]() {while(true) {Task task;{std::unqiue_lock<std::mutex> lock(task_mutex_);task_cv_.wait(lock, [this](){return is_shutdown_.load() || !tasks.empty(); });if (is_shutdown_.load() && tasks.empty()) {return;}// 取出任務task = std::move(tasks.front());tasks.pop();}task();}}// 將其封裝為線程ThreadInfoPtr thread_ptr = std::make_shared<ThreadInfo>();thread_ptr->ptr = std::make_shared<std::thread>(std::move(func));work_threads_.emplace_back(std::move(thread_ptr));
}
線程池
基本功能接口
C++11 及以后的標準中,std::packaged_task
和std::future
是并發編程中用于任務封裝和結果獲取的重要組件,它們通常與線程配合使用,實現異步操作。
std::packaged_task
std::packaged_task
:封裝可調用對象為異步任務,它是一個模板類,用于封裝任何可調用對象(包括函數、lambda
、函數對象等),并且它還需要與std::future
關聯使用,當std::packaged_task
被執行時,其中封裝的任務也會運行,結果會存儲在內部,和這個std::packaged_task
關聯的std::future
可以進行調用。
核心功能
- 異步任務封裝:將任務(在線程池中每個任務起始就是一個函數)打包起來,讓它可以異步執行
- 和
std::future
綁定:因為每一個std::packaged_task()
會對應一個std::future
,所以在std::packaged_task
執行之后,其中的任務也運行了,結果就存儲在內部,等待std::future
通過get_future()
調用. - 執行任務:可以直接通過
operator()
調用,也可傳遞給線程執行(std::thread
接收std::packaged_task()
當參數就行)
簡單函數的示例
這里先給出一個簡單的小示例,之后會結合線程池進行闡述
#include <future>
#include <thread>
#include <iostream>int add(int a, int b) {return a + b;
}int main()
{// 1、首先需要封裝任務-->這里是封裝add(),需要一個int做返回值以及兩個int參數std::packaged_task<int(int, int)> task(add);// 2、和std::future綁定 異步任務task調用get_future進行綁定std::future<int> f = task.get_future();// 3、在一個線程中執行這個封裝好的異步任務std::thread t(std::move(task), 10, 20);// 這里需要強調一下,封裝后的異步任務不可復制,只能進行移動,所以再傳給線程做參數時,只能使用std::move()// 因為這里是異步執行的,所以主線程可以執行其他任務// 現在獲取這個線程中執行的結果// future() 具有唯一性,使用get()獲取一次之后,就不能獲取第二次了,如果結果沒有就緒就阻塞等待結果就緒int res = f.get(); // 現在就可以打印獲得的這個結果了std::cout << "res = " << res << std::endl;// 在創建線程之后,必須 join或者detach,否則就會出現錯誤t.join();return 0;
}
那么接下來是關于std::future
獲取結果
std::future
用于獲取異步操作(線程、任務)執行的結果,可以理解為一種類似于“未來結果的占位符”,因為你啟動一個異步線程時,可能無法立即得到結果,但是可以使用std::future
對象在未來某個時刻獲取結果。
核心功能
- 可以通過
get()
方法獲取異步操作得到的結果(返回值),如果在調用get()
時,異步操作還未完成,那么就會阻塞當前線程等待有結果產生。 - 可以通過
valid()
判斷future
是否與一個有效的異步操作關聯成功,可以通過wait()
阻塞等待結果,也可以通過wait_for()
或wait_until()
等待指定時長之后返回狀態。
上方已經給出了示例用法,都是一樣的,這里就不給了,待會直接上線程池相關的示例。
任務入隊操作
當有新任務到來時,任務會被添加到任務隊列中,這個過程中,需要先獲取互斥鎖,保證任務隊列的線程安全,添加任務后,通過條件變量通知等待的線程有新任務到來。我這里將任務劃分成了不帶返回值的普通任務和帶返回值的任務,其中帶返回值的任務使用異步封包的方式進行封裝,分別如下:
帶返回值的異步任務提交到任務隊
步驟:
- 通過
std::bind()
和std::make_shared()
創建一個包裝了任務的std::package_task
- 獲取其對應的
std::future
用于獲取任務執行結果 - 在臨界區內(加鎖)將任務添加到任務隊列
tasks
中 - 通知一個等待的線程有新任務
以下是線程池提交帶有返回值的任務的示例過程
template<typename F, typename... Args>
auto SubmitRetTask(F&& func, Args... args) -> std::future<std::invoke_result_t<F, Args...>> {// 首先定義返回類型auto ret_type = std::invoke_result_t<F, Args...>;// 狀態判斷if (is_shuntdown_.load() || !is_available_.load()) {// 返回一個控制return std::future<ret_type>();}// 開始封裝異步任務auto bind_task = std::bind(std::forward<F>(func), std::forward<Args>(args...));auto task = std::make_shared<std::packaged_task<ret_type>()>(std::move(bind_task));std::future<ret_type> res = task.get_future();{// 在臨界區加鎖,將任務添加到任務隊列中std::lock_guard<std::mutex> lock(task_mutex_);tasks_.emplace([task](){(*task)();});}task_cv_.notify_one();return res;
}// 用到的成員變量
std::queue<std::function<void()> tasks_; // 任務隊列
std::atomic<bool> is_shutdown_; // 線程是否關閉
std::atomic<bool> is_available_; // 線程池是否還有效std::mutex task_mutex_; // 任務鎖
std::condition_variable task_cv_; // 條件變量,用于阻塞任務
不帶返回值的普通任務
template<typename F, typename... Args>
void SubmitTask(F&& func, Args... args) {// 終止條件if (is_shutdown_.load() || !is_available_.load()) {return;}// 封裝任務auto task = std::bind(std::forward<F>(func), std::forward<Args>(args...));{std::lock_guard<std::mutex> lock(task_mutex_);tasks.emplace([task](){task(); // 調用對應的任務});}// 喚醒一個阻塞中的線程task_cv_.notify_one();
}
所以可以看出,起始線程池中任務的提交過程整體思路都是一致的,只是有返回值的提交上,添加了std::packaged_task
與std::future
來做異步任務的封裝而已。
工作線程取出任務執行過程
在工作線程開啟之后,需要去任務隊列中取出任務然后執行。主要的過程是,獲取互斥鎖保證資源的互斥訪問,然后檢查任務隊列是否為空,如果為空,就需要通過條件變量阻塞,等待任務添加進來。獲取到任務之后就會執行任務,執行完畢馬上繼續獲取任務,除非線程池停止并且任務隊列為空。
主要的過程如下:
- 由于每次都會取出一個任務
task
,每個任務都是一個函數std::function<void()>
- 無限循環,一直訪問任務隊列,直到線程池停止,然后任務隊列為空
- 取出任務隊列中的任務,執行
我的取出任務的接口函數
成員變量信息:
using ThreadPtr = std::shared_ptr<std::thread>;
using Task = std::function<void()>;// 一個線程信息結構體,包含管理線程的智能指針
struct ThreadInfo {ThreadInfo();~ThreadInfo();ThreadPtr ptr{nullptr};
}// 每一個線程的信息都是有一個智能指針來管理
using ThreadInfoPtr = std::shared_ptr<ThreadInfo>;// 線程數組
std::vector<ThreadInfoPtr> work_threads_;
添加線程函數:
void ThreadPool::AddThread() {// 先從任務隊列中取出一個任務auto func = [this]() {while (true) {Task task;{// 首先獲取互斥鎖std::unique_lock<std::mutex> lock(task_mutex_);// 通過條件變量等待條件滿足task_cv_.wait(lock, [this](){return is_shutdown_.load() || !tasks.empty();});if (is_shutdown_.load() && tasks.empty()) {return;}// 取出任務task = std::move(tasks.front());tasks.pop();}task();}};// 將取出來的任務封裝到線程中添加到線程池ThreadInfoPtr thread_ptr = std::shared_ptr<std::thread>();thread_ptr->ptr = std::make_shared<ThreadInfo>(std::move(func));// 添加到線程池中work_threads_.emplace_back(std::move(thread_ptr));
}
線程池類設計
線程池類負責創建線程池、銷毀線程池以及管理線程隊列、任務隊列以及添加任務或者取出任務執行等操作。
類定義如下:
class ThreadPool{
public:explicit ThreadPool(uint32_t thread_count);// 禁止拷貝線程池ThreadPool(const ThreadPool&) = delete;ThreadPool& operator=(const ThreadPool&) = delete;~ThreadPool();bool Start(); // 啟動線程池void Stop(); // 停止線程池// 提交任務,分別有普通任務和帶返回值的任務template<typename F, typename... Args>void SubmitTask(F&& func, Args... args) {if (is_shutdown_.load() || !is_available_.load()) {return;}auto task = std::bind(std::forward<F>(func), std::forward<Args>(args...));{std::unique_lock<std::mutex> lock(task_mutex_);// 添加任務tasks.emplace([task](){task();});}// 喚醒一個等待任務的阻塞線程task_cv_.notify_one();}// 提交帶有返回值的任務template<typename F, typename... Args>auto SubmitRetTask(F&& func, Args... args) -> std::future<std::invoke_result_t<F, Args...>> {auto ret_type = std::invoke_result_t<F, Args...>;// 檢查變量判斷是否還能繼續if (is_shutdown_.load() || !is_available_.load()) {return std::future<ret_type>(); // 此時需要返回一個空對象}auto bind_task = std::bind(std::forward<F>(func), std::forward<Args>(args...));// 用packaged_task和shared_ptr封裝異步任務auto task = std::make_shared<std::packaged_task<ret_type>()>(std::move(bind_task));// 與future綁定std::future<ret_type> res = task.get_future();{std::unique_lock<std::mutex> lock(task_mutex_);tasks_.emplace([task](){(*task)();});}// 喚醒等待線程task_cv_.notify_one();return res;}private:// 增加線程函數void AddThread();// 通過智能指針來管理線程using ThreadPtr = std::shared_ptr<std::thread>;using Task = std::function<void()>;struct ThreadInfo{ThreadInfo();~ThreadInfo();ThreadInfo ptr{nullptr};}using ThreadInfoPtr = std::shared_ptr<ThreadInfo>;std::vector<ThreadInfoPtr> works_threads_; std::queue<Task> tasks_;std::mutex task_mutex_;std::condition_variable task_cv_;std::atomic<uint32_t> thread_count_;std::atomic<bool> is_shutdown_;std::atomic<bool> is_available_;
}
接口實現
構造與析構
我這里的思路是構造函數初始化一些基本的成員變量,比如thread_count_
,is_shutdown_
,is_available_
就夠了,在啟動線程池時采取初始化,并且創建線程添加到線程池中,所以構造函數如下:
explicit ThreadPool::ThreadPool(uint32_t thread_count) : thread_count_(thread_count), is_shutdown_(false), is_available(false){}
析構函數和構造函數的思路類似,里面由Stop()
這個接口來處理線程池的終止
ThreadPool::~ThreadPool() { Stop();}
Start() 啟動線程池和 Stop()終止線程池
Start()
負責啟動線程池,然后循環創建線程并且添加到容器中
bool ThreadPool::Start() {if (!is_available_.load()) {is_availeable_.store(true);uint32_t thread_count = thread_count_.load();for (uint32_t i = 0; i < thread_count; i++) {AddThread(); // 由這個添加函數完成創建線程并且綁定任務添加到容器中}return true;}return false;
}
Stop()
代表線程池停止接口,首先需要將所有相關的成員變量置為停止狀態下對應的值,然后停止所有進程,回收所有進程,保證所有進程只join()
一次
void ThreadPool::Stop() {if (!is_shotdown_.load()) {return ;}// 將對應的變量置為退出狀態is_shutdown_.store(true);is_available_.store(false);// 通知所有線程task_cv_.notify_all();// 回收所有線程for (auto& thread_info_ptr : work_threads_) {if (thread_info_ptr && thread_info_ptr->ptr) {std::thread& t = *thread_info_ptr->ptr;if (t.joinable()) {t.join();}}}// 清空所有線程容器work_threads_.clear();{// 在線程池關閉的時候,還需要將任務隊列中的所有任務popstd::lock_guard<std::mutex> lock(task_mutex_);while (!tasks_.empty()) {tasks_.pop();}}
}
取出任務綁定線程然后添加到線程函數 AddThread()
AddThread()
這個函數主要是從任務隊列中取出任務,然后將其綁定到線程,并且添加到容器中.
void ThreadPool::AddThread() {// 取出任務auto func = [this]() {while(true) {Task task;{std::unqiue_lock<std::mutex> lock(task_mutex_);task_cv_.wait(lock, [this](){return is_shutdown_.load() || !tasks.empty(); });if (is_shutdown_.load() && tasks.empty()) {return;}// 取出任務task = std::move(tasks.front());tasks.pop();}task();}}// 將其封裝為線程ThreadInfoPtr thread_ptr = std::make_shared<ThreadInfo>();thread_ptr->ptr = std::make_shared<std::thread>(std::move(func));work_threads_.emplace_back(std::move(thread_ptr));
}
最后,希望自己繼續加油,學無止境,還請各位大佬海涵,如有錯誤請直接指出,我一定會及時修改。如果侵權,請聯系我刪除~