文章目錄
- 線程同步概念
- 生產者消費者模型
- 條件變量
- 使用條件變量
- 喚醒條件變量
- 阻塞隊列
線程同步概念
互斥能保證安全,但是僅有安全不夠,同步可以更高效的使用資源
生產者消費者模型
下面就基于生產者消費者來深入線程同步等概念:
如何理解生產消費者模型:
以函數調用為例:
兩個線程之間要進行信息交互就需要引入一段內存空間(交易場所)
線程a將數據放入緩沖區(交易場所),線程b從緩沖區進行讀取
這樣線程a完成數據存放后就繼續做自己的事情,線程b去讀取數據
這樣就能很好的實現多執行流之間的執行解耦
特點:很好的提高了處理數據的能力
支持忙閑不均
條件變量
條件變量:為了不讓消費者的每次消費為無效消費.
所以對于生產者,在每次完成自己的任務之后對條件做出改變,當條件的變量達到一定條件后,消費者才進行有效消費
無效消費過程: 消費前(加鎖)----嘗試消費(無效消費)—消費結束(解鎖)
條件變量的目的:
1.不做無效的鎖申請
2.假設消費者很多,讓他們有執行順序
相當于條件變量給各個線程在調度他之前給一個提醒
條件變量本質是數據:可以理解為:
使用條件變量
認識接口
與互斥鎖的創建和使用非常相似
pthread_cond_destroy();
//創建布局條件變量要后要進行銷毀
pthread_cond_init();
//對局部的條件變量進行初始化
pthread_cond_t;
//關鍵字 創建布局變量
全局就要提供PTHREAD_COND_INITIALIZER
的宏來進行初始化
條件變量創建的前提是有線程安全,所以條件變量的接口和互斥鎖的接口大致類似.
條件創建了還要有一個接口來等待條件成立:
pthread_cond_wait();
//等待條件成立,參數為條件變量和互斥鎖
上述所有的參數返回值都是在成功時返回0
失敗返回錯誤原因
喚醒條件變量
pthread_cond_signal();
//喚醒指定的條件變量,并喚醒一個線程
pthread_cond_broadcast();
//是條件變量成立,并喚醒所有的線程
在沒有條件變量的時候,打印信息如圖:
可以看到線程的調度是不確定的,我們想讓這個線程按照我們想要的順序(如:一次Thread-1,Thread-2,Thread-3,這樣)進行打印,那么就需要用到條件變量.
代碼:
#include <iostream>
#include <unistd.h>
#include <string>
#include <pthread.h>//創建條件變量和互斥鎖
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;void *ThreadRoutine(void* args)
{std::string name = static_cast<const char *>(args);while(true){sleep(1);pthread_mutex_lock(&mutex);pthread_cond_wait(&cond, &mutex);std::cout << "I am a new Thread, my name is " << name << std::endl;pthread_mutex_unlock(&mutex);}
}
int main()
{pthread_t t1, t2, t3;pthread_create(&t1, nullptr, ThreadRoutine, (void*)"Thread-1");pthread_create(&t2, nullptr, ThreadRoutine, (void*)"Thread-2");pthread_create(&t3, nullptr, ThreadRoutine, (void*)"Thread-3");while(true){sleep(1);pthread_cond_signal(&cond);}pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);return 0;
}
我們讓代碼按照t1, t2, t3的線程順序來執行子線程的任務
換一個順序驗證也是如此
如果沒有條件變量,這個也是按照順序打印,不過是批次進行,和CPU時鐘機制有關,所有使用條件變量更好
使用pthread_cond_broadcast();
//喚醒全部線程
就跟加鎖機制一樣,不過每次是各個線程只執行一次后就會等待,并不想無鎖那樣批次打印
單純的互斥能保證線程的安全, 但不一定合理或者高效.
pthread_cond_wait();//
在等待的時候,會釋放這把鎖(等待是在臨界區內,釋放鎖是為了資源高效利用,再次加鎖是不允許在有鎖的臨界區內有無鎖的線程存在)
再被喚醒的時候,又會再次加鎖
當被喚醒的時候,重新申請也是需要參與鎖的競爭的(未解決這個問題, 看下main阻塞隊列部分的講解)
阻塞隊列
這個隊列只有為空,為滿兩種狀態
為空:消費線程不能再消費
為滿:生產線程不能在生產
這個場景也滿足上述說明的所需的321原則
(3種關系,生產–生產,消費–消費
2中角色:生產者,消費者
1個環境(這個阻塞隊列就是一個臨界區))
單生產,單消費:
基于隊列實現,阻塞隊列的操作(消費者生產者實例):
偽喚醒:
在這份代碼中,將來如果因為productor慢不滿足生產, 多個線程在一個阻塞隊列中等待,而有一個Push達到(有一個生產剛產出), 假設此時的代碼是將全部線程都喚醒,那么除了第一個線程得到條件變量的滿足和鎖的滿足,其他線程會在條件變量下的等待轉化為競爭鎖等待的情況, 假設此時若第一個線程完成pop且unlock速度快,那么這時后續的線程會在得到鎖之后直接對空隊列進行Pop操作,這是就會出現錯誤,這個狀態就是偽喚醒(條件不滿足,但是線程被喚醒了)
(雖然上述只是假設, 但是cpu的運行速度很快, 我們不防會有這樣的情況發生)
所以修改Pop內的if(IsFull)代碼和Push內的(IsEmpty)代碼,還可以使用之前的鎖封裝的代碼
此時的消費者生產者代碼:
BlockQueue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"int defaultcap = 5; // for testtemplate <class T>
class BlockQueue
{
public:BlockQueue(int cap = defaultcap): _capacity(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_p_cond, nullptr);pthread_cond_init(&_c_cond, nullptr);}bool IsEmpty(){return _q.size() == 0;//查看隊列狀態}bool IsFull(){return _q.size() == _capacity;//此時為滿,}bool Pop(T *out){LockGuard lg(&_mutex);while(IsEmpty()){//為空, 進行等待pthread_cond_wait(&_c_cond, &_mutex);}*out = _q.front();_q.pop();//可以生產//可增加水準線進行響應的操作pthread_cond_signal(&_p_cond);//pthread_mutex_unlock(&_mutex);return true;}bool Push(const T &in){// 當前變量進行加鎖LockGuard lg(&_mutex);//pthread_mutex_lock(&_mutex);while(IsFull()){// 為滿,進行阻塞等待pthread_cond_wait(&_p_cond, &_mutex);}// 進行生產等待_q.push(in);//可以進行消費pthread_cond_signal(&_c_cond);//pthread_mutex_unlock(&_mutex);return true;}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_p_cond);pthread_cond_destroy(&_c_cond);}private:std::queue<T> _q;int _capacity; // 為空時,不能再消費,為滿時,不能再生產,狀態是capacity與size進行比較pthread_mutex_t _mutex;pthread_cond_t _p_cond;pthread_cond_t _c_cond;
};
main.cc
#include "BlockQueue.hpp"
#include <pthread.h>
#include <time.h>
#include <sys/types.h>
#include <unistd.h>void *productor(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){int data = rand() % 10 + 1;//[1,10];bq->Push(data);std::cout << "consumer data: " << data << std::endl;sleep(1);}
}
void *consumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){int data = 0;bq->Pop(&data);std::cout << "product data: " << data << std::endl;}
}
int main()
{srand((uint64_t)time(nullptr) ^ getpid() ^ pthread_self());BlockQueue<int> * bq = new BlockQueue<int>();pthread_t c, p;//兩個線程pthread_create(&p, nullptr, productor, bq);pthread_create(&c, nullptr, consumer, bq);pthread_join(p, nullptr);pthread_join(c, nullptr);return 0;
}
利用生產者消費者模型實現分派任務的操作()
在此基礎上構建一個任務類型:
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
enum //設置退出碼
{ok = 0,div_zero,mod_zero,unknow
};
const std::string opts = "+-*/%)[()]"; //設置隨機運算
class Task
{
public:Task()//無參構造函數用于生成無參臨時對象,如果有參構造是全缺省那么可以不用寫這個無參構造函數{}Task(int x, int y, char op):data_x(x), data_y(y), opt(op), result(0), code(ok){}void Run()//任務主題內容{switch(opt){case '+':result = data_x + data_y;break;case '-':result = data_x - data_y;break;case '*':result = data_x * data_y;break;case '/':{if(data_y == 0){code = div_zero;}else{result = data_x / data_y;}break;}case '%':{if(data_y == 0){code = mod_zero;}else{result = data_x % data_y;}break;}default:code = unknow;break;}}void operator()(){Run();}~Task(){}//打印任務,用于更清晰的認識std::string print_task(){std::string s;s = std::to_string(data_x) + opt + std::to_string(data_y) + "=?\n";return s;}std::string print_result(){std::string s;s = std::to_string(data_x) + opt + std::to_string(data_y) + "=" + std::to_string(result) + "[" + std::to_string(code) + "]" + "\n";return s;}
private:int data_x;int data_y;char opt;int result;int code;
};
對上述代碼的修改:
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <time.h>
#include <sys/types.h>
#include <unistd.h>void *productor(void *args)//生產者
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);//阻塞隊列的對象while(true){int x = rand() % 11;//[0,10];int y = rand() % 11;//[0,10];char opt = opts[rand() % opts.size()];Task t(x, y, opt);std::cout << t.print_task() << std::endl;;bq->Push(t);//放入隊列,隊列size+1//usleep(1000);}
}
void *consumer(void *args)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);while(true){sleep(1);Task t;//1.拿到消費數據bq->Pop(&t);//2.執行任務t();//3.打印任務信息std::cout << t.print_result() << std::endl;;}
}
int main()
{srand((uint64_t)time(nullptr) ^ getpid() ^ pthread_self());//偽隨機種子BlockQueue<Task> * bq = new BlockQueue<Task>();//創建一個阻塞隊列pthread_t c, p;//兩個線程pthread_create(&p, nullptr, productor, bq);//兩個線程模擬消費者生產者模型pthread_create(&c, nullptr, consumer, bq);pthread_join(p, nullptr);pthread_join(c, nullptr);return 0;
}
針對上述代碼,生產者和消費者本身就是互斥的,也就是串行執行,怎么會高效呢?
探究這個問題,首先從消費者消費后去哪里?生產在生產之前從哪來?來考慮.
生產者的數據,在產生時是花費時間,消費者消費也要花時間.
在消費者處理數據時花時間的同時生產者在某個時刻剛好將數據傳給臨界區,生產者只需要保證自己完成傳送就可以做其他自己的事,消費者自己繼續處理數據
所以高效,并發不體現在同步互斥,而是在拿數據,處理數據這里.
多線程任務下的消費者生產者模型多對多:
將bq和線程名字一起封裝可以更好的觀察:
本篇結束,下篇更精彩,關注我,帶你飛~~~~