C++實現線程池(3)緩存線程池

三. CachedThreadPool 的實現

3.1 需求:

動態調整線程數量:與 FixedThreadPool 不同,CachedThreadPool 的線程數量是動態調整的。當有新任務提交時,如果線程池中有空閑的線程,則會立即使用空閑線程執行任務;如果線程池中沒有空閑線程,則會創建一個新的線程來執行任務。當線程空閑一段時間后,超過一定的時間(默認為 60 秒),會被回收銷毀。

3.2 SyncQueue 同步隊列的設計和實現

#ifndef SYNCQUEUE3_HPP
#define SYNCQUEUE3_HPP#include<list>
#include<mutex>
#include<condition_variable>
#include<iostream>using namespace std;template<class T>
class SyncQueue
{
private:std::list<T> m_queue; // 任務緩沖區mutable std::mutex m_mutex;std::condition_variable m_notEmpty; // Cstd::condition_variable m_notFull;  // Psize_t m_waitTime;                  //任務隊列滿等待時間sint m_maxSize;                      // bool m_needStop;    // true 同步隊列不在接受新的任務//bool IsFull() const{bool full = m_queue.size() >= m_maxSize;if (full){printf("m_queue 已經滿了,需要等待....\n");}return full;}bool IsEmpty() const{bool empty = m_queue.empty(); //if (empty){printf("m_queue 已經空了,需要等待....\n");}return empty;}// return 0; 成功// 1 ;// full// 2 ;// stop;template<class F>int Add(F&& x){std::unique_lock<std::mutex> locker(m_mutex);if (!m_notFull.wait_for(locker, std::chrono::seconds(m_waitTime),[this] { return m_needStop || !IsFull(); })){cout << "task queue full return 1" << endl;return 1;}if (m_needStop){cout << "同步隊列停止工作..." << endl;return 2;}m_queue.push_back(std::forward<F>(x));m_notEmpty.notify_one();return 0;}public:SyncQueue(int maxsize = 100, int timeout = 1):m_maxSize(maxsize),m_waitTime(timeout),m_needStop(false) // 同步隊列開始工作{}int Put(const T& x){return Add(x);}int Put(T&& x){return Add(std::forward<T>(x));}int notTask(){std::unique_lock<std::mutex> locker(m_mutex);if (!m_notEmpty.wait_for(locker, std::chrono::seconds(m_waitTime))== std::cv_status::timeout){return 1;}return 0;}void Take(std::list<T>& list) //{std::unique_lock<std::mutex> locker(m_mutex);while (!m_needStop && IsEmpty()){m_notEmpty.wait(locker);}if (m_needStop){cout << "同步隊列停止工作..." << endl;return;}list = std::move(m_queue);m_notFull.notify_one();}//T& GetTake();// return 0; 成功// 1 ;// empty// 2 ;// stop;int Take(T& t) // 1{std::unique_lock<std::mutex> locker(m_mutex);if (!m_notEmpty.wait_for(locker, std::chrono::seconds(m_waitTime),[this] { return m_needStop || !IsEmpty(); })){return 1;   // 隊列空}if (m_needStop){cout << "同步隊列停止工作..." << endl;return 2;}t = m_queue.front();m_queue.pop_front();m_notFull.notify_one();return 0;}void Stop(){std::unique_lock<std::mutex> locker(m_mutex);while (!IsEmpty()){m_notFull.wait(locker);}m_needStop = true;m_notFull.notify_all();m_notEmpty.notify_all();}bool Empty() const{std::unique_lock<std::mutex> locker(m_mutex);return m_queue.empty();}bool Full() const{std::unique_lock<std::mutex> locker(m_mutex);return m_queue.size() >= m_maxSize;}size_t size() const{std::unique_lock<std::mutex> locker(m_mutex);return m_queue.size();}
};#endif

3.3 CachedThreadPool 線程池的設計和實現

#ifndef CACHEDTHREADPOOL_HPP
#define CACHEDTHREADPOOL_HPP#include"SyncQueue3.hpp"
#include<functional>
#include<unordered_map>
#include<map>
#include<future>using namespace std;int MaxTaskCount = 2;
const int KeepAliveTime = 10;   //線程最大存活時間 60 ,為測試改為10class CachedThreadPool
{
public:using Task = std::function<void(void)>;private:std::unordered_map<std::thread::id, std::shared_ptr<std::thread>> m_threadgroup;int m_coreThreadSize;                 // 核心的線程數量,下限閾值 2int m_maxThreadSize;                  // 最大的線程數量,上限閾值std::atomic_int m_idleThreadSize;     // 空閑線程的數量std::atomic_int m_curThreadSize;      // 當前線程池里面的線程總數量mutable std::mutex m_mutex;           // SyncQueue<Task> m_queue;std::atomic_bool m_running; // true ; false stop;std::once_flag m_flag;void Start(int numthreads){m_running = true;m_curThreadSize = numthreads;for (int i = 0; i < numthreads; ++i){auto tha = std::make_shared<std::thread>(std::thread(&CachedThreadPool::RunInThread, this));std::thread::id tid = tha->get_id();m_threadgroup.emplace(tid, std::move(tha));m_idleThreadSize++;}}void RunInThread(){auto tid = std::this_thread::get_id();auto startTime = std::chrono::high_resolution_clock().now();while (m_running){Task task;if (m_queue.size() == 0 && m_queue.notTask()){auto now = std::chrono::high_resolution_clock().now();auto intervalTime = std::chrono::duration_cast<std::chrono::seconds>(now - startTime);std::lock_guard<std::mutex> lock(m_mutex);if (intervalTime.count() >= KeepAliveTime &&m_curThreadSize > m_coreThreadSize){m_threadgroup.find(tid)->second->detach();m_threadgroup.erase(tid);m_curThreadSize--;m_idleThreadSize--;cout << "空閑線程銷毀" << m_curThreadSize << " " << m_coreThreadSize << endl;return;}}if (!m_queue.Take(task) && m_running){m_idleThreadSize--;task();m_idleThreadSize++;startTime = std::chrono::high_resolution_clock().now();}}}void StopThreadGroup(){m_queue.Stop();m_running = false;for (auto& thread : m_threadgroup){thread.second->join();}m_threadgroup.clear();}public:CachedThreadPool(int initNumThreads = 8, int taskPoolSize = MaxTaskCount):m_coreThreadSize(initNumThreads),m_maxThreadSize(2 * std::thread::hardware_concurrency() + 1),m_idleThreadSize(0),m_curThreadSize(0),m_queue(taskPoolSize),m_running(false){Start(m_coreThreadSize);}~CachedThreadPool(){StopThreadGroup();}template<class Func, class... Args>auto submit(Func&& func, Args&&... args) -> std::future<decltype(func(args...))>{auto task = std::make_shared<std::packaged_task<decltype(func(args...))()>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));std::future<decltype(func(args...))> result = task->get_future();if (m_queue.Put([task]() { (*task)(); }) != 0){cout << "調用者運行策略" << endl;(*task)();}if (m_idleThreadSize <= 0 && m_curThreadSize < m_maxThreadSize){std::lock_guard<std::mutex> lock(m_mutex);auto tha = std::make_shared<std::thread>(std::thread(&CachedThreadPool::RunInThread, this));std::thread::id tid = tha->get_id();m_threadgroup.emplace(tid, std::move(tha));m_curThreadSize++;m_idleThreadSize++;}return result;}template<class Func, class... Args>void execute(Func&& func, Args&&... args){submit(std::forward<Func>(func), std::forward<Args>(args)...);}
};#endif

3.4 測試

///
void func(int index)
{static int num = 0;cout << "func_" << index << " num: " << ++num << endl;
}int add(int a, int b)
{return a + b;
}int main()
{CachedThreadPool mypool;for (int i = 0; i < 1000; ++i){if (i % 2 == 0){auto pa = mypool.submit(add, i, i + 1);cout << pa.get() << endl;}else{mypool.execute(func, i);}}CachedThreadPool pool(2);int add(int a, int b, int s){std::this_thread::sleep_for(std::chrono::seconds(s));int c = a + b;cout << "add begin ..." << endl;return c;}void add_a(){auto r = pool.submit(add, 10, 20, 4);cout << "add_a: " << r.get() << endl;}void add_b(){auto r = pool.submit(add, 20, 30, 6);cout << "add_b: " << r.get() << endl;}void add_c(){auto r = pool.submit(add, 30, 40, 1);cout << "add_c: " << r.get() << endl;}void add_d(){auto r = pool.submit(add, 10, 40, 9);cout << "add_d: " << r.get() << endl;}int main(){std::thread tha(add_a);std::thread thb(add_b);std::thread thc(add_c);std::thread thd(add_d);tha.join();thb.join();thc.join();thd.join();std::this_thread::sleep_for(std::chrono::seconds(20));std::thread the(add_a);std::thread thf(add_b);the.join();thf.join();return 0;}
}

3.5 FixedThreadPool 與 CachedThreadPool 特性對比

特性FixedThreadPoolCachedThreadPool
重用能 reuse 就用,但不能隨時建新的線程先查看池中有無以前建立的線程,有就 reuse;沒有就建新線程加入池中
池大小可指定 nThreads,固定數量可增長,最大值 Integer.MAX_VALUE
隊列大小無限制無限制
超時無 IDLE默認 60 秒 IDLE
使用場景針對穩定固定的正規并發線程,用于服務器,執行負載重、CPU 使用率高的任務,防止線程頻繁切換得不償失處理大量短生命周期異步任務,執行大量并發短期異步任務,任務負載要輕
結束不會自動銷毀放入的線程超過 TIMEOUT 不活動會自動被終止

3.6 最佳實踐

FixedThreadPool 和 CachedThreadPool 對高負載應用都不特別友好,CachedThreadPool 更危險。若應用要求高負載、低延遲,最好不選這兩種,推薦使用 ThreadPoolExecutor ,可進行細粒度控制:

  1. 將任務隊列設置成有邊界的隊列
  2. 使用合適的 RejectionHandler 拒絕處理程序
  3. 若任務完成前后需執行操作,可重載?beforeExecute(Thread, Runnable)afterExecute(Runnable, Throwable)
  4. 重載 ThreadFactory ,若有線程定制化需求
  5. 運行時動態控制線程池大小(Dynamic Thread Pool)

3.7 使用場景

適用于以下場景:

  1. 大量短期任務:適合處理大量短期任務,任務到來時盡可能創建新線程執行,有空閑線程則復用,避免頻繁創建銷毀線程的額外開銷。
  2. 任務響應快速:可根據任務到來快速創建啟動新線程執行,減少任務等待時間。
  3. 不需要限制線程數量:最大線程數不限,只要內存足夠,可根據任務動態創建新線程。
  4. 短期性任務的高并發性:可動態創建線程,適合處理需高并發性的短期任務,任務處理完后保持一定空閑線程用于下一批任務。

需注意,CachedThreadPool 線程數量不受限,任務過多可能導致線程數量過多、系統資源過度消耗,使用時需靈活調整線程數量或用其他線程池控制資源。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/94516.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/94516.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/94516.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

WMS+自動化立庫:無人倉的現在進行時

傳統倉庫正面臨嚴峻挑戰&#xff1a;效率瓶頸日益凸顯&#xff0c;人力成本持續攀升&#xff0c;空間利用率逼近極限&#xff0c;而訂單響應速度卻難以滿足市場需求。如何破局&#xff1f;WMS&#xff08;倉庫管理系統&#xff09;與自動化立體庫&#xff08;AS/RS&#xff09;…

多模態大模型研究每日簡報【2025-08-05】

訓練數據相關 EditGarment: An Instruction-Based Garment Editing Dataset Constructed with Automated MLLM Synthesis and Semantic-Aware Evaluation (https://arxiv.org/abs/2508.03497)&#xff1a;提出了一種自動化的流程&#xff0c;用于構建服裝編輯數據集EditGarmen…

4、docker數據卷管理命令 | docker volume

1、命令總覽命令作用出現頻率備注★ docker volume create新建卷高-d 指定驅動&#xff0c;-o 指定驅動選項★ docker volume ls列出卷高--filter danglingtrue 查孤兒卷★ docker volume inspect查看卷詳情高輸出 JSON&#xff0c;可加 --format★ docker volume rm刪除卷高只…

計數組合學7.14(對偶 RSK 算法)

7.14 對偶 RSK 算法 存在 RSK 算法的一種變體&#xff0c;其與乘積 ∏i,j(1xiyj)\prod_{i,j}(1 x_{i}y_{j})∏i,j?(1xi?yj?) 的關系類似于 RSK 算法本身與 ∏i,j(1?xiyj)?1\prod_{i,j}(1 - x_{i}y_{j})^{-1}∏i,j?(1?xi?yj?)?1 的關系。我們稱此變體為對偶 RSK 算法…

C語言中的進程、線程與進程間通信詳解

目錄 引言 基本概念 1. 進程&#xff08;Process&#xff09; 2. 線程&#xff08;Thread&#xff09; 線程編程實戰 1. 常見線程庫 2. 合理設置線程數 3. pthread 創建線程 線程同步機制 1. 互斥鎖 pthread_mutex_t 2. 條件變量 pthread_cond_t 3. 讀寫鎖 pthread…

[假面騎士] 555淺談

假面騎士555(faiz)是我最先接觸的一部平成系列的假面騎士&#xff0c;同時也是我個人最喜歡的一部假面騎士。一、大綱簡介震驚&#xff0c;人類最新的進化形態——奧菲一諾&#xff0c;橫空出世&#xff01;日本的頂級財團&#xff0c;Smart Brain&#xff0c;的前任社長&#…

Vue Router 路由的創建和基本使用(超詳細)

一、路由的基本概念 你是否好奇單頁應用&#xff08;SPA&#xff09;是如何在不刷新頁面的情況下實現頁面切換的&#xff1f;這就離不開路由的功勞。 路由&#xff1a;本質是一組 key-value 的對應關系&#xff0c;在前端領域中&#xff0c;key 通常是路徑&#xff0c;value …

深入理解設計模式:策略模式的藝術與實踐

在軟件開發中&#xff0c;我們經常會遇到需要根據不同情況選擇不同算法或行為的場景。傳統的做法可能是使用大量的條件語句&#xff08;if-else或switch-case&#xff09;&#xff0c;但隨著需求的增加和變化&#xff0c;這種硬編碼的方式會導致代碼難以維護和擴展。策略模式&a…

概率/期望 DP llya and Escalator

題目鏈接&#xff1a;Problem - D - Codeforces 看了這篇文章來的&#xff1a;【算法學習筆記】概率與期望DP - RioTian - 博客園 這篇博客寫得挺好的&#xff0c;講了一些常見方法&#xff0c;概率 / 期望的題多練練就上手了。 題目大意&#xff1a; n 個人排隊上電梯&…

大陸電子MBDS開發平臺轉到其他國產控制器平臺產生的問題記錄

u8_StComLowSpdGearSwt變量為例&#xff0c;之前用的時候只有輸入&#xff0c;沒什么實際意義&#xff0c;導致新環境下編譯報錯&#xff0c;缺少聲明&#xff0c;解決辦法&#xff1a;注釋掉輸入模塊。今天解決的另一個比較大的問題&#xff0c;不同模型函數公用函數模塊生成代…

機器學習模型調優實戰指南

文章目錄模型選擇與調優&#xff1a;從理論到實戰1. 引言2. 模型評估&#xff1a;為選擇提供依據2.1 偏差-方差權衡2.2 數據集劃分與分層抽樣2.3 交叉驗證&#xff08;Cross-Validation&#xff09;2.4 信息準則&#xff08;AIC / BIC&#xff09;3. 超參數調優&#xff1a;讓模…

【教程】Unity CI/CD流程

測試機&#xff1a;紅帽 Linux8 源碼倉庫&#xff1a;Gitee - MrRiver/Unity Example ? 系統環境準備 1&#xff09;yum 源 sudo curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-8.repo sudo sed -i s/\$releasever/8/g /etc/yum.repos…

文獻閱讀 | Briefings in Bioinformatics | Hiplot:全面且易于使用的生物醫學可視化分析平臺

文獻介紹文獻題目&#xff1a; Hiplot&#xff1a;一個綜合且易于使用的 Web 服務&#xff0c;用于增強出版物準備的生物醫學數據可視化 研究團隊&#xff1a; Openbiox/Hiplot 社區 發表時間&#xff1a; 2022-07-05 發表期刊&#xff1a; Briefings in Bioinformatics 影響因…

【數字圖像處理系列筆記】Ch04:灰度變換與空間域圖像增強(2)

目錄 一、空域濾波基礎 一、空域濾波的基本概念 二、空域濾波的數學原理 三、空域濾波器的分類與典型示例 &#xff08;一&#xff09;線性濾波器&#xff08;Linear Filter&#xff09; &#xff08;二&#xff09;非線性濾波器&#xff08;Non-linear Filter&#xff0…

AI浪潮下,FPGA如何實現自我重塑與行業變革

引言&#xff1a;AI 與 FPGA&#xff0c;新時代的碰撞 2025 年&#xff0c;人工智能技術迎來爆發式增長&#xff0c;大模型、生成式 AI 和多模態技術持續突破&#xff0c;人形機器人量產元年正式開啟&#xff0c;自動駕駛商業化進程加速&#xff0c;工業數字化轉型全面鋪開(1)…

系統集成項目管理工程師【第十一章 規劃過程組】定義范圍、創建WBS、規劃進度管理和定義活動篇

系統集成項目管理工程師【第十一章 規劃過程組】定義范圍、創建WBS、規劃進度管理和定義活動篇 一、定義范圍&#xff1a;給項目畫好"邊界線" 定義范圍是明確項目和產品"做什么、不做什么"的過程&#xff0c;直接影響后續所有工作的方向。 1. 核心概念與作…

Spring Boot 參數校驗全指南

Spring Boot 參數校驗全指南 在 Web 開發中&#xff0c;參數校驗是保障接口安全性和數據合法性的關鍵環節。手動編寫校驗邏輯不僅繁瑣&#xff0c;還容易遺漏邊界情況。Spring Boot 整合了 validation 工具&#xff0c;提供了一套簡潔高效的參數校驗方案&#xff0c;可快速實現…

常用技術資料鏈接

1.team技術 https://zhuanlan.zhihu.com/p/11389323664 https://blog.csdn.net/Lucky_Lu0/article/details/121697151 2.bond切換主備 https://www.xgss.net/3306.html 3.ssh詳解&#xff1a; https://cloud.tencent.com/developer/news/105165 https://blog.huochengrm.c…

【Spring Cloud】-- 注冊中心

文章目錄1. 什么是注冊中心2. CPA理論1. 什么是注冊中心 注冊中心有三種角色&#xff1a; 服務提供者&#xff08;Server&#xff09; &#xff1a;提供接口給其他微服務的程序。服務消費者&#xff08;Client&#xff09;&#xff1a;調用其他微服務提供的接口。**服務注冊中…

go-zero 詳解

go-zero 詳解 go-zero 是一個基于 Go 語言的微服務框架&#xff0c;由字節跳動團隊開發并開源&#xff0c;旨在幫助開發者快速構建高可用、高性能的微服務架構。它集成了豐富的組件&#xff0c;簡化了微服務開發中的常見問題&#xff08;如服務注冊發現、配置管理、限流熔斷等&…