引言
今天,我們繼續學習Linux線程本分,在Linux條件變量中,我們對條件變量的做了詳細的說明,今天我們要利用條件變量來引出我們的另一個話題——信號量內容的學習。
1.復習條件變量
在上一期博客中,我們沒有對條件變量做具體的使用,所以,這里我們通過一份代碼來復習一下,接下來,我們實現基于BlockingQueue的生產者消費者模型。
1.1何為基于BlockingQueue的生產者消費者模型
BlockingQueue在多線程編程中阻塞隊列(Blocking Queue)是一種常用于實現生產者和消費者模型的數據結構。其與普通的隊列區別在于,當隊列為空時,從隊列獲取元素的操作將會被阻塞,直到隊列中被放入了元素;當隊列滿時,往隊列里存放元素的操作也會被阻塞,直到有元素被從隊列中取出(以上的操作都是基于不同的線程來說的,線程在對阻塞隊列進程操作時會被阻塞)
如圖:
1.2分析該模型
這里我想寫多個生產線程和多個消費線程的模型
我們來分析一下。
- 首先生產任務的過程和消費任務的過程必須是互斥關系,不可以同時訪問該隊列(此時,這個隊列是共享資源)。
- 當隊列滿時,生產線程就不能再生產任務,必須在特定的條件變量下等待;同理當隊列為空時,消費線程就不能再消費任務,也必須在特定的條件變量下等待。
所以,類應這樣設計:
template<class T>
class BlockQueue
{
public:BlockQueue(const int &maxcap=gmaxcap):_maxcap(maxcap){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_pcond,nullptr);pthread_cond_init(&_ccond,nullptr);}void push(const T&in)//輸入型參數,const &{pthread_mutex_lock(&_mutex);while(is_full()){pthread_cond_wait(&_pcond,&_mutex);}_q.push(in);pthread_cond_signal(&_ccond);pthread_mutex_unlock(&_mutex);}void pop(T*out){pthread_mutex_lock(&_mutex);while(is_empty()){pthread_cond_wait(&_ccond,&_mutex);}*out=_q.front();_q.pop();pthread_cond_signal(&_pcond);pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_ccond);pthread_cond_destroy(&_pcond);}
private:bool is_empty(){return _q.empty();}bool is_full(){return _q.size()==_maxcap;}
private:std::queue<T> _q;int _maxcap; //隊列中元素的上線pthread_mutex_t _mutex;pthread_cond_t _pcond; //生產者對應的條件變量pthread_cond_t _ccond;
};
由于我們不知道存儲的數據類型,所以這里我們選擇使用泛型編程的方式。
接下來就是要生產任務,為了可以觀察到整個生產和消費任務的過程,我們可以生成兩個隨機數,然后進行運算。代碼如下:
class CalTask
{using func_t = function<int(int, int, char)>;public:CalTask() {}CalTask(int x, int y, char op, func_t func) :_x(x),_y(y),_op(op),_callback(func){}string operator()(){int result=_callback(_x,_y,_op);char buffer[1024];snprintf(buffer,sizeof buffer,"%d %c %d=%d",_x,_op,_y,result);return buffer;}string toTaskstring(){char buffer[1024];snprintf(buffer,sizeof buffer,"%d %c %d=?",_x,_op,_y);return buffer;}
private:int _x;int _y;char _op;func_t _callback;
};
const char*oper="+-*/%";
int mymath(int x,int y,char op)
{int result=0;switch(op){case '+':result=x+y;break;case '-':result=x-y;break;case '*':result=x*y;break;case '/':if(y==0){cerr<<"div zero error"<<endl;result=-1;}else{result=x/y;}break;case '%':if(y==0){cerr<<"mod zero error"<<endl;result=-1;}else{result=x%y;}default:break;}return result;
}
接下來,我們來寫整體的代碼。
1.3完整代碼
我們要創建三個文件:BlockQueue.hpp Task.hpp Main.cc各文件內容如下所示:
BlockQueue.hpp
#pragma once
#include<iostream>
#include<pthread.h>
#include<cstring>
#include<unistd.h>
#include<cassert>
#include<queue>
using namespace std;
const int gmaxcap=100;
template<class T>
class BlockQueue
{
public:BlockQueue(const int &maxcap=gmaxcap):_maxcap(maxcap){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_pcond,nullptr);pthread_cond_init(&_ccond,nullptr);}void push(const T&in)//輸入型參數,const &{pthread_mutex_lock(&_mutex);while(is_full()){pthread_cond_wait(&_pcond,&_mutex);}_q.push(in);pthread_cond_signal(&_ccond);pthread_mutex_unlock(&_mutex);}void pop(T*out){pthread_mutex_lock(&_mutex);while(is_empty()){pthread_cond_wait(&_ccond,&_mutex);}*out=_q.front();_q.pop();pthread_cond_signal(&_pcond);pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_ccond);pthread_cond_destroy(&_pcond);}
private:bool is_empty(){return _q.empty();}bool is_full(){return _q.size()==_maxcap;}
private:std::queue<T> _q;int _maxcap; //隊列中元素的上線pthread_mutex_t _mutex;pthread_cond_t _pcond; //生產者對應的條件變量pthread_cond_t _ccond;
};
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstdio>
#include<string>
#include <functional>
using namespace std;
class CalTask
{using func_t = function<int(int, int, char)>;public:CalTask() {}CalTask(int x, int y, char op, func_t func) :_x(x),_y(y),_op(op),_callback(func){}string operator()(){int result=_callback(_x,_y,_op);char buffer[1024];snprintf(buffer,sizeof buffer,"%d %c %d=%d",_x,_op,_y,result);return buffer;}string toTaskstring(){char buffer[1024];snprintf(buffer,sizeof buffer,"%d %c %d=?",_x,_op,_y);return buffer;}
private:int _x;int _y;char _op;func_t _callback;
};
const char*oper="+-*/%";
int mymath(int x,int y,char op)
{int result=0;switch(op){case '+':result=x+y;break;case '-':result=x-y;break;case '*':result=x*y;break;case '/':if(y==0){cerr<<"div zero error"<<endl;result=-1;}elseresult=x/y;}break;case '%':if(y==0){cerr<<"mod zero error"<<endl;result=-1;}else{result=x%y;}default:break;}return result;
}
Main.cc
include "BlockQueue.hpp"
#include "Task.hpp"
#include<sys/types.h>
#include<unistd.h>
#include<ctime>
#include<iostream>
using namespace std;void *productor(void *bqs_)
{BlockQueue<CalTask> *bqs=static_cast<BlockQueue<CalTask>*>(bqs_);while(true){int x=rand()%10+1;int y=rand()%5+1;int opercode=rand()%(sizeof(oper));CalTask T(x,y,oper[opercode],mymath);bqs->push(T);cout<<"生產任務: ";cout<<T.toTaskstring()<<endl;sleep(1);}
}
void *consumer(void *bqs_)
{BlockQueue<CalTask>*bqs=static_cast<BlockQueue<CalTask>*>(bqs_);while(true){CalTask T;bqs->pop(&T);cout<<"消費任務: ";cout<<T()<<endl;}
}
int main()
{BlockQueue<CalTask> bqs;pthread_t p[5];pthread_t c[5];for(int i=0;i<5;i++){pthread_create(&p[i],nullptr,productor,&bqs);pthread_create(&c[i],nullptr,consumer,&bqs);}for(int i=0;i<5;i++){pthread_join(p[i],nullptr);pthread_join(c[i],nullptr);}
}
在代碼中,有幾個點需要注意一下:
第一點:
pthread_cond_wait的第二個參數一定是我們正在使用的互斥鎖,這個函數在被運行時,會以原子性的方式將鎖釋放,然后將自己掛起,等待被條件變量喚醒。該函數在被喚醒時,會自動重新獲取持有的鎖,然后繼續向下執行。
假如數個生產者線程一起被喚醒,然后先后持有鎖,接著繼續生產任務,當隊列剩余的空間小于這些生產者生產的任務時,就會出現問題,所以讓所有被喚醒的線程先通過while循環,如果有剩余的空間,再進行任務的生產活動。
生產線程這樣處理,消費線程也要這樣處理
大家可以在自己試這敲一下,有問題可以在評論區和我交流。
接下來,我們來查找一下這些代碼有哪些"不足的地方"
2.代碼中的“不足”
一個線程在操作臨界資源時,臨界資源必須是滿足條件的,然后線程才能對臨界資源進行操作。比如:在如上代碼中,生產者線程只有在隊列(臨界資源)有剩余空間的條件下,才能進行下一步操作。
可是,臨界資源是否滿足生產和消費的條件,我們不能事前得知,只等進入臨界資源后,再進行進一步的檢測。
所以,一般訪問臨界資源的過程為:先加鎖,再檢測,如果條件滿足,就進行下一步的操作;反之,就將該線程掛起,釋放鎖,然后掛起等待,等到條件滿足時,重新獲得鎖,接著進行下一步操作。
因為不可能事先得知是否滿足條件,所以我們只能先加鎖,進入臨界資源內部進行檢測。
只要我們申請了信號量,就默認對這部分資源的整體使用,但通常情況下,我們使用的僅僅是臨界資源的一小部分。
實際情況中,有沒有可能不同的線程訪問臨界資源不同部分的情況,有可能。所以,前輩大佬們給出了一種解決方案——信號量。
3.信號量
3.1什么是信號量
信號量的本質是一把計數器,一把衡量臨界資源多少的計數器。只要擁有信號量,就在未來一定能夠擁有臨界資源的一部分。
申請信號量的本質:就是對臨界資源的預定機制。
比如:我想去看電影,首先我要買票。我一旦買到票,無論我去不去看電影,都會有一個位置屬于我。買票的過程==申請信號信號量的過程。
所以,在訪問臨界資源之前,我們可以申請信號量。通過申請信號量,我們就可以獲知臨界資源的使用情況。①只要申請成功,就一定有我可以訪問的資源。②只要申請失敗,說明條件不就緒,只能等待。如此,就不需要進入臨界資源再進行檢測了。
3.2信號量的相關接口
如上這些借口如果調用成功的話,返回0;調用失敗的話,返回-1,并且錯誤原因被設置。
我們知道信號量的本質是一把計數器,所以信號量必須可以進行遞增和遞減的操作。
- 信號量-1:申請資源,其過程必須是原子性的。簡稱P操作。
- 信號量+1:歸還資源,其過程必須是原子性的。簡稱V操作。
所以,信號量的核心操作:PV原語。
接下來,我們就使用信號量來完成我們的基于環形隊列的生產消費模型。
3.3用信號量來實現基于環形隊列的生產消費模型
3.3.1對環形隊列的簡單介紹
相信大家在C++學習期間到都模擬實現過環形隊列隊列。如圖:
環形隊列的邏輯結構為環形,但其存儲結構實際上就是隊列,其實就是一個數組,只不過用下標不斷的%上隊列的長度。
大家在模擬實現環形隊列時,大家必定遇到的問題是:當rear==front時,究竟是環形隊列已滿還是環形隊列為空呢?其實,這個問題有多種處理方式,今天就不講了。
今天,我們的基于環形隊列的生產消費模型必須遵守哪些規則呢?
我們來講一個故事:
張三和李四在一個房間里做游戲,這個房間里有一張大圓桌,桌子上有很多的盤子。規定張三往每個盤子里放一個桃子🍑,然后李四在后邊吃桃子🍑,由于李四還要吃桃子,所以速度一定比張三放的速度滿。
總結一下,我們發現張三和李四必須滿足這些規律:
- 李四不可以超過張三——消費者不可以超過生產者。
- 張三不可以把李四套一個圈——生產者不可以把消費者套一個圈。
- 張三和李四什么時候在一起?①盤子全為空,張三和李四在一起,張三先運行(生產者先運行)。②盤子全為滿,張三和李四在一起,李四先運行(消費者先運行)。③其他情況,張三和李四指向不同的位置。
我們將這些規則遷移到環形隊列的生產消費模型,就是生產消費模型應該遵守的規則:
①消費者不能超過生產者。②生產者不能把消費者套一個圈。③生產者和消費者什么情況下會在一起呢?空的時候和滿的時候,對應不同的處理方式。④只要生產者和消費者指向不同的位置,就可以實現生產者和消費者的并發執行。只有在為空和為 滿時,才會出現同步和互斥問題。
那這些規則由什么來保證呢?信號量。信號量是表征臨界資源中資源數目的。
1.對于生產者而言,看中的是隊列中的剩余空間——空間資源定義一個信號量。
2.對于消費者而言,看中的是隊列中的數據——數據資源定義一個信號量。
接下來,我們基于這份偽代碼來理解一下,看看能否滿足我們的規則。
生產者關注的是隊列里的剩余空間,在隊列為空時剩余空間為10,所以生產者可以順利申請到信號量。但是由于空間中這部分資源已經被占用,所以無法歸還。但是消費者所關注的隊列中的數據資源不知不覺中已經多了一份。所以對消費者信號量應進行V操作。
消費者關注的是隊列中的數據資源,隊列剛開始為空時,數據資源為0,消費者申請失敗。等到生產者申請神域空間成功后,生產了數據。所以消費者可以成功申請到數據資源信號量,然后消費數據。但不知不覺,隊列中的剩余空間多了一份,所以應對剩余空間資源的信號量進行V操作。
若隊列滿時,剩余空間信號量為0,生產者申請信號量失敗。此時,數據資源信號量為滿,消費者可以申請到信號量,從而進行操作。所以必須消費者先運行。
若隊列空時,數據資源信號量為0,消費者申請信號量失敗。此時,剩余空間信號量為滿,生產者可以申請到信號量,從而進行操作。所以必須生產者先運行。
所以,這偽代碼完全符合我們的規則。接下來,我們編寫單生產進程和單消費進程的代碼。
編寫代碼
我們創建三個源文件:RingQueue.hpp main.cc Task.hpp
Ringqueue.hpp:
#pragma once#include <iostream>
#include <vector>
#include <cassert>
#include <semaphore.h>
#include <pthread.h>static const int gcap = 5;template<class T>
class RingQueue
{
private:void P(sem_t &sem){int n = sem_wait(&sem);assert(n == 0); // if(void)n;}void V(sem_t &sem){int n = sem_post(&sem);assert(n == 0);(void)n;}
public:RingQueue(const int &cap = gcap): _queue(cap), _cap(cap){int n = sem_init(&_spaceSem, 0, _cap);assert(n == 0);n = sem_init(&_dataSem, 0, 0);assert(n == 0);_productorStep = _consumerStep = 0;pthread_mutex_init(&_pmutex, nullptr);pthread_mutex_init(&_cmutex, nullptr);}// 生產者void Push(const T &in){// ?: 這個代碼 有沒有優化的可能// 你認為:現加鎖,后申請信號量,還是現申請信號量,在加鎖?P(_spaceSem); // 申請到了空間信號量,意味著,我一定能進行正常的生產pthread_mutex_lock(&_pmutex); _queue[_productorStep++] = in;_productorStep %= _cap;pthread_mutex_unlock(&_pmutex);V(_dataSem);}// 消費者void Pop(T *out){// 你認為:現加鎖,后申請信號量,還是現申請信號量,在加鎖?P(_dataSem);pthread_mutex_lock(&_cmutex);*out = _queue[_consumerStep++];_consumerStep %= _cap;pthread_mutex_unlock(&_cmutex);V(_spaceSem);}~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);pthread_mutex_destroy(&_pmutex);pthread_mutex_destroy(&_cmutex);}
private:std::vector<T> _queue;int _cap;sem_t _spaceSem; // 生產者 想生產,看中的是什么資源呢? 空間資源sem_t _dataSem; // 消費者 想消費,看中的是什么資源呢? 數據資源int _productorStep;int _consumerStep;pthread_mutex_t _pmutex;pthread_mutex_t _cmutex;
};
Task.hpp
#pragma once#include <iostream>
#include <string>
#include <cstdio>
#include <functional>class Task
{using func_t = std::function<int(int,int,char)>;// typedef std::function<int(int,int)> func_t;
public:Task(){}Task(int x, int y, char op, func_t func):_x(x), _y(y), _op(op), _callback(func){}std::string operator()(){int result = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);return buffer;}std::string toTaskString(){char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);return buffer;}
private:int _x;int _y;char _op;func_t _callback;
};const std::string oper = "+-*/%";int mymath(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero error!" << std::endl;result = -1;}elseresult = x / y;}break;case '%':{if (y == 0){std::cerr << "mod zero error!" << std::endl;result = -1;}elseresult = x % y;}break;default:// do nothingbreak;}return result;
}
main.cc
#include "RingQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>std::string SelfName()
{char name[128];snprintf(name, sizeof(name), "thread[0x%x]", pthread_self());return name;
}void *ProductorRoutine(void *rq)
{// RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);while(true){// version1// int data = rand() % 10 + 1;// ringqueue->Push(data);// std::cout << "生產完成,生產的數據是:" << data << std::endl;// version2// 構建or獲取任務 --- 這個是要花時間的!int x = rand() % 10;int y = rand() % 5;char op = oper[rand()%oper.size()];Task t(x, y, op, mymath);// 生產任務ringqueue->Push(t);// 輸出提示std::cout << SelfName() << ", 生產者派發了一個任務: " << t.toTaskString() << std::endl;// sleep(1);}
}void *ConsumerRoutine(void *rq)
{// RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);while(true){//version1// int data;// ringqueue->Pop(&data);// std::cout << "消費完成,消費的數據是:" << data << std::endl;// sleep(1);// version2Task t;//消費任務ringqueue->Pop(&t);std::string result = t(); // 消費也是要花時間的!std::cout << SelfName() << ", 消費者消費了一個任務: " << result << std::endl;// sleep(1);}
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ pthread_self() ^ 0x71727374);// RingQueue<int> *rq = new RingQueue<int>();RingQueue<Task> *rq = new RingQueue<Task>();// 單生產,單消費,多生產,多消費 --> 只要保證,最終進入臨界區的是一個生產,一個消費就行!// 多生產,多消費的意義??pthread_t p[4], c[8];for(int i = 0; i < 4; i++) pthread_create(p+i, nullptr, ProductorRoutine, rq);for(int i = 0; i < 8; i++) pthread_create(c+i, nullptr, ConsumerRoutine, rq);for(int i = 0; i < 4; i++) pthread_join(p[i], nullptr);for(int i = 0; i < 8; i++) pthread_join(c[i], nullptr);delete rq;return 0;
}
大家可以自己敲一敲,試一下。
寫到這里,這篇博客就結束了,下篇博客我們再見。