🔥個人主頁🔥:孤寂大仙V
🌈收錄專欄🌈:Linux
🌹往期回顧🌹: 【Linux筆記】——簡單實習一個日志項目
🔖流水不爭,爭的是滔滔不息
- 一、線程池設計
- 二、線程池代碼
- 三、線程安全的單例模式
- 四、線程安全和重入問題
一、線程池設計
線程池
一種線程使用模式。線程過多會帶來調度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著監督管理者分配可并發執行的任務。這避免了在處理短時間任務時創建與銷毀線程的代價。線程池不僅能夠保證內核的充分利用,還能防止過分調度,可用線程數量應該取決于可用的并發處理器、處理器內核、內存、網絡sockets等的數量。線程池的主要優點是減少在創建和銷毀線程上所花的時間以及系統資源的開銷。通過重用已存在的線程,線程池可以顯著提高系統性能,特別是在需要處理大量短生命周期任務的場景中。
使用場景
需要大量的線程來完成任務,且完成任務的時間比較短。比如WEB服務器完成網頁請求這樣的任務,使用線程池技術是非常合適的。因為單個任務小,而任務數量巨大,你可以想象一個熱門網站的點擊次數。 但對于長時間的任務,比如一個Telnet連接請求,線程池的優點就不明顯了。因為Telnet會話時間比線程的創建時間大多了。
對性能要求苛刻的應用,比如要求服務器迅速響應客戶的請求。
接受突發性的大量請求,但不至于使服務器因此產生大量線程的應用。突發性大量客戶請求,在沒有線程池情況下,將產生大量線程,雖然理論上大部分操作系統線程數目最大值不是問題,短時間內產生大量線程可能使內存到達極限,出現錯誤。
二、線程池代碼
ThreadPool.hpp線程池主體邏輯
#pragma once#include <iostream>
#include <vector>
#include <queue>
#include <functional>
#include "Log.hpp"
#include "Thread.hpp"
#include "Mutex.hpp"
#include "Cond.hpp"
using namespace std;
using namespace MutexModule;
using namespace CondModule;
using namespace ThreadModule;
using namespace LogModule;const int gnum=5;
namespace ThreadPoolModule
{template<class T>class ThreadPool{public:ThreadPool(int num=gnum):_num(num),_isrunning(false),_sleepnum(0){for(int i=0;i<_num;i++){_thread.emplace_back([this](){HandlerTask();});}}void Threadone(){_cond.Signal();LOG(LogLevel::INFO) << "喚醒一個休眠線程";}void Threadall(){LockGuard lockguard (_mutex);if(_sleepnum>0){_cond.Broadcast();}LOG(LogLevel :: INFO)<<"喚醒所有休眠線程";}void Start(){if(_isrunning) return;_isrunning=true;for(auto &thread :_thread){thread.Start();}LOG(LogLevel :: INFO)<<"開始創建線程池";}void Stop(){if(!_isrunning) return;_isrunning =false;Threadall();//讓等待的進程全部啟動}void Join(){for(auto &thread :_thread){thread.Join();}LOG(LogLevel :: INFO)<<"線程回收";}void HandlerTask(){char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));while(true){T t;{LockGuard lockguard (_mutex);if(_taskq.empty() && _isrunning) //把全部休眠的線程啟動,必須保證進程池已經退出狀態,要不就陷入死循環{_sleepnum++;//如果等待計數++_cond.Wait(_mutex);_sleepnum--;//退出等待計數--}if(_taskq.empty() && !_isrunning) //等待后喚醒,必須是任務隊列為空 進程池退出{LOG(LogLevel :: INFO)<<name<<"退出了,任務隊列為空,進程池退出";break;}t=_taskq.front();_taskq.pop();}t();//執行任務}}bool Enqueue (const T& in){if(_isrunning){LockGuard LockGuard (_mutex);_taskq.push(in);if(_sleepnum==_thread.size()){Threadone();}return true;}return false; }~ThreadPool(){}private:vector<Thread> _thread;queue<T> _taskq;int _num;Mutex _mutex;Cond _cond;bool _isrunning;int _sleepnum;};
}
這里用到了之前封裝好的線程、條件變量互斥與同步、日志。
私有成員變量
_thread我們用vector數組充當線程池,_taskq任務隊列用的是queue隊列,_num是線程池中的線程的個數(我們寫的是固定線程的線程池),_isrunning判斷線程是否運行,_sleepnum是線程等待的個數。
ThreadPool(int num=gnum):_num(num),_isrunning(false),_sleepnum(0){for(int i=0;i<_num;i++){_thread.emplace_back([this](){HandlerTask();});}}void HandlerTask(){char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));while(true){T t;{LockGuard lockguard (_mutex);if(_taskq.empty() && _isrunning) //把全部休眠的線程啟動,必須保證進程池已經退出狀態,要不就陷入死循環{_sleepnum++;//如果等待計數++_cond.Wait(_mutex);_sleepnum--;//退出等待計數--}if(_taskq.empty() && !_isrunning) //等待后喚醒,必須是任務隊列為空 進程池退出{LOG(LogLevel :: INFO)<<name<<"退出了,任務隊列為空,進程池退出";break;}t=_taskq.front();_taskq.pop();}t();//執行任務}
上面的代碼是構造函數構造線程池,這里主要闡述,構造函數創建線程池與線程去執行任務的函數的關系。
構造函數通過一個for循環,我們創建了_num個Thread對象,每個對象都綁定一個lambda,lambda里面調用的是線程池的成員函數HandlerTask()(在類內調用類內成員函數用lambda),這些lambda是“線程的入口函數”,它們一啟動就跑進HandlerTask()中并一直在那里循環干活。
void HandlerTask(){char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));while(true){T t;{LockGuard lockguard (_mutex);if(_taskq.empty() && _isrunning) //把全部休眠的線程啟動,必須保證進程池已經退出狀態,要不就陷入死循環{_sleepnum++;//如果等待計數++_cond.Wait(_mutex);_sleepnum--;//退出等待計數--}if(_taskq.empty() && !_isrunning) //等待后喚醒,必須是任務隊列為空 進程池退出{LOG(LogLevel :: INFO)<<name<<"退出了,任務隊列為空,進程池退出";break;}t=_taskq.front();_taskq.pop();}t();//執行任務}
上面這段代碼。一啟動,線程池的每個線程都會進去,所以加鎖。如果任務隊列是空的并且這個線程池已經啟動了,那么線程就要進入等待隊列(條件變量同步),注意要寫一個計數的變量需要進入等待隊列就要++出來就–(這里+±-就是為了喚醒休眠線程的時候進行判斷,顆粒度更細)。如果任務隊列為空并且線程池也不在運行了,直接break退出回收線程。最后滿足線程能拿到任務,取出隊首的任務,執行任務。
//喚醒單個線程void Threadone(){_cond.Signal();LOG(LogLevel::INFO) << "喚醒一個休眠線程";}//喚醒所有進程void Threadall(){LockGuard lockguard (_mutex);if(_sleepnum>0){_cond.Broadcast();}LOG(LogLevel :: INFO)<<"喚醒所有休眠線程";}//創建線程池void Start(){if(_isrunning) return;_isrunning=true;for(auto &thread :_thread){thread.Start();}LOG(LogLevel :: INFO)<<"開始創建線程池";}//終止線程void Stop(){if(!_isrunning) return;_isrunning =false;Threadall();//讓等待的進程全部啟動}//回收線程void Join(){for(auto &thread :_thread){thread.Join();}LOG(LogLevel :: INFO)<<"線程回收";}//任務隊列中放任務bool Enqueue (const T& in){if(_isrunning){LockGuard LockGuard (_mutex);_taskq.push(in);if(_sleepnum==_thread.size()){Threadone();}return true;}return false; }
喚醒單個線程直接調用之前封裝好的條件變量同步,喚醒所有休眠的線程,如果之前計數的_sleepnum>0就要喚醒所有的休眠線程了。
啟動線程就是創建vector數組里的線程,創建線程池。
進程終止,調用之前封裝的條件變量。
回收線程,調用之前封裝的條件變量。
Enqueue就是往任務隊列里放任務,if(_sleepnum==_thread.size()) 判斷是否所有線程都在休眠,避免喚醒線程池中已經在忙的線程(節省上下文切換開銷)如果不判斷 _sleepnum,直接 Threadone() 會怎樣?可能會導致重復喚醒甚至無意義的上下文切換。比如:有5個線程,其中2個在處理任務,3個在睡覺;你來了個新任務,就喚醒1個線程;但其實可能原本某個線程馬上就處理完會搶新任務;你提前喚醒一個,就多了一次線程上下文切換(白喚醒了)。
main.cc
#include "Log.hpp"
#include "ThreadPool.hpp"
#include "Task.hpp"using namespace LogModule;
using namespace ThreadPoolModule;int main()
{Enable_Console_Log_Strategy();int cnt=5;ThreadPool<task_t>* tp=new ThreadPool <task_t> ();tp->Start();while(cnt){sleep(1);tp->Enqueue(Download);cnt--;}tp->Stop();tp->Join();return 0;
}
Task.hpp
#pragma once
#include <iostream>
#include <functional>
#include "Log.hpp"using namespace LogModule;using task_t = std::function<void()>;void Download()
{LOG(LogLevel::DEBUG) << "我是一個下載任務...";
}
這里的function就簡單寫了一下,沒有放具體的任務,但是要走到這里用的是function的語法
線程池就是通過一個vector數組(這里這么寫的也可以是別的)里面創建線程,其實就是把線程準備好放在vector數組中,任務來了線程直接就能用,大大提高了效率。線程池就是提前創建一批線程放在池子里反復復用,避免任務來了才臨時創建/銷毀線程造成的高開銷。
常規線程池:源碼
三、線程安全的單例模式
單例模式
單例模式是一種設計模式,確保一個類只有一個實例(對象),并提供一個全局訪問點來獲取該實例。這種模式通常用于控制資源的訪問,例如數據庫連接、日志記錄器等,以避免創建多個實例導致資源浪費或沖突。
餓漢模式實現單例
一開始就創建好單例對象。程序一啟動,就創建好對象了,餓的不行
懶漢模式實現單例
什么時候用什么時候創建單例對象。防止創建多個對象。第一次用對象是才創建,是不是很懶。
線程安全的懶漢式單例模式
禁用拷貝構造和賦值重載
ThreadPool(const ThreadPool<T> &) = delete; // 把拷貝構造給禁用 沒辦法直接創建對象
ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; // 把賦值重載禁用
顯示的禁掉拷貝構造和賦值重載,單例模式的最大的特征以及核心就是,保證全程只有一個對象。如果不禁掉拷貝構造和賦值重載就會出現多個對象,完全違背了單例迷失的初衷。
沒有禁用拷貝構造和賦值重載,單例根本不單例。
類外初始化單例指針
template <class T>
class ThreadPool {
public:static ThreadPool<T>* inc;// 其他成員...
};template <class T>
ThreadPool<T>* ThreadPool<T>::inc = nullptr;// 類外初始化template <class T>
Mutex ThreadPool<T>::_lock; // 類外初始化
類內聲明的靜態成員變量,必須類外初始化。 static ThreadPool* inc這個靜態成員變量必須類外初始化。原因是:靜態成員變量屬于類本身,而不是某個對象。類聲明只是告訴編譯器"這里有這么個靜態變量",但不分配內存。只有在類外定義后,編譯器才會給它分配內存空間。
為什么要加static成為靜態成員變量?
創建單例對象
static ThreadPool<T> *GetInstance(){if (inc == nullptr) // 多加一層多一層保護 {LockGuard lockguard(_lock); // 為防止多線程訪問,加鎖if (inc == nullptr) //第一次用這個inc單例指針對象{LOG(LogLevel::DEBUG) << "首次使用單例, 創建之....";inc = new ThreadPool<T>(); //創建單例指針對象}}return inc;}
這里主要就是創建單例指針對象,如果這里不加static如果想調用這個函數是不是就要創建這個類的對象就違背了單例的初衷。所以這里要讓它成為靜態成員函數。這也回答了上面為什么要用static成員變量,在這個靜態成員函數中創建單例指針對象,要使用靜態成員變量。
內層中的if (inc == nullptr)是判斷是否是第一次用這個inc單例指針對象,如果沒有創建單例指針對象。加鎖是為了防止多線程訪問。在外層的if (inc == nullptr),首先不加外層的這個if (inc == nullptr),線程來了訪問互斥鎖,拿到鎖的線程進去創建單例指針對象,那么多線程是不是每次不管是否已經創建了單例指針對象都要拿鎖然后進去轉一圈在返回這個單例指針對象,是不是效率就會降低。加上最外側的if (inc == nullptr) 每次線程來了,如果已經有單例指針對象了就不要進去在轉一圈,直接返回對象就完了,大大提高了效率。
Main.cc
#include "Log.hpp"
#include "ThreadPool.hpp"
#include "Task.hpp"using namespace LogModule;
using namespace ThreadPoolModule;int main()
{Enable_Console_Log_Strategy();//int cnt=5;// ThreadPool<task_t>* tp=new ThreadPool <task_t> ();ThreadPool<task_t> ::GetInstance()->Start();while(cnt){sleep(1);ThreadPool<task_t> ::GetInstance()->Enqueue(Download);cnt--;}ThreadPool<task_t> ::GetInstance()->Stop();ThreadPool<task_t> ::GetInstance()->Join();return 0;
}
單例模式如何訪問類內成員函數呢?不用創建類的對象,直接用作用域解釋符,調用靜態成員函數。這樣獲取進程池的唯一對象,然后通過指針調用方法創建線程池。
懶漢式單例模式線程池
#pragma once#include <iostream>
#include <vector>
#include <queue>
#include <functional>
#include "Log.hpp"
#include "Thread.hpp"
#include "Mutex.hpp"
#include "Cond.hpp"
using namespace std;
using namespace MutexModule;
using namespace CondModule;
using namespace ThreadModule;
using namespace LogModule;const int gnum = 5;
namespace ThreadPoolModule
{template <class T>class ThreadPool{private:ThreadPool(int num = gnum): _num(num), _isrunning(false), _sleepnum(0){for (int i = 0; i < _num; i++){_thread.emplace_back([this](){ HandlerTask(); });}}void Threadone(){_cond.Signal();LOG(LogLevel::INFO) << "喚醒一個休眠線程";}void Threadall(){LockGuard lockguard(_mutex);if (_sleepnum > 0){_cond.Broadcast();}LOG(LogLevel ::INFO) << "喚醒所有休眠線程";}ThreadPool(const ThreadPool<T> &) = delete; // 把拷貝構造給禁用 沒辦法直接創建對象ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; // 把賦值重載禁用public:static ThreadPool<T> *GetInstance(){if (inc == nullptr) // 多加一層多一層保護 {LockGuard lockguard(_lock); // 為防止多線程訪問,加鎖if (inc == nullptr) //第一次用這個inc單例指針對象{LOG(LogLevel::DEBUG) << "首次使用單例, 創建之....";inc = new ThreadPool<T>(); //創建單例指針對象}}return inc;}void Start(){if (_isrunning)return;_isrunning = true;for (auto &thread : _thread){thread.Start();}LOG(LogLevel ::INFO) << "開始創建線程池";}void Stop(){if (!_isrunning)return;_isrunning = false;Threadall(); // 讓等待的進程全部啟動}void Join(){for (auto &thread : _thread){thread.Join();}LOG(LogLevel ::INFO) << "線程回收";}void HandlerTask(){char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));while (true){T t;{LockGuard lockguard(_mutex);if (_taskq.empty() && _isrunning) // 把全部休眠的線程啟動,必須保證進程池已經退出狀態,要不就陷入死循環{_sleepnum++; // 如果等待計數++_cond.Wait(_mutex);_sleepnum--; // 退出等待計數--}if (_taskq.empty() && !_isrunning) // 等待后喚醒,必須是任務隊列為空 進程池退出{LOG(LogLevel ::INFO) << name << "退出了,任務隊列為空,進程池退出";break;}t = _taskq.front();_taskq.pop();}t(); // 執行任務}}bool Enqueue(const T &in){if (_isrunning){LockGuard LockGuard(_mutex);_taskq.push(in);if (_sleepnum == _thread.size()){Threadone();}return true;}return false;}~ThreadPool(){}private:vector<Thread> _thread;queue<T> _taskq;int _num;Mutex _mutex;Cond _cond;bool _isrunning;int _sleepnum;static ThreadPool<T> *inc;static Mutex _lock;};template <class T>ThreadPool<T> *ThreadPool<T>::inc = nullptr; // 類外初始化template <class T>Mutex ThreadPool<T>::_lock; // 類外初始化
}
運行結果
懶漢式單例模式線程池:源碼
四、線程安全和重入問題
線程安全:就是多個線程在訪問共享資源時,能夠正確地執行,不會相互干擾或破壞彼此的執行結果。?般而言,多個線程并發同一段只有局部變量的代碼時,不會出現不同的結果。但是對全局變量或者靜態變量進行操作,并且沒有鎖保護的情況下,容易出現該問題。
重入:同?個函數被不同的執行流調用,當前?個流程還沒有執行完,就有其他的執行流再次進入,我們稱之為重入。?個函數在重?的情況下,運行結果不會出現任何不同或者任何問題,則該函數被稱為可重入函數,否則,是不可重入函數。學到現在,其實我們已經能理解重入其實可以分為兩種情況1.多線程重入函數2.信號導致?個執行流重復進入函數
可重入與線程安全聯系
- 函數是可重入的,那就是線程安全的(其實知道這?句話就夠了)
- 函數是不可重入的,那就不能由多個線程使用,有可能引發線程安全問題
- 如果?個函數中有全局變量,那么這個函數既不是線程安全也不是可重人的。
可重入與線程安全區別
- 可重入函數是線程安全函數的?種
- 線程安全不一定是可重入的,而可重入函數則?定是線程安全的。
- 如果將對臨界資源的訪問加上鎖,則這個函數是線程安全的,但如果這個重入函數若鎖還未釋放則會產生死鎖,因此是不可重入的。
如果不考慮信號導致一個執行流重復進入函數這種重入情況,線程安全和重入在安全角度不做區分。
但是線程安全側重說明線程訪問公共資源的安全情況,表現的是并發線程的特點。
可重入描述的是一個函數是否被重復進入,表示的是函數特點。