1. 什么是線程同步
為什么會有線程同步,那一定是有了新問題。互斥可以解決臨界資源被同時訪問的問題,但是純互斥也會帶來新的問題。由于當前被執行的線程離cpu最近【其他線程被阻塞掛起還要被喚醒】,所以,當前進程對于競爭鎖天然就有極大的優勢。這勢必會導致當前線程重復申請和釋放鎖,其他線程很難拿到鎖,也就會造成線程饑餓的問題。純粹的互斥,不高效,也不公平!
因此,我們提出新的要求,當前線程一旦釋放鎖就不能立即申請鎖,外面的線程也不能亂作一團的競爭鎖,必須排隊申請鎖。而剛釋放鎖的線程要想再次申請鎖,就必須排到隊列的末尾!!
我們把多個執行流在臨界區安全的前提下,按照順序依次訪問臨界資源叫做線程同步!
?2. 條件變量
> 理解條件變量
? 當?個線程互斥地訪問某個變量時,它可能發現在其它線程改變狀態之前,它什么也做不了。
? 例如?個線程訪問隊列時,發現隊列為空,它只能等待,直到其它線程將?個節點添加到隊列 中。這種情況就需要用到條件變量。
為什么需要用到條件變量呢??因為線程a需要訪問隊列,線程b添加節點。這兩個線程互相競爭鎖,但是,只有隊列中添加節點后【即滿足一定的條件】,線程a拿到鎖才能做有效動作。因此,我們引入了條件變量。
當每個線程a訪問隊列時發現并不滿足條件,那么就掛起到隊列中等待。當線程b將節點添加之后,滿足了條件變量,此時再喚醒等待隊列中的一個或全部線程讓他們來訪問臨界資源。這樣做就高效多了!
> 條件變量接口
定義和初始化條件變量和互斥鎖的使用和互斥鎖一樣。
?這一批接口就是線程在指定條件變量下等待,很好理解。
喚醒在指定條件變量下等待的一個或所有線程。
> 使用條件變量接口demon?
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include <string>const int num = 5; // 創建5個線程
int cnt = 0;// 定義初始化鎖和條件變量
pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;void *routine(void *args)
{std::string name = static_cast<char *>(args);while (true){// 加鎖pthread_mutex_lock(&glock);// 使用條件變量等待pthread_cond_wait(&gcond, &glock);std::cout << name << "計算" << cnt << std::endl;cnt++;// 解鎖pthread_mutex_unlock(&glock);}return nullptr;
}int main()
{std::vector<pthread_t> tids;// 創建5個線程for (int i = 0; i < num; i++){char *name = new char[64];snprintf(name, 64, "線程%d", i);pthread_t tid;pthread_create(&tid, nullptr, routine, name);tids.push_back(tid);// 每個1秒創建一個線程sleep(1);}// 主線程每隔一秒主動喚醒線程while (true){std::cout << "喚醒線程" << std::endl;// pthread_cond_signal(&gcond);//每隔一秒主動喚醒一個線程pthread_cond_broadcast(&gcond); // 每隔一秒主動喚醒所有線程sleep(1);}for (auto &e : tids){pthread_join(e, nullptr);}return 0;
}
每次喚醒一個線程:?線程依次被喚醒在隊列中同步串行運行。
?每次喚醒所有線程:所有線程競爭鎖。
3. 生產者消費者模型?
在現實生活中,我們不是直接去工廠購物,而是在超市中購物,因為我們直接去工廠購物成本高,效率低。?
在生產者消費者模型中,一共有3種關系,2種角色,1個交易場所。【321原則】
三種關系:消費者和消費者之間【互斥競爭關系】,生產者和生產者之間【互斥競爭關系】,生產者和消費者之間【互斥和同步關系】。
兩種角色:生產者和消費者角色【由線程承擔】。
一個交易場所:以特定結構構成的一種”內存“空間。
為什么要有生產者消費者模型呢???
1. 生產過程和消費過程解耦。【由于互斥和同步機制,生產和消費之間是不互相干擾的】
2. 支持忙閑不均。【即便消費者線程不工作,生產者線程也可以不斷執行。反之也是如此】
3. 提高效率。【獲取任務和處理任務是并發的!!】
4. 基于阻塞隊列的生產者消費者模型
在多線程編程中阻塞隊列(Blocking Queue)是?種常用于實現生產者和消費者模型的數據結構。其與普通的隊列區別在于,當隊列為空時,從隊列獲取元素的操作將會被阻塞,直到隊列中被放入了元素;當隊列滿時,往隊列里存放元素的操作也會被阻塞,直到有元素被從隊列中取出(以上的操作都是基于不同的線程來說的,線程在對阻塞隊列進程操作時會被阻塞)
block_queue.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include <queue>const int defaultcap = 5;template <typename T>
class block_queue
{bool is_empty(){return _q.size() <= 0;}bool is_full(){return _q.size() >= _cap;}public:block_queue(int num = defaultcap): _cap(num), _consum_sleep_num(0), _product_sleep_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full_cond, nullptr);pthread_cond_init(&_empty_cond, nullptr);}// 消費T &consum(){pthread_mutex_lock(&_mutex);while (is_empty()){// 空了就在阻塞隊列等待被喚醒_consum_sleep_num++;pthread_cond_wait(&_empty_cond, &_mutex);_consum_sleep_num--; // 被喚醒}T &out = _q.front();_q.pop();// 隊列一定不滿,喚醒生產者if (_product_sleep_num != 0){std::cout << "喚醒生產者……" << std::endl;pthread_cond_signal(&_full_cond);}// 解鎖pthread_mutex_unlock(&_mutex);return out;}// 生產void product(const T &in){pthread_mutex_lock(&_mutex);while (is_full()){// 滿了就在阻塞隊列等待被喚醒_product_sleep_num++;pthread_cond_wait(&_full_cond, &_mutex);_product_sleep_num--; // 被喚醒}_q.push(in);// 隊列一定不空,喚醒消費者if (_consum_sleep_num != 0){std::cout << "喚醒消費者……" << std::endl;pthread_cond_signal(&_empty_cond);}// 解鎖pthread_mutex_unlock(&_mutex);}~block_queue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);}private:std::queue<T> _q;int _cap; // 隊列容量上限pthread_mutex_t _mutex;pthread_cond_t _full_cond;pthread_cond_t _empty_cond;int _consum_sleep_num; // 消費者休眠個數int _product_sleep_num; // 生產者休眠個數
};
消費者這里為什么要寫成循環呢???假設生產者只生產了一個商品,但是生產者卻喚醒了一批消費者!!那么消費者就全部來競爭鎖,一個消費者消費完一個商品后釋放鎖。其他消費者并不在阻塞隊列中等待,而是在鎖上等待。如果消費者立即拿到鎖,生產者還沒有來得及生產,那么消費者就沒有商品可以消費了,但是此時消費者已近持有鎖區消費空隊列了,也就是_q.pop()。這樣在代碼層面就出問題了!!我們把這種情況稱為偽喚醒!所以我們要在判斷上加上循環,即便被喚醒了,也要再次檢查隊列中商品是否為空!
main.cc
#include "block_queue.hpp"
#include <unistd.h>// 生產者
void *product(void *args)
{int cnt = 1;block_queue<int> *bq = static_cast<block_queue<int> *>(args);while (true){sleep(1);bq->product(cnt);std::cout << "生產者生產了:" << cnt << std::endl;cnt++;}
}// 消費者
void *consum(void *args)
{block_queue<int> *bq = static_cast<block_queue<int> *>(args);while (true){//sleep(1);int t = bq->consum();std::cout << "生產者消費了:" << t << std::endl;}
}int main()
{// 申請阻塞隊列block_queue<int> *bq = new block_queue<int>();// 構建生產者和消費者pthread_t p, c;pthread_create(&p, nullptr, product, bq);pthread_create(&p, nullptr, consum, bq);pthread_join(p, nullptr);pthread_join(c, nullptr);return 0;
}
5. 封裝條件變量?
mutex.hpp
#pragma once
#include <iostream>
#include <pthread.h>
namespace mutex_module
{class mutex{public:mutex(){int n = pthread_mutex_init(&_lock, nullptr);(void)n;}void lock(){pthread_mutex_lock(&_lock);}void unlock(){pthread_mutex_unlock(&_lock);}~mutex(){int n = pthread_mutex_destroy(&_lock);}pthread_mutex_t* get(){return &_lock;}private:pthread_mutex_t _lock;};// 采?RAII?格,進?鎖管理class lock_guard{public:lock_guard(mutex &m) : _m(m){_m.lock();}~lock_guard(){_m.unlock();}private:mutex &_m;};
}
cond.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include "mutex.hpp"using namespace mutex_module;namespace cond_module
{class cond{public:cond(){pthread_cond_init(&_cond, nullptr);}void wait(mutex &lock){pthread_cond_wait(&_cond, lock.get());}void signal(){pthread_cond_signal(&_cond);}void broadcast(){pthread_cond_broadcast(&_cond);}~cond(){pthread_cond_destroy(&_cond);}private:pthread_cond_t _cond;};
}