在實現線程池之前,首先對線程池中所需要用到的互斥鎖、條件變量和信號量進行了簡單的封裝。
互斥鎖、條件變量和信號量封裝
locker.h頭文件如下(已詳細注釋)
/*
這里面對互斥鎖,條件變量和信號量進行了封裝
保證工作隊列的線程同步與數據安全
*/#ifndef LOCKER_H
#define LOCKER_H
/*
這是一個簡單的C或C++頭文件保護(header guard)機制,用于防止頭文件被多次包含(include)。
#ifndef LOCKER_H:#ifndef是預處理指令,用于檢查LOCKER_H這個宏是否已經定義。如果LOCKER_H沒有被定義,那么后面的代碼(直到#endif)會被編譯器包含(include)。
#define LOCKER_H:這行代碼定義了一個宏LOCKER_H。一旦這個宏被定義,再次遇到#ifndef LOCKER_H時,由于LOCKER_H已經被定義,所以其后的代碼不會被再次包含。
#endif:這是結束#ifndef預處理的指令。
這種機制確保了在同一個編譯單元中,頭文件只被包含一次,避免了由于多次包含同一個頭文件而可能導致的各種問題,如重復定義、多重繼承等。
*/#include <pthread.h>
#include <exception> //異常處理
#include <semaphore.h> //信號量
#include <stdexcept> //std::runtime_error 是定義在 <stdexcept> 頭文件中的一個異常類//可以使用互斥量(mutex 是 mutual exclusion的縮寫)來確保同時僅有一個線程可以訪問某項共享資源。
//任何時候,至多只有一個線程可以鎖定該互斥量。試圖對已經鎖定的某一互斥量再次加鎖,將可能阻塞線程或者報錯失敗
//一旦線程鎖定互斥量,隨即成為該互斥量的所有者,只有所有者才能給互斥量解鎖。
/*初始化互斥量后,你可以使用 pthread_mutex_lock 函數來鎖定互斥量,使用 pthread_mutex_unlock 函數來解鎖互斥量。鎖定互斥量的線程將獨占對共享資源的訪問,直到它解鎖該互斥量。其他嘗試鎖定該互斥量的線程將被阻塞,直到互斥量被解鎖。* */
//1.互斥鎖類,應該確保一個線程在訪問資源的時候,另外的線程不能同時訪問這些資源
class Locker{
public://1.1 構造函數,對互斥量進行初始話Locker(){//這段代碼確實是在檢查互斥量是否被成功初始化,并在初始化失敗時拋出異常。成功初始化返回0if(pthread_mutex_init(&m_mutux, NULL) != 0){throw std::runtime_error("Failed to initialize mutex");}}//1.2 析構函數,對互斥量進行消耗~Locker(){pthread_mutex_destroy(&m_mutux);}//1.3 上鎖函數bool lock(){return pthread_mutex_lock(&m_mutux) == 0; //上鎖成功返回0}//1.4 解鎖函數bool unlock(){return pthread_mutex_unlock(&m_mutux) == 0;}//1.5 get函數獲取互斥量pthread_mutex_t * get(){return &m_mutux;}/** 在C++中,pthread_mutex_t 是一個結構體類型,通常用于POSIX線程編程中的互斥量。* 當你通過函數返回一個 pthread_mutex_t 類型的值時,你實際上是在返回這個結構體的一個副本。* 然而,對于互斥量這樣的類型,返回其副本通常是沒有意義的,因為互斥量的狀態(如鎖定或未鎖定)不能通過簡單地復制結構體來傳遞。因此,當你想從一個函數返回一個互斥量以便在其他地方使用時,通常會返回指向互斥量的指針。這樣,調用者可以通過這個指針來操作原始的互斥量對象,而不是它的一個副本。* */
private:pthread_mutex_t m_mutux; //互斥量
};//2. 條件變量類
/*條件變量(Condition Variables)是線程同步的一種機制,它允許一個或多個線程等待某個條件成立,或者在某個條件成立后喚醒一個或多個等待的線程。條件變量通常與互斥鎖(Mutex)一起使用,以避免競爭條件和保證線程安全。條件變量的類型 pthread_cond_tint pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);int pthread_cond_destroy(pthread_cond_t *cond);int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);等待,調用了該函數,線程會阻塞。當這個函數調用阻塞等待的時候,會對互斥鎖進行解鎖,否則生產者拿不到互斥鎖。解除阻塞時,重新加鎖int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);- 等待多長時間,調用了這個函數,線程會阻塞,直到指定的時間結束。int pthread_cond_signal(pthread_cond_t *cond);- 喚醒一個或者多個等待的線程int pthread_cond_broadcast(pthread_cond_t *cond);- 喚醒所有的等待的線程*//*條件變量(Condition Variable)是操作系統提供的一種線程間同步機制,用于在多線程環境中實現線程的等待和喚醒操作。它通常與互斥鎖(Mutex)結合使用,用于實現復雜的線程同步。條件變量的原理如下:線程在進入臨界區前先獲取互斥鎖。當某個條件不滿足時,線程調用條件變量的等待(wait)函數,
并釋放之前獲取到的互斥鎖,然后進入阻塞狀態等待被喚醒。當其他線程滿足了該條件時,調用條件變量的通知或廣播(broadcast)函數來喚醒一個或多個等待中的線程。被喚醒的線程重新獲得互斥鎖,并檢查條件是否滿足。如果滿足,則繼續執行;如果不滿足,則再次進入等待狀態。條件變量的作用是用于多線程之間關于共享數據狀態變化的通信。當一個動作需要另外一個動作完成時才能進行,
即:當一個線程的行為依賴于另外一個線程對共享數據狀態的改變時,這時候就可以使用條件變量。操作系統是主動調用者,而條件變量其實是操作系統預留出的接口。
因而這里主要是去考慮記錄誰在等待、記錄誰要喚醒、如何喚醒的問題。條件變量是一種等待機制,每一個條件變量對應一個等待原因與等待隊列。* */
class Cond{
public://2.1 構造函數,初始化Cond(){if(pthread_cond_init(&m_cond, NULL) != 0){throw std::runtime_error("Failed to initialize Condition Variables");}}//2.2 析構函數~Cond(){pthread_cond_destroy(&m_cond);}//2.3 條件變量要配合互斥鎖使用,因此需要傳遞一個互斥鎖指針類型bool wait(pthread_mutex_t * mutex){return pthread_cond_wait(&m_cond, mutex) == 0;}//2.4 timewait,還要傳遞一個時間tbool timewait(pthread_mutex_t * mutex, struct timespec t){return pthread_cond_timedwait(&m_cond, mutex, &t) == 0;}//2.5 喚醒一個或者多個等待的線程bool signal(){return pthread_cond_signal(&m_cond) == 0;}//2.6 喚醒所有的等待的線程bool broadcast(){return pthread_cond_broadcast(&m_cond) == 0;}private:pthread_cond_t m_cond;
};//3. 信號量類
/*信號量(Semaphore)是一種用于控制多個線程或進程對共享資源訪問的同步機制。
它可以看作是一個計數器,用于表示可用資源的數量。信號量的主要操作包括P操作(等待)和V操作(釋放)。P操作(Wait):當一個線程或進程需要訪問共享資源時,它首先會執行P操作。這個操作會將信號量的值減1,
表示一個資源被占用。如果信號量的值大于0,表示還有可用資源,
線程或進程可以繼續執行;如果信號量的值為0,表示沒有可用資源,線程或進程將被阻塞,直到有資源可用。V操作(Signal):當一個線程或進程完成對共享資源的訪問后,它會執行V操作。這個操作會將信號量的值加1,
表示一個資源被釋放。如果有其他線程或進程正在等待該資源(即被P操作阻塞),那么它們將被喚醒并繼續執行。信號量的類型 sem_tint sem_init(sem_t *sem, int pshared, unsigned int value);- 初始化信號量- 參數:- sem : 信號量變量的地址- pshared : 0 用在線程間 ,非0 用在進程間- value : 信號量中的值,生產+1,消費-1int sem_destroy(sem_t *sem);- 釋放資源int sem_wait(sem_t *sem);- 對信號量加鎖,調用一次對信號量的值-1,如果值為0,就阻塞,直到大于0(post)int sem_trywait(sem_t *sem);int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);int sem_post(sem_t *sem);- 對信號量解鎖,調用一次對信號量的值+1int sem_getvalue(sem_t *sem, int *sval);信號量的工作原理基于兩種基本操作:P(等待)操作和V(發送信號)操作。P操作用于獲取信號量,即減少信號量值。如果信號量值大于0,表示資源可用,進程或線程可以訪問該資源,并將信號量值減1;如果信號量值等于0,表示資源已被占用,進程或線程需要等待其他進程或線程釋放資源,并將自己掛起,直到信號量值變為正數。V操作用于釋放信號量,即增加信號量值。如果有進程或線程正在等待該信號量,則喚醒其中一個進程或線程,使其繼續執行。
*/
class Sem{
public:Sem(){if(sem_init(&m_sem, 0, 0) != 0){throw std::runtime_error("Failed to initialize Semaphore");}}~Sem(){sem_destroy(&m_sem);}// int sem_wait(sem_t *sem); - 調用一次對信號量的值-1,如果值為0,就阻塞,直到大于0(post)bool wait(){return sem_wait(&m_sem) == 0;}// int sem_post(sem_t *sem); - 調用一次對信號量的值+1bool post(){return sem_post(&m_sem) == 0;}private:sem_t m_sem; //信號量
};#endif //LOCKER_H
2.線程池
線程池是一種用于管理和重用線程的并發編程技術。在軟件開發中,線程池被用來處理大量的并發任務,以提高系統性能和資源利用率。
主要的組成部分包括:
-
線程池管理器(Thread Pool Manager):負責創建、銷毀和管理線程池中的線程。它通常提供了添加任務、刪除任務、調整線程池大小等接口,用于管理線程池的狀態。
-
工作隊列(Work Queue):用于存儲需要執行的任務。當有任務需要執行時,線程從工作隊列中獲取任務并執行。工作隊列可以是有限大小的隊列,用于控制系統資源的使用。
-
線程池(Thread Pool):包含一組預先創建的線程,這些線程可以重復使用來執行任務。通過維護一組可重用的線程,線程池可以減少線程的創建和銷毀開銷,提高系統的性能和響應速度。
-
任務(Task):需要在線程池中執行的工作單元。任務可以是任意類型的計算、I/O 操作或其他類型的工作。
線程池的工作流程通常如下:
- 初始時,線程池會創建一定數量的線程,并將它們置于等待狀態。
- 當有任務需要執行時,任務被添加到工作隊列中。
- 線程池中的線程會不斷地從工作隊列中獲取任務,并執行這些任務。
- 執行完任務后,線程會再次回到等待狀態,等待下一個任務的到來。
- 當線程池不再需要時,可以銷毀線程池中的線程,釋放資源。
線程池的優勢在于:
- 降低線程創建和銷毀的開銷。通過重用線程,減少了頻繁創建和銷毀線程的性能開銷。
- 控制并發線程數量。線程池可以限制同時執行的線程數量,防止系統資源被過度占用。
- 提高系統響應速度。通過并發執行多個任務,可以提高系統的并發處理能力和響應速度。
在C++中,
this
?是一個特殊的指針,它指向調用成員函數的對象。當你在一個類的非靜態成員函數中使用?this
?時,它實際上指向調用該函數的實例。
this
?指針允許你訪問對象的所有成員,包括私有(private)和保護(protected)成員。以下是?
this
?指針的一些關鍵點:
隱含傳遞:當你調用一個類的非靜態成員函數時,
this
?指針會自動作為第一個參數傳遞給該函數。雖然你不需要顯式地傳遞它,但在函數內部,你可以使用?this
?來引用調用該函數的對象。類型:
this
?指針的類型是指向類類型的指針。例如,如果你有一個名為?MyClass
?的類,那么?this
?的類型就是?MyClass*
。使用場景:
this
?指針通常用于以下情況:
- 當成員函數的參數名和類的成員變量名相同時,為了避免歧義,可以使用?
this
?指針來明確指代類的成員變量。- 當你想在成員函數中返回對象本身(通常用于鏈式操作)時,可以使用?
return *this;
。- 在某些情況下,你可能想將?
this
?指針傳遞給其他函數或方法。class MyClass { public: int value; MyClass(int val) : value(val) {} // 使用 this 指針來訪問和修改成員變量 void setValue(int newVal) { this->value = newVal; // this-> 是可選的,但在某些情況下可以幫助提高代碼的可讀性 } // 返回對象本身,以便進行鏈式操作 MyClass* incrementValue() { this->value++; return this; } }; int main() { MyClass obj(10); obj.setValue(20); obj.incrementValue()->incrementValue(); // 鏈式操作 return 0; }
線程池代碼threadpool.h
//線程池的實現
#ifndef THREADPOOL_H
#define THREADPOOL_H#include <pthread.h>
#include <list>
#include "locker.h"
#include <exception>
#include <cstdio>
#include <stdexcept>
#include <iostream>
using namespace std;//定義成模板類,是為了代碼的復用,
//任務可能是不同的,T就是任務類
template<typename T>
class Threadpool{
public://1 構造函數,初始化線程數量, 請求隊列中最多允許的,等待處理的請求數量Threadpool(int thread_number = 8, int max_requests = 10000);//2 析構函數~Threadpool();//3 向工作隊列中去添加任務,append方法,類型為Tbool append(T * request);private://靜態函數,不能訪問非靜態的成員變量,線程所要執行實現的功能static void* worker(void* arg);/*** */void run();private://1 線程的數量int m_thread_number;//2 線程池數組,存儲創建線程的pid,大小與線程數量一致pthread_t * m_threads;//3 工作隊列中最多允許的,等待處理的請求數量int m_max_requests;//4 工作隊列std::list<T*> m_workqueue;/** 內存管理:使用指針允許你更靈活地管理內存。例如,如果你有一個大型對象或動態分配的對象,* 將其存儲在std::list<T>中可能會導致不必要的內存復制,因為std::list在插入和刪除元素時可能需要重新分配內存。* 使用指針可以避免這種復制,因為實際上你只是在復制指針(一個小的內存地址),而不是整個對象。* *///5 互斥鎖Locker m_queue_mutex;//6 信號量用來判斷是否有任務需要處理Sem m_queue_sem;//7 是否結束線程bool m_stop;
};//1 構造函數的類外初始化,在這個里面要創建出來線程
template<typename T>
Threadpool<T>::Threadpool(int thread_number, int max_requests) : m_thread_number(thread_number), m_max_requests(max_requests),
m_stop(false), m_threads(nullptr)
{//1. 參數是否正確的判斷if(thread_number <= 0 || max_requests <= 0){throw std::runtime_error("Failed to initialize Threadpool");}//2. 根據線程的數量創建出線程池數組,存儲創建線程的pid,析構的時候需要銷毀m_threads = new pthread_t[thread_number];if(!m_threads){throw std::runtime_error("m_threads Error");}//3. 創建thread_number,線程pid存儲在m_threads中,并設置為線程分離for(int i = 0; i < thread_number; i++){cout<<"create" << i << " th thread"<<endl;//線程執行的代碼在worker中,是個靜態函數,創建的時候,并沒有顯示指定存儲子線程的tid的變量,而是直接放在數組中if(pthread_create(m_threads + i, NULL, worker, this) != 0){//子線程創建失敗,刪掉這個數組m_threadsdelete[] m_threads;/** 在C++中,delete 和 delete[] 是用于釋放動態分配的內存的運算符,但它們的使用場景有所不同。* delete:用于釋放通過 new 運算符單個分配的對象。delete[]:用于釋放通過 new[] 運算符分配的對象數組。* */throw std::runtime_error("pthread_create Error");}//設置線程分離if(pthread_detach(m_threads[i]) != 0){delete[] m_threads;throw std::runtime_error("pthread_detach Error");}}
}//2 析構函數
template<typename T>
Threadpool<T>::~Threadpool()
{delete[] m_threads;m_stop = true;
}//3 向工作隊列中去添加任務,append方法,類型為T,并且需要確保線程同步
template<typename T>
bool Threadpool<T>::append(T *request)
{//1 上鎖m_queue_mutex.lock();//2 如果工作隊列中的大小大于最大的工作隊列中最多允許的,等待處理的請求數量,解鎖,返回錯誤,處理不了了if(m_workqueue.size() > m_max_requests){m_queue_mutex.unlock();return false;}//3 向工作隊列中添加m_workqueue.push_back(request);m_queue_mutex.unlock(); //解鎖m_queue_sem.post(); //信號量增加,說明工作隊列中有了新任務
}//4 線程執行的代碼在worker中,是個靜態函數
/*
靜態函數,它不能訪問非靜態的成員函數
if (pthread_create(m_threads + i, NULL, worker, NULL) != 0);
在創建線程的時候
if (pthread_create(m_threads + i, NULL, worker,this) != 0); this代表本類對象,是Threadpool類型對象* */
template<typename T>
void* Threadpool<T>::worker(void * arg)
{Threadpool * pool = (Threadpool *) arg;pool->run();return pool;
}//運行函數run,在工作隊列中去任務,做任務
template<typename T>
void Threadpool<T>::run()
{while(!m_stop){//工作隊列中的信號量-1,如果為0則阻塞在這m_queue_sem.wait();//加鎖m_queue_mutex.lock();//如果工作隊列為空就解鎖if(m_workqueue.empty()){m_queue_mutex.unlock();continue;}//取第一個任務T* request = m_workqueue.front();m_workqueue.pop_front();m_queue_mutex.unlock();if(!request){continue;}request->process(); //調用process函數執行任務}}#endif //THREADPOOL_H/** C++中的靜態函數* 在C++中,靜態成員函數(Static Member Functions)是類的一部分,但它們與類的實例(對象)無關。* 與類關聯,而非對象關聯:靜態成員函數屬于類本身,而不是類的某個特定對象。因此,它們可以在沒有創建類對象的情況下被調用。* 訪問限制:靜態成員函數只能直接訪問靜態成員變量和其他靜態成員函數,不能訪問類的非靜態成員變量和非靜態成員函數,除非通過類的實例或指針/引用。* 不隱藏this指針:靜態成員函數不接收this指針,因此它們不能訪問類的非靜態成員,因為這些成員需要通過this指針來訪問。* 調用方式:可以通過類名和作用域解析運算符::來調用靜態成員函數,也可以通過類的對象來調用(盡管這樣做并不常見)。
靜態成員函數不能直接訪問類的非靜態成員變量,因為靜態成員函數不與類的任何特定實例關聯,而非靜態成員變量是與類的實例關聯的。但是,有一些方法可以間接地訪問非靜態成員變量:通過參數傳遞:你可以將非靜態成員變量的引用或指針作為參數傳遞給靜態成員函數。
這樣,靜態成員函數就可以通過這個參數來訪問和修改非靜態成員變量。通過類的實例:如果靜態成員函數能夠獲得類的某個實例的引用或指針,那么它可以通過這個實例來訪問非靜態成員變量。
這通常是通過將實例作為參數傳遞給靜態成員函數來實現的。* */