目錄
一 生產消費者模型
1. 概念:
2. 基于阻塞隊列的生產消費者模型:
1. 對鎖封裝
2. 對條件變量封裝
二 信號量(posix)
1. 概念
2. API
3. 基于環形隊列的生產消費者模型
三 線程池
1. 概念
2. 示例
四 補充字段
1. 可重入函數 VS 線程安全
2. 死鎖的4個特征
一 生產消費者模型
1. 概念:
在現實生活中,工廠/超市/人,工廠給超市供貨,超市緩存貨物,人來取貨物,避免了人與工廠直接交互,工廠和人只需要與超市進行交互,也就是變相的解耦。
這種模型典型的特征:
- 1種交易場所:超市
????????超市作為人和工廠進行交互的對象,人拿貨物,工廠繼續生產,工廠出貨物,人處理貨物,不存在工廠生產的時候,人去工廠拿貨物,工廠停下生產提供貨物,反向也是,所以超市是用來提高效率的。
- 2種角色:人和工廠
? ? ? ? 人就是消費者,工廠就是生產者。
- 三種關系:
? ? ? ? 工廠和工廠的關系:如果超市還差一個貨物滿了,工廠與工廠之間同時出貨物,超市只能處理一個,剩下處理不了,所以工廠和工廠是互斥關系。
? ? ? ? 人和人的關系:同理上面,超市只剩一個貨物,人與人同時拿,必定有一個拿不到,所以也是互斥關系。
? ? ? ? 工廠和人:工廠在提供貨物的時候,提供到一半,人就拿走了,導致數據不一致問題,所以他們也是互斥關系。
? ? ? ? 超市有沒有貨物人怎么知道:生產者知道,生產者生產數據就會告訴消費者。
????????超市貨物滿沒滿生產者怎么知道:人知道,人拿貨物就會告訴消費者。
? ? ? ? 所以工廠和人也有同步關系:沒有數據人就等,告訴工廠去生產,反向一樣。
2. 基于阻塞隊列的生產消費者模型:
首先消費者和消費者之間需要互斥(生產者一樣),消費者和生產者之間也需要互斥,交易場所沒數據需要生產者通知消費者,反之數據滿了消費者通知生產者生產數據。
1. 對鎖封裝
#include <pthread.h>class mymutex
{
public:// 初始化互斥鎖mymutex(){pthread_mutex_init(&_mymutex, nullptr);}// 加鎖void lock() { pthread_mutex_lock(&_mymutex); }// 解鎖void unlock() { pthread_mutex_unlock(&_mymutex); }// 返回鎖地址pthread_mutex_t *getrefmutex() { return &_mymutex; }// 釋放鎖~mymutex(){pthread_mutex_destroy(&_mymutex);}private:pthread_mutex_t _mymutex;private:// 鎖不能拷貝mymutex(const mymutex ©) = delete;mymutex &operator=(const mymutex ©) = delete;
};// 讓另一個對象來管理鎖的初始化和釋放
class mymutexguard
{
public:// 初始化/獲取鎖mymutexguard(mymutex *mtu) : _mtu(mtu){_mtu->lock();}// 解鎖~mymutexguard() { _mtu->unlock(); }
private:mymutex *_mtu;
};
2. 對條件變量封裝
#include <pthread.h>class mycond
{
public:// 初始化 條件變量mycond(){pthread_cond_init(&_mycond, nullptr);}// 喚醒條件變量隊頭的線程void signal_one(){pthread_cond_signal(&_mycond);}// 喚醒條件變量全部線程void signal_all(){pthread_cond_broadcast(&_mycond);}// 線程去條件變量隊列中等待void wait(pthread_mutex_t *mymtu){pthread_cond_wait(&_mycond, mymtu);}// 釋放條件變量~mycond(){pthread_cond_destroy(&_mycond);}private:pthread_cond_t _mycond;
};
3. 交易場所
#include "mycond.hpp"
#include "mymutex.hpp"// 交易場所
template <class T>
class Pro_co
{
public:// 隊列大小為 sizePro_co(int size = 5) : _size(size) {}~Pro_co() {}// 生產數據void mypush(const T &val){// 自動管理鎖的加鎖和解鎖,局部變量初始化自動加鎖,出作用域自動調用析構解鎖mymutexguard mg(&_mymutex);//_mymutex.lock();// 如果隊列滿了,去生產者的條件變量等待,等待前自動解鎖,喚醒重新申請鎖// 注意這里要 循環 判斷// 如果采用 if() 如果數據還差一個滿了,消費者喚醒一個生產者,新來的生產者比喚醒的先搶到鎖,并放入數據,此時隊列數據滿,// 后續的消費者申請鎖,如果被被喚醒的這個生產者也先搶到鎖,生產數據,但數據已經滿了,出問題// 如果同時喚醒,數據還差一個滿了,生產數據出問題,同理上面while (isfull()){// 入生產者隊列_mypro.wait(_mymutex.getrefmutex());}// 放入數據_q.push(val);std::cout << "生產了 :" << val << std::endl;//_mymutex.unlock();// 喚醒一個消費者_myco.signal_one();// 出作用域自動解鎖}// 消費數據void mypop(T *val){mymutexguard mg(&_mymutex);// 如果隊列滿了,去消費者的條件變量等待,等待前自動解鎖,喚醒重新申請鎖// 注意這里要 循環 判斷// 如果采用 if() 如果數據還有1個,條件變量里有多個,如果喚醒一個,鎖被新來的消費者搶到,消費完數據變成1,// 此時被喚醒的消費者也申請到鎖,比生產者先搶到(此時不在條件變量等待隊列里,不管數據還有沒有),都會執行消費數據,但數據為空,出問題// 如果全部喚醒,數據只有一個,同時消費數據,也會出問題while (empty()){// 去消費者隊列等待_myco.wait(_mymutex.getrefmutex());}// 處理數據*val = _q.front();_q.pop();std::cout << "消費了 :" << *val << std::endl;// 喚醒一個生產者_mypro.signal_one();// 出作用域自動解鎖}// 隊列是否為空bool empty() { return _q.empty(); }// 隊列是否為滿bool isfull() { return _q.size() == _size; }private:mymutex _mymutex;mycond _mypro;mycond _myco;std::queue<T> _q;int _size;
};
當訪問交易場所的時候,只能有一個線程能訪問,純串行訪問,效率何在?
這里的效率指的是生產者生產數據和消費者消費數據都需要時間,僅放/拿數據是互斥的,放/拿數據和他們生產任務/處理任務不和交易場所直接關聯,也就是通過交易場所進行解耦,互相不沖突達到并行效果,所以總體來說效率高,并且上面是把交易場所當整體使用了,也可以比如交易場所有10個數據,同時放10個消費者取數據,維護他們之間的互斥和同步,也能達到同時消費,進而提高效率,所以下面引入信號量。
二 信號量(posix)
1. 概念
信號量本質是一個計數器,自帶同步互斥機制,P/V操作對應--/++,表示資源的數量,一般用來輔助進程/線程同步的。
2. API
????????
#include <semaphore.h>int sem_init(sem_t *sem, // 信號量 int pshared, // 信號量屬性unsigned int value // 初始值);
// P--操作
int sem_wait(sem_t *sem);// V++操作
int sem_post(sem_t *sem);// 釋放信號量
int sem_destroy(sem_t *sem);
3. 基于環形隊列的生產消費者模型
還是和之前一樣,生產和消費者之間是互斥關系,但這里不一樣,如果生產和消費者指向環形隊列的同一個位置,表示隊列為空或者滿了,這時候只能有一個角色可以進入隊列,互斥,但如果不空或者不滿,他們指向的位置一定是不一樣的,所以可以并行訪問隊列。
生產和生產者之間是互斥關系,因為STL容器本身不是線程安全的,多個生產者對同一個下標進行操作,有線程安全問題,消費者也是,所以需要鎖同步。
1. 封裝信號量
#include <semaphore.h>
#include <iostream>// 封裝信號量
class mysem
{public:// 初始化信號量值mysem(int size){sem_init(&_mysem,0,size);}// 釋放信號量~mysem(){sem_destroy(&_mysem);}// --操作void P(){sem_wait(&_mysem);}// ++操作void V(){sem_post(&_mysem);}private:sem_t _mysem;
};
2. 環形隊列
#include "mymutex.hpp"
#include "sem.hpp"
#include <vector>
#include <iostream>// 環形隊列
template <class T>
class Circular_queue
{
public:// 初始化信號量,緩沖區大小Circular_queue(int size): _v(size), _cap(size), _prosem(size), _co(0),_head(0),_tail(0){}~Circular_queue() {}// 生產任務void mypush(const T &val){// 信號量自帶原子操作,資源不足自動阻塞_prosem.P();{// 生產者和生產者互斥關系mymutexguard guard(&_promutex);_v[_tail++] = val;// 保證環形隊列的性質_tail %= _cap;}// 生產數據,告訴消費者有數據_co.V();}// 拿任務void mypop(T *val){// 信號量自帶原子操作,資源不足自動阻塞_co.P();{// 消費者和消費者互斥關系mymutexguard guard(&_comutex);*val = _v[_head++];// 保證環形隊列的性質_head = _head % _cap;}// 拿走數據,告訴生產者有空間_prosem.V();}private:// 緩沖區/大小std::vector<T> _v;int _cap;// 生產者的鎖和信號量和下標mysem _prosem;mymutex _promutex;int _tail;// 消費者的鎖和信號量和下標mysem _co;mymutex _comutex;int _head;
};
三 線程池
1. 概念
什么是線程池,顧名思義,池子就是預先預留一塊對象,比如內存塊,進程/線程....等,先創建出來,想要用直接用,省去自己創建的麻煩,也是提高效率的設計,比如Linux中的按需分配,寫時拷貝,CPU緩存.....等等都是對效率有很大的提升。
而線程池就是預先創建一批線程,進行后續合理分配任務。
2. 示例
下面基于生產消費者模型實現的線程池:
#include <iostream>
#include <vector>
#include "thread.hpp"
#include <queue>
#include "mycond.hpp"
#include "mymutex.hpp"
#include <functional>template <class T>
class thread_pool
{
public:// 線程池線程個數thread_pool(int nums) : _nums(nums), _isrunning(false), _threads_nums(0){for (int i = 0; i < _nums; i++){// 創建線程池_v.push_back(mythread(std::to_string(i), std::bind(&thread_pool::mypop, this)));}}// 釋放線程~thread_pool(){// for (auto &e : _v)// e.join();}// 啟動線程void start(){// 如果線程池在運行直接返回if (_isrunning)return;_isrunning = true;// 啟動線程for (auto &e : _v)e.start();}// 緩沖區是否為空bool empty() { return _q.empty(); }// 推送數據void mypush(const T &val){// 生產和消費者保持互斥關系。加鎖mymutexguard guard(&_mtu);_q.push(val);// 推送任務,如果有線程在等待則喚醒if (_threads_nums > 0)_cond.signal_one();}// 線程拿任務void mypop(){while (true){T val;{// 消費和消費者是互斥關系,加鎖mymutexguard guard(&_mtu);// 如果緩沖區為空并且線程池不停止進行等待while (empty() && _isrunning==true){_threads_nums++;_cond.wait(_mtu.getrefmutex());_threads_nums--;}// 如果緩沖區為空并且線程池停止,則退出if (empty() && !_isrunning){std::cout<<"線程退出"<<std::endl;return;}// 拿走數據val = _q.front();_q.pop();}// 處理任務val();}}// 停止線程void stop(){mymutexguard guard(&_mtu);if (!_isrunning)return;_isrunning=false;std::cout<<"cnt :"<<_threads_nums<<std::endl;_isrunning == false;if (_threads_nums > 0)_cond.signal_all();std::cout << "主線程停止" << std::endl;}private: // 線程池個數int _nums;std::vector<mythread> _v;// 緩沖區std::queue<T> _q;// 條件鎖mycond _cond;// 互斥鎖mymutex _mtu;// 線程池狀態bool _isrunning;// 線程在條件變量等待的個數int _threads_nums;
};
四 補充字段
1. 可重入函數 VS 線程安全
可重入函數:
- 函數被多個執行流調用,出現問題,函數稱為不可重入,反之可重入。
- 一般全局對象被多執行流共享,在函數內修改可能出現問題。
線程安全:
- 多個線程訪問數據資源,可能因為相互影響造成數據不一致等問題,稱為線程安全問題。
- 多個線程對同一個共享對象進行操作,可能異常。
1. 可重入函數一定是線程安全的,因為函數可以被多個執行流進入且不會出現問題。
2. 線程安全不一定可重入,加了鎖再次進入該函數,比如遞歸,就死鎖了。
3. 可重入函數強調的是函數有沒有問題,線程安全強調的是線程之間會不會互相影響。
2. 死鎖的4個特征
互斥條件:一個鎖只能一個人使用。
請求與保持條件:我干任何事情(包括申請別人持有的鎖),鎖還是我的。
不可剝奪條件:你不能搶我的鎖。
循環等待條件:我想要一個鎖,但這個鎖被你拿了,你想要一個鎖,但這個鎖被我拿了,互相卡住不放,其實是對請求與保持的補充。
避免死鎖只需要破壞上述一個條件即可。