線程互斥
競態條件
當多個線程(或進程)并發訪問和操作同一個共享資源(如變量、文件、數據庫記錄等)時,最終的結果依賴于這些線程執行的相對時序(即誰在什么時候執行了哪條指令)。?由于操作系統調度線程執行的順序具有不確定性,這種依賴時序的行為會導致程序的行為變得不可預測、不一致,甚至完全錯誤。這種情形叫競態條件(Race Condition)
為了避免這種情況,需要讓線程互斥地訪問共享資源。
為此,引入了以下概念:
臨界資源:多線程執行流共享的資源就叫做臨界資源
臨界區:每個線程內部,訪問臨界資源的代碼,就叫做臨界區
互斥:任何時刻,互斥保證有且只有一個執行流進入臨界區,訪問臨界資源,通常對臨界資源起保護作用
原子性:不會被任何調度機制打斷的操作,該操作只有兩態,要么完成,要么未完成。單獨的一句匯編語句被認為是原子性的。
互斥量mutex
局部變量在線程的線程棧上,無法被其他線程直接訪問,因此不會產生競態條件,而全局變量被所有線程共享,因此會產生競態條件,我們以一個搶票程序為例,展示通過互斥量來實現線程互斥:
五個線程對共享資源ticket進行--操作,當ticket==0時停止(模擬搶票)
#include<unistd.h>
#include<pthread.h>
#include<stdio.h>int ticket=50;void* GetTicket(void* arg)
{while(true){if(ticket>0){usleep(800);printf("第%lld號線程搶到到第%d張票\n",(long long)arg,ticket);--ticket;}else{break;}}return 0;
}int main()
{pthread_t t1,t2,t3,t4,t5; pthread_create(&t1,NULL,GetTicket,(void*)1);pthread_create(&t2,NULL,GetTicket,(void*)2);pthread_create(&t3,NULL,GetTicket,(void*)3);pthread_create(&t4,NULL,GetTicket,(void*)4);pthread_create(&t5,NULL,GetTicket,(void*)5);pthread_join(t1,NULL);pthread_join(t2,NULL);pthread_join(t3,NULL);pthread_join(t4,NULL);pthread_join(t5,NULL);return 0;
}
可以看到運行結果并不理想:當ticket小于0時還在輸出
其原因在于,if(ticket>0)到打印ticket的值這一段并不是一個原子性操作,當線程在ticket>0時進行了if判斷,隨后可能切換到其他線程執行--ticket操作,等到線程切換回來時就會打印出負數。
概括的講,ticket全局變量作為線程間的共享資源,線程應該互斥地對其進行修改(或者說線程間對其進行的修改操作應該是原子性的),否則就會因代碼執行順序造成各種問題
為了解決這個問題,我們使用互斥量mutex實現線程互斥:
#include<unistd.h>
#include<pthread.h>
#include<stdio.h>int ticket=50;
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;void* GetTicket(void* arg)
{while(true){pthread_mutex_lock(&mutex);if(ticket>0){printf("第%lld號線程搶到到第%d張票\n",(long long)arg,ticket);--ticket;pthread_mutex_unlock(&mutex);}else{pthread_mutex_unlock(&mutex);break;}usleep(1000);}return 0;
}int main()
{pthread_t t1,t2,t3,t4,t5; pthread_create(&t1,NULL,GetTicket,(void*)1);pthread_create(&t2,NULL,GetTicket,(void*)2);pthread_create(&t3,NULL,GetTicket,(void*)3);pthread_create(&t4,NULL,GetTicket,(void*)4);pthread_create(&t5,NULL,GetTicket,(void*)5);pthread_join(t1,NULL);pthread_join(t2,NULL);pthread_join(t3,NULL);pthread_join(t4,NULL);pthread_join(t5,NULL);return 0;
}
搶票程序運行結果正確:
下面正式解釋mutex相關接口和實現原理:
相關接口
Linux下mutex的數據類型為pthread_mutex_t,使用前需要初始化,使用完畢需要銷毀
初始化:
兩種方式:
定義時初始化:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
調用函數完成初始化:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const
pthread_mutexattr_t *restrict attr);
mutex:要初始化的互斥量
attr:設為NULL即可
銷毀:
int pthread_mutex_destroy(pthread_mutex_t *mutex);
注意:
使? PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要銷毀
不要銷毀?個已經加鎖的互斥量
已經銷毀的互斥量,要確保后?不會有線程再嘗試加鎖
互斥量加鎖和解鎖
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
調? pthread_ lock 時,可能有以下兩種情況:
情況1:互斥量處于未鎖狀態,該函數會將互斥量鎖定,繼續執行后面的代碼
情況2:其他線程已經鎖定互斥量,或者存在其他線程同時申請互斥量,但沒有競爭到互斥量,那么線程會陷?阻塞狀態,等待互斥量解鎖后再次試圖申請互斥量。
總結使用mutex的方法:
首先定義一個能被多線程共享的mutex變量(全局變量或靜態變量),當訪問共享資源時加鎖,訪問結束后解鎖
實現原理
了解了mutex的使用方法后,容易產生一個疑惑,對于全局變量這種線程間共享資源,我們需要定義一個mutex對其進行保護,實現原子性的訪問,可是mutex本身同樣是一個全局變量,多線程同樣要對其進行共享訪問,這就意味著其加鎖解鎖操作本身也必須是原子性的(不會被操作系統的調度機制打斷),而這又是怎么實現的?
這就需要探究其實現原理:
可以看到,lock函數首先將0賦值給了一個寄存器,該寄存器內容屬于線程上下文,不會被其他線程訪問到,隨后執行xchgb操作,該操作是一個硬件指令,含義為交換后面的兩個操作數,也就是交換寄存器中的值(0)和mutex(mutex是一個共享內容),若寄存器中的內容>0則執行完畢,否則將線程掛起等待,到被喚醒時再次回到lock開頭。
整個lock函數每次執行只訪問了一次mutex,因此是原子性的。
同理unlock中也只訪問了一次mutex,因此是原子性的
總的來說,lock和unlock的實現思路就是只訪問一次內存中的mutex變量,其余操作(如判斷)則通過寄存器進行,而寄存器屬于線程上下文,不會受到線程調度的影響,因此整體來看lock和unlock函數是原子性的
而從線程lock函數后到unlock函數之前,這期間該線程不會訪問mutex,此時當其他線程執行lock函數時,只能將其賦值為0(此前mutex已經是0),相當于其他線程對mutex是只讀不寫的,這樣總體看來,在一個線程執行lock函數到unlock函數期間,沒有任何一個線程對mutex進行修改操作,這樣整個操作就是原子性的了
RALL風格的鎖
在理解了mutex的原理后,我們在使用鎖時可能會覺得操作太繁瑣,要手動的初始化,銷毀,加鎖,解鎖。有沒有一種方式,使得我們不用手動的初始化和銷毀;只需要手動加鎖,即可自動解鎖呢。
這就不得不提的RAII風格(獲取即是初始化)了。其實現思路在于通過管理對象的生命周期來完成相應的操作。
這里給出RAII風格的互斥鎖的實現:
LockGuard.h
#pragma once
#include<pthread.h>class Mutex
{
public:Mutex(const Mutex&)=delete;const Mutex& operator=(const Mutex&)=delete;Mutex(){pthread_mutex_init(&_mutex,NULL);}~Mutex(){pthread_mutex_destroy(&_mutex);}void Lock(){pthread_mutex_lock(&_mutex);}void Unlock(){pthread_mutex_unlock(&_mutex);}pthread_mutex_t* GetMutex(){return &_mutex;}
private:pthread_mutex_t _mutex;
};class LockGuard
{
public:LockGuard(Mutex& mutex):_mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}
private:Mutex& _mutex;
};
使用時,先定義mutex,加鎖時定義Lockguard對象,該對象析構時解鎖
線程同步
我們已經了解了線程如何互斥地運行,但很多時候,線程的運行不僅應該是互斥地,還應符合一定的先后順序,這就是線程同步。
實現線程同步有兩種方式:使用條件變量/信號量,為了更好的展示其作用,我們稍后會引入一個具體場景
條件變量cond
條件變量的數據類型為pthread_cond_t,與mutex一樣需要進行初始化和銷毀
初始化:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
或者:
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t
*restrict attr);
cond:要初始化的條件變量
attr:設為NULL即可
銷毀:
int pthread_cond_destroy(pthread_cond_t *cond)
等待:
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict
mutex);
?cond:要在這個條件變量上等待
mutex:等待期間釋放該互斥量,恢復后嘗試獲取該互斥量
調用該函數會釋放指定的互斥鎖,并使當前線程阻塞,直到其他線程通過 pthread_cond_signal 或pthread_cond_broadcast函數喚醒。當線程喚醒后,pthread_cond_wait函數會再次獲取互斥鎖。 ? ? ??
喚醒該條件變量上的所有線程/一個線程:
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
條件變量簡單封裝
#pragma once
#include"LockGuard.h"class Cond
{
public:Cond(){pthread_cond_init(&_cond,NULL);}~Cond(){pthread_cond_destroy(&_cond);}void Wait(Mutex& mutex){pthread_cond_wait(&_cond,mutex.GetMutex());}void Notify(){pthread_cond_signal(&_cond);}void NotifyAll(){pthread_cond_broadcast(&_cond);}
private:pthread_cond_t _cond;
};
信號量
依賴頭文件:
#include<semaphore.h>
信號量數據類型為sem_t,同樣需要初始化和銷毀:
初始化
int sem_init(sem_t *sem, int pshared, unsigned int value);
pshared:0表?線程間共享,?零表?進程間共享
value:信號量初始值
銷毀
int sem_destroy(sem_t *sem);
等待信號量,會將信號量的值減1當信號量為0時陷入阻塞狀態,直到信號量大于0時恢復并再次嘗試對信號量減一
int sem_wait(sem_t *sem);
發布信號量,將信號量值加1。
int sem_post(sem_t *sem);
生產者與消費者模型
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,緩解了生產者和消費者忙先不均的問題,具體規則如下:
生產者向阻塞隊列里放數據,消費者從阻塞隊列中取數據
如果緩沖區已經滿了,則生產者線程阻塞;
如果緩沖區為空,那么消費者線程阻塞。
思考一下,阻塞隊列作為一個臨界資源,被生產者線程和消費者線程共享,因此需要互斥訪問,但同時消費者和生產者還需要按照一定的順序來訪問:隊列為空則只能生產者訪問,隊列未滿則只能消費者訪問,這就需要線程同步了。
下面我們分別用條件變量和信號量來實現生產者消費者模型:
條件變量+互斥鎖
#include<pthread.h>
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<queue>int count=0;template<class T>
class BlockQueue
{
private:BlockQueue(int cap=defaultnum):_cap(cap){pthread_mutex_init(&mutex,NULL);pthread_cond_init(&cond_c,NULL);pthread_cond_init(&cond_p,NULL);}
public:~BlockQueue(){pthread_mutex_destroy(&mutex);pthread_cond_destroy(&cond_c);pthread_cond_destroy(&cond_p);}static BlockQueue& instance(){static BlockQueue bq;return bq;}void Push(const T& in){pthread_mutex_lock(&mutex);while(isfull()){pthread_cond_wait(&cond_p,&mutex);}_q.emplace(in);printf("生產數據%c剩余%ld個\n",in,_q.size());pthread_cond_broadcast(&cond_c); pthread_mutex_unlock(&mutex); }void Pop(T& out){pthread_mutex_lock(&mutex);while(isempty()){pthread_cond_wait(&cond_c,&mutex);}out=_q.front();_q.pop();printf("消費數據%c剩余%ld個\n",out,_q.size());pthread_cond_broadcast(&cond_p);pthread_mutex_unlock(&mutex); }bool isfull()const {return _q.size()>=_cap;}bool isempty()const {return _q.size()<=0;}
private:static const int defaultnum=10;std::queue<T> _q;int _cap;pthread_mutex_t mutex;pthread_cond_t cond_c;pthread_cond_t cond_p;
};#define Instance() BlockQueue<char>::instance()void* Producer(void* arg)
{while(1){Instance().Push((char)(rand()%26+'a'));sleep(1);}return nullptr;
}void* Consumer(void* arg)
{while(1){char tmp;Instance().Pop(tmp);sleep(2);}return nullptr;
}int main()
{srand(time(NULL));pthread_t p[3],c[5];for(int i=0;i<3;++i){pthread_create(p+i,NULL,Producer,NULL);}for(int i=0;i<5;++i){pthread_create(c+i,NULL,Consumer,NULL);}for(int i=0;i<3;++i){pthread_join(p[i],NULL);}for(int i=0;i<5;++i){pthread_join(c[i],NULL);}return 0;
}
信號量+互斥鎖
#include<pthread.h>
#include<semaphore.h>
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<queue>int count=0;template<class T>
class BlockQueue
{
private:BlockQueue(int cap=defaultnum):_cap(cap){pthread_mutex_init(&mutex,NULL);sem_init(&full,0,0);sem_init(&empty,0,10);}
public:~BlockQueue(){pthread_mutex_destroy(&mutex);sem_destroy(&full);sem_destroy(&empty);}static BlockQueue& instance(){static BlockQueue bq;return bq;}void Push(const T& in){sem_wait(&empty);pthread_mutex_lock(&mutex);_q.emplace(in);printf("生產數據%c剩余%ld個\n",in,_q.size()); pthread_mutex_unlock(&mutex); sem_post(&full);}void Pop(T& out){sem_wait(&full);pthread_mutex_lock(&mutex);out=_q.front();_q.pop();printf("消費數據%c剩余%ld個\n",out,_q.size());pthread_mutex_unlock(&mutex); sem_post(&empty);}bool isfull()const {return _q.size()>=_cap;}bool isempty()const {return _q.size()<=0;}
private:static const int defaultnum=10;std::queue<T> _q;int _cap;pthread_mutex_t mutex;sem_t full;sem_t empty;
};#define Instance() BlockQueue<char>::instance()void* Producer(void* arg)
{while(1){Instance().Push((char)(rand()%26+'a'));sleep(1);}return nullptr;
}void* Consumer(void* arg)
{while(1){char tmp;Instance().Pop(tmp);sleep(2);}return nullptr;
}int main()
{srand(time(NULL));pthread_t p[3],c[5];for(int i=0;i<3;++i){pthread_create(p+i,NULL,Producer,NULL);}for(int i=0;i<5;++i){pthread_create(c+i,NULL,Consumer,NULL);}for(int i=0;i<3;++i){pthread_join(p[i],NULL);}for(int i=0;i<5;++i){pthread_join(c[i],NULL);}return 0;
}
線程池
進行了線程互斥和線程同步的基本實踐后,我們接下來設計一個線程池
該線程池支持日志,工作模式為:向任務隊列注入任務->喚醒線程池中的線程->執行任務
(其實也是一個生產者消費者模型),下面進行分段設計,并引入一些設計模式
日志
計算機中的日志是記錄系統和軟件運行中發生事件的文件,主要作用是監控運行狀態、記錄一場信息,幫助快速定位問題并支持程序員進行問題修復。它是系統維護、故障排查和安全管理的重要工具。
日志內容包括:
時間戳
日志等級
日志內容
文件名行號
進程,線程相關id信息等。
這里,我們規定日志格式為:
[時間戳][日志等級][進程id][文件名][行號]-支持可變參數的消息內容
同時,我們還希望提供兩種輸出日志的方案:向控制臺輸出和向指定文件輸出
對此我們通過策略模式來實現:
創建一個策略基類,并基于此創建策略派生類,當要調用指定策略的接口時,就用智能指針創建該對象并調用相應接口
Log.h:
#pragma once
#include<unistd.h>
#include<ctime>
#include<string>
#include<memory>
#include<filesystem>
#include<fstream>
#include<iostream>
#include"LockGuard.h"enum class LogLevel
{DEBUG,INFO,WARNING,ERROR,FATAL,
};class LogStrategy
{
public:LogStrategy()=default;virtual ~LogStrategy()=default;virtual void SyncLog(const std::string &message)=0;
};class ConsoleLogStrategy:public LogStrategy
{
public:~ConsoleLogStrategy(){}void SyncLog(const std::string &message){std::cerr<<message<<'\n';}
};class FileLogStrategy:public LogStrategy
{
public:FileLogStrategy(const std::string& dir,const std::string& filename):_dir_path_name(dir),_file_name(filename){try{std::filesystem::create_directory(_dir_path_name);}catch(const std::exception& e){std::cerr << e.what() << '\n';}}~FileLogStrategy(){}void SyncLog(const std::string &message){std::string target=_dir_path_name+'/'+_file_name;std::ofstream out(target.c_str(),std::ios::app);if(!out.is_open())return;out<<message<<'\n';out.close();}
private:std::string _dir_path_name;std::string _file_name;
};class Logger
{
private:class LogMessage{public:LogMessage(LogLevel type,const std::string& file_name,int line,Logger& logger):_current_time(GetCurrentTime()),_type(type),_pid(getpid()),_file_name(file_name),_line(line),_logger(logger){std::stringstream stringbuffer;stringbuffer<<"["<<_current_time<<"]"<<"["<<LogLevelToString(_type)<<"]"<<"["<<_pid<<"]"<<"["<<_file_name<<"]"<<"["<<_line<<"]"<<"-";_loginfo=stringbuffer.str();}~LogMessage(){LockGuard lockguard(_logger._sync_lock);if(_logger._strategy)_logger._strategy->SyncLog(_loginfo);}template<class T>LogMessage& operator<<(const T& info){std::stringstream stringbuffer;stringbuffer<<info;_loginfo+=stringbuffer.str();return *this;}private:std::string _current_time;LogLevel _type;pid_t _pid;std::string _file_name;int _line;std::string _loginfo;Logger& _logger;};
public:Logger()=default;~Logger()=default;void UseConsoleLogStrategy(){if(dynamic_cast<ConsoleLogStrategy*>(_strategy.get())!=nullptr)return;_strategy=std::make_unique<ConsoleLogStrategy>();}void UseFileLogStrategy(const std::string& dir,const std::string& file_name){if(dynamic_cast<FileLogStrategy*>(_strategy.get())!=nullptr)return;_strategy=std::make_unique<FileLogStrategy>(dir,file_name);}LogMessage operator()(LogLevel type,const std::string& file_name,int line){return LogMessage(type,file_name,line,*this);}
private:static std::string LogLevelToString(LogLevel level){switch(level){case LogLevel::DEBUG:return "Debug";case LogLevel::INFO:return "Info";case LogLevel::WARNING:return "Warning";case LogLevel::ERROR:return "Error";case LogLevel::FATAL:return "Fatal";default:return "None";}}static std::string GetCurrentTime(){time_t current_time=time(NULL);struct tm current_tm;localtime_r(¤t_time,¤t_tm);char timebuffer[64];snprintf(timebuffer,sizeof(timebuffer),"%4d-%02d-%02d-%02d-%02d-%02d",current_tm.tm_year+1900,current_tm.tm_mon+1,current_tm.tm_mday,current_tm.tm_hour,current_tm.tm_min,current_tm.tm_sec);return timebuffer;}
private:std::unique_ptr<LogStrategy> _strategy=std::make_unique<ConsoleLogStrategy>();Mutex _sync_lock;
};static Logger logger;#define LOG(type) logger(type,__FILE__,__LINE__)
inline void ENABLE_CONSOLE_LOG_STRATEGY()
{logger.UseConsoleLogStrategy();
}
inline void ENABLE_FILE_LOG_STRATEGY(const std::string& dir=std::filesystem::current_path(),const std::string& file_name="log.txt")
{logger.UseFileLogStrategy(dir,file_name);
}
執行測試程序,得到結果:
#include"Log.h"int main()
{LOG(LogLevel::INFO)<<"hello,world"<<'-'<<123456<<'-'<<3.14<<'\n';return 0;
}
單例模式
下面我們正式設計線程池,為了方便線程池接口的調用,我們提供了一個接口,返回一個靜態線程池對象的引用:
static ThreadPool& instance(){static ThreadPool instance;return instance;}
同時,將線程池的構造函數設為private,這樣就不能外部構造線程池對象了,確保一個進程最多只有一個線程池對象,當然單例模式還有其他實現方式,比如這種:
class ThreadPool
{
private:static ThreadPool* p;
public:static ThreadPool* GetInstance() {if(p==nullptr)p=new ThreadPool();return p;}
};
但這種實現存在嚴重的線程安全問題,簡單的來說就是,線程池指針本身也是一個共享資源,如果不加鎖保護,可能new多個threadpool對象,造成內存泄漏,當然可以直接加鎖解決,但由于調用任何線程池的接口都要先調用該接口獲取線程池對象,因此該接口應設計的盡可能高效,由此得到以下版本:
只需在第一次判斷指針為空時加鎖new線程池對象,其余時刻只需一次判斷,無需加鎖。
class ThreadPool
{
private:static ThreadPool* inst;static std::mutex lock;
public:static ThreadPool* GetInstance() {if (inst == NULL) {lock.lock();if (inst == NULL) {inst = new ThreadPool();}lock.unlock();}return inst;}
};
看起來十分美好,但實際上由于編譯器優化原因,很多時候編譯結果并不是所見即所得,對于inst這種頻繁訪問的指針,在編譯器優化時可能會將其放在寄存器中,每次直接從寄存器中讀取,而不是內存。
而由于寄存器屬于線程上下文,一個線程new了一個對象,修改了寄存器,并不會影響其他線程,這樣其他線程也會new對象,造成內存泄漏。
為了解決此問題,我們又不得不用volatile關鍵字來禁止編譯器對該變量進行優化,然而我們即使避開了寄存器,還有高速緩存造成的緩存一致性問題,這個問題與上述問題類似,同樣是不太好解決的,事實上,這種雙重檢定的模式(DCLP)無論如何都存在安全風險,已經被淘汰使用。對于實踐而言,用第一種實現是最好的。
?
class ThreadPool
{
private:volatile static ThreadPool* inst;static std::mutex lock;
public:static ThreadPool* GetInstance() {if (inst == NULL) {lock.lock();if (inst == NULL) {inst = new ThreadPool();}lock.unlock();}return inst;}
};?
最后我們給出線程池的全部代碼:
Thread.h
#pragma once
#include<unistd.h>
#include<pthread.h>
#include<functional>
#include"LockGuard.h"using thread_func=std::function<void()>;std::uint32_t thread_name_count=0;
Mutex _thread_name_count_lock;class Thread
{
private:enum class ThreadStatus{THREAD_NEW,THREAD_RUNNING,THREAD_STOP};public:Thread(thread_func func):_func(func){SetName();}~Thread()=default;void SetISDetached(bool flag){is_detached=_status==ThreadStatus::THREAD_NEW&&flag;}bool Start(){int ret=pthread_create(&_id,NULL,run,this);return ret==0;}bool Join(){int ret=pthread_join(_id,NULL);return !is_detached&&ret==0;}
private:static void* run(void *obj){auto self=static_cast<Thread*>(obj);self->_status=ThreadStatus::THREAD_RUNNING;pthread_setname_np(pthread_self(),self->_name.c_str());if(self->is_detached)pthread_detach(pthread_self());if(self->_func)self->_func();return nullptr;}void SetName(){LockGuard lockguard(_thread_name_count_lock);_name="Thread-"+std::to_string(++thread_name_count);}
private:std::string _name;pthread_t _id;ThreadStatus _status=ThreadStatus::THREAD_NEW;thread_func _func=nullptr;bool is_detached=false;
};
Thread_Pool.h
#pragma once
#include<vector>
#include<queue>
#include"Log.h"
#include"Thread.h"
#include"Cond.h"using Task=std::function<void()>;class ThreadPool
{
public:static ThreadPool& instance(){static ThreadPool instance;return instance;}void Start(){is_running=true;for(auto& thread:_threads){thread.Start();LOG(LogLevel::INFO)<<"thread start";}}void Stop(){LockGuard lockguard(_mutex);is_running=false;_cond.NotifyAll();}void Wait(){for(auto& thread:_threads){thread.Join();LOG(LogLevel::INFO)<<"thread quit";}}bool Enqueue(const Task &task){bool ret=false;LockGuard lockguard(_mutex);if(is_running){_task_queue.emplace(task);if(_wait_num>0)_cond.Notify();ret=true;LOG(LogLevel::INFO)<<"添加任務";}return ret;}
private:ThreadPool(int thread_num=5):_thread_num(thread_num){for(int i=0;i<_thread_num;++i){_threads.emplace_back([this](){//線程池不運行且無任務->直接結束//線程池不運行且有任務->完成剩余任務后結束//線程池運行且無任務->休眠//線程池運行且有任務->做任務while(true){_mutex.Lock();while(is_running&&_task_queue.empty()){++_wait_num;_cond.Wait(_mutex);--_wait_num;}if(!is_running&&_task_queue.empty()){_mutex.Unlock();break;}Task task=_task_queue.front();_task_queue.pop();_mutex.Unlock();task();}});}}ThreadPool(const ThreadPool&)=delete;ThreadPool operator=(const ThreadPool&)=delete;~ThreadPool()=default;
private:int _wait_num=0;bool is_running=false;int _thread_num;std::vector<Thread> _threads;std::queue<Task> _task_queue;Mutex _mutex;//讓線程有序訪問任務隊列Cond _cond;//任務條件變量
};#define Instance() ThreadPool::instance()
測試程序:
打印1到10
#include"Log.h"
#include"ThreadPool.h"Mutex mutex;
int main()
{Instance().Start();for(int i=0;i<10;++i){Instance().Enqueue([](){LockGuard lockguard(mutex);static int count=1;std::cout<<count<<'\n';++count;});}Instance().Stop();Instance().Wait();return 0;
}
執行結果: