1.基礎BlockingQueue的生產者消費模型
1.1 BlockQueue
在多線程編程中阻塞隊列是一種常用于實現生產者和消費者模型的數據結構,它與普通的隊列區別在于,當隊列為空時,從隊列獲取元素的操作將被阻塞,直到隊列中放入了新的數據。當隊列滿的時候,往里面插入數據也會被阻塞直到有元素被取出
1.2 c++ queue模擬實現阻塞隊列的生產消費模型
為了理解我們來看一下代碼
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
template <typename T>
class BlockQueue
{
private:
bool IsFull()
{
return _block_queue.size() == _cap;
}
bool IsEmpty()
{
return _block_queue.empty();
}
public:
BlockQueue(int cap) : _cap(cap)
{
_productor_wait_num = 0;
_consumer_wait_num = 0;
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_product_cond, nullptr);
pthread_cond_init(&_consum_cond, nullptr);
}
void Enqueue(T &in) // ?產者?的接?
{
pthread_mutex_lock(&_mutex);
while(IsFull()) // 保證代碼的健壯性
{
// ?產線程去等待,是在臨界區中休眠的!你現在還持有鎖呢!!!
// 1. pthread_cond_wait調?是: a. 讓調?線程等待 b. ?動釋放曾經持有的
_mutex鎖 c. 當條件滿?,線程喚醒,pthread_cond_wait要求線性
// 必須重新競爭_mutex鎖,競爭成功,?可返回!!!
// 之前:安全
_productor_wait_num++;
pthread_cond_wait(&_product_cond, &_mutex); // 只要等待,必定會有喚
醒,喚醒的時候,就要繼續從這個位置向下運?!!
_productor_wait_num--;
// 之后:安全
}
// 進??產
// _block_queue.push(std::move(in));
// std::cout << in << std::endl;
_block_queue.push(in);
// 通知消費者來消費
if(_consumer_wait_num > 0)
pthread_cond_signal(&_consum_cond); // pthread_cond_broadcast
pthread_mutex_unlock(&_mutex);
}
void Pop(T *out) // 消費者?的接? --- 5個消費者
{
pthread_mutex_lock(&_mutex);
while(IsEmpty()) // 保證代碼的健壯性
{
// 消費線程去等待,是在臨界區中休眠的!你現在還持有鎖呢!!!
// 1. pthread_cond_wait調?是: a. 讓調?進程等待 b. ?動釋放曾經持有的
_mutex鎖
_consumer_wait_num++;
pthread_cond_wait(&_consum_cond, &_mutex); // 偽喚醒
_consumer_wait_num--;
}// 進?消費
*out = _block_queue.front();
_block_queue.pop();
// 通知?產者來?產
if(_productor_wait_num > 0)
pthread_cond_signal(&_product_cond);
pthread_mutex_unlock(&_mutex);
// pthread_cond_signal(&_product_cond);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_product_cond);
pthread_cond_destroy(&_consum_cond);
}
private:
std::queue<T> _block_queue; // 阻塞隊列,是被整體使?的!!!
int _cap; // 總上限
pthread_mutex_t _mutex; // 保護_block_queue的鎖
pthread_cond_t _product_cond; // 專?給?產者提供的條件變量
pthread_cond_t _consum_cond; // 專?給消費者提供的條件變量
int _productor_wait_num;
int _consumer_wait_num;
};📌 注意:這?采?模版,是想告訴我們,隊列中不僅僅可以防?內置類型,?如int, 對象也可
以作為任務來參與?產消費的過程哦.
下?附上?張代碼,?便課堂使?
#pragma once
#include <iostream>
#include <string>
#include <functional>
// 任務類型1
// class Task
// {
// public:
// Task() {}
// Task(int a, int b) : _a(a), _b(b), _result(0)
// {
// }
// void Excute()
// {
// _result = _a + _b;
// }
// std::string ResultToString()
// {
// return std::to_string(_a) + "+" + std::to_string(_b) + "=" +
std::to_string(_result);
// }
// std::string DebugToString()
// {
// return std::to_string(_a) + "+" + std::to_string(_b) + "=?";
// }
// private:
// int _a;
// int _b;
// int _result;
// };
// 任務類型2
using Task = std::function<void()>;
2-3 為什么ptread_cond_wait需要互斥量
條件等待是線程同步的一種手段,如果只有一個線程,條件不滿足,一直等待下去都不會被滿足,所以必須要有一個線程通過某些手段,去改變共享變量,使原先的條件不滿足變成滿足,然后再去友好的通知等待在該條件變量上的線程
條件不會無緣無故的突然變得滿足了,必然牽扯到共享數據的變化,所以一定要用互斥鎖來保護
// 錯誤的設計
pthread_mutex_lock(&mutex);
while (condition_is_false) {
pthread_mutex_unlock(&mutex);
//解鎖之后,等待之前,條件可能已經滿?,信號已經發出,但是該信號可能被錯過
pthread_cond_wait(&cond);
pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
由于解鎖和等待不是原子性,調用解鎖之后pthread_cond_wait之前,如果有其他的線程獲得互斥量摒棄條件滿?,發送了信號,那么 pthread_cond_wait 將錯過這個信 號,可能會導致線程永遠阻塞在這個 pthread_cond_wait 。所以解鎖和等待必須是?個原?操作。
2-4 條件變量的使用規范
pthread_mutex_lock(&mutex);
while (條件為假)
pthread_cond_wait(cond, mutex);
修改條件
pthread_mutex_unlock(&mutex);
給條件發現信號
pthread_mutex_lock(&mutex);
設置條件為真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
2-5條件變量的封裝
大家有興趣的可以看一下
test_4_7_線程池/Cond.h · liu xi peng/linux---ubuntu系統 - 碼云 - 開源中國
2-6 POSIX信號量
POSIX信號量和SystemV信號量作用相同,都是用于同步操作,達到無沖突的訪問共享資源,但是OIXIS可以用于線程間同步
初始化
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
參數:
pshared:0表?線程間共享,?零表?進程間共享
value:信號量初始值
銷毀
int sem_destroy(sem_t *sem);
等待