1、 Linux線程概念
1.1、什么是線程
- 在一個程序里的一個執行路線就叫做線程(thread)更準確的定義是:線程是“一個進程內部的控制序列”
- 一切進程至少都有一個執行線程
- 線程在進程內部運行,本質是在進程地址空間內運行
- 在Linux系統中,在CPU眼中,看到的PCB都要比傳統的進程更加輕量化
- 透過進程虛擬地址空間,可以看到進程的大部分資源,將進程資源合理分配給每個執行流,就形成了線程執行流
?線程與進程:
- 進程是一個執行起來的程序,進程 = 內核數據結構 + 代碼和數據;進程是承擔分配系統資源的基本實體
- 線程是一個進程內的執行流,執行粒度比進程要更細,是進程內部的一個執行分支;線程是OS調度的基本單位
- Linux下的線程,是用進程模擬實現的,復用了進程的歷史代碼
- Linux執行流,統一稱為輕量級進程(LWP),Linux中沒有真正意義上的線程,線程使用LWP進行模擬實現的;所以一個進程是由多個PCB構成的,而不是只有一個PCB。
1.2 線程的優點
- 創建一個新線程的代價要比創建一個新進程小得多
- 與進程之間的切換相比,線程之間的切換需要操作系統做的工作要少很多
最主要的區別是線程的切換虛擬內存空間依然是相同的,但是進程切換是不同的。這兩種上下文切換的處理都是通過操作系統內核來完成的。內核的這種切換過程伴隨的最顯著的性能損耗是將寄存器中的內容切換出
另外一個隱藏的損耗是上下文的切換會擾亂處理器的緩存機制。簡單的說,一旦去切換上下文,處理器中所有已經緩存的內存地址一瞬間都作廢了。還有一個顯著的區別是當你改變虛擬內存空間的時候,處理的頁表緩沖TLB(快表)會被全部刷新,這將導致內存的訪問在一段時間內相當的低效。但是在線程的切換中,不會出現這個問題,當然還有硬件cache - 線程占用的資源要比進程很少
- 能充分利用多處理器的可并行數量
- 在等待慢速I/O操作結束的同時,程序可執行其他的計算任務
- 計算密集型應用,為了能在多處理器系統上運行,將計算分解到多個線程中實現
- I/O密集型應用,為了提高性能,將I/O操作重疊。線程可以同時等待不同的I/O操作
1.3 線程的缺點
- 性能損失:一個很少被外部事件阻塞的計算密集型線程往往無法與其它線程共享同一個處理器。如果計算密集型線程的數量比可用的處理器多,那么可能會有較大的性能損失,這里的性能損失指的是增加了額外的同步和調度開銷,而可用的資源不變。
- 健壯性降低:編寫多線程需要更全面更深入的考慮,在一個多線程程序里,因時間分配上的細微偏差或者因共享了不該共享的變量而造成不良影響的可能性是很大的,換句話說線程之間是缺乏保護的。
- 缺乏訪問控制:進程是訪問控制的基本粒度,在一個線程中調用某些OS函數會對整個進程造成影響。
- 編程難度提高:編寫與調試一個多線程程序比單線程程序困難得多
1.4 線程異常
- 單個線程如果出現除零,野指針問題導致線程奔潰,進程也會隨著奔潰
- 線程是進程的執行分支,線程出異常,就類似進程出異常,進而觸發信號機制,終止進程,進程終止,該進程內的所有線程也就隨即退出
1.5 線程用途
- 合理的使用多線程,能提高CPU密集型程序的執行效率
- 合理的使用多線程,能提高IO密集型程序的用戶體驗(如生活中我們一邊寫代碼一邊下載開發工具,就是多線程運行的一種表現)
2、Linux線程控制
2.1 進程和線程
- 進程是資源分配的基本單位
- 線程是調度的基本單位
- 線程共享進程數據,但也擁有自己的一部分數據:線程ID、一組寄存器、棧、errno、信號屏蔽字、調度優先級
進程的多個線程共享同一個地址空間,因此Text Segment、Data Segment等都是共享的,如果定義一個函數,在各線程中都可以調用,如果定義一個全局變量,在各線程中都可以訪問到,同時各個線程還共享以下進程資源和環境:文件描述符、每種信號的處理方式(SIG_IGN、SIG_DFL或者自定義的信號處理函數)、當前工作目錄、用戶id和組id。
2.2 POSIX線程庫?
POSIX線程庫:
- 與線程有關的函數構成了一個完整的系列,絕大多數函數的名字都是以“pthread_”打頭的
- 要使用這些函數庫,要通過引入頭文<pthread,h>
- 鏈接這些線程函數庫時要使用編譯器命令的“-lpthread”選項?
線程創建
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,void *(*start_routine)(void *), void *arg);
功能:創建新線程
參數:
thread
:指向線程標識符的指針
attr
:線程屬性,NULL表示默認屬性
start_routine
:線程執行的函數
arg
:傳遞給線程函數的參數返回值:成功返回0,失敗返回錯誤碼
例子:
#include <iostream> #include <unistd.h> #include <pthread.h> #include <string.h> #include <stdlib.h>void* handler(void* arg) {int* i = static_cast<int*>(arg);std::cout << *i << std::endl;for(;;){printf("I am thread 1\n");sleep(1);} }int main() {pthread_t id;int ret;int arg = 10;if((ret = pthread_create(&id,nullptr,handler,&arg)) != 0){fprintf(stderr,"pthread_create: %s\n",strerror(ret));exit(EXIT_FAILURE);}int i;for(;;){printf("I am main thread\n");sleep(1);}return 0; }
獲取線程ID
#include <pthread.h>//獲取線程ID pthread_t pthread_self(void);
功能:返回一個pthread_t類型的變量,指代的是調用pthread_self函數的線程的"ID"。
#include <iostream> #include <unistd.h> #include <pthread.h> #include <string.h> #include <stdlib.h>void* handler(void* arg) {printf("thread1 ID:%0lx\n",pthread_self());return nullptr; }int main() {pthread_t id;int ret;int arg = 10;if((ret = pthread_create(&id,nullptr,handler,&arg)) != 0){fprintf(stderr,"pthread_create: %s\n",strerror(ret));exit(EXIT_FAILURE);}printf("thread1 ID:%0lx\n",id);printf("mainthread ID:%0lx\n",pthread_self());return 0; }
這個"ID"是當前調用線程的線程標識符(thread ID),該標識符由線程庫分配,用于唯一標識線程。線程 ID 僅在進程內唯一,不同進程的線程 ID 可能重復。這個pthread_t類型的ID實際上是一個在虛擬地址空間上的一個地址,通過這個地址可以找到關于這個線程的基本信息,包括線程ID,線程棧,寄存器等屬性。pthread庫是用戶空間的線程庫,但底層依賴內核提供的系統調用(如
clone
)實現線程創建和管理。內核為每個線程分配全局唯一的線程ID(TID),而pthread庫在用戶層也會維護獨立的線程標識符(pthread_t)。
?內核分配的線程ID是系統全局唯一的,通過
gettid()
系統調用可獲取。此ID用于內核調度和資源管理,不同于用戶層的pthread_t
。例如:#include <unistd.h> #include <sys/syscall.h>pid_t tid = syscall(SYS_gettid);
pthread_t與內核TID的區別:
pthread_t
是pthread庫的用戶層標識符,類型由實現決定(可能為整數或結構體),兩者關聯性取決于實現,Linux中pthread_t
通常直接映射到內核TID。
使用PS命令查看線程信息(-L選項):ps -aL
LWP的ID就是真正的線程ID,即為TID。PID和LWP相同的是主線程,主線程的棧在虛擬地址空間的棧上,而其他線程的棧是在共享區(堆棧之間),因為pthread系列函數都是pthread庫提供給我們的,而pthread庫在共享區,所以除了主線程之外的其他線程的棧都在共享區。
線程終止
線程終止有三種基本方法:return返回(對主線程不適用);線程調用pthread_exit終止自己;線程調用pthread_cancel終止同一進程中的另一個線程。
pthread_exit函數void pthread_exit(void *retval);
功能:終止調用線程
參數:
retval
:線程返回值,可由其他線程通過pthread_join
獲取pthead_cancel函數
int pthread_cancel(pthread_t thread);
- 功能:取消一個指定的執行中的線程(一定要在執行中)
- 參數:thread線程ID
- 返回值:成功返回0;失敗返回錯誤碼
線程等待
int pthread_join(pthread_t thread, void **retval);
功能:等待指定線程終止
參數:
thread
:要等待的線程ID
retval
:存儲被等待線程的返回值返回值:成功返回0,失敗返回錯誤碼
為什么需要線程等待:因為退出的線程,其空間沒有被釋放,仍然在進程的地址空間內;并且創建新的線程不會復用剛才退出線程的地址空間
調用該函數的線程將掛起等待,直到id為thread的線程終止;thread線程以不同的方法終止,通過pthread_join得到的終止狀態是不同的:
1、如果thread線程通過return返回,retval所指向的單元里存放的是thread線程函數的返回值。
2、如果thread線程被別的線程調用pthread_cancel異常終掉,retval所指向的單元里存放的是常數PTHREAD_CANCELED。
3、如果thread線程是自己調用pthread_exit終止的,retval所指向的單元存放的是傳給pthread_exit的參數。
4、如果對thread線程的終止狀態不感興趣,可以傳NULL給retval參數。例子:
#include <iostream> #include <unistd.h> #include <pthread.h> #include <cstring> #include <cstdlib>void *thread1(void *arc) {printf("thread1 returning ... \n");int *p = (int*)malloc(sizeof(int));*p = 1111;return (void*)p; }void *thread2(void *arc) {printf("thread2 returning ... \n");int *p = (int*)malloc(sizeof(int));*p = 22222;pthread_exit((void*)p); }void *thread3(void *arc) {while(1){std::cout << "thread3 is running ... \n" << std::endl;sleep(1);}return nullptr; }int main() {pthread_t id;void *ret;pthread_create(&id, nullptr, thread1, nullptr);pthread_join(id,&ret);printf("thread return,thread id:%lX,return code:%d\n",id,*(int*)ret);free(ret);pthread_create(&id, nullptr, thread2, nullptr);pthread_join(id,&ret);printf("thread return,thread id:%lX,return code:%d\n",id,*(int*)ret);free(ret);pthread_create(&id, nullptr, thread3, nullptr);sleep(3);pthread_cancel(id);pthread_join(id,&ret);if(ret == PTHREAD_CANCELED)printf("thread return, thread id %lX, return code:PTHREAD_CANCELED",id);elseprintf("thread return,thread id %lX, return code:NULL\n",id);return 0; }
線程分離?
- 默認情況下,新創建的線程是joinable的,線程退出后,需要對其進行pthread_join操作,否則無法釋放資源,從而造成系統泄漏。
- 如果不關心線程的返回值,join是一種負擔,這個時候,我們可以告訴系統,當線程退出時,自動釋放線程資源。
int pthread_detach(pthread_t thread);
功能:將線程標記為分離狀態,線程終止時自動釋放資源
參數:
thread
:要分離的線程ID返回值:成功返回0,失敗返回錯誤碼
可以是線程組內其他線程對目標線程分離,也可以是線程對自己分離:
pthread_detach(pthread_self());
例子:
#include <iostream> #include <unistd.h> #include <pthread.h> #include <cstring> #include <cstdlib>void *handler(void *arc) {for(int i = 0; i < 5; i++){std::cout << "thread is running...\n" << std::endl;sleep(2);}std::cout << "thread is return" << std::endl;return nullptr; }int main() {pthread_t id;pthread_create(&id,nullptr,handler,nullptr);pthread_detach(id);for(int i = 0; i < 5; i++){std::cout << "main is running...\n" << std::endl;sleep(3);}std::cout << "main is return" << std::endl;return 0; }
3、線程ID和進程地址空間布局
- pthread_create函數會產生一個線程ID,存放在第一個參數指向的地址中。
- 前面講的線程ID屬于進程調度的范疇。因為線程是輕量級進程,是操作系統調度器的最小單位,所以需要一個數值來唯一表示該線程。
- pthread_create函數第一個參數指向一個虛擬內存單元,該內存單元的地址即為新創建線程的線程ID,屬于NPTL線程庫的范疇。線程庫的后續操作,就是根據該線程ID來操作線程的。
- pthread_t 本質是一個進程地址空間上的一個地址:
4、線程封裝
這是傳入函數無參版本的:
//Thread.hpp#include <iostream>
#include <cstring>
#include <cstdlib>
#include <stdlib.h>
#include <string>
#include <pthread.h>
#include <functional>
#include <atomic>
#include <unistd.h>namespace ThreadModule
{std::atomic<std::uint32_t> cnt(1); // 原子計數器,用于形成線程編號using func_t = std::function<void()>;enum class TSTATUS{NEW,RUNNING,STOP};//封裝自己的線程類class Thread{private:static void *Routine(void* args){Thread* t = static_cast<Thread*>(args);t->_func();return nullptr;}public:Thread(func_t func):_func(func),_status(TSTATUS::NEW),_joinable(true){_name = "Thread-" + std::to_string(cnt++);_pid = getpid();}bool Start(){//避免多次啟動if(_status == TSTATUS::RUNNING){std::cout << _name << "啟動失敗,當前線程已經執行" << std::endl;return false;}else{if(_status == TSTATUS::STOP)std::cout << "正在重新啟動線程" << std::endl;int ret;std::cout << _name << "已啟動" << std::endl;_status = TSTATUS::RUNNING;if((ret = pthread_create(&_tid,nullptr,Routine,this)) != 0){fprintf(stderr,"phread_create:%s\n",strerror(ret));return false;}return true;}return false;}bool Stop(){if(_status == TSTATUS::RUNNING){int ret;if((ret = pthread_cancel(_tid)) != 0){fprintf(stderr,"pthread_cancel:%s\n",strerror(ret));return false;}_status = TSTATUS::STOP;std::cout << _name << "已停止" << std::endl;return true;}return false;}bool Join(){if(_joinable){int ret;if((ret = pthread_join(_tid,nullptr)) != 0){fprintf(stderr,"pthread_join:%s\n",strerror(ret));return false;}_status = TSTATUS::STOP;std::cout << _name << "資源已被回收" << std::endl;return true;}return false;}void Detach(){_joinable = false;pthread_detach(_tid);}bool IsJoinable(){return _joinable; }std::string Name(){return _name;}~Thread(){}public:std::string _name;//線程名pthread_t _tid;//線程idpid_t _pid;//進程idbool _joinable;//是否可以分離func_t _func;//函數TSTATUS _status;//狀態};
}
//main.cpp#include "Thread.hpp"using namespace ThreadModule;void func()
{int cnt = 5;while(cnt--){std::cout << "pthread running" << std::endl;sleep(1);}return;
}int main()
{Thread thread(func);thread.Start();thread.Start();thread.Join();thread.Start();thread.Detach();for(int i = 1; i <= 10; i++){std::cout << "main is running" << std::endl;sleep(1);}return 0;
}
這是傳入函數可變參數版本的:
//Thread.hpp#include <iostream>
#include <cstring>
#include <cstdlib>
#include <stdlib.h>
#include <string>
#include <pthread.h>
#include <functional>
#include <atomic>
#include <unistd.h>namespace ThreadModule
{std::atomic<std::uint32_t> cnt(1); // 原子計數器,用于形成線程編號enum class TSTATUS{NEW,RUNNING,STOP};//封裝自己的線程類template <typename T>class Thread{using func_t = std::function<void(T)>;private:static void *Routine(void* args){Thread<T>* t = static_cast<Thread<T>*>(args);t->_func(t->_data);return nullptr;}public:Thread(func_t func, T data):_func(func),_data(data),_status(TSTATUS::NEW),_joinable(true){_name = "Thread-" + std::to_string(cnt++);_pid = getpid();}bool Start(){//避免多次啟動if(_status == TSTATUS::RUNNING){std::cout << _name << "啟動失敗,當前線程已經執行" << std::endl;return false;}else{if(_status == TSTATUS::STOP)std::cout << "正在重新啟動線程" << std::endl;int ret;std::cout << _name << "已啟動" << std::endl;_status = TSTATUS::RUNNING;if((ret = pthread_create(&_tid,nullptr,Routine,this)) != 0){fprintf(stderr,"phread_create:%s\n",strerror(ret));return false;}return true;}return false;}bool Stop(){if(_status == TSTATUS::RUNNING){int ret;if((ret = pthread_cancel(_tid)) != 0){fprintf(stderr,"pthread_cancel:%s\n",strerror(ret));return false;}_status = TSTATUS::STOP;std::cout << _name << "已停止" << std::endl;return true;}return false;}bool Join(){if(_joinable){int ret;if((ret = pthread_join(_tid,nullptr)) != 0){fprintf(stderr,"pthread_join:%s\n",strerror(ret));return false;}_status = TSTATUS::STOP;std::cout << _name << "資源已被回收" << std::endl;return true;}return false;}void Detach(){_joinable = false;pthread_detach(_tid);}bool IsJoinable(){return _joinable; }std::string Name(){return _name;}~Thread(){}public:std::string _name;//線程名pthread_t _tid;//線程idpid_t _pid;//進程idbool _joinable;//是否可以分離func_t _func;//函數TSTATUS _status;//狀態T _data;};
}
//main.cpp#include "Thread.hpp"using namespace ThreadModule;void func(int a)
{int cnt = 5;std::cout << a << std::endl;while(cnt--){std::cout << "pthread running" << std::endl;sleep(1);}return;
}int main()
{int a = 888;Thread<int> thread(func,a);thread.Start();thread.Start();thread.Join();thread.Start();thread.Detach();for(int i = 1; i <= 10; i++){std::cout << "main is running" << std::endl;sleep(1);}return 0;
}
5、線程互斥
相關概念
- 臨界資源:多線程執行流共享的資源叫做臨界資源
- 臨界區:每個線程內部,訪問臨界資源的代碼,叫做臨界區
- 互斥:任何時刻,互斥保證有且只有一個執行流進入臨界區,訪問臨界資源,通常對臨界資源起保護作用
- 原子性:不會被任何調度機制打斷的操作,該操作只有兩態,要么完成,要么未完成
互斥量mutex
大部分情況下,線程使用的數據都是局部變量,變量的地址空間在線程棧空間內,這種情況下變量歸屬于單個線程,其他線程無法獲得這種變量。但有時候,很多變量都需要在線程間共享,這樣的變量稱為共享變量,可以通過數據的共享,完成線程之間的交互。多個線程并發的操作共享變量,會帶來一些問題。
比如下面的搶票代碼:
#include <iostream> #include <unistd.h> #include <string> #include <cstring> #include <pthread.h> #include <cstdlib>int ticket = 100;void *route(void *argc) {char *id = (char*)argc;while(1){if(ticket > 0){sleep(1);std::cout << id << "sells ticket:" << ticket-- << std::endl;}elsebreak;}return nullptr; } int main() {pthread_t t1,t2,t3,t4;pthread_create(&t1,nullptr,route,(void*)"thread1");pthread_create(&t2,nullptr,route,(void*)"thread2");pthread_create(&t3,nullptr,route,(void*)"thread3");pthread_create(&t4,nullptr,route,(void*)"thread4");pthread_join(t1,nullptr);pthread_join(t2,nullptr);pthread_join(t3,nullptr);pthread_join(t4,nullptr);return 0; }
為什么票數可能減到-2呢?原因如下:
- if語句判斷條件為真以后,代碼可以并發得切換到其他線程
- sleep這個模擬漫長業務的過程,在這個漫長的業務過程中,可能有很多個線程會進入該代碼段
- --ticket操作本身就不是一個原子操作,而是對應三條匯編指令:
1、load:將共享變量ticket從內存加載到寄存器中
2、update:更新寄存器里面的值,執行-1操作
3、store:將新值從寄存器寫回到共享變量ticket的內存地址
而大部分情況下,一條指令就是原子的(當然也有一些多條指令是原子的)要解決以上問題,需要做到三點:
- 代碼必須實現互斥機制:當某個線程進入臨界區執行時,其他線程不得進入同一臨界區。
- 在多線程環境中,若臨界區空閑且多個線程同時請求執行,系統只允許其中一個線程進入臨界區。
- 非臨界區執行的線程,不得妨礙其他線程正常進入臨界區。
要做到這三點,本質上就是需要一把鎖,Linux上提供的這把鎖叫互斥鎖:
互斥量/鎖的接口
1、初始化互斥量:
方法1:靜態分配:phread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
方法2:動態分配:
#include <pthread.h> int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
- mutex: 指向待初始化的互斥鎖對象的指針。
- attr: 互斥鎖屬性對象指針,若為?
NULL
?則使用默認屬性。- 成功返回?
0
,失敗返回錯誤碼(非?errno
)。2、銷毀互斥量:
- 使用PTHREAD_MUTEX_INITIALIZER初始化的互斥量不需要銷毀
- 不要銷毀一個已經加鎖的互斥量
- 已經銷毀的互斥量,要確保后面不會有線程再嘗試加鎖
int pthread_mutex_destroy(pthread_mutex_t *mutex);
3、互斥量加鎖和解鎖:
int pthread_mutex_lock(pthread_mutex_t *mutex); //加鎖 int pthread_mutex_ulock(pthread_mutex_t *mutex); //解鎖返回值:成功返回0,失敗返回錯誤號
- 互斥量處于未鎖狀態,該函數會將互斥量鎖定,同時返回成功
- 發起函數調用時,其他線程已經鎖定互斥量,或者存在其他線程同時申請互斥量,但沒有競爭到互斥量,那么pthread_lock調用就會陷入阻塞(執行流被掛起),等待互斥量解鎖。
使用鎖來改進上面的模擬搶票系統:
#include <iostream> #include <unistd.h> #include <string> #include <cstring> #include <pthread.h> #include <cstdlib>int ticket = 100; pthread_mutex_t mutex;void *route(void *argc) {char *id = (char*)argc;while(1){pthread_mutex_lock(&mutex);if(ticket > 0){usleep(1000);std::cout << id << "sells ticket:" << ticket-- << std::endl;pthread_mutex_unlock(&mutex);}else{pthread_mutex_unlock(&mutex);break;}}return nullptr; } int main() {pthread_t t1,t2,t3,t4;pthread_mutex_init(&mutex,nullptr);pthread_create(&t1,nullptr,route,(void*)"thread1");pthread_create(&t2,nullptr,route,(void*)"thread2");pthread_create(&t3,nullptr,route,(void*)"thread3");pthread_create(&t4,nullptr,route,(void*)"thread4");pthread_join(t1,nullptr);pthread_join(t2,nullptr);pthread_join(t3,nullptr);pthread_join(t4,nullptr);pthread_mutex_destroy(&mutex);return 0; }
結果符合預期。
互斥量實現原理
單純的i++或者++i都不是原子的,有可能會有數據一致性問題;為了實現互斥鎖操作,大多數體系結構都提供了swap或exchange指令,該指令的作用是把寄存器和內存單元的數據相交換,由于只有一條指令,保證了原子性,即使是多處理器平臺,訪問內存的總線周期也有先后,一個處理器上的交換指令執行時另一個處理器的交換指令只能等待總線周期。lock和unlock的偽代碼改一下:
互斥量的封裝 -- 使用RAII風格管理鎖
#pragma once#include <iostream> #include <pthread.h> #include <string>namespace LockModule {class Mutex{public:Mutex(const Mutex&) = delete;const Mutex& operator=(const Mutex&) = delete;Mutex(){int ret;if((ret = pthread_mutex_init(&_mutex, nullptr)) != 0){std::cout << "Mutex init error" << std::endl;exit(EXIT_FAILURE);}}void Lock(){int ret;if((ret = pthread_mutex_lock(&_mutex)) != 0){std::cout << "Mutex lock error" << std::endl;exit(EXIT_FAILURE);}}void Unlock(){int ret;if((ret = pthread_mutex_unlock(&_mutex)) != 0){std::cout << "Mutex unlock error" << std::endl;exit(EXIT_FAILURE);}}~Mutex(){int ret;if((ret = pthread_mutex_destroy(&_mutex)) != 0){std::cout << "Mutex destroy error" << std::endl;exit(EXIT_FAILURE);}}pthread_mutex_t* GetMutex(){return &_mutex;}private:pthread_mutex_t _mutex;};//采用RAII風格,進行鎖管理class LockGuard{public:LockGuard(Mutex &mutex):_mutex(mutex){_mutex.Lock();}//析構函數中解鎖~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;}; }
6、線程同步
相關概念
條件變量:
- 當一個線程互斥地訪問某個變量時,它可能發現在其它線程改變狀態之前,它什么也做不了。
- 例如一個線程訪問隊列時,發現隊列為空,它只能等待,直到線程將一個節點添加到隊列中。這種情況就需要用到條件變量。
同步概念與競態條件
- 同步:在保證數據安全的前提下,讓線程能夠按照某種特定的順序訪問臨界資源,從而有效避免饑餓問題,叫做同步
- 競態條件:因為時序問題,而導致程序異常,我們稱之為競態條件。
條件變量相關接口
條件變量(Condition Variable)是線程同步的一種機制,通常與互斥鎖(Mutex)配合使用,用于阻塞線程直到特定條件滿足。Linux通過POSIX線程庫(pthread)提供相關接口,主要包括以下核心函數:
初始化條件變量
函數原型:
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
參數說明:
cond
:指向條件變量的指針。attr
:屬性參數,通常設為NULL
表示默認屬性。靜態初始化:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
銷毀條件變量
函數原型:
int pthread_cond_destroy(pthread_cond_t *cond);
注意事項:
- 必須在沒有線程等待該條件變量時調用。
- 靜態初始化的條件變量無需銷毀。
等待條件變量
函數原型:
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
行為:
- 原子性地釋放
mutex
并阻塞當前線程。- 被喚醒后重新獲取
mutex
并返回。超時等待:
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
abstime
:絕對時間(從1970年1月1日起的秒和納秒)。
喚醒等待線程
喚醒單個線程:
int pthread_cond_signal(pthread_cond_t *cond);
喚醒所有線程:
int pthread_cond_broadcast(pthread_cond_t *cond);
使用示例
#include <iostream> #include <string> #include <pthread.h> #include <unistd.h> #include <cstring> #include <cstdlib>pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *handler(void *argc) {std::string name = (char*)argc;while(1){pthread_mutex_lock(&mutex);pthread_cond_wait(&cond,&mutex);std::cout << name << " is running" << std::endl;pthread_mutex_unlock(&mutex);//sleep(1);} }int main() {pthread_t tid1,tid2;int ret;if(ret = (pthread_create(&tid1,nullptr,handler,(void*)"thread-1")) != 0){std::cerr << "pthread_create error: " << strerror(ret) << std::endl;return -1;}if(ret = (pthread_create(&tid2,nullptr,handler,(void*)"thread-2")) != 0){std::cerr << "pthread_create error: " << strerror(ret) << std::endl;return -1;}while(1){pthread_cond_signal(&cond);sleep(1);}if(ret = (pthread_join(tid1,nullptr)) != 0){std::cerr << "pthread_join error: " << strerror(ret) << std::endl;return -1;}if(ret = (pthread_join(tid2,nullptr)) != 0){std::cerr << "pthread_join error: " << strerror(ret) << std::endl;return -1;}return 0; }
注意事項
- 虛假喚醒:即使未調用喚醒函數,線程也可能被喚醒。通常需在循環中檢查條件。
- 死鎖風險:確保在調用
pthread_cond_wait
前持有正確的互斥鎖。- 性能優化:頻繁喚醒可能影響性能,建議結合事件驅動模型優化。
為什么pthread_cond_wait需要互斥量?
- 條件等待是線程間同步的一種手段,如果只有一個線程,條件不滿足,一直等下去都不會滿足,所以必須要有一個線程通過某些操作,改變共享變量,使原先不滿足的條件變得滿足,并且友好的通知等待在條件變量上的線程。
- 條件不會無緣無故的突然變得滿足了,必然會牽扯到共享數據的變化。所以一定要用互斥鎖來保護。沒有互斥鎖就無法安全的獲取和修改共享數據。
- 舉一個錯誤的設計:先上鎖,發現條件不滿足,直接解鎖,然后在等待條件變量?(錯誤的)例如:
由于解鎖和等待不是原子操作。調用解鎖之后,pthread_cond_wait之前,如果已經有其他線程獲取到互斥量,摒棄條件滿足,發送了信號,那么pthread_cond_wait將錯過這個信號,可能會導致線程永遠阻塞在這個pthread_cond_wait。所以解鎖和等待必須是一個原子操作。// 錯誤的設計 pthread_mutex_lock(&mutex); while (condition_is_false) {pthread_mutex_unlock(&mutex);//解鎖之后,等待之前,條件可能已經滿?,信號已經發出,但是該信號可能被錯過pthread_cond_wait(&cond);pthread_mutex_lock(&mutex); } pthread_mutex_unlock(&mutex);
int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t *mutex);進入該函數后,會去看條件量等于0不?等于,就把互斥量變成1,直到cond_wait返回,把條件量改成1,把互斥量恢復成原樣。
條件變量使用規范
等待條件代碼:
pthread_mutex_lock(&mutex); while (條件為假)pthread_cond_wait(cond, mutex); //在這里等待,被喚醒后還需要通過while再次檢測條件是否為真,為真繼續向下執行,為假繼續等待 修改條件 pthread_mutex_unlock(&mutex);
給條件發送信號代碼:
pthread_mutex_lock(&mutex); 設置條件為真 pthread_cond_signal(cond); pthread_mutex_unlock(&mutex);
條件變量的封裝
鎖使用的是上面封裝的鎖
#ifndef __MYCOND__HPP__ #define __MYCOND__HPP__#include <iostream> #include <pthread.h> #include <cstring> #include "Lock.hpp"namespace CondModule {using namespace LockModule;class Cond{public:Cond(){int ret = pthread_cond_init(&_cond,nullptr);if(ret != 0){fprintf(stderr,"pthread_cond_init:%s\n",strerror(ret));exit(EXIT_FAILURE);}}void Wait(Mutex &mutex){int ret = pthread_cond_wait(&_cond,mutex.GetMutex());if(ret != 0){fprintf(stderr,"pthread_cond_wait:%s\n",strerror(ret));exit(EXIT_FAILURE);}}void Notify(){int ret = pthread_cond_signal(&_cond);if(ret != 0){fprintf(stderr,"pthread_cond_signal:%s\n",strerror(ret));exit(EXIT_FAILURE);}}void NotifyAll(){int ret = pthread_cond_broadcast(&_cond);if(ret != 0){fprintf(stderr,"pthread_cond_broadcast:%s\n",strerror(ret));exit(EXIT_FAILURE);}}~Cond(){int ret = pthread_cond_destroy(&_cond);if(ret != 0){fprintf(stderr,"pthread_cond_destroy:%s\n",strerror(ret));exit(EXIT_FAILURE);}}private:pthread_cond_t _cond;}; }#endif
7、生產者消費者模型?
概念
生產者消費者模型是一種經典的并發編程模式,用于解決多線程或多進程環境中生產者和消費者之間的協同問題。生產者負責生成數據或任務,消費者負責處理這些數據或任務,兩者通過共享的緩沖區(如隊列)進行通信。該模型的核心目標是解耦生產與消費過程,避免兩者因速度不匹配導致的資源浪費或阻塞。
簡單來說就是:
- 3種關系 -- 生產者與生產者:互斥;消費者和消費者:互斥;生產者和消費者:互斥且同步;
- 2種角色 -- 生產者和消費者;
- 1個交易場所 -- 共享緩沖區;
生產者消費者模型的優點:
- 解耦
- 支持并發
- 支持忙先不均(生產者消費者兩者速度不均)
基于BlockingQueue的生產者消費者模型
概念:在多線程編程中阻塞隊列(BlockingQueue)是一種常用于實現生產者和消費者模型的數據結構。其與普通的隊列區別在于,當隊列為空時,從隊列獲取元素的操作將會被阻塞,直到隊列中被放入了元素;當隊列滿時,往隊列里存放元素的操作也會被阻塞,直到有元素被從隊列中取出(以上的操作都是基于不同的線程來說的,線程在對阻塞隊列進程操作時會被阻塞)
c++ queue模擬阻塞隊列的單生產單消費模型
//BlockQueue.hpp#ifndef __BLOCKQUEUE__ #define __BLOCKQUEUE__#include <iostream> #include <queue> #include <pthread.h> #include <string>namespace myBlockQueue {template<typename T>class BlockQueue{private:bool isFull(){return _block_queue.size() >= _cap;}bool isEmpty(){return _block_queue.empty();}public:BlockQueue(int cap){_cap = cap;_product_wait_num = 0;_consum_wait_num = 0;pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_product_cond,nullptr);pthread_cond_init(&_consum_cond,nullptr);}//生產者調用接口bool Enqueue(const T &in){pthread_mutex_lock(&_mutex);//判斷是否為滿,滿了就等待//if(isFull()) //這里使用while(isFull())而不是使用if(isFull())的原因\避免造成偽喚醒,即線程被喚醒但條件并不滿足\例如:此時有五個生產者在等待,但此時生產者最多生產的數據只能是一個,\如果此時同時喚醒五個生產者,那必然只有一個生產者此時可以生產數據(生產一個后隊列就滿了),另外四個還不可以生產\但這四個也被喚醒了,也會向下執行代碼,所以需要while再次循環判斷是否滿足條件,而不是使用if讓執行流向下執行while(isFull()) {_product_wait_num++;pthread_cond_wait(&_product_cond,&_mutex);_product_wait_num--;}//生產數據_block_queue.push(in);//此時隊列不為空了,判斷是否有消費者在等待,通知消費者來消費if(_consum_wait_num > 0){pthread_cond_signal(&_consum_cond);}pthread_mutex_unlock(&_mutex);return true;}//消費者調用接口bool Pop(T *out){pthread_mutex_lock(&_mutex);//這里使用while循環判斷條件是否滿足,同樣是避免造成偽喚醒的問題while(isEmpty()){_consum_wait_num++;pthread_cond_wait(&_consum_cond,&_mutex);_consum_wait_num--;}//消費數據*out = _block_queue.front();_block_queue.pop();//判斷是否有生產者在等待,通知生產者生產數據if(_product_wait_num > 0){pthread_cond_signal(&_product_cond);}pthread_mutex_unlock(&_mutex);return true;}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_product_cond);pthread_cond_destroy(&_consum_cond);}private:std::queue<T> _block_queue;int _cap;pthread_mutex_t _mutex;pthread_cond_t _product_cond;pthread_cond_t _consum_cond;int _product_wait_num;int _consum_wait_num;}; }#endif
//main.cpp#include "BlockQueue.hpp" #include <unistd.h> #include <cstdlib>using namespace myBlockQueue;void* ProductStart(void* args) {BlockQueue<int>* bq = (BlockQueue<int>*)args;while(true){int data = rand() % 100 + 1;bq->Enqueue(data);printf("product data:%d\n",data);sleep(2);} }void* ConsumStart(void* args) {BlockQueue<int>* bq = (BlockQueue<int>*)args;while(true){int data;bq->Pop(&data);printf("consum data:%d\n",data);sleep(4);} }int main() {BlockQueue<int> bq(10);pthread_t product,consum;pthread_create(&product,nullptr,ProductStart,&bq);pthread_create(&consum,nullptr,ConsumStart,&bq);pthread_join(product,nullptr);pthread_join(consum,nullptr);return 0; }
執行結果:
c++ queue模擬阻塞隊列的多生產多消費模型
在上述的單生產、單消費中已有消費者和生產者的關系,需要新加消費者和消費者的關系、生產者和生產者的關系。 但上述代碼都是用同一個鎖,所以天然形成了上面三個關系。所以上面的代碼同樣適用于多生產多消費模型。
改為多生產多消費模型+封裝的互斥鎖和信號量:
//BlockQueue.hpp #ifndef __BLOCKQUEUE__ #define __BLOCKQUEUE__#include <iostream> #include <queue> #include <pthread.h> #include <string> #include "Cond.hpp" #include "Lock.hpp"namespace myBlockQueue {using namespace LockModule;using namespace CondModule;template <typename T>class BlockQueue{private:bool isFull(){return _block_queue.size() >= _cap;}bool isEmpty(){return _block_queue.empty();}public:BlockQueue(int cap){_cap = cap;_product_wait_num = 0;_consum_wait_num = 0;}// 生產者調用接口bool Enqueue(const T &in){// RAII思想,自動加鎖解鎖LockGuard lock(_mutex);// 判斷是否為滿,滿了就等待// if(isFull())// 這里使用while(isFull())而不是使用if(isFull())的原因\避免造成偽喚醒,即線程被喚醒但條件并不滿足\例如:此時有五個生產者在等待,但此時生產者最多生產的數據只能是一個,\如果此時同時喚醒五個生產者,那必然只有一個生產者此時可以生產數據(生產一個后隊列就滿了),另外四個還不可以生產\但這四個也被喚醒了,也會向下執行代碼,所以需要while再次循環判斷是否滿足條件,而不是使用if讓執行流向下執行while (isFull()){_product_wait_num++;_product_cond.Wait(_mutex);_product_wait_num--;}// 生產數據_block_queue.push(in);// 此時隊列不為空了,判斷是否有消費者在等待,通知消費者來消費if (_consum_wait_num > 0){_consum_cond.Notify();}return true;}// 消費者調用接口bool Pop(T *out){LockGuard lock(_mutex);// 這里使用while循環判斷條件是否滿足,同樣是避免造成偽喚醒的問題while (isEmpty()){_consum_wait_num++;_consum_cond.Wait(_mutex);_consum_wait_num--;}// 消費數據*out = _block_queue.front();_block_queue.pop();// 判斷是否有生產者在等待,通知生產者生產數據if (_product_wait_num > 0){_product_cond.Notify();}return true;}~BlockQueue(){// 不用自己調用析構//_mutex.Destroy();//_product_cond.Destroy();//_consum_cond.Destroy();}private:std::queue<T> _block_queue;int _cap;Mutex _mutex;Cond _product_cond;Cond _consum_cond;int _product_wait_num;int _consum_wait_num;}; }#endif
//main.cpp#include "BlockQueue.hpp" #include <unistd.h> #include <cstdlib>using namespace myBlockQueue;template <typename T> class Data { public:Data(BlockQueue<T>* bq,const std::string name): _bq(bq),_name(name){}BlockQueue<T>* GetBlockQueue(){return _bq;}std::string GetName(){return _name;} private:BlockQueue<T>* _bq;std::string _name; };void* ProductStart(void* args) {Data<int> *da = (Data<int>*)args;BlockQueue<int>* bq = da->GetBlockQueue();std::string name = da->GetName();while(true){int val = rand() % 100 + 1;bq->Enqueue(val);printf("%s data:%d\n",name.c_str(),val);}return nullptr; }void* ConsumStart(void* args) {Data<int> *da = (Data<int>*)args;BlockQueue<int>* bq = da->GetBlockQueue();std::string name = da->GetName();while(true){int val;bq->Pop(&val);printf("%s data:%d\n",name.c_str(),val);}return nullptr; }int main() {BlockQueue<int> bq(5);pthread_t product1,product2,product3,product4,product5;pthread_t consum1,consum2,consum3,consum4,consum5;Data<int> data1(&bq,"product-1");Data<int> data2(&bq,"product-2");Data<int> data3(&bq,"product-3");Data<int> data4(&bq,"product-4");Data<int> data5(&bq,"product-5");Data<int> data6(&bq,"consum-1");Data<int> data7(&bq,"consum-2");Data<int> data8(&bq,"consum-3");Data<int> data9(&bq,"consum-4");Data<int> data10(&bq,"consum-5");pthread_create(&product1,nullptr,ProductStart,&data1);pthread_create(&product2,nullptr,ProductStart,&data2);pthread_create(&product3,nullptr,ProductStart,&data3);pthread_create(&product4,nullptr,ProductStart,&data4);pthread_create(&product5,nullptr,ProductStart,&data5);pthread_create(&consum1,nullptr,ConsumStart,&data6);pthread_create(&consum2,nullptr,ConsumStart,&data7);pthread_create(&consum3,nullptr,ConsumStart,&data8);pthread_create(&consum4,nullptr,ConsumStart,&data9);pthread_create(&consum5,nullptr,ConsumStart,&data10);pthread_join(product1,nullptr);pthread_join(product2,nullptr);pthread_join(product3,nullptr);pthread_join(product4,nullptr);pthread_join(product5,nullptr);pthread_join(consum1,nullptr);pthread_join(consum2,nullptr);pthread_join(consum3,nullptr);pthread_join(consum4,nullptr);pthread_join(consum5,nullptr);return 0; }
基于環形隊列的生產消費模型
POSIX信號量
POSIX信號量是多線程或多進程同步機制的重要工具,分為命名信號量和未命名信號量(基于內存的信號量)。命名信號量通過文件系統路徑名標識,可用于不相關進程間同步;未命名信號量通常用于同一進程內的線程間或相關進程間的同步。
主要接口函數:
sem_init 初始化未命名信號量。原型:
int sem_init(sem_t *sem, int pshared, unsigned int value);
pshared
為0表示線程間共享,非0表示進程間共享;value
為信號量初始值。成功返回0,失敗返回-1。sem_open 創建或打開命名信號量。原型:
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
name
為信號量路徑名,oflag
指定打開方式(如O_CREAT),mode
設置權限,value
為初始值。成功返回信號量指針,失敗返回SEM_FAILED。sem_wait 信號量P操作(減一)。原型:
int sem_wait(sem_t *sem);
若信號量值為0則阻塞,直到信號量值變為正。成功返回0,失敗返回-1。
sem_post 信號量V操作(加一)。原型:
int sem_post(sem_t *sem);
喚醒等待該信號量的線程/進程。成功返回0,失敗返回-1。
sem_getvalue 獲取信號量當前值。原型:
int sem_getvalue(sem_t *sem, int *sval);
成功時
sval
存儲信號量值并返回0,失敗返回-1。sem_close 關閉命名信號量。原型:
int sem_close(sem_t *sem);
釋放進程關聯的資源,但不銷毀信號量。成功返回0,失敗返回-1。
sem_unlink 刪除命名信號量。原型:
int sem_unlink(const char *name);
當所有進程都關閉信號量后,系統自動銷毀它。成功返回0,失敗返回-1。
sem_destroy 銷毀未命名信號量。原型:
int sem_destroy(sem_t *sem);
成功返回0,失敗返回-1。
命名信號量(進程間同步):
sem_t *sem = sem_open("/my_sem", O_CREAT, 0644, 1); sem_wait(sem); // 臨界區操作 sem_post(sem); sem_close(sem); sem_unlink("/my_sem");
未命名信號量(線程間同步):
sem_t sem; sem_init(&sem, 0, 1); sem_wait(&sem); // 臨界區操作 sem_post(&sem); sem_destroy(&sem);
注意事項
- 命名信號量的名稱通常以斜杠開頭(如
/sem_name
),且長度受系統限制。sem_destroy
調用前需確保沒有線程/進程在等待該信號量。- 多進程共享未命名信號量時,信號量需位于共享內存區域。
- 信號量操作是原子性的,但需注意錯誤處理(如檢查返回值)。
信號量封裝
#ifndef __MYSEM__ #define __MYSEM__#include <iostream> #include <semaphore.h>namespace SemModule {class Sem{public:Sem(int n) : _num(n){// 0表示線程間共享sem_init(&_sem, 0, _num);}// P操作void P(){sem_wait(&_sem);}// V操作void V(){sem_post(&_sem);}~Sem(){sem_destroy(&_sem);}int GetNum(){return _num;}private:sem_t _sem;int _num;}; }#endif
基于環形隊列的生產消費模型環形隊列采用數組模擬,用模運算來模擬環狀特性:
環形結構起始狀態和結束狀態都是一樣的,不好判斷為空或者為滿,所以可以通過加計數器或者標記位來判斷滿或者空。另外也可以預留一個空的位置,作為滿的狀態。
現在使用信號量這個計數器,就很簡單的進行多線程間的同步過程:
- 我們采用數組模擬,模運算來模擬環狀特性,為空或為滿時都指向同一個位置。
- 對于這個環形隊列起始時有空間N個,數據0個,對于生產者來說關注的是剩余空間,對于消費者來說關注的是剩余數據,任何人訪問臨界資源之前都必須申請信號量。
//生產: int tail = 0; P(空間); ring[tail] = data; tail++; V(數據);
//消費: int head = 0; P(數據); int data = ring[head]; head++; V(空間);
當生產消費同時訪問同一個位置時,只能是隊列為空或為滿:
如果隊列為空,此時需要保證生產者,原子性先生產 -- 此時數據信號量為0,空間信號量為N,數據信號量阻塞,這樣就形成生產消費同一位置互斥且同步,生產優先。
如果隊列為滿,此時需要保證消費者,原子性先消費 -- 此時空間信號量為0,數據信號量為N,空間信號量阻塞,這樣就形成生產消費同一位置互斥且同步,消費優先。當生產消費不是同一個位置時,即除了隊列為空或為滿時,其他情況都是生產消費不在同一個位置,生產者和消費者,可以同時進行并發訪問。
總結:為空為滿時,通過信號量實現互斥同步;其他情況,生產者和消費者并發進行。
三種關系:生產者之間互斥;消費者之間互斥;生產和消費互斥和同步。
需要鎖兩個:生產者與生產者之間加鎖;消費者與消費者之間加鎖。
需要信號量兩個:一個表示空間;一個表示數據。
代碼:
//RingQueue.hpp#ifndef __RINGQUEUE__ #define __RINGQUEUE__#include <iostream> #include <vector> #include <semaphore.h> #include <pthread.h> #include <string> #include <cstdlib> #include <unistd.h> #include "Sem.hpp" #include "Lock.hpp"namespace myRingQueue {using namespace LockModule;using namespace SemModule;template <typename T>class RingQueue{public:RingQueue(int cap) : _ring_queue(cap), _cap(cap), _productor_step(0), _consumer_step(0), _room_sem(cap), _data_sem(0){}// 生產者入隊void Enqueue(const T &in){_room_sem.P(); //這里申請信號量和上鎖位置可以互換但是存在區別:先上鎖則是訪問臨界資源和申請信號量串行處理;如果后上鎖則是訪問臨界資源和申請信號量并行處理,提高效率。{LockGuard lock(_productor_mutex);// 能進入這里此時一定有空間_ring_queue[_productor_step++] = in;_productor_step %= _cap;}_data_sem.V();}// 消費者出隊void Pop(T *out){_data_sem.P();{LockGuard lock(_consumer_mutex);*out = _ring_queue[_consumer_step++];_consumer_step %= _cap;}_room_sem.V();}~RingQueue(){}private:std::vector<T> _ring_queue; // 環形隊列int _cap; // 上線容量// 生產消費的下標int _productor_step;int _consumer_step;// 定義信號量Sem _room_sem; // 生產者關心Sem _data_sem; // 消費者關心// 鎖Mutex _productor_mutex;Mutex _consumer_mutex;}; }#endif
//Sem.hpp#ifndef __MYSEM__ #define __MYSEM__#include <iostream> #include <semaphore.h>namespace SemModule {class Sem{public:Sem(int n) : _num(n){// 0表示線程間共享sem_init(&_sem, 0, _num);}// P操作void P(){sem_wait(&_sem);}// V操作void V(){sem_post(&_sem);}~Sem(){sem_destroy(&_sem);}int GetNum(){return _num;}private:sem_t _sem;int _num;}; }#endif
//Lock.hpp#ifndef __LOCK__ #define __LOCK__#include <iostream> #include <pthread.h> #include <string>namespace LockModule {class Mutex{public:Mutex(const Mutex&) = delete;const Mutex& operator=(const Mutex&) = delete;Mutex(){int ret;if(ret = (pthread_mutex_init(&_mutex, nullptr)) != 0){std::cout << "Mutex init error" << std::endl;exit(EXIT_FAILURE);}}void Lock(){int ret;if(ret = (pthread_mutex_lock(&_mutex)) != 0){std::cout << "Mutex lock error" << std::endl;exit(EXIT_FAILURE);}}void Unlock(){int ret;if(ret = (pthread_mutex_unlock(&_mutex)) != 0){std::cout << "Mutex unlock error" << std::endl;exit(EXIT_FAILURE);}}~Mutex(){int ret;if(ret = (pthread_mutex_destroy(&_mutex)) != 0){std::cout << "Mutex destroy error" << std::endl;exit(EXIT_FAILURE);}}pthread_mutex_t* GetMutex(){return &_mutex;}private:pthread_mutex_t _mutex;};//采用RAII風格,進行鎖管理class LockGuard{public:LockGuard(Mutex &mutex):_mutex(mutex){_mutex.Lock();}//析構函數中解鎖~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;}; }#endif
//main.cpp#include "RingQueue.hpp"using namespace LockModule; using namespace myRingQueue; using namespace SemModule;template <typename T> class Data { public:Data(RingQueue<T>* rq,const std::string name): _rq(rq),_name(name){}RingQueue<T>* GetRingQueue(){return _rq;}std::string GetName(){return _name;} private:RingQueue<T>* _rq;std::string _name; };void *product(void *argc) {sleep(3);Data<int> *data = static_cast<Data<int>*>(argc);RingQueue<int> *rq = data->GetRingQueue();std::string name = data->GetName();int val = 1;while(true){rq->Enqueue(val++);std::cout << name << " product: " << val << std::endl;sleep(3);}return nullptr; }void *consum(void *argc) {sleep(10);Data<int> *data = static_cast<Data<int>*>(argc);RingQueue<int> *rq = data->GetRingQueue();std::string name = data->GetName();int val;while(true){rq->Pop(&val);std::cout << name << " consume: " << val << std::endl;sleep(1);}return nullptr; }int main() {RingQueue<int> rq(4);pthread_t product1,product2,product3,product4,product5;pthread_t consum1,consum2,consum3,consum4,consum5;Data<int> data1(&rq,"product-1");Data<int> data2(&rq,"product-2");Data<int> data3(&rq,"product-3");Data<int> data4(&rq,"product-4");Data<int> data5(&rq,"product-5");Data<int> data6(&rq,"consumer-1");Data<int> data7(&rq,"consumer-2");Data<int> data8(&rq,"consumer-3");Data<int> data9(&rq,"consumer-4");Data<int> data10(&rq,"consumer-5");pthread_create(&product1, nullptr, product, &data1);pthread_create(&product2, nullptr, product, &data2);pthread_create(&product3, nullptr, product, &data3);pthread_create(&product4, nullptr, product, &data4);pthread_create(&product5, nullptr, product, &data5);pthread_create(&consum1, nullptr, consum, &data6);pthread_create(&consum2, nullptr, consum, &data7);pthread_create(&consum3, nullptr, consum, &data8);pthread_create(&consum4, nullptr, consum, &data9);pthread_create(&consum5, nullptr, consum, &data10);pthread_join(product1, nullptr);pthread_join(product2, nullptr);pthread_join(product3, nullptr);pthread_join(product4, nullptr);pthread_join(product5, nullptr);pthread_join(consum1, nullptr);pthread_join(consum2, nullptr);pthread_join(consum3, nullptr);pthread_join(consum4, nullptr);pthread_join(consum5, nullptr);return 0; }
#makefilebin=ringbuffer_cp cc=g++ src=$(wildcard *.cpp) obj=$(src:.cc=.o)$(bin):$(obj)$(cc) -o $@ $^ -lpthread %.o:%.cc$(cc) -c $< -std=c++17.PHONY:clean clean:rm -f $(bin).PHONY:test test:echo $(src)echo $(obj)
8、策略模式 和 日志
日志:日志是系統、應用程序或設備在運行過程中自動生成的記錄文件,用于追蹤事件、狀態變化、錯誤信息或用戶操作。日志通常以時間順序存儲,包含時間戳、日志內容、日志等級等關鍵信息,幫助分析系統行為或排查問題。
日志格式必須有的指標:時間戳、日志等級、日志內容;
可選指標:文件名行號、進程線程相關id信息;
?日志有現成的解決方案,如:spdlog、glog、Boost.Log、Log4cxx等等,我們依舊采用自定義日志的方式。
采用日志格式:
[可讀性很好的時間] [日志等級] [進程pid] [打印對應的文件名] [行號] - 消息內容支持可變參數例如:[2011-11-11?11:11:11] [DEBUG] [111111] [main.cc] [11] - hello world
這里我們采用設計模式--策略模式來進行日志的設計。
?策略模式:策略模式(Strategy Pattern)屬于行為型設計模式,允許在運行時動態選擇算法或行為。它將算法封裝成獨立的類,使得算法可以獨立于使用它的客戶端變化。
核心思想:
- 定義策略接口:聲明所有具體策略類必須實現的方法。
- 具體策略類:實現策略接口,提供具體的算法實現。
- 上下文類(Context):持有一個策略對象的引用,通過委托調用具體策略。
基于策略模式實現的日志
策略模式實現結構圖:
日志記錄流程時序圖:
//Log.hpp#ifndef __LOG__ #define __LOG__#include <iostream> #include <string> #include <cstring> #include <cstdlib> #include <fstream> #include <sstream> #include <memory> #include <filesystem> #include <unistd.h> #include <time.h> #include "Lock.hpp"namespace LogModule {using namespace LockModule;//獲取一下當前系統時間std::string CurrentTime(){//時間戳time_t time_stamp = time(nullptr);struct tm curr;localtime_r(&time_stamp, &curr); //將時間戳轉化成可讀性較強的時間信息char buffer[1024];snprintf(buffer,sizeof(buffer),"%4d-%02d-%02d %02d:%02d:%02d",curr.tm_year + 1900,curr.tm_mon + 1,curr.tm_mday,curr.tm_hour,curr.tm_min,curr.tm_sec);return buffer;}const std::string defaultlogpath = "./log/";const std::string defaultlogname = "log.txt";enum class LogLevel{DEBUG = 1, INFO,WARNING,ERROR,FATAL};std::string LevelToString(LogLevel level){switch(level){case LogLevel::DEBUG:return "DEBUG";case LogLevel::INFO:return "INFO";case LogLevel::WARNING:return "WARNING";case LogLevel::ERROR:return "ERROR";case LogLevel::FATAL:return "FATAL";default:return "None";}}//刷新策略class LogStrategy{public:virtual ~LogStrategy() = default;virtual void SyncLog(const std::string &message) = 0;};//控制臺策略class ConsoleLogStrategy:public LogStrategy{public:ConsoleLogStrategy(){}~ConsoleLogStrategy(){}void SyncLog(const std::string &message){LockGuard lockGuard(_lock);std::cout << message << std::endl;}private:Mutex _lock;};//磁盤策略class FileLogStrategy:public LogStrategy{public:FileLogStrategy(const std::string &logpath = defaultlogpath, const std::string &logname = defaultlogname):_logpath(logpath),_logname(logname){//確定_logpath是否存在LockGuard lockGuard(_lock);if(std::filesystem::exists(_logpath)){return;}//不存在就嘗試創建路徑try{std::filesystem::create_directories(_logpath);}catch(std::filesystem::filesystem_error &e){std::cerr << e.what() << "\n";}}~FileLogStrategy(){}void SyncLog(const std::string &message){LockGuard lockGuard(_lock);std::string log = _logpath + _logname; //./log/log.txtstd::ofstream out(log, std::ios::app); //追加方式寫入//判斷是否打開if(!out.is_open()){return;}out << message << "\n";out.close();}private:std::string _logpath;std::string _logname;Mutex _lock;};//日志類:構建日志字符串,根據策略,進行刷新class Logger{public:Logger(){//默認采用控制臺策略_strategy = std::make_shared<ConsoleLogStrategy>();}//啟動控制臺策略void EnableConsoleLog(){_strategy = std::make_shared<ConsoleLogStrategy>();}//啟動磁盤策略void EnableFileLog(){_strategy = std::make_shared<FileLogStrategy>();}~Logger(){}//內部類:構建字符串class LogMessage{public:LogMessage(LogLevel level,const std::string &filename,int line,Logger &logger):_level(level),_currtime(CurrentTime()),_pid(getpid()),_filename(filename),_line(line),_logger(logger){std::stringstream ssbuffer;ssbuffer << "[" << _currtime << "] "<< "[" << LevelToString(_level) << "] "<< "[" << _pid << "] "<< "[" << _filename << "] "<< "[" << _line << "] - ";_loginfo = ssbuffer.str();}template <typename T>LogMessage &operator<<(const T &info){std::stringstream ss;ss << info;_loginfo += ss.str();return *this;}~LogMessage(){if(_logger._strategy)_logger._strategy->SyncLog(_loginfo);}private:std::string _currtime;//當前日志的時間LogLevel _level; //日志等級pid_t _pid; //進程pidstd::string _filename;//源文件名稱int _line; //日志所在的行號Logger &_logger; //負責根據不同的策略進行刷新std::string _loginfo; //一條完整的日志記錄};LogMessage operator()(LogLevel level,const std::string &filename, int line){return LogMessage(level,filename,line,*this);}private: std::shared_ptr<LogStrategy> _strategy; //日志刷新的策略方案};//全局一個logger對象Logger logger;//__FILE__:表示當前文件名,__LINE__:表示當前調用所在行號#define LOG(level) logger(level,__FILE__,__LINE__)#define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog()#define ENABLE_FILE_LOG() logger.EnableFileLog() }#endif
//Lock.hpp#pragma once#include <iostream> #include <pthread.h> #include <string>namespace LockModule {class Mutex{public:Mutex(const Mutex&) = delete;const Mutex& operator=(const Mutex&) = delete;Mutex(){int ret;if(ret = (pthread_mutex_init(&_mutex, nullptr)) != 0){std::cout << "Mutex init error" << std::endl;exit(EXIT_FAILURE);}}void Lock(){int ret;if(ret = (pthread_mutex_lock(&_mutex)) != 0){std::cout << "Mutex lock error" << std::endl;exit(EXIT_FAILURE);}}void Unlock(){int ret;if(ret = (pthread_mutex_unlock(&_mutex)) != 0){std::cout << "Mutex unlock error" << std::endl;exit(EXIT_FAILURE);}}~Mutex(){int ret;if(ret = (pthread_mutex_destroy(&_mutex)) != 0){std::cout << "Mutex destroy error" << std::endl;exit(EXIT_FAILURE);}}pthread_mutex_t* GetMutex(){return &_mutex;}private:pthread_mutex_t _mutex;};//采用RAII風格,進行鎖管理class LockGuard{public:LockGuard(Mutex &mutex):_mutex(mutex){_mutex.Lock();}//析構函數中解鎖~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;}; }
//main.cpp#include "Log.hpp"using namespace LogModule;int main() {logger.EnableFileLog();LOG(LogLevel::DEBUG) << "Hello Chen Sun";logger.EnableConsoleLog();LOG(LogLevel::INFO) << "Hello CHEN SUN";return 0; }
9、線程池
概念
線程池:線程池是一種多線程處理形式,通過預先創建并管理一組線程,避免頻繁創建和銷毀線程的開銷。線程池的核心思想是將任務提交到隊列中,由池中的線程自動執行,提高系統資源利用率和響應速度。
核心組件:
- 任務隊列:存放待執行的任務,通常采用阻塞隊列實現。
- 工作線程:池中實際執行任務的線程,循環從隊列中獲取任務并運行。
- 線程池管理器:負責創建、銷毀線程,以及動態調整池的大小。
線程池的應用場景:
- 需要大量的線程來完成任務,且完成任務的時間比較短。比如WEB服務器完成網頁請求這樣的任務,使用線程池技術是非常合適的。因為單個任務小,而任務數量巨大,你可以想象一個熱門網站的點擊次數。但對于長時間的任務,比如一個Telnet連接請求,線程池的優點就不明顯了。因為Telnet會話時間比線程的創建時間大多了。
- 對性能要求苛刻的應用,比如要求服務器迅速響應客戶請求。
- 接受突發性的大量請求,但不至于使服務器因此產生大量線程的應用。突發性大量客戶請求,在沒有線程池情況下,將產生大量線程,雖然理論上大部分操作系統線程數目最大值不是問題,短時間內產生大量線程可能使內存到達極限,出現錯誤。
線程池的種類:
- 固定大小線程池(FixedThreadPool):創建固定數量線程池,循環從任務隊列中獲取任務對象,獲取到任務對象后,執行任務對象中的任務接口
- 緩存線程池(CachedThreadPool):線程數量動態調整,空閑線程會被回收。適用于短期異步任務或負載波動大的場景,但可能因任務暴增耗盡資源。
線程池的優勢:
- 降低資源消耗:復用已創建的線程,減少線程創建和銷毀的開銷。
- 提高響應速度:任務到達時直接由空閑線程執行,無需等待線程創建。
- 增強可管理性:統一監控和調優線程數量,避免無限制創建線程導致系統崩潰。
?線程池設計
實現固定大小線程池,預先創建若干個工作線程;同時創建一個阻塞隊列作為任務隊列,存放待執行的任務;循環從隊列中獲取任務并讓線程池中某個線程運行。
//ThreadPool.hpp#ifndef __THREADPOOL__ #define __THREADPOOL__#include <iostream> #include <string> #include <cstring> #include <queue> #include <vector> #include <unistd.h> #include <memory> #include "Cond.hpp" #include "Lock.hpp" #include "Log.hpp" #include "Task.hpp" #include "Thread.hpp"namespace ThreadPoolModule {using namespace LogModule;using namespace LockModule;using namespace CondModule;using namespace ThreadModule; //用于接收單個線程的指針using thread_t = std::shared_ptr<Thread<std::string>>;const static int defaultnum = 5;template<typename T>class ThreadPool{private://判斷任務隊列是否為空bool IsEmpty(){return _taskq.empty();}//線程的執行邏輯 -- 1、拿任務;2、執行任務void HandlerTask(std::string name){LOG(LogLevel::INFO) << "Thread:" << name << " 開始運行";while(true){//1、拿任務T t;{LockGuard lockguard(_lock);//任務隊列如果為空且線程池運行(如果線程池停止,就不能繼續等待)while(IsEmpty() && _isrunning){_wait_num++;_cond.Wait(_lock);_wait_num--;}//如果線程池停止了,此時存在兩個情況:\1、任務隊列不為空--繼續執行任務隊列的任務;2、任務隊列為空--直接退出即可if(!_isrunning && IsEmpty())break;t = _taskq.front();_taskq.pop();}//2、處理任務 -- 這里不需要加鎖,可以并發執行任務t(name);}LOG(LogLevel::INFO) << "thread:" << name << "退出";}public:ThreadPool(int num_thread = defaultnum):_thread_num(num_thread),_wait_num(0),_isrunning(false){//創建自定義線程類對象for(int i = 0; i < _thread_num; i++){_threads.push_back(std::make_shared<Thread<std::string>>(std::bind(&ThreadPool::HandlerTask,this,std::placeholders::_1),"Thread-" + std::to_string(i + 1)));LOG(LogLevel::INFO) << "構建線程" << _threads.back()->Name() << "對象...成功";}}//任務加入任務隊列void Equeue(T &&in){LockGuard lockguard(_lock);//如果線程池停止,不能繼續加入任務if(!_isrunning) return;_taskq.push(std::move(in));//通知線程隊列已有任務,可以獲取if(_wait_num > 0)_cond.Notify();}void Start(){if(_isrunning) return;LockGuard lockguard(_lock);_isrunning = true;for(auto &thread_ptr:_threads){LOG(LogLevel::INFO) << "啟動線程" << thread_ptr->Name() << "...成功";thread_ptr->Start(); //啟動線程}}//執行剩下所有任務,停止線程池void Stop(){ LockGuard lockguard(_lock);if(_isrunning){_isrunning = false;//喚醒所有線程去執行剩下的任務if(_wait_num > 0)_cond.NotifyAll();}}//回收線程void Wait(){for(auto &thread_ptr:_threads){thread_ptr->Join();LOG(LogLevel::INFO) << "回收線程" << thread_ptr->Name() << "...成功";}}~ThreadPool(){//在析構函數中調用Stop和WaitStop();sleep(3);Wait();}private:std::vector<thread_t> _threads; //線程數組int _thread_num; //線程數量std::queue<T> _taskq; //任務隊列int _wait_num; //任務等待數量Mutex _lock;Cond _cond;bool _isrunning;}; }#endif
//ThreadPool.cpp#include "ThreadPool.hpp" #include "Task.hpp" #include <memory>using namespace ThreadPoolModule;int main() {//ENABLE_CONSOLE_LOG();ENABLE_FILE_LOG();std::unique_ptr<ThreadPool<task_t>> tp = std::make_unique<ThreadPool<task_t>>();tp->Start();int cnt = 10;while (cnt){tp->Equeue(PushTask);cnt--;sleep(1);}return 0; }
//Thread.hpp#include <iostream> #include <cstring> #include <cstdlib> #include <stdlib.h> #include <string> #include <pthread.h> #include <functional> #include <atomic> #include <unistd.h>namespace ThreadModule {std::atomic<std::uint32_t> cnt(1); // 原子計數器,用于形成線程編號enum class TSTATUS{NEW,RUNNING,STOP};//封裝自己的線程類template <typename T>class Thread{using func_t = std::function<void(T)>;private:static void *Routine(void* args){Thread<T>* t = static_cast<Thread<T>*>(args);t->_func(t->_data);return nullptr;}public:Thread(func_t func, T data):_func(func),_data(data),_status(TSTATUS::NEW),_joinable(true){_name = "Thread-" + std::to_string(cnt++);_pid = getpid();}bool Start(){//避免多次啟動if(_status == TSTATUS::RUNNING){std::cout << _name << "啟動失敗,當前線程已經執行" << std::endl;return false;}else{if(_status == TSTATUS::STOP)std::cout << "正在重新啟動線程" << std::endl;int ret;std::cout << _name << "已啟動" << std::endl;_status = TSTATUS::RUNNING;if((ret = pthread_create(&_tid,nullptr,Routine,this)) != 0){fprintf(stderr,"phread_create:%s\n",strerror(ret));return false;}return true;}return false;}bool Stop(){if(_status == TSTATUS::RUNNING){int ret;if((ret = pthread_cancel(_tid)) != 0){fprintf(stderr,"pthread_cancel:%s\n",strerror(ret));return false;}_status = TSTATUS::STOP;std::cout << _name << "已停止" << std::endl;return true;}return false;}bool Join(){if(_joinable){int ret;if((ret = pthread_join(_tid,nullptr)) != 0){fprintf(stderr,"pthread_join:%s\n",strerror(ret));return false;}_status = TSTATUS::STOP;std::cout << _name << "資源已被回收" << std::endl;return true;}return false;}void Detach(){_joinable = false;pthread_detach(_tid);}bool IsJoinable(){return _joinable; }std::string Name(){return _name;}~Thread(){}public:std::string _name;//線程名pthread_t _tid;//線程idpid_t _pid;//進程idbool _joinable;//是否可以分離func_t _func;//函數TSTATUS _status;//狀態T _data;}; }
//Task.hpp#ifndef __TASK__ #define __TASK__#include <iostream> #include <string> #include <functional> #include "Log.hpp"using namespace LogModule;using task_t = std::function<void(std::string name)>;void PushTask(std::string name) {LOG(LogLevel::DEBUG) << "我是一個推送數據到服務器的一個任務,我正在被執行" << '[' << name << ']'; }#endif
//Log.hpp#ifndef __LOG__ #define __LOG__#include <iostream> #include <string> #include <cstring> #include <cstdlib> #include <fstream> #include <sstream> #include <memory> #include <filesystem> #include <unistd.h> #include <time.h> #include "Lock.hpp"namespace LogModule {using namespace LockModule;//獲取一下當前系統時間std::string CurrentTime(){//時間戳time_t time_stamp = time(nullptr);struct tm curr;localtime_r(&time_stamp, &curr); //將時間戳轉化成可讀性較強的時間信息char buffer[1024];snprintf(buffer,sizeof(buffer),"%4d-%02d-%02d %02d:%02d:%02d",curr.tm_year + 1900,curr.tm_mon + 1,curr.tm_mday,curr.tm_hour,curr.tm_min,curr.tm_sec);return buffer;}const std::string defaultlogpath = "./log/";const std::string defaultlogname = "log.txt";enum class LogLevel{DEBUG = 1, INFO,WARNING,ERROR,FATAL};std::string LevelToString(LogLevel level){switch(level){case LogLevel::DEBUG:return "DEBUG";case LogLevel::INFO:return "INFO";case LogLevel::WARNING:return "WARNING";case LogLevel::ERROR:return "ERROR";case LogLevel::FATAL:return "FATAL";default:return "None";}}//刷新策略class LogStrategy{public:virtual ~LogStrategy() = default;virtual void SyncLog(const std::string &message) = 0;};//控制臺策略class ConsoleLogStrategy:public LogStrategy{public:ConsoleLogStrategy(){}~ConsoleLogStrategy(){}void SyncLog(const std::string &message){LockGuard lockGuard(_lock);std::cout << message << std::endl;}private:Mutex _lock;};//磁盤策略class FileLogStrategy:public LogStrategy{public:FileLogStrategy(const std::string &logpath = defaultlogpath, const std::string &logname = defaultlogname):_logpath(logpath),_logname(logname){//確定_logpath是否存在LockGuard lockGuard(_lock);if(std::filesystem::exists(_logpath)){return;}//不存在就嘗試創建路徑try{std::filesystem::create_directories(_logpath);}catch(std::filesystem::filesystem_error &e){std::cerr << e.what() << "\n";}}~FileLogStrategy(){}void SyncLog(const std::string &message){LockGuard lockGuard(_lock);std::string log = _logpath + _logname; //./log/log.txtstd::ofstream out(log, std::ios::app); //追加方式寫入//判斷是否打開if(!out.is_open()){return;}out << message << "\n";out.close();}private:std::string _logpath;std::string _logname;Mutex _lock;};//日志類:構建日志字符串,根據策略,進行刷新class Logger{public:Logger(){//默認采用控制臺策略_strategy = std::make_shared<ConsoleLogStrategy>();}//啟動控制臺策略void EnableConsoleLog(){_strategy = std::make_shared<ConsoleLogStrategy>();}//啟動磁盤策略void EnableFileLog(){_strategy = std::make_shared<FileLogStrategy>();}~Logger(){}//內部類:構建字符串class LogMessage{public:LogMessage(LogLevel level,const std::string &filename,int line,Logger &logger):_level(level),_currtime(CurrentTime()),_pid(getpid()),_filename(filename),_line(line),_logger(logger){std::stringstream ssbuffer;ssbuffer << "[" << _currtime << "] "<< "[" << LevelToString(_level) << "] "<< "[" << _pid << "] "<< "[" << _filename << "] "<< "[" << _line << "] - ";_loginfo = ssbuffer.str();}template <typename T>LogMessage &operator<<(const T &info){std::stringstream ss;ss << info;_loginfo += ss.str();return *this;}~LogMessage(){if(_logger._strategy)_logger._strategy->SyncLog(_loginfo);}private:std::string _currtime;//當前日志的時間LogLevel _level; //日志等級pid_t _pid; //進程pidstd::string _filename;//源文件名稱int _line; //日志所在的行號Logger &_logger; //負責根據不同的策略進行刷新std::string _loginfo; //一條完整的日志記錄};LogMessage operator()(LogLevel level,const std::string &filename, int line){return LogMessage(level,filename,line,*this);}private: std::shared_ptr<LogStrategy> _strategy; //日志刷新的策略方案};//全局一個logger對象Logger logger;//__FILE__:表示當前文件名,__LINE__:表示當前調用所在行號#define LOG(level) logger(level,__FILE__,__LINE__)#define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog()#define ENABLE_FILE_LOG() logger.EnableFileLog() }#endif
//Lock.hpp#pragma once#include <iostream> #include <pthread.h> #include <string>namespace LockModule {class Mutex{public:Mutex(const Mutex&) = delete;const Mutex& operator=(const Mutex&) = delete;Mutex(){int ret;if(ret = (pthread_mutex_init(&_mutex, nullptr)) != 0){std::cout << "Mutex init error" << std::endl;exit(EXIT_FAILURE);}}void Lock(){int ret;if(ret = (pthread_mutex_lock(&_mutex)) != 0){std::cout << "Mutex lock error" << std::endl;exit(EXIT_FAILURE);}}void Unlock(){int ret;if(ret = (pthread_mutex_unlock(&_mutex)) != 0){std::cout << "Mutex unlock error" << std::endl;exit(EXIT_FAILURE);}}~Mutex(){int ret;if(ret = (pthread_mutex_destroy(&_mutex)) != 0){std::cout << "Mutex destroy error" << std::endl;exit(EXIT_FAILURE);}}pthread_mutex_t* GetMutex(){return &_mutex;}private:pthread_mutex_t _mutex;};//采用RAII風格,進行鎖管理class LockGuard{public:LockGuard(Mutex &mutex):_mutex(mutex){_mutex.Lock();}//析構函數中解鎖~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;}; }
//Cond.hpp#ifndef __MYCOND__HPP__ #define __MYCOND__HPP__#include <iostream> #include <pthread.h> #include <cstring> #include "Lock.hpp"namespace CondModule {using namespace LockModule;class Cond{public:Cond(){int ret = pthread_cond_init(&_cond,nullptr);if(ret != 0){fprintf(stderr,"pthread_cond_init:%s\n",strerror(ret));exit(EXIT_FAILURE);}}void Wait(Mutex &mutex){int ret = pthread_cond_wait(&_cond,mutex.GetMutex());if(ret != 0){fprintf(stderr,"pthread_cond_wait:%s\n",strerror(ret));exit(EXIT_FAILURE);}}void Notify(){int ret = pthread_cond_signal(&_cond);if(ret != 0){fprintf(stderr,"pthread_cond_signal:%s\n",strerror(ret));exit(EXIT_FAILURE);}}void NotifyAll(){int ret = pthread_cond_broadcast(&_cond);if(ret != 0){fprintf(stderr,"pthread_cond_broadcast:%s\n",strerror(ret));exit(EXIT_FAILURE);}}~Cond(){int ret = pthread_cond_destroy(&_cond);if(ret != 0){fprintf(stderr,"pthread_cond_destroy:%s\n",strerror(ret));exit(EXIT_FAILURE);}}private:pthread_cond_t _cond;}; }#endif
#makefilebin=thread_pool cc=g++ src=$(wildcard *.cpp) obj=$(src:.cc=.o)$(bin):$(obj)$(cc) -o $@ $^ -lpthread %.o:%.cc$(cc) -c $< -std=c++17.PHONY:clean clean:rm -f $(bin) $(obj).PHONY:test test:echo $(src)echo $(obj)
[2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [75] - 構建線程Thread-1對象...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [75] - 構建線程Thread-2對象...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [75] - 構建線程Thread-3對象...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [75] - 構建線程Thread-4對象...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [75] - 構建線程Thread-5對象...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [96] - 啟動線程Thread-1...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [96] - 啟動線程Thread-2...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [41] - Thread:Thread-1 開始運行 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [96] - 啟動線程Thread-3...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [96] - 啟動線程Thread-4...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [41] - Thread:Thread-3 開始運行 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [41] - Thread:Thread-2 開始運行 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [96] - 啟動線程Thread-5...成功 [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [41] - Thread:Thread-5 開始運行 [2025-06-17 16:56:51] [DEBUG] [11927] [Task.hpp] [15] - 我是一個推送數據到服務器的一個任務,我正在被執行[Thread-5] [2025-06-17 16:56:51] [INFO] [11927] [ThreadPool.hpp] [41] - Thread:Thread-4 開始運行 [2025-06-17 16:56:52] [DEBUG] [11927] [Task.hpp] [15] - 我是一個推送數據到服務器的一個任務,我正在被執行[Thread-5] [2025-06-17 16:56:53] [DEBUG] [11927] [Task.hpp] [15] - 我是一個推送數據到服務器的一個任務,我正在被執行[Thread-1] [2025-06-17 16:56:54] [DEBUG] [11927] [Task.hpp] [15] - 我是一個推送數據到服務器的一個任務,我正在被執行[Thread-3] [2025-06-17 16:56:55] [DEBUG] [11927] [Task.hpp] [15] - 我是一個推送數據到服務器的一個任務,我正在被執行[Thread-4] [2025-06-17 16:56:56] [DEBUG] [11927] [Task.hpp] [15] - 我是一個推送數據到服務器的一個任務,我正在被執行[Thread-2] [2025-06-17 16:56:57] [DEBUG] [11927] [Task.hpp] [15] - 我是一個推送數據到服務器的一個任務,我正在被執行[Thread-5] [2025-06-17 16:56:58] [DEBUG] [11927] [Task.hpp] [15] - 我是一個推送數據到服務器的一個任務,我正在被執行[Thread-1] [2025-06-17 16:56:59] [DEBUG] [11927] [Task.hpp] [15] - 我是一個推送數據到服務器的一個任務,我正在被執行[Thread-3] [2025-06-17 16:57:00] [DEBUG] [11927] [Task.hpp] [15] - 我是一個推送數據到服務器的一個任務,我正在被執行[Thread-4] [2025-06-17 16:57:01] [INFO] [11927] [ThreadPool.hpp] [65] - thread:Thread-4退出 [2025-06-17 16:57:01] [INFO] [11927] [ThreadPool.hpp] [65] - thread:Thread-2退出 [2025-06-17 16:57:01] [INFO] [11927] [ThreadPool.hpp] [65] - thread:Thread-5退出 [2025-06-17 16:57:01] [INFO] [11927] [ThreadPool.hpp] [65] - thread:Thread-1退出 [2025-06-17 16:57:01] [INFO] [11927] [ThreadPool.hpp] [65] - thread:Thread-3退出 [2025-06-17 16:57:04] [INFO] [11927] [ThreadPool.hpp] [118] - 回收線程Thread-1...成功 [2025-06-17 16:57:04] [INFO] [11927] [ThreadPool.hpp] [118] - 回收線程Thread-2...成功 [2025-06-17 16:57:04] [INFO] [11927] [ThreadPool.hpp] [118] - 回收線程Thread-3...成功 [2025-06-17 16:57:04] [INFO] [11927] [ThreadPool.hpp] [118] - 回收線程Thread-4...成功 [2025-06-17 16:57:04] [INFO] [11927] [ThreadPool.hpp] [118] - 回收線程Thread-5...成功
10、線程安全的單例模式?
單例模式:某些類,只應該具有一個對象(實例),就稱之為單例;在很多服務器開發場景中,經常需要讓服務器加載很多的數據到內存中,此時往往要用一個單例的類來管理這些數據。
餓漢實現方式和懶漢實現方式:?
- 餓漢方式在類加載時就完成單例對象的初始化,利用類加載機制保證線程安全。
template<typename T> class Singleton {static T data; public:static T* GetInstance(){return &data;} };//只通過Singleton這個包裝類來使用T對象,則一個進程中只有一個T對象的實例。
- 懶漢模式延遲單例對象的初始化,僅在首次調用時創建實例。
template <typename T> class Singleton {static T* inst; public:static T* GetInstance(){if(inst == nullptr)inst = new T();return inst;} }; //但這個懶漢模式線程不安全,可能存在兩個線程同時調用,可能會創建出兩份T對象的實例。//懶漢模式,線程安全版本 template <typename T> class Singleton {volatile static T *inst; //需要設置volatile關鍵字,否則可能被編譯器優化。static std::mutex lock; //上面自定義的鎖 public:static T *GetInstance(){if(inst == nullptr){lock.lock();if(inst == nullptr)//雙重判斷是為了不必要的鎖競爭inst = new T();lock.unlock();}return inst;} };
單例式線程池:
//ThreadPool.hpp#ifndef __THREADPOOL__
#define __THREADPOOL__#include <iostream>
#include <string>
#include <cstring>
#include <queue>
#include <vector>
#include <unistd.h>
#include <memory>
#include "Cond.hpp"
#include "Lock.hpp"
#include "Log.hpp"
#include "Task.hpp"
#include "Thread.hpp"namespace ThreadPoolModule
{using namespace LogModule;using namespace LockModule;using namespace CondModule;using namespace ThreadModule;// 用于接收單個線程的指針using thread_t = std::shared_ptr<Thread<std::string>>;const static int defaultnum = 5;template <typename T>class ThreadPool{private://復制拷貝禁用ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;ThreadPool(const Thread<T>&) = delete;//構造設置為私有ThreadPool(int num_thread = defaultnum): _thread_num(num_thread), _wait_num(0), _isrunning(false){// 創建自定義線程類對象for (int i = 0; i < _thread_num; i++){_threads.push_back(std::make_shared<Thread<std::string>>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), "Thread-" + std::to_string(i + 1)));LOG(LogLevel::INFO) << "構建線程" << _threads.back()->Name() << "對象...成功";}}// 判斷任務隊列是否為空bool IsEmpty(){return _taskq.empty();}// 線程的執行邏輯 -- 1、拿任務;2、執行任務void HandlerTask(std::string name){LOG(LogLevel::INFO) << "Thread:" << name << " 開始運行";while (true){// 1、拿任務T t;{LockGuard lockguard(_lock);// 任務隊列如果為空且線程池運行(如果線程池停止,就不能繼續等待)while (IsEmpty() && _isrunning){_wait_num++;_cond.Wait(_lock);_wait_num--;}//如果線程池停止了,此時存在兩個情況:\1、任務隊列不為空--繼續執行任務隊列的任務;2、任務隊列為空--直接退出即可if (!_isrunning && IsEmpty())break;t = _taskq.front();_taskq.pop();}// 2、處理任務 -- 這里不需要加鎖,可以并發執行任務t(name);}LOG(LogLevel::INFO) << "thread:" << name << "退出";}public:static ThreadPool<T> *GetInstace(int num = defaultnum){//懶漢方式if(_instace == nullptr){LockGuard lockguard(_mutex);if(_instace == nullptr) //雙重判斷避免不必要的鎖爭搶問題{_instace = new ThreadPool<T>(num);LOG(LogLevel::DEBUG) << "創建線程池單例";return _instace;}}LOG(LogLevel::DEBUG) << "獲取線程池單例";return _instace;}// 任務加入任務隊列void Equeue(T &&in){LockGuard lockguard(_lock);// 如果線程池停止,不能繼續加入任務if (!_isrunning)return;_taskq.push(std::move(in));// 通知線程隊列已有任務,可以獲取if (_wait_num > 0)_cond.Notify();}void Start(){if (_isrunning)return;LockGuard lockguard(_lock);_isrunning = true;for (auto &thread_ptr : _threads){LOG(LogLevel::INFO) << "啟動線程" << thread_ptr->Name() << "...成功";thread_ptr->Start(); // 啟動線程}}// 執行剩下所有任務,停止線程池void Stop(){LockGuard lockguard(_lock);if (_isrunning){_isrunning = false;// 喚醒所有線程去執行剩下的任務if (_wait_num > 0)_cond.NotifyAll();}}// 回收線程void Wait(){for (auto &thread_ptr : _threads){thread_ptr->Join();LOG(LogLevel::INFO) << "回收線程" << thread_ptr->Name() << "...成功";}}~ThreadPool(){}private:std::vector<thread_t> _threads; // 線程數組int _thread_num; // 線程數量std::queue<T> _taskq; // 任務隊列int _wait_num; // 任務等待數量Mutex _lock;Cond _cond;bool _isrunning;//添加單例模式static ThreadPool<T> *_instace;static Mutex _mutex;};//初始化template<typename T>ThreadPool<T> *ThreadPool<T>::_instace = nullptr;template<typename T>Mutex ThreadPool<T>::_mutex;
}#endif
//ThreadPool.cpp#include "ThreadPool.hpp"
#include "Task.hpp"
#include <memory>using namespace ThreadPoolModule;int main()
{ENABLE_CONSOLE_LOG();//ENABLE_FILE_LOG();int cnt = 20;//首次調用創建唯一對象ThreadPool<task_t>::GetInstace(10)->Start();while (cnt){ThreadPool<task_t>::GetInstace()->Equeue(PushTask);cnt--;sleep(1);}ThreadPool<task_t>::GetInstace()->Stop();sleep(3);ThreadPool<task_t>::GetInstace()->Wait();return 0;
}
11、線程安全和重入問題
線程安全:就是多個線程在訪問共享資源時,能夠正確地執行,不會相互干擾或破壞彼此的執行結
果。一般而言,多個線程并發同一段只有局部變量的代碼時,不會出現不同的結果。但是對全局變量或者靜態變量進行操作,并且沒有鎖保護的情況下,容易出現該問題。
重入:同一個函數被不同的執行流調用,當前一個流程還沒有執行完,就有其他的執行流再次進入,我們稱之為重入。一個函數在重入的情況下,運行結果不會出現任何不同或者任何問題,則該函數被稱為可重入函數,否則,是不可重入函數。
重入可以分為兩種情況:
- 多線程重入函數
- 信號導致一個執行流重復進入函數
常見的線程不安全的情況:
- 不保護共享變量的函數
- 函數狀態隨著被調用,狀態發生變化的函數
- 返回指向靜態變量指針的函數
- 調用線程不安全函數的函數
常見的線程安全的情況:
- 每個線程對全局變量或者靜態變量只有讀取的權限,而沒有寫入的權限,一般來說這些線程是安全的
- 類或者接口對于線程來說都是原子操作
- 多個線程之間的切換不會導致該接口的執行結果存在二義性
常見不可重入的情況:
- 調用了malloc/free函數,因為malloc函數就是用全局鏈表來管理堆的
- 調用了標準I/O庫函數,標準I/O庫的很多實現都以不可重入的方式使用全局數據結構
- 可重入函數體內使用了靜態的數據結構
常見可重入的情況:
- 不使用全局變量或靜態變量
- 不使用malloc或者new開辟出的空間
- 不調用不可重入函數
- 不返回靜態或全局數據,所有數據都有函數的調用者提供
- 使用本地數據,或者通過制作全局數據的本地拷貝來保護全局數據
可重入與線程安全聯系:
- 函數是可重入的,那就是線程安全的
- 函數是不可重入的,那就不能由多個線程使用,有可能引發線程安全問題
- 如果一個函數中有全局變量,那么這個函數既不是線程安全也不是可重入的
可重入與線程安全區別:
- 可重入函數是線程安全函數的一種
- 線程安全不一定是可重入的,而可重入函數則一定是線程安全的
- 如果將對臨界資源的訪問加上鎖,則這個函數是線程安全的,但如果這個重入函數若鎖還未釋放則會產生死鎖,因此是不可重入
12、死鎖
1、死鎖概念
- 死鎖是指在一組進程中的各個進程均占有不會釋放的資源,但因互相申請被其他進程所占用不會釋放的資源而處于的一種永久等待狀態。
- 為了方便表述,假設現在線程A,線程B,一個線程必須同時持有鎖1和鎖2,才能進行后續資源的訪問:
申請一把鎖是原子的,但申請兩把鎖就不一定是原子的了
可能造成結果就是:
- 死鎖的四個必要條件:
1、互斥條件:一個資源每次只能被一個執行流使用
2、請求與保持條件:一個執行流因請求資源而阻塞時,對以獲得的資源保持不放
3、不剝奪條件:一個執行流已獲得的資源,在未使用完之前,不能強行剝奪
4、循環等待條件:若干執行流之間形成一種頭尾相接的循環等待資源的關系
2、破壞死鎖?
死鎖的產生需要滿足四個必要條件:互斥條件、占有并等待、非搶占條件和循環等待。破壞其中任何一個條件即可避免死鎖。
- 破壞互斥條件:互斥條件指資源一次只能被一個進程占用。破壞互斥條件可以通過允許多個進程共享資源,但某些資源(如打印機)無法共享,因此該方法適用性有限。
- 破壞占有并等待條件:
一次性申請所有資源:進程在運行前申請所有所需資源,否則不執行;
釋放已占有資源:若進程無法獲得新資源,則釋放已占有資源并重新申請。 - 破壞非搶占條件:
若進程無法獲得資源,則釋放已占有資源,等待一段時間后重新申請。
操作系統強制回收資源,可能導致進程回滾。 - 破壞循環等待條件:
資源有序分配法:為資源編號,進程只能按編號順序申請資源。
超時機制。
銀行家算法:動態檢查資源分配狀態,避免進入不安全狀態。
2.1 銀行家算法
具體參考:操作系統——銀行家算法(Banker's Algorithm) - 王陸 - 博客園
13、讀寫鎖
讀者寫者問題:
- 三種關系:讀者和讀者--并發關系;讀者和寫者--互斥&&同步;寫者和寫者--互斥;
- 兩種角色:讀者和寫者;
- 一個交易場所:緩沖區
偽代碼:
uint32_t reader_count = 0; lock_t count_lock; lock_t writer_lock;//Reader //加鎖 lock(count_lock); if(read_count == 0)lock(writer_lock); //只需要第一個讀者上鎖 ++reader_count; ulock(count_lock); //read lock(count_lock) --reader_count; if(reader_count == 0)unlock(writer_lock); ulock(count_lock);//Writer lock(writer_lock); //write unlock(writer_lock);
讀寫鎖:
在編寫多線程的時候,有一種情況是十分常見的。那就是,有些公共數據修改的機會比較少。相比較改寫,它們讀的機會反而高的多。通常而言,在讀的過程中,往往伴隨著查找的操作,中間耗時很長。給這種代碼段加鎖,會極大地降低我們程序的效率;讀寫鎖就是專門處理這種多讀少寫的情況。
|
讀者優先(Reader-Preference):在這種策略中,系統會盡可能多地允許多個讀者同時訪問資源(比如共享文件或數據),而不會優先考慮寫者。這意味著當有讀者正在讀取時,新到達的讀者會立即被允許進入讀取區,而寫者則會被阻塞,直到所有讀者都離開讀取區。讀者優先策略可能會導致寫者饑餓(即寫者長時間無法獲得寫入權限),特別是當讀者頻繁到達時。
?寫者優先(Writer-Preference):在這種策略中,系統會優先考慮寫者。當寫者請求寫入權限時,系統會盡快地讓寫者進入寫入區,即使此時有讀者正在讀取。這通常意味著一旦有寫者到達,所有后續的讀者都會被阻塞,直到寫者完成寫入并離開寫入區。寫者優先策略可以減少寫者等待的時間,但可能會導致讀者饑餓(即讀者長時間無法獲得讀取權限),特別是當寫者頻繁到達時。
讀寫鎖接口:讀寫鎖(Reader-Writer Lock)接口,用于實現多線程環境下的高效共享資源訪問。讀寫鎖允許并發讀操作,但寫操作必須獨占訪問。
Linux讀寫鎖接口
Linux提供了讀寫鎖(Reader-Writer Lock)的接口,用于實現多線程環境下的高效共享資源訪問。讀寫鎖允許并發讀操作,但寫操作必須獨占訪問。以下是主要接口和用法:
初始化讀寫鎖:
讀寫鎖的類型為
pthread_rwlock_t
,初始化方式有兩種:靜態初始化:
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
動態初始化(需銷毀):
pthread_rwlock_init(&rwlock, NULL);
設置讀寫優先:
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref); /* pref 共有 3 種選擇 PTHREAD_RWLOCK_PREFER_READER_NP (默認設置) 讀者優先, 可能會導致寫者饑餓情況 PTHREAD_RWLOCK_PREFER_WRITER_NP 寫者優先, 目前有 BUG, 導致表現行為和PTHREAD_RWLOCK_PREFER_READER_NP 一致 PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 寫者優先, 但寫者不能遞歸加鎖 */
獲取讀鎖(共享鎖):
- 阻塞方式獲取讀鎖:
pthread_rwlock_rdlock(&rwlock);
- 非阻塞方式嘗試獲取讀鎖:
int ret = pthread_rwlock_tryrdlock(&rwlock); if (ret != 0) {// 獲取鎖失敗處理 }
- 獲取寫鎖(獨占鎖):
- 阻塞方式獲取寫鎖:
pthread_rwlock_wrlock(&rwlock);
- 非阻塞方式嘗試獲取寫鎖:
int ret = pthread_rwlock_trywrlock(&rwlock); if (ret != 0) {// 獲取鎖失敗處理 }
- 釋放讀寫鎖:
無論讀鎖還是寫鎖,統一使用:
pthread_rwlock_unlock(&rwlock);
銷毀讀寫鎖:
動態初始化的讀寫鎖需要銷毀:
pthread_rwlock_destroy(&rwlock);
示例代碼:
#include <pthread.h> #include <stdio.h>pthread_rwlock_t rwlock; int shared_data = 0;void* reader(void* arg) {pthread_rwlock_rdlock(&rwlock);printf("Reader: %d\n", shared_data);pthread_rwlock_unlock(&rwlock);return NULL; }void* writer(void* arg) {pthread_rwlock_wrlock(&rwlock);shared_data++;printf("Writer: %d\n", shared_data);pthread_rwlock_unlock(&rwlock);return NULL; }int main() {pthread_rwlock_init(&rwlock, NULL);pthread_t threads[10];for (int i = 0; i < 5; i++) {pthread_create(&threads[i], NULL, writer, NULL);}for (int i = 5; i < 10; i++) {pthread_create(&threads[i], NULL, reader, NULL);}for (int i = 0; i < 10; i++) {pthread_join(threads[i], NULL);}pthread_rwlock_destroy(&rwlock);return 0; }
注意事項:
- 讀寫鎖可能導致寫者饑餓問題(讀者持續獲取鎖導致寫者無法執行)
- 適合讀多寫少的場景,寫多讀少時性能可能不如互斥鎖
- 銷毀已鎖定的讀寫鎖會導致未定義行為
- 同一線程重復獲取寫鎖可能導致死鎖(除非使用遞歸鎖屬性)