條件變量
條件變量本身不是鎖!但它也可以造成線程阻塞。通常與互斥鎖配合使用。給多線程提供一個會合的場所。
主要應用函數:
- pthread_cond_init 函數
- pthread_cond_destroy 函數
- pthread_cond_wait 函數
- pthread_cond_timedwait 函數
- pthread_cond_signal 函數
- pthread_cond_broadcast 函數
- 以上 6 個函數的返回值都是:成功返回 0, 失敗直接返回錯誤號。
- pthread_cond_t 類型 用于定義條件變量
- pthread_cond_tcond;
pthread_cond_init 函數
初始化一個條件變量
int pthread_cond_init(pthread_cond_t *restrictcond,const pthread_condattr_t *restrictattr);
參 2:attr 表條件變量屬性,通常為默認值,傳 NULL 即可
也可以使用靜態初始化的方法,初始化條件變量:
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
pthread_cond_destroy 函數
銷毀一個條件變量
int pthread_cond_destroy(pthread_cond_t *cond);
pthread_cond_wait 函數
阻塞等待一個條件變量
int pthread_cond_wait(pthread_cond_t *restrictcond,pthread_mutex_t *restrictmutex);
函數作用:
- 阻塞等待條件變量 cond(參 1)滿足
- 釋放已掌握的互斥鎖(解鎖互斥量)相當于 pthread_mutex_unlock(&mutex);
- 1.2.兩步為一個原子操作。
- 當被喚醒,pthread_cond_wait 函數返回時,解除阻塞并重新申請獲取互斥鎖 pthread_mutex_lock(&mutex);
pthread_cond_timedwait 函數
限時等待一個條件變量
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrictabstime);
參 3: 參看 mansem_timedwait 函數,查看 struct timespec 結構體。
struct timespec{time_t tv_sec; /*seconds*/ 秒long tv_nsec; /*nanosecondes*/ 納秒 }
形參 abstime:絕對時間。
如:time(NULL)返回的就是絕對時間。
而 alarm(1)是相對時間,相對當前時間定時 1 秒鐘。
struct timespect={1,0};
pthread_cond_timedwait(&cond,&mutex,&t); 只能定時到 1970 年 1 月 1 日 00:00:01 秒(早已經過去)
正確用法:
time_tcur=time(NULL); 獲取當前時間。structtimespect; 定義 timespec 結構體變量 tt.tv_sec=cur+1; 定時 1 秒pthread_cond_timedwait(&cond,&mutex,&t); 傳參
setitimer 函數還有另外一種時間類型:
struct timeval{time_t tv_sec; /*seconds*/ 秒 suseconds_ttv_usec; /*microseconds*/ 微秒};
pthread_cond_signal 函數
喚醒至少一個阻塞在條件變量上的線程
int pthread_cond_signal(pthread_cond_t*cond);
pthread_cond_broadcast 函數
喚醒全部阻塞在條件變量上的線程
int pthread_cond_broadcast(pthread_cond_t*cond);
生產者消費者條件變量模型
線程同步典型的案例即為生產者消費者模型,而借助條件變量來實現這一模型,是比較常見的一種方法。假定 有兩個線程,一個模擬生產者行為,一個模擬消費者行為。兩個線程同時操作一個共享資源(一般稱之為匯聚), 生產向其中添加產品,消費者從中消費掉產品。
/*借助條件變量模擬 生產者--消費者問題*/
#include<stdio.h>
#include<unistd.h>
#include<pthread.h>
#include<stdio.h>
#include<string.h>
/*鏈表作為共享數據,需要被互斥量保護*/
struct msg{struct msg *next;int num;
};struct msg *head;
struct msg *mp;/*靜態初始化 一個條件變量 和一個互斥量*/
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;void *consumer(void *p)
{for(;;){pthread_mutex_lock(&lock); //頭指針為空,說明沒有結點 while(head == NULL){pthread_cond_wait(&has_product,&lock); //判斷條件變量是否滿足}mp = head;head = mp->next; //模擬消費掉一個產品pthread_mutex_unlock(&lock);printf("---Consume ---%d\n",mp->num);free(mp);sleep(rand() % 5);}
}
void *producer()
{for(;;){mp = malloc(sizeof(struct msg));mp->num = rand() % 1000 + 1; //模擬生產一個產品printf("--Produce ---%d\n",mp->num);pthread_mutex_lock(&lock);mp->next=head; //頭插法head = mp;pthread_mutex_unlock(&lock); //釋放pthread_cond_signal(&has_product); //將等待在該條件變量上的一個線程>喚醒sleep(rand() % 5);}
}int main(int argc,char *argv[])
{pthread_t pid,cid; //pid生產者ID cid消費者IDsrand(time(NULL));pthread_create(&pid,NULL,producer,NULL); //生產者pthread_create(&cid,NULL,consumer,NULL); //消費者pthread_join(pid,NULL);pthread_join(cid,NULL);return 0;
}
條件變量是搭配互斥鎖一起使用的
- 因為條件變量實現同步只提供等待與喚醒功能,并沒有提供條件判斷的功能,因此條件判斷需要用戶實現,但是條件的操作是一個臨界資源的操作,因此需要受保護,需要在條件判斷之前加鎖
- 如果加鎖成功后,因為條件不滿足而陷入休眠,就會導致卡死(因為另一方因為無法獲取鎖,而導致無法促使條件滿足),因此需要在休眠之前解鎖;并且解鎖與休眠必須是原子操作
- 被喚醒之后,即將對臨界資源進行操作,但是被操作前還要進行保護加鎖
- 所以pthread_cond_wait集合了三步原子操作:解鎖–>等待–>被喚醒后加鎖
條件變量的優點
- 相較于 mutex 而言,條件變量可以減少競爭。
- 如直接使用 mutex,除了生產者、消費者之間要競爭互斥量以外,消費者之間也需要競爭互斥量,但如果匯聚 (鏈表)中沒有數據,消費者之間競爭互斥鎖是無意義的。有了條件變量機制以后,只有生產者完成生產,才會引 起消費者之間的競爭。提高了程序效率。
生產者與消費者模型(線程安全隊列)
一個場所,兩種角色,三種關系
功能:
- 解耦和(兩個關系之間緊密)
- 支持忙閑不均
- 支持并發
三者關系
生產者–生產者:互斥
消費者–消費者:互斥
生產者–消費者:同步+互斥
/*生產者與消費者模型隊列實現 * 1.實現線程安全的隊列,對外提供線程安全的數據入隊和出隊操作* 2.創建線程,分別作為生產者與消費者數據入隊或數據出隊*/#include<iostream>
#include<queue>
#include<pthread.h>#define MAX_QUEUE 10
class BlockQueue
{public:BlockQueue(int cap = MAX_QUEUE):_capacity(cap){//初始化隊列pthread_mutex_init(&_mutex,NULL);pthread_cond_init(&_cond_con,NULL);pthread_cond_init(&_cond_pro,NULL);} ~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond_con);pthread_cond_destroy(&_cond_pro);} //入隊void QueuePush(int data){QueueLock();while(QueueIsFull()){ //隊列滿了ProWait(); //生產者等待} _queue.push(data);ConWakeUp();QueueUnLock();} void QueuePop(int *data){QueueLock();while(QueueIsEmpty()){ConWait();} *data = _queue.front();//獲取隊列頭結點_queue.pop();//結點出隊ProWakeUp();QueueUnLock();}private://隊列加鎖void QueueLock(){pthread_mutex_lock(&_mutex);}//隊列解鎖void QueueUnLock(){pthread_mutex_unlock(&_mutex);}//消費者等待void ConWait(){pthread_cond_wait(&_cond_con,&_mutex);}//消費者喚醒void ConWakeUp(){pthread_cond_signal(&_cond_con);}//生產者等待void ProWait(){pthread_cond_wait(&_cond_pro,&_mutex);}//生產者喚醒void ProWakeUp(){pthread_cond_signal(&_cond_pro);}//判斷隊列是否為空bool QueueIsFull(){return (_capacity == _queue.size());}//隊列是否是滿的bool QueueIsEmpty(){return _queue.empty();}private:std::queue<int>_queue;//創建隊列int _capacity;//隊列結點最大數量 //線程安全實現成員pthread_mutex_t _mutex;pthread_cond_t _cond_pro;pthread_cond_t _cond_con;
};void *thr_consumer(void *arg){BlockQueue *q = (BlockQueue *)arg;while(1){int data;q->QueuePop(&data);std::cout<<"consumer"<<pthread_self() <<" get data:"<< data <<std::endl;}return NULL;
}int i = 0; //必須受保護
pthread_mutex_t mutex;void *thr_productor(void *arg){BlockQueue *q = (BlockQueue *)arg;while(1){pthread_mutex_lock(&mutex);q->QueuePush(i++);pthread_mutex_unlock(&mutex);std::cout<<"productor:" <<pthread_self() <<"put data:"<< i <<std::endl;}return NULL;
}int main(int argc,char *argv[])
{BlockQueue q;pthread_t ctid[4],ptid[4];int i,ret;pthread_mutex_init(&mutex,NULL);for(i = 0;i < 4; i++){ ret = pthread_create(&ctid[i],NULL,thr_consumer,(void *)&q);if(ret != 0){std::cout<<"pthread create error\n";return -1;}}for(i = 0;i < 4; i++){ret = pthread_create(&ptid[i],NULL,thr_productor,(void *)&q);if(ret != 0){std::cout<<"pthread create error\n";return -1;}}for(i = 0;i < 4; i++){pthread_join(ctid[i],NULL);} for(i = 0; i < 4;i++){pthread_join(ptid[i],NULL);}return 0;
}
信號量
進化版的互斥鎖(1–>N)
由于互斥鎖的粒度比較大,如果我們希望在多個線程間對某一對象的部分數據進行共享,使用互斥鎖是沒有辦 法實現的,只能將整個數據對象鎖住。這樣雖然達到了多線程操作共享數據時保證數據正確性的目的,卻無形中導 致線程的并發性下降。線程從并行執行,變成了串行執行。與直接使用單進程無異。
信號量,是相對折中的一種處理方式,既能保證同步,數據不混亂,又能提高線程并發。
計數器+等待隊列+等待與喚醒功能
- 通過自身的計數器實現條件判斷,當前條件滿足時則直接返回并且計數-1.當條件并不滿足時則阻塞
- 當產生資源后,通過信號量的喚醒功能喚醒等待并且計數+1
信號量和條件變量實現同步的區別
- 信號量的條件判斷由自身來完成,而條件變量的條件判斷由用戶完成
- 信號量并不搭配互斥鎖使用,而條件變量需要搭配互斥鎖一起使用保護條件的改變
sem_init 函數
初始化一個信號量
int sem_init(sem_t *sem,int pshared,unsigned int value);
參 1:sem 信號量
參 2:pshared 取 0 用于線程間;取非 0(一般為 1)用于進程間
參 3:value 指定信號量初值
sem_destroy 函數
銷毀一個信號量
int sem_destroy(sem_t *sem);
sem_wait 函數
給信號量加鎖 ,對計數進行判斷,計數<=0則阻塞;否則立即返回流程繼續,計數-1
int sem_wait(sem_t *sem);
sem_post 函數
給信號量解鎖 ,對計數進行+1,并且喚醒等到的線程
int sem_post(sem_t *sem);
sem_trywait 函數
嘗試對信號量加鎖
(與 sem_wait 的區別類比 lock 和 trylock)
int sem_trywait(sem_t *sem);
sem_timedwait 函數
限時嘗試對信號量加鎖
int sem_timedwait(sem_t *sem,const struct timespec *abs_timeout);
參 2:abs_timeout 采用的是絕對時間。
定時 1 秒:
time_tcur=time(NULL); 獲取當前時間。
structtimespect; 定義 timespec 結構體變量 t
t.tv_sec=cur+1; 定時 1 秒
t.tv_nsec=t.tv_sec+100;
sem_timedwait(&sem,&t); 傳參
使用信號量實現生產者與消費者模型
/*使用信號量實現生產者與消費者模型**/#include<iostream>
#include<queue>
#include<pthread.h>
#include<semaphore.h>class RingQueue
{public:RingQueue(int cap = 10):_capacity(cap),_queue(cap){//1.信號量變量//2.參數取值 0:用于線程間同步與互斥// 非0:用于進程間同步與互斥//3.信號量初值sem_init(&_sem_lock,0,1);//互斥鎖初始值只給1sem_init(&_sem_data,0,0);//初始數據資源數據為0sem_init(&_sem_space,0,cap);//初始空閑空間計數} ~RingQueue(){sem_destroy(&_sem_lock);sem_destroy(&_sem_data);sem_destroy(&_sem_space);} void QueuePush(int data){// ProWait();//空閑空間計數判斷是否有空閑空間,若有返回,否則等待// 因為已經通過_sem_space的空閑空間計數知道是否有空閑空間sem_wait(&_sem_space);//添加數據之后,空閑空間計數-1sem_wait(&_sem_lock);//鎖計數初始為1,一旦進入-1加鎖_queue[_step_write]=data; _step_write = ( _step_write + 1) % _capacity;sem_post(&_sem_lock);//數據添加完畢后解鎖,數據資源計數+1sem_post(&_sem_data);//數據添加完畢后,數據資源計數+1//ConWakeUp();} void QueuePop(int *data){sem_wait(&_sem_data);//取數據的時候,數據資源計數-1sem_wait(&_sem_lock);//鎖最好僅僅保護臨界區*data = _queue[_step_read];_step_read = (_step_read + 1) % _capacity;sem_post(&_sem_lock);sem_post(&_sem_space);//取數據之后,空閑空間計數+!}private:std::vector<int>_queue;int _capacity; //隊列最大數量int _step_write;//當前寫到哪里的下標int _step_read;//當前讀到哪里了的下標sem_t _sem_lock;//實現互斥鎖sem_t _sem_space;//空閑空間計數sem_t _sem_data;//數據資源計數/*//隊列加鎖void QueueLock(){pthread_mutex_lock(&_mutex);}//隊列解鎖void QueueUnLock(){pthread_mutex_unlock(&_mutex);}*/
};
void *thr_productor(void *arg){ RingQueue *q = (RingQueue*)arg;int i=0;while(1){q->QueuePush(i);std::cout<<"thread:"<<pthread_self()<<"put data"<<i++<<"\n";}return NULL;
}void *thr_consumer(void *arg){RingQueue *q = (RingQueue*)arg;while(1){int data;q->QueuePop(&data);std::cout<<"thread:"<<pthread_self()<<"get data"<<data<<"\n";}return NULL;
}int main(int argc,char *argv[])
{RingQueue q;pthread_t ptid,ctid[4];int i ,ret;ret = pthread_create(&ptid,NULL,thr_productor,(void *)&q);if(ret != 0){std::cout<<"thread create error\n";return -1;}for(i = 0;i < 4;i++){ret = pthread_create(&ctid[i],NULL,thr_consumer,(void *)&q);if(ret != 0){std::cout<<"thread create error\n";return -1;}}for(i = 0; i < 4; i++){pthread_join(ctid[i],NULL);}pthread_join(ptid,NULL);return 0;
}