三. CachedThreadPool 的實現
3.1 需求:
動態調整線程數量:與 FixedThreadPool 不同,CachedThreadPool 的線程數量是動態調整的。當有新任務提交時,如果線程池中有空閑的線程,則會立即使用空閑線程執行任務;如果線程池中沒有空閑線程,則會創建一個新的線程來執行任務。當線程空閑一段時間后,超過一定的時間(默認為 60 秒),會被回收銷毀。
3.2 SyncQueue 同步隊列的設計和實現
#ifndef SYNCQUEUE3_HPP
#define SYNCQUEUE3_HPP#include<list>
#include<mutex>
#include<condition_variable>
#include<iostream>using namespace std;template<class T>
class SyncQueue
{
private:std::list<T> m_queue; // 任務緩沖區mutable std::mutex m_mutex;std::condition_variable m_notEmpty; // Cstd::condition_variable m_notFull; // Psize_t m_waitTime; //任務隊列滿等待時間sint m_maxSize; // bool m_needStop; // true 同步隊列不在接受新的任務//bool IsFull() const{bool full = m_queue.size() >= m_maxSize;if (full){printf("m_queue 已經滿了,需要等待....\n");}return full;}bool IsEmpty() const{bool empty = m_queue.empty(); //if (empty){printf("m_queue 已經空了,需要等待....\n");}return empty;}// return 0; 成功// 1 ;// full// 2 ;// stop;template<class F>int Add(F&& x){std::unique_lock<std::mutex> locker(m_mutex);if (!m_notFull.wait_for(locker, std::chrono::seconds(m_waitTime),[this] { return m_needStop || !IsFull(); })){cout << "task queue full return 1" << endl;return 1;}if (m_needStop){cout << "同步隊列停止工作..." << endl;return 2;}m_queue.push_back(std::forward<F>(x));m_notEmpty.notify_one();return 0;}public:SyncQueue(int maxsize = 100, int timeout = 1):m_maxSize(maxsize),m_waitTime(timeout),m_needStop(false) // 同步隊列開始工作{}int Put(const T& x){return Add(x);}int Put(T&& x){return Add(std::forward<T>(x));}int notTask(){std::unique_lock<std::mutex> locker(m_mutex);if (!m_notEmpty.wait_for(locker, std::chrono::seconds(m_waitTime))== std::cv_status::timeout){return 1;}return 0;}void Take(std::list<T>& list) //{std::unique_lock<std::mutex> locker(m_mutex);while (!m_needStop && IsEmpty()){m_notEmpty.wait(locker);}if (m_needStop){cout << "同步隊列停止工作..." << endl;return;}list = std::move(m_queue);m_notFull.notify_one();}//T& GetTake();// return 0; 成功// 1 ;// empty// 2 ;// stop;int Take(T& t) // 1{std::unique_lock<std::mutex> locker(m_mutex);if (!m_notEmpty.wait_for(locker, std::chrono::seconds(m_waitTime),[this] { return m_needStop || !IsEmpty(); })){return 1; // 隊列空}if (m_needStop){cout << "同步隊列停止工作..." << endl;return 2;}t = m_queue.front();m_queue.pop_front();m_notFull.notify_one();return 0;}void Stop(){std::unique_lock<std::mutex> locker(m_mutex);while (!IsEmpty()){m_notFull.wait(locker);}m_needStop = true;m_notFull.notify_all();m_notEmpty.notify_all();}bool Empty() const{std::unique_lock<std::mutex> locker(m_mutex);return m_queue.empty();}bool Full() const{std::unique_lock<std::mutex> locker(m_mutex);return m_queue.size() >= m_maxSize;}size_t size() const{std::unique_lock<std::mutex> locker(m_mutex);return m_queue.size();}
};#endif
3.3 CachedThreadPool 線程池的設計和實現
#ifndef CACHEDTHREADPOOL_HPP
#define CACHEDTHREADPOOL_HPP#include"SyncQueue3.hpp"
#include<functional>
#include<unordered_map>
#include<map>
#include<future>using namespace std;int MaxTaskCount = 2;
const int KeepAliveTime = 10; //線程最大存活時間 60 ,為測試改為10class CachedThreadPool
{
public:using Task = std::function<void(void)>;private:std::unordered_map<std::thread::id, std::shared_ptr<std::thread>> m_threadgroup;int m_coreThreadSize; // 核心的線程數量,下限閾值 2int m_maxThreadSize; // 最大的線程數量,上限閾值std::atomic_int m_idleThreadSize; // 空閑線程的數量std::atomic_int m_curThreadSize; // 當前線程池里面的線程總數量mutable std::mutex m_mutex; // SyncQueue<Task> m_queue;std::atomic_bool m_running; // true ; false stop;std::once_flag m_flag;void Start(int numthreads){m_running = true;m_curThreadSize = numthreads;for (int i = 0; i < numthreads; ++i){auto tha = std::make_shared<std::thread>(std::thread(&CachedThreadPool::RunInThread, this));std::thread::id tid = tha->get_id();m_threadgroup.emplace(tid, std::move(tha));m_idleThreadSize++;}}void RunInThread(){auto tid = std::this_thread::get_id();auto startTime = std::chrono::high_resolution_clock().now();while (m_running){Task task;if (m_queue.size() == 0 && m_queue.notTask()){auto now = std::chrono::high_resolution_clock().now();auto intervalTime = std::chrono::duration_cast<std::chrono::seconds>(now - startTime);std::lock_guard<std::mutex> lock(m_mutex);if (intervalTime.count() >= KeepAliveTime &&m_curThreadSize > m_coreThreadSize){m_threadgroup.find(tid)->second->detach();m_threadgroup.erase(tid);m_curThreadSize--;m_idleThreadSize--;cout << "空閑線程銷毀" << m_curThreadSize << " " << m_coreThreadSize << endl;return;}}if (!m_queue.Take(task) && m_running){m_idleThreadSize--;task();m_idleThreadSize++;startTime = std::chrono::high_resolution_clock().now();}}}void StopThreadGroup(){m_queue.Stop();m_running = false;for (auto& thread : m_threadgroup){thread.second->join();}m_threadgroup.clear();}public:CachedThreadPool(int initNumThreads = 8, int taskPoolSize = MaxTaskCount):m_coreThreadSize(initNumThreads),m_maxThreadSize(2 * std::thread::hardware_concurrency() + 1),m_idleThreadSize(0),m_curThreadSize(0),m_queue(taskPoolSize),m_running(false){Start(m_coreThreadSize);}~CachedThreadPool(){StopThreadGroup();}template<class Func, class... Args>auto submit(Func&& func, Args&&... args) -> std::future<decltype(func(args...))>{auto task = std::make_shared<std::packaged_task<decltype(func(args...))()>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));std::future<decltype(func(args...))> result = task->get_future();if (m_queue.Put([task]() { (*task)(); }) != 0){cout << "調用者運行策略" << endl;(*task)();}if (m_idleThreadSize <= 0 && m_curThreadSize < m_maxThreadSize){std::lock_guard<std::mutex> lock(m_mutex);auto tha = std::make_shared<std::thread>(std::thread(&CachedThreadPool::RunInThread, this));std::thread::id tid = tha->get_id();m_threadgroup.emplace(tid, std::move(tha));m_curThreadSize++;m_idleThreadSize++;}return result;}template<class Func, class... Args>void execute(Func&& func, Args&&... args){submit(std::forward<Func>(func), std::forward<Args>(args)...);}
};#endif
3.4 測試
///
void func(int index)
{static int num = 0;cout << "func_" << index << " num: " << ++num << endl;
}int add(int a, int b)
{return a + b;
}int main()
{CachedThreadPool mypool;for (int i = 0; i < 1000; ++i){if (i % 2 == 0){auto pa = mypool.submit(add, i, i + 1);cout << pa.get() << endl;}else{mypool.execute(func, i);}}CachedThreadPool pool(2);int add(int a, int b, int s){std::this_thread::sleep_for(std::chrono::seconds(s));int c = a + b;cout << "add begin ..." << endl;return c;}void add_a(){auto r = pool.submit(add, 10, 20, 4);cout << "add_a: " << r.get() << endl;}void add_b(){auto r = pool.submit(add, 20, 30, 6);cout << "add_b: " << r.get() << endl;}void add_c(){auto r = pool.submit(add, 30, 40, 1);cout << "add_c: " << r.get() << endl;}void add_d(){auto r = pool.submit(add, 10, 40, 9);cout << "add_d: " << r.get() << endl;}int main(){std::thread tha(add_a);std::thread thb(add_b);std::thread thc(add_c);std::thread thd(add_d);tha.join();thb.join();thc.join();thd.join();std::this_thread::sleep_for(std::chrono::seconds(20));std::thread the(add_a);std::thread thf(add_b);the.join();thf.join();return 0;}
}
3.5 FixedThreadPool 與 CachedThreadPool 特性對比
特性 | FixedThreadPool | CachedThreadPool |
---|---|---|
重用 | 能 reuse 就用,但不能隨時建新的線程 | 先查看池中有無以前建立的線程,有就 reuse;沒有就建新線程加入池中 |
池大小 | 可指定 nThreads,固定數量 | 可增長,最大值 Integer.MAX_VALUE |
隊列大小 | 無限制 | 無限制 |
超時 | 無 IDLE | 默認 60 秒 IDLE |
使用場景 | 針對穩定固定的正規并發線程,用于服務器,執行負載重、CPU 使用率高的任務,防止線程頻繁切換得不償失 | 處理大量短生命周期異步任務,執行大量并發短期異步任務,任務負載要輕 |
結束 | 不會自動銷毀 | 放入的線程超過 TIMEOUT 不活動會自動被終止 |
3.6 最佳實踐
FixedThreadPool 和 CachedThreadPool 對高負載應用都不特別友好,CachedThreadPool 更危險。若應用要求高負載、低延遲,最好不選這兩種,推薦使用 ThreadPoolExecutor ,可進行細粒度控制:
- 將任務隊列設置成有邊界的隊列
- 使用合適的 RejectionHandler 拒絕處理程序
- 若任務完成前后需執行操作,可重載?
beforeExecute(Thread, Runnable)
、afterExecute(Runnable, Throwable)
- 重載 ThreadFactory ,若有線程定制化需求
- 運行時動態控制線程池大小(Dynamic Thread Pool)
3.7 使用場景
適用于以下場景:
- 大量短期任務:適合處理大量短期任務,任務到來時盡可能創建新線程執行,有空閑線程則復用,避免頻繁創建銷毀線程的額外開銷。
- 任務響應快速:可根據任務到來快速創建啟動新線程執行,減少任務等待時間。
- 不需要限制線程數量:最大線程數不限,只要內存足夠,可根據任務動態創建新線程。
- 短期性任務的高并發性:可動態創建線程,適合處理需高并發性的短期任務,任務處理完后保持一定空閑線程用于下一批任務。
需注意,CachedThreadPool 線程數量不受限,任務過多可能導致線程數量過多、系統資源過度消耗,使用時需靈活調整線程數量或用其他線程池控制資源。