生產者消費者模式就是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。這個阻塞隊列就是用來給生產者和消費者解耦的。
一、堵塞隊列
(1)三種關系
生產者vs生產者:互斥(加鎖)
消費者vs消費者:互斥(加鎖)
生產者vs消費者:互斥和同步(加鎖和條件變量)
(2)代碼實現
Makefile
test:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f test
BlockQueue.hpp
#include<iostream>
#include<queue>
#include<unistd.h>
#define M 10
template<class T>
class BlockQueue{
public:
BlockQueue(T cap=M)
:_capacity(M)
{pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_pcond,nullptr);pthread_cond_init(&_ccond,nullptr);}bool IsFull()
{return q.size()==_capacity;
}
bool empty()
{return q.size()==0;
}void Push( T in)
{pthread_mutex_lock(&_mutex);//if(!IsFull())//可能出現偽喚醒while(IsFull())//健壯性{pthread_cond_wait(&_pcond,&_mutex);}q.push(in);std::cout<<"push:"<<in<<std::endl;sleep(1);pthread_cond_signal(&_ccond);pthread_mutex_unlock(&_mutex);
}void Pop()
{pthread_mutex_lock(&_mutex);while(empty())//if(!empty())//如果這里使用if判斷的話,可能出現偽喚醒問題。{pthread_cond_wait(&_ccond,&_mutex);}auto n=q.front();std::cout<<"pop:"<<n<<std::endl;sleep(1);q.pop();pthread_cond_signal(&_pcond);pthread_mutex_unlock(&_mutex);}~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}private:pthread_mutex_t _mutex;pthread_cond_t _pcond;pthread_cond_t _ccond;int _capacity=M;std::queue<T> q;};
Main.cc?
#include<pthread.h>
#include"BlockQueue.hpp"
#include<iostream>
void* consumer(void* argv)
{BlockQueue<int>* q=static_cast<BlockQueue<int>*>(argv);int i=0;while(true){ q->Pop();}return nullptr;
}
void * productor(void* argv)
{BlockQueue<int>* q=static_cast<BlockQueue<int>*>(argv);while(true){int data=rand()%10+1;q->Push(data);}
return nullptr;
}int main()
{srand((unsigned)time(NULL)^getpid()^pthread_self());pthread_t c,p;BlockQueue<int>* bq=new BlockQueue<int>();pthread_create(&c,nullptr,consumer,bq);pthread_create(&p,nullptr,productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}
(3)總結
以上是單生產和單消費,對于多生產和多消費也是可以的,因為是同一個隊列,同一把鎖,同一把鎖就決定了,生產者和生產者,消費者和消費者之間就是互斥的,生產者和消費者的條件變量提供了同步。
隊列中的數據也可以是任務,堵塞隊列可以實現高并發高效率,在與每個線程都拿到了自己的任務
并且處理任務,處理任務也是需要時間的,這個決定了,每個線程拿到任務都在跑自己的任務代碼,實現高并發。同時條件變量讓生產者和消費者進行同步,保證了安全性。
1.消費者和生產者調度優先級
如果消費者線程先調度,隊列為空,消費者就會在條件變量下進行等待,等生產者生產商品了,就會喚醒消費者進行消費。
2.如何控制生產消費的節奏
我們可以通過sleep控制。比如消費者消費進行休眠的話,可以給生產者足夠的時間進行生產
3.偽喚醒
如果用if來判斷隊列為空可能會出現偽喚醒,有些線程處于等待堵塞,競爭鎖的狀態,一旦隊列為空而線程競爭到了鎖就會出現隊列為空依然進行pop的現象。
while(true)可以提供檢查,if判斷可能有風險。
環形隊列
(1)POSIX信號量
posix和SystemV信號量作用相同,都是用于同步操作,達到無沖突的訪問共享資源目的。 但POSIX可以用于線程間同步.
快速認識接口:
(1)初始化
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);參數: pshared:0表示線程間共享,非零表示進程間共享value:信號量初始值
(2)銷毀
int sem_destroy(sem_t *sem);
?
(3)等待
功能:等待信號量,會將信號量的值減1
int sem_wait(sem_t *sem);//p()
?
(3)通知
功能:發布信號量,表示資源使用完畢,可以歸還資源了。將信號量值加1。int sem_post(sem_t *sem);//V()
?
三種關系
(2)對于環形隊列,生產者和消費者就有兩種場景會指向同一個位置。要么為空,要么為滿,其他情況不會指向同一個位置。
1.如果隊列為空,只能讓生產者先生產,消費者不可以消費---------互斥
2.如果隊列為滿,只能讓消費者先消費,然后到生產者生產---------同步
3.其余情況,消費者都是在生產者后面的,兩個位置不同,即使pop和push同時進行,也是安全的,這個就是多線程高并發可以進入臨界區的原因。
如何實現多線程中的生產者和生產者,消費者和消費者的互斥問題,對于循環隊列,我們要定義兩把鎖,一個是push隊列的鎖,一個是pop隊列的鎖。pv操作是原子性的,讓生產者和消費者進行同步其中又可以體現互斥。
代碼實現
makefile
ringtest:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f ringtest
Main.cc
#include<iostream>
#include<pthread.h>
#include"RingQueue.hpp"
#include<pthread.h>void* producter(void* args)
{while(true){ RingQueue<int>* rq=static_cast<RingQueue<int>*>(args);int n=1;n=rand()%3+1;rq->Push(n);}}
void* consumer(void* argv)
{while(true)
{RingQueue<int>* q=static_cast<RingQueue<int>*>(argv);int date=0;q->Pop(&date);}}int main()
{srand(time(nullptr));pthread_t p,c;
pthread_mutex_t pm,cm;LockGuard pmutex(&pm),cmutex(&cm);RingQueue<int> ringqueue(cmutex,pmutex);
pthread_create(&p,nullptr,consumer,&ringqueue);
pthread_create(&c,nullptr,producter,&ringqueue);pthread_join(p,nullptr);pthread_join(c,nullptr);return 0;
}
RingQueue.hpp?
#include<vector>
#include<iostream>
#include"LockGuard.hpp"
#include<unistd.h>
const int Size =10;
template<class T>
class RingQueue{
private:void P(sem_t& sem){sem_wait(&sem);}void V(sem_t& sem){sem_post(&sem);}
public:
RingQueue(LockGuard c,LockGuard p,int s=Size)
:size(s),pmutex(p),cmutex(c),q(s),ppose(0),cpose(0)
{
sem_init(&psem,0,size);
sem_init(&csem,0,0);}// 生產
void Push(const T& in)
{// 先加鎖,還是先申請信號量?先申請信號量,效率高。申請到資源的線程,只有競爭到鎖,就可以生產了。P(psem);{pmutex;q[ppose] = in;std::cout<<"生產:"<<in<<std::endl;ppose++;ppose %=size;}V(csem);
}
void Pop(T* out)
{P(csem);{cmutex;*out = q[cpose];sleep(1);std::cout<<"消費:"<<*out<<std::endl;cpose++;cpose %=size;}V(psem);}
~RingQueue()
{sem_destroy(&psem);sem_destroy(&csem);
}private:int size;std::vector<int> q;int ppose;//生產者位置int cpose;//消費者位置sem_t psem;//生產者信號量sem_t csem;//消費者信號量LockGuard pmutex;LockGuard cmutex;};
#pragma once
#include<pthread.h>
#include <semaphore.h>
class Mutex{
public:Mutex(pthread_mutex_t* mutex):_Mutex(mutex){pthread_mutex_init(_Mutex,nullptr);}void Lock(){pthread_mutex_lock(_Mutex);}void unlock(){pthread_mutex_unlock(_Mutex);}~Mutex(){pthread_mutex_destroy(_Mutex);}private:pthread_mutex_t* _Mutex;
};class LockGuard{
public:LockGuard(pthread_mutex_t* lock):mutex(lock){mutex.Lock();}~LockGuard(){mutex.unlock();}private:Mutex mutex;};