目錄
一、POSIX信號量:
接口:
二、基于環形隊列的生產消費者模型
環形隊列:
單生產單消費實現代碼:
RingQueue.hpp:
main.cc:
多生產多消費實現代碼:
RingQueue.hpp:
main.cc:
一、POSIX信號量:
在實現線程的同步,互斥不僅僅只有條件變量和鎖,還有POSIX信號量,這里學習的POSIX信號量和之前學習的SystemV信號量作用相同,都是用于同步操作,達到無沖突的訪問共享資源目的,但POSIX可以用于線程間同步
引入信號量:
對于共享資源,為了保證其并發性,將其分成了幾份資源,就允許幾個線程進入共享資源訪問,此時就引入了信號量來對其進行保護
信號量的本質是一個計數器
這把計數器用來描述臨界資源中資源數目的多少,實際上是對資源的預加載機制(這就像在電影院買票,買票的本質就是對電影院座位的預加載機制,當你買到票了,就一定會有位置給你,并且別人即使有票也是坐別人的座位,也不會搶了你的座位)
雖然信號量的本質是一個計數器,當一個線程申請資源成功就將計數器--,當一個線程申請資源失敗就將計數器++,但是不能用一個簡簡單單的普通變量代替信號量,因為變量的--和++操作不是原子的,所以,我們就要使用一個支持PV操作的原子的計數器------信號量
那么什么是PV操作呢?
P:代表申請資源,計數器--
V:代表釋放資源,計數器++
當將共享資源分為N份,此時信號量也就是N,這個時候就能夠申請資源,再將信號量--,當信號量為0的時候,線程就不能夠申請資源了,只能阻塞等待
如上,這是一個多元信號量sem,我們之前學習的鎖被叫做二元信號量
在使用多元信號量訪問資源的時候,要先申請信號量,只有申請成功了,才能訪問資源否則就需要進入阻塞隊列等待
接口:
初始化信號量
參數1:需要初始化信號量的地址
參數2:表示的是線程共享還是進程共享,默認為零,線程共享,非零表示進程共享
參數3:設定的信號量的初始值
返回值:初始化成功返回0,失敗返回-1,并設置錯誤碼
其中sem_t實際上是一個聯合體
銷毀信號量
參數:就是需要銷毀信號量的地址
返回值:初始化成功返回0,失敗返回-1,并設置錯誤碼
申請信號量:
其中下面用的是sem_wait,其功能就是成功將信號量-1,也就是P操作
參數:就是需要銷毀信號量的地址
返回值:初始化成功返回0,失敗返回-1,并設置錯誤碼
sem_trywait:嘗試申請,如果沒有申請到資源,就會放棄申請
sem_timedwait:每隔一段時間進行申請
釋放信號量(發布信號量)
參數:就是需要銷毀信號量的地址
返回值:初始化成功返回0,失敗返回-1,并設置錯誤碼
其表示資源使用完畢,歸還資源,成功將信號量+1,也就是V操作
二、基于環形隊列的生產消費者模型
環形隊列:
在實現生產消費者的模型中,不僅僅只有共享隊列,還有環形隊列,什么是環形隊列呢?
雖然它叫環形隊列,但是它不是隊列,而是用數組實現的,
其中head作為頭指針,當申請資源成功的時候就向后移動一位,
tail作為尾指針,當釋放資源成功的時候向后移動一位,
首先,如何讓數組成環呢?
在每次head++后都進行一次取模操作,這樣保證head的大小不會超過這個環形隊列的大小
特殊的是,當為空或者為滿的時候,頭指針和尾指針都指向同一個位置,那么如何證明此時是空還是滿呢?
這里有兩種方法:
方法一:添加一個計數器,當計數器的值為0的時候,表示當前為空,當計數器的值為容器大小的時候,表示該環形隊列為滿
判空條件:count == 0
判滿條件:count == size方法二:犧牲一個空間的大小,通過預留一個空位,避免head和tail重合時無法區分空和滿。此時隊列最大容量為size-1
判空條件:head== tail
判滿條件:(head+ 1) % size == tail在下面實現的時候采用計數器,畢竟信號量是一個天然的計數器
當數據不為空或者滿的時候,此時head指針和tail指針必定不指向同一個位置,
此時就能夠進行生產者和消費者的同時訪問,
為空的時候,只能生產者訪問,生產者只關注還剩多少空間
為滿的時候,只能消費者訪問,消費者只關注還剩多少數據
所以在使用信號量標識資源的情況下,生產者和消費者關注的資源不一樣,所以就需要兩個信號量來進行計數:
生產者的信號量:表示當前有多少可用空間
消費者的信號量:表示當前有多少可消費數據
所以以下在實現的時候,定義兩個信號量,spacesem = N 和datasem = 0
對于生產者的PV操作:P(spacesem)將空間資源-1,V(datasem)將數據資源+1
對于消費者的PV操作:P(datasem)將數據資源-1,V(spacesem)將空間資源+1
單生產單消費實現代碼:
RingQueue.hpp:
首先創建一個實現環形隊列的文件:
#pragma once
#include <vector>
#include <iostream>
#include <semaphore.h>template <class T>
class RingQueue
{
private:std::vector<T> _ringqueue; // 用vector模擬環形隊列int _maxcap; // 環形隊列的最大容量int _p_step; // 生產者下標int _c_step; // 消費者下標sem_t _pspace_sem; // 生產者關注的空間資源sem_t _cdata_sem; // 消費者關注的數據資源
};
接著依次實現其中的接口:
構造與析構
RingQueue(int maxcap = 5): _maxcap(maxcap), _ringqueue(maxcap), _p_step(0), _c_step(0){sem_init(&_pspace_sem,0,maxcap);sem_init(&_cdata_sem,0,0);pthread_mutex_init(&_p_mutex,nullptr);pthread_mutex_init(&_c_mutex,nullptr);}~RingQueue(){sem_destroy(&_pspace_sem);sem_destroy(&_cdata_sem);pthread_mutex_destroy(&_p_mutex);pthread_mutex_destroy(&_c_mutex);}
其中,構造函數的主要作用就是初始化各種變量,析構函數的主要作用就是釋放這些變量
push與pop
push的作用是從交易場所中放入數據,pop的作用是從交易場所中拿到數據
void push(const T& in){//生產數據先要申請信號量來預定資源P(_pspace_sem);_ringqueue[_p_step] = in;//將所對應的數據放入到環形隊列中_p_step++;//將生產者對應的下標向后移動一位_p_step %= _maxcap;//保證生產者不會超過環形隊列的大小V(_cdata_sem);}void pop(T *out){P(_cdata_sem);pthread_mutex_lock(&_c_mutex);*out = _ringqueue[_c_step];//將該位置的數據交給out作為輸出型參數帶出去_c_step++;//將消費者對應的下標向后移動一位_c_step %= _maxcap;//保證消費者下標不會超過環形隊列的大小pthread_mutex_unlock(&_c_mutex);V(_pspace_sem);}
生產者push后,證明環形隊列中一定有數據,所以就需要在V后傳入消費者關心的信號量,也就是需要傳遞_cdata_sem
消費者pop后,證明環形隊列中一定有空間,所以就需要在V后傳入生產者關心的信號量,也就是需要傳遞_pspace_sem
PV操作:
void P(sem_t &sem){sem_wait(&sem);}void V(sem_t &sem){sem_post(&sem);}
P操作代表申請資源,也就是semwait這個函數
V操作就是釋放了資源,比如生產者就是釋放了一個數據
這里要保證數據在為空的時候只能生產者運行,在數據為滿的時候只能消費者去運行,
所以wait是為了保持順序同步,保證即使消費者先調用,但是沒有數據,就將消費者申請資源所關注的數據信號量送去等待隊列里去等待
在封裝V操作中,post就是釋放資源,對于生產者就是給了個數據給消費者
對于消費者post就是釋放了空間,生產者就能接著生產了
那消費者一開始調用P操作,沒有數據就會阻塞,而生產者這邊V了數據,消費者這邊P就不會阻塞了可以拿到數據了
所以生產和消費這兩者的PV操作是反的
生產者V了,消費者的p就停止阻塞了因為生產者給了消費者資源了
反之同理
main.cc:
void *Productor(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);while (true){int data = rand()%10+1;rq->push(data);std::cout<<"Productor : data = "<< data << std::endl;sleep(1);}return nullptr;
}void *Consumer(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);while (true){int data = 0;rq->pop(&data);std::cout<<"Consumer : data = "<< data << std::endl;sleep(1);}return nullptr;
}int main()
{srand(time(nullptr)^getpid());RingQueue<Task> *rq = new RingQueue<Task>();pthread_t c, p;pthread_create(&p, nullptr, Productor, rq);pthread_create(&c, nullptr, Consumer, rq);pthread_join(p, nullptr);pthread_join(c, nullptr);delete rq;return 0;
}
或者也可以讓消費者瘋狂消費數據,生產者瘋狂生產
多生產多消費實現代碼:
RingQueue.hpp:
在多生產多消費中,需要保證生產者和生產者之間、消費者和消費者之間的互斥關系,生產者和消費者之間的互斥關系已經由信號量承擔了
所以在多生產多消費的代碼中要加上鎖
構造與析構中也要增加初始化鎖與釋放鎖
template <class T>
class RingQueue
{
public:RingQueue(int maxcap = 5): _maxcap(maxcap), _ringqueue(maxcap), _p_step(0), _c_step(0){sem_init(&_pspace_sem,0,maxcap);sem_init(&_cdata_sem,0,0);pthread_mutex_init(&_p_mutex,nullptr);pthread_mutex_init(&_c_mutex,nullptr);}~RingQueue(){sem_destroy(&_pspace_sem);sem_destroy(&_cdata_sem);pthread_mutex_destroy(&_p_mutex);pthread_mutex_destroy(&_c_mutex);}private:std::vector<T> _ringqueue; // 用vector模擬環形隊列int _maxcap; // 環形隊列的最大容量int _p_step; // 生產者下標int _c_step; // 消費者下標sem_t _pspace_sem; // 生產者關注的空間資源sem_t _cdata_sem; // 消費者關注的數據資源pthread_mutex_t _p_mutex;//保證生產者和生產者之間的互斥pthread_mutex_t _c_mutex;//保證消費者和消費者之間的互斥
};
push與pop
void push(const T& in){//生產數據先要申請信號量來預定資源P(_pspace_sem);//信號量的申請本來就是原子的,所以加鎖的時候就需要在這之后pthread_mutex_lock(&_p_mutex);_ringqueue[_p_step] = in;//將所對應的數據放入到環形隊列中_p_step++;//將生產者對應的下標向后移動一位_p_step %= _maxcap;//保證生產者下標不會超過環形隊列的大小pthread_mutex_unlock(&_p_mutex);V(_cdata_sem);}void pop(T *out){P(_cdata_sem);pthread_mutex_lock(&_c_mutex);*out = _ringqueue[_c_step];//將該位置的數據交給out作為輸出型參數帶出去_c_step++;//將消費者對應的下標向后移動一位_c_step %= _maxcap;//保證消費者下標不會超過環形隊列的大小pthread_mutex_unlock(&_c_mutex);V(_pspace_sem);}
細節:
在加鎖的時候要在申請信號量之后,這樣能夠提高并發度
如果是在申請信號量之前進行加鎖,那么申請信號量的線程永遠只有一個? 不能夠提高并發度
理解:
就像在電影院中,是先買票在進行排隊的,這樣能夠加快進場的速度,如果排隊后再買票,需要一人一人地進行操作,這相比上一種就會很慢的
申請信號量的操作是原子的,不需要加鎖保護也能保證線程安全,所以并發申請信號量,串行訪問臨界資源能夠提高并發度
main.cc:
在進行生產消費者模型中的數據問題,不僅僅是讓二者看到同一份資源,更重要的是讓消費者拿到資源并對資源進行處理,這里引入上一章的Task文件來進行數據處理
Task.hpp
#include <iostream>
#include <string>std::string opers = "+-*/%";enum
{Divzero = 1,Modzero,Unknow
};class Task
{
public:Task(){}Task(int data1, int data2, char oper): _data1(data1), _data2(data2), _oper(oper),_exitcode(0){}void run(){switch (_oper){case '+':_result = _data1 + _data2;break;case '-':_result = _data1 - _data2;break;case '*':_result = _data1 * _data2;break;case '/':if (_data2 == 0)_exitcode = Divzero;else_result = _data1 / _data2;break;case '%':if (_data2 == 0)_exitcode = Modzero;else_result = _data1 % _data2;break;default:_exitcode = Unknow;break;}}void operator()(){run();}std::string Getresult(){std::string ret = std::to_string(_data1);ret += _oper;ret += std::to_string(_data2);ret += "=";ret += std::to_string(_result);ret += "[exitcode=";ret += std::to_string(_exitcode);ret += "]";return ret;}std::string GetTask(){std::string ret = std::to_string(_data1);ret += _oper;ret += std::to_string(_data2);ret += "=?";return ret;}~Task(){}private:int _data1;int _data2;char _oper;int _exitcode;int _result;
};
接著在生產消費者的線程所執行的對應的方法中,基本和上一章類似
void *Productor(void *args)
{ThreadData *td = static_cast<ThreadData *>(args);RingQueue<Task> *rq = td->rq;std::string name = td->threadname;int len = opers.size();while (true){int data1 = rand() % 10 + 1;usleep(10);int data2 = rand() % 10;char op = opers[rand()%len];Task t(data1,data2,op);rq->push(t);std::cout<<"Productor : Task = "<< t.GetTask() << " who "<< name << std::endl;sleep(1);}return nullptr;
}void *Consumer(void *args)
{ThreadData *td = static_cast<ThreadData *>(args);RingQueue<Task> *rq = td->rq;std::string name = td->threadname;while (true){Task t;rq->pop(&t);//處理數據t();std::cout << "Consumer : Task = " << t.GetTask() << " who: " << name << " result: " << t.Getresult() << std::endl;// sleep(1);}return nullptr;
}
我們也可以創建一個結構體來存儲線程名稱與任務
struct ThreadData
{RingQueue<Task> *rq;std::string threadname;
};
int main()
{srand(time(nullptr));RingQueue<Task> *rq = new RingQueue<Task>();pthread_t c[5], p[3];for(int i = 0;i<3;i++){ThreadData *td = new ThreadData();td->rq = rq;td->threadname = "Productor-" + std::to_string(i);pthread_create(p+i, nullptr, Productor, td);usleep(10);}sleep(1);for(int i = 0;i<5;i++){ThreadData *td = new ThreadData();td->rq = rq;td->threadname = "Consumer-" + std::to_string(i);pthread_create(c+i, nullptr, Consumer, td);usleep(10);}for(int i = 0;i<3;i++){pthread_join(p[i], nullptr);}for(int i = 0;i<5;i++){pthread_join(c[i], nullptr);}return 0;
}
注意:在環形隊列中允許多個生產者線程一起進行生活數據,也允許多個消費者線程一起消費數據,多個線程一起操作并非同時操作,任務開始時間有先后,但都是在進行處理的