目錄
概念補充
條件變量
操作
例:多線程搶票
封裝
生產者消費者模型
生產者和消費者之間的關系
BlockQueue(阻塞隊列)
單生產單消費
信號量
簡介
操作
多生產者多消費者RingQueue(環形隊列)代碼
sem封裝? ?
信號量與鎖
小知識
概念補充
同步:在保證數據安全的前提下,讓線程能夠按照某種特定的順序訪問臨界資源,從而有效避免饑餓問題,叫做同步
競態條件:因為時序問題,而導致程序異常,我們稱之為競態條件.
條件變量
簡單來說,條件變量相當于一個阻塞隊列,將線程入隊列(等待),滿足條件就出隊列(喚醒)
操作
全局條件變量
和mutex相似,使用宏初始化一個全局的條件變量
局部條件變量
attr傳nullptr即可
等待
cond為條件變量,mutex為鎖
喚醒
signal為喚醒一個線程
broadcast為喚醒所有線程
例:多線程搶票
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;int tickets = 100;
void* func(void* args)
{std::string name(static_cast<char*>(args));while(true){pthread_mutex_lock(&lock);pthread_cond_wait(&cond,&lock);if(tickets==0) {pthread_mutex_unlock(&lock);break;}std::cout<<name<<" "<<--tickets<<std::endl;pthread_mutex_unlock(&lock);}return (void*)0;
}int main()
{pthread_t t1,t2,t3;pthread_create(&t1,nullptr,func,(void*)"t1");pthread_create(&t2,nullptr,func,(void*)"t2");pthread_create(&t3,nullptr,func,(void*)"t3");while(true){pthread_cond_signal(&cond);sleep(0.3);if(tickets==0){pthread_cond_broadcast(&cond);break;}}pthread_join(t1,nullptr);pthread_join(t2,nullptr);pthread_join(t3,nullptr);return 0;
}
每個線程執行到wait時會阻塞住,
為什么wait要傳鎖,
因為wait會讓線程等待,如果線程申請到了鎖等待,那么其他線程就無法申請到鎖,所以wait會先釋放鎖再等待,等被喚醒后先申請鎖再向下執行,保證互斥.
封裝
#pragma once
#include <iostream>
#include <pthread.h>
#include "mutex.hpp"class Cond
{pthread_cond_t _cond;
public: Cond(){pthread_cond_init(&_cond,nullptr);}void Wait(LockGuard& lock){int n = pthread_cond_wait(&_cond,lock.Get());(void)n;}void Notifyone(){int n = pthread_cond_signal(&_cond);(void)n;}void Notifyall(){int n = pthread_cond_broadcast(&_cond);(void)n;}~Cond(){pthread_cond_destroy(&_cond);}
};
生產者消費者模型
和生活中的超市類似,就是讓不同線程作為生產產品者和消費產品者,臨界區作為廠家和消費者的中轉站超市.
為什么存在這樣的模型:
1.減少生產和消費過程中產生的成本
2.支持生產和消費的忙閑不均
3.維護松耦合關系
減少成本問題:? ?比如一個方便面廠家,生產方便面都是成批生產,而消費者一般只會按個按箱購買,如果兩者直接交易,廠家生產一袋方便面,消費者購買,這樣廠家的成本會很高;如果廠家生產一批,消費者也買不了一批產品,消費者成本會很高.
支持忙閑不均和松耦合關系問題:? ? 還是上面的例子,廠家可以生產一批放到超市,然后等消費者慢慢消費,生產和消費沒有直接關系
生產者和消費者之間的關系
生產者之間 : 互斥關系
消費者之間 : 互斥關系
生產者與消費者之間 : 互斥+同步關系
BlockQueue(阻塞隊列)
在多線程編程中阻塞隊列(Blocking Queue)是?種常?于實現?產者和消費者模型的數據結構。其與普通的隊列區別在于,當隊列為空時,從隊列獲取元素的操作將會被阻塞,直到隊列中被放入了元素;當隊列滿時,往隊列?存放元素的操作也會被阻塞,直到有元素被從隊列中取出(以上的操作都是基于不同的線程來說的,線程在對阻塞隊列進程操作時會被阻塞)
//<BlockQueue.hpp>
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "mutex.hpp"
#include "cond.hpp"//上面封裝的條件變量const static u_int32_t CAP = 5;template<typename T>
class BlockQueue
{std::queue<T> _q;u_int32_t _cap;pthread_mutex_t _lock;Cond _c_cond;//消費者Cond _p_cond;//生產者unsigned int _c_wait_num;//當前消費者等待的個數unsigned int _p_wait_num;//當前生產者等待的個數 為了防止喚醒次數過多或喚醒無意義bool isFull(){return _q.size()>=_cap;}bool isEmpty(){return _q.empty();}public: ? ?BlockQueue(u_int32_t cap = CAP) :_cap(cap){}void Enqueue(const T& data){{LockGuard lock(&_lock);while(isFull()) //while循環防止wait調用失敗和偽喚醒問題{_p_wait_num++;_p_cond.Wait(lock);_p_wait_num--;}_q.push(data);if(_c_wait_num>0)_c_cond.Notifyone();}}void Pop(T* retval){{LockGuard lock(&_lock);while(isEmpty()) //while循環防止wait調用失敗和偽喚醒問題{_c_wait_num++;_c_cond.Wait(lock);_c_wait_num--;}*retval = _q.front();_q.pop();if(_p_wait_num>0)_p_cond.Notifyone();}}~BlockQueue(){}};
單生產單消費
#include "blockqueue.hpp"
#include <iostream>
#include <string>
#include <unistd.h>struct Data
{BlockQueue<int>* bq;std::string name;
};void *consumer(void* args)
{Data* data = static_cast<Data*>(args);int retval = 0;while(true){data->bq->Pop(&retval);std::cout<<data->name<<"消費了 " <<retval<<std::endl; //sleep(1);} return (void*)0;
}void *productor(void* args)
{Data* data = static_cast<Data*>(args);int num = 1;while(true){data->bq->Enqueue(num);std::cout<<data->name<<"生產了 "<<num++<<std::endl;sleep(1);}return (void*)0;
}int main()
{BlockQueue<int>* bq = new BlockQueue<int>();pthread_t c,p;Data ctd = {bq,"消費者"};pthread_create(&c,nullptr,consumer,(void*)&ctd);Data ptd = {bq,"生產者"};pthread_create(&p,nullptr,productor,(void*)&ptd);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}
對于多生產和多消費,BlockQueue.hpp也支持
信號量
簡介
信號量針對多線程并發訪問一塊資源中的不同部分.
本質上,就是一個描述資源數量的計數器
操作
sem_init? ?初始化信號量
pshared ? 用來表示線程使用還是進程使用,0表示線程間使用
value ?信號量的初始值
sem_destroy? 銷毀信號量
sem_wait? ?信號量--
sem_post? 信號量++
多生產者多消費者RingQueue(環形隊列)代碼
生產者關注空余空間資源,消費者關注數據資源
sem封裝? ?<sem.hpp>
#pragma once
#include <iostream>
#include <semaphore.h>#define NUM 5class Sem
{sem_t _sem;int _initnum;
public:Sem(int num = NUM):_initnum(num){sem_init(&_sem,0,_initnum);}void P(){int n = sem_wait(&_sem);(void)n;}void V(){int n = sem_post(&_sem);(void)n;}~Sem(){sem_destroy(&_sem);}
};
<mutex.hpp>
#pragma once
#include <iostream>
#include <unistd.h>class LockGuard
{pthread_mutex_t* _lock;public:LockGuard(pthread_mutex_t* lock):_lock(lock){pthread_mutex_lock(_lock);} pthread_mutex_t* Get(){return _lock;}~LockGuard(){pthread_mutex_unlock(_lock);}
};
<cond.hpp>
#pragma once
#include <iostream>
#include <pthread.h>
#include "mutex.hpp"class Cond
{pthread_cond_t _cond;
public: Cond(){pthread_cond_init(&_cond,nullptr);}void Wait(LockGuard& lock){int n = pthread_cond_wait(&_cond,lock.Get());(void)n;}void Notifyone(){int n = pthread_cond_signal(&_cond);(void)n;}void Notifyall(){int n = pthread_cond_broadcast(&_cond);(void)n;}~Cond(){pthread_cond_destroy(&_cond);}
};
<RingQueue.hpp>
#pragma once
#include <iostream>
#include "sem.hpp"
#include <vector>
#include "mutex.hpp"#define CAP 10template<typename T>
class RingQueue
{std::vector<T> _rq;int _cap;Sem _data_sem;Sem _space_sem;int _c_step;int _p_step;pthread_mutex_t _p_lock;pthread_mutex_t _c_lock;
public:RingQueue(int cap = CAP):_cap(cap) ,_rq(cap) ,_data_sem(0) ,_space_sem(cap),_c_step(0),_p_step(0){pthread_mutex_init(&_p_lock,nullptr);pthread_mutex_init(&_c_lock,nullptr);}void Enqueue(const T& data){ _space_sem.P();{LockGuard lock(&_p_lock);_rq[_c_step++] = data;_c_step%=_cap;}_data_sem.V();}void Pop(T* data){_data_sem.P();{LockGuard lock(&_c_lock);*data = _rq[_p_step++];_p_step%=_cap;}_space_sem.V();}~RingQueue(){pthread_mutex_destroy(&_p_lock);pthread_mutex_destroy(&_c_lock);}
};
<RingQueue.cpp>
#include <iostream>
#include "sem.hpp"
#include "mutex.hpp"
#include "cond.hpp"
#include <pthread.h>
#include "RingQueue.hpp"RingQueue<int> rq(10);void* consumer(void* args)
{while(true){int data = 0;rq.Pop(&data);std::cout<<"消費 "<<data<<std::endl;}return (void*)0;
}void* productor(void* args)
{int data = 1;while(true){rq.Enqueue(data);std::cout<<"生產 "<<data++<<std::endl;}return (void*)0;
}int main()
{pthread_t c[2],p[2];pthread_create(c,nullptr,consumer,nullptr);pthread_create(c+1,nullptr,consumer,nullptr);pthread_create(p,nullptr,productor,nullptr);pthread_create(p+1,nullptr,productor,nullptr);pthread_join(c[0],nullptr);pthread_join(c[1],nullptr);pthread_join(p[0],nullptr);pthread_join(p[1],nullptr);return 0;
}
信號量與鎖
為什么信號量不需要加鎖,信號量為空或為滿時,信號量完成了同步和互斥動作,信號量不為空和不為滿時,生產者和消費者訪問不同位置,也不需要加鎖
二元信號量(信號量初始值為1) :就是鎖
鎖:認為資源只有一份,申請鎖相當于信號量P操作(--),釋放鎖相當于信號量V操作(++),所以鎖是信號量的一種特殊情況
小知識
1.pthread開頭的函數一般返回值為0表示成功
2.生產和消費還可以傳遞函數,用來給線程派發任務
3.生產消費模型高效在哪里:生產者生產數據和消費者處理數據是并發的,兩者是松耦合關系,可以忙閑不均,這里效率高,而二者向阻塞隊列中進行放入和拿出屬于同步和互斥,效率不高