目錄
實現一個無返回的線程池
完全代碼實現
Reference
實現一個無返回的線程池
實現一個簡單的線程池非常簡單,我們首先聊一聊線程池的定義:
線程池(Thread Pool) 是一種并發編程的設計模式,用于管理和復用多個線程,以提高程序的性能和資源利用率。它的核心思想是預先創建一組線程,并將任務分配給這些線程執行,而不是為每個任務單獨創建和銷毀線程。線程池廣泛應用于需要處理大量短期任務的場景,例如 Web 服務器、數據庫連接池、任務調度系統等。換而言之,線程池說白了就是一種餓漢思維——直接預先提供若干的線程,由線程池內部控制調度,確保我們可以只關心任務的提交以及完成。
我們下面要做的是設計一個任務是不返回的線程池。所以,我們約束我們的函數是:
using supportive_task_type = std::function<void()>;
下一步,就是構造我們的線程池的線程。注意的是——線程和任務是解耦合的,意味著我們需要一個中間函數解耦合任務派發。筆者決定,將任務派發分到一個私有函數完成:
? CCThreadPool(const int workers_num) {for(int i = 0; i < workers_num; i++){internal_threads.emplace_back([this](){__scheduled();});}}
上面這個代碼很簡單,就是將每一個線程都分配一個調度函數,這個調度函數來委派分發任務,辦法說簡單也很簡單:
void __scheduled(){while(1){// sources protectionsstd::unique_lock<std::mutex> locker(internal_mutex);// waiting for the access of the task resourcescontrolling_cv.wait(locker, [this]{return thread_pool_status || !tasks_queue.empty();});// quit if requriedif(thread_pool_status && tasks_queue.empty()){return;}// 現在我們可以取到任務執行了supportive_task_type task(std::move(tasks_queue.front()));tasks_queue.pop();locker.unlock();task();}}
當析構的時候,我們也要通知所有線程的cv不要睡眠了,由于設置了thread_pool_status
是true,直接線程跳出來結束全文。
? ~CCThreadPool(){thread_pool_status = true;controlling_cv.notify_all();for(auto& thread : internal_threads){thread.join();}}
完全代碼實現
#include <condition_variable>
#include <functional>
#include <mutex>
#include <print>
#include <queue>
#include <thread>
#include <utility>
#include <vector>
?
class CCThreadPool {public:CCThreadPool() ? ? ? ? ? ? ? ? ? ? ? ? = delete;CCThreadPool(const CCThreadPool &) ? ? = delete;CCThreadPool &operator=(CCThreadPool &) = delete;
?CCThreadPool(const int workers_num) {for(int i = 0; i < workers_num; i++){internal_threads.emplace_back([this](){__scheduled();});}}
?~CCThreadPool(){thread_pool_status = true;controlling_cv.notify_all();for(auto& thread : internal_threads){thread.join();}}
?template<typename F, typename... Args>void enTask(F&& f, Args&&... args){supportive_task_type task(std::bind(std::forward<F&&>(f), std::forward<Args&&>(args)...));{std::unique_lock<std::mutex> locker(internal_mutex);tasks_queue.emplace(std::move(task));}controlling_cv.notify_one();}
?private:void __scheduled(){while(1){std::unique_lock<std::mutex> locker(internal_mutex);controlling_cv.wait(locker, [this]{return thread_pool_status || !tasks_queue.empty();});// quitif(thread_pool_status && tasks_queue.empty()){return;}supportive_task_type task(std::move(tasks_queue.front()));tasks_queue.pop();locker.unlock();task();}}
?using supportive_task_type = std::function<void()>;std::vector<std::thread> internal_threads;std::queue<supportive_task_type> tasks_queue;std::mutex internal_mutex;std::condition_variable controlling_cv;bool thread_pool_status = false;
};
?
?
int main()
{std::println("Task start");CCThreadPool pool(4);for (int i = 0; i < 8; ++i) {pool.enTask([i] {std::println("Task {} is started at thread with id {}", i, std::this_thread::get_id());std::this_thread::sleep_for(std::chrono::seconds(1));std::println("Task {} is done", i);});}return 0;
}
Reference
8. C++11 跨平臺線程池-See的編程日記 (seestudy.cn)