1.初識線程池
問:線程池是什么?
答:維持管理一定數量的線程的池式結構。(維持:線程復用? 。 管理:沒有收到任務的線程處于阻塞休眠狀態不參與cpu調度 。一定數量:數量太多的線程會給操作系統帶來線程頻繁切換的開銷)
問:線程池解決的問題是什么?
答:異步執行耗時任務,充分利用多核,不過度占用核心業務線程。(耗時:耗時等待(io),耗時計算,不耗時的任務不會啟用線程池解放核心線程的壓力)
問:線程池是如何解決問題的?(工作原理)
答:生產者(核心線程)來使用線程,拋出任務到任務隊列,消費者(線程池線程)從隊列中取出任務進行執行。線程為空要阻塞線程池線程,避免輪詢隊列增加操作系統負擔。
問:為什么線程池總是使用隊列?
答:生產者對應push,消費者對應pop,操作隊列時間為O(1),占用鎖的時間短,可以使鎖的使用更加靈活。
2.用C++代碼實現:
1.線程池的類:
對外提供三個接口,內部兩個核心數據結構與一個核心工作函數
#pragma once#include <thread>
#include <functional>
#include <vector>// blockingqueue 僅僅只能用作指針或引用 不直接引入頭文件,避免循環依賴
template <typename T>
class BlockingQueue;class ThreadPool {
public:// 初始化線程池 explicit作用:避免ThreadPool tp = 3的賦值情況explicit ThreadPool(int threads_num);// 停止線程池~ThreadPool();// 發布任務到線程池void Post(std::function<void()> task); //function<void()>表示一個沒有參數無返回值的可調用對象private:void Worker();std::unique_ptr<BlockingQueue<std::function<void()>>> task_queue_; //unique_ptr作用:線程池獨占該任務隊列,若調用者輕易淺拷貝,提供報錯;在Pool離開作用域的時候,自動調用析構函數刪除隊列std::vector<std::thread> workers_;
};
2.對外三個接口以及核心工作函數的實現
#include "blockingqueue.h"
#include <memory>
#include "threadpool.h"ThreadPool::ThreadPool(int threads_num) {task_queue_ = std::make_unique<BlockingQueue<std::function<void()>>>();for (size_t i = 0; i < threads_num; ++i) {workers_.emplace_back([this] {Worker(); }); //this捕獲當前 調用ThreadPool的pool對象 ;worker為線程執行函數}
}// 停止線程池
ThreadPool::~ThreadPool() {task_queue_->Cancel();for (auto& worker : workers_) {if (worker.joinable())worker.join();}
}void ThreadPool::Post(std::function<void()> task) {task_queue_->Push(task);
}void ThreadPool::Worker() {while (true) {std::function<void()> task;if (!task_queue_->Pop(task)) {break;}task();}
}
3.核心數據結構:自定義隊列類的實現
class BlockingQueuePro {
public:BlockingQueuePro(bool nonblock = false) : nonblock_(nonblock) {}//提供給生產者(主)線程使用void Push(const T& value) {std::lock_guard<std::mutex> lock(prod_mutex_);prod_queue_.push(value);not_empty_.notify_one();}//提供給消費者線程使用bool Pop(T& value) {std::unique_lock<std::mutex> lock(cons_mutex_);if (cons_queue_.empty() && SwapQueue_() == 0) { return false;//隊列已關閉}value = cons_queue_.front();cons_queue_.pop();return true;}//提供給生產者(主)線程使用 銷毀線程池時調用void Cancel() {std::lock_guard<std::mutex> lock(prod_mutex_);nonblock_ = true;not_empty_.notify_all();}private:int SwapQueue_() {std::unique_lock<std::mutex> lock(prod_mutex_);//所有消費者進程阻塞在下面一行等待被喚醒 (正常喚醒 or 異常喚醒(線程池銷毀))not_empty_.wait(lock, [this] {return !prod_queue_.empty() || nonblock_; });std::swap(prod_queue_, cons_queue_);return cons_queue_.size();}bool nonblock_;//設置阻塞隊列的狀態,構造置0表示默認有阻塞,cancel中置為1表示無阻塞std::queue<T> prod_queue_;//STL里的隊列std::queue<T> cons_queue_;std::mutex prod_mutex_;std::mutex cons_mutex_;std::condition_variable not_empty_;//條件變量cond 用于線程間通信: 消費者wait 生產者喚醒
};
3.工作流程
初始化線程池,創建線程。線程在執行task()之前,阻塞在任務隊列的Pop,等待主業務線程將task 通過post加入到任務隊列后將某一線程喚醒(或者要銷毀任務隊列了喚醒所有線程)。所有任務執行完畢后,銷毀線程池(可選)。