http://blog.csdn.net/gebushuaidanhenhuai/article/details/74011636
基本概念
提到生產者和消費者,我們最有可能想到的是商店賣東西,顧客在貨架上(緩沖區)買東西。?
生產者消費者問題,其實是一個多線程同步問題的經典案例。該問題描述了兩個共享固定大小緩沖區的線程—即所謂的“生產者”和“消費者”–在實際運行時會發生的問題。生產者的主要作用是生成一定量的數據放在緩沖區中,消費者在緩沖區消耗這些數據。但是,要保證生產者不會在緩沖區滿時還往緩沖區寫數據,消費者也不會在緩沖區為空時讀數據。?

三種關系
- 生產者與消費者之間是供求關系(互斥和同步)
- 生產者與生產者之間是競爭關系(互斥)
- 消費者與消費者之間是競爭關系(互斥)
我們簡單解釋一下三種關系。假如我們現在在一家超市,我們們想要買一箱牛奶。牛奶生產商(生產者)生產了牛奶,經超市工作人員把牛奶擺放在了貨架上,在這個過程過我們(消費者)不能買牛奶,要等待工作人員擺好貨物,所以此時生產者與消費者是互斥關系。工作人員擺好貨物后,我們(消費者)去購買,此時生產者與消費者是同步關系。?
一個貨架上只能擺一個品牌的貨物,怒能擺其他的,此時生產者與生產者之間是互斥關系。?
兩個或多個顧客不能同時買一個貨物,此時消費者與消費者之間是互斥關系。
我們可以用兩種方法實現生產者與消費者模型。
基于單鏈表的生產者消費者模型
我們用兩個線程分別表示生產者與消費者,用單鏈表表示緩沖區。?
生產者生產數據,插入到單鏈表的頭部。?
消費者消費數據,從單鏈表的頭部讀數據。
條件變量
條件變量是利用線程間共享的全局變量進行同步的一種機制,只要包括兩個動作:一個線程等待”條件變量的條件成立”而掛起;另一個線程使”條件成立(給出條件成立信號)。?
為了放置競爭,條件變量的使用總和一個互斥鎖結合在一起。
條件變量的類型為 pthread_cond_t.?
條件變量的初始化:
1.直接定義一個全局的條件變量,并利用宏PTHREAD_COND_INITIALIZER進行值得初始化。
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
2.調用函數pthread_cond_init
#include<pthread.h>
pthread_cond_init (pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
第一個參數即為我們動態分配的條件變量cond,除非創建一個非默認屬性的條件變量,否則第二個參數attr始終為NULL;?
注意:若不想講條件變量定義成全局的,必須以動態分配的方式創建。
pthread_cond_t *cond = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
注意:使用此種方式,先destroy條件變量,再free這塊空間。
3.銷毀條件變量
int pthread_cond_destroy(pthread_cond_t* cond);
參數cond指針即指向我們創建的條件變量。?
4.等待?
我們使用pthread_cond_wait或pthread_cond_timewait函數等待條件變量變為真。
int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
pthread _cond_wait,其第一個參數是指向條件變量的指針,第二個參數是一個指向互斥鎖的指針。在上面提到過,條件變量總是和一把互斥鎖協同使用的。目的是為了防止資源的競爭。?
生產者與消費者之間是同步互斥關系的,他們不能同時訪問緩沖區,所以我們需要一把鎖來約束他們。?
假如我們此時有兩個消費者A,B在等待資源,生產者申請到了”鎖“,并且生產了一個產品,釋放鎖。并發送信號告訴消費者你們可以來消費了。?
假如消費者A 率先搶到鎖,買走了產品。B再申請到鎖時,發現已經沒有產品了,只能等待條件變量為真時,買產品。此時鎖在B身上,如果B一直在等待,一直不釋放鎖時,會造成生產者申請不到鎖而造成“死鎖”。所以wait的第二個參數就是當消費者在申請到鎖時,條件變量為假時,及時的釋放鎖資源。?
wait函數是無條件等待。在條件變量為假時,會一直等下去。timedwait是有條件等待,它多定義了一個超時,超時值定義了我們愿意等待多長時間。它通過timespec決定。
5.發送信號
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
當生產者生產完畢后需要通知消費者。發送信號有兩種方式。signal是根據某種優先級喚醒一個等待者。broadcast是在資源充足的情況下進行廣播,喚醒所有等待者。
代碼實現:
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>typedef struct _list
{struct _list *next;int _val;
}product_list;product_list *head = NULL;
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t need_product = PTHREAD_COND_INITIALIZER;void Init_list(product_list* list)
{if(list != NULL){list -> next = NULL;list -> _val = 0;}
}void* Consumer(void* _val)
{product_list *p = NULL;for(;;){pthread_mutex_lock(&lock);while(head == NULL){pthread_cond_wait(&need_product,&lock);}p = head;head = head -> next;p -> next = NULL;pthread_mutex_unlock(&lock);printf("Consum success,val is:%d\n",p -> _val);free(p);}return NULL;
}void* Product(void* _val)
{for(;;){sleep(rand() % 2);product_list* p =malloc(sizeof(product_list));pthread_mutex_lock(&lock);Init_list(p);p -> _val = rand() % 1000;p -> next = head;head = p;pthread_mutex_unlock(&lock);printf("Call consumer! Product has producted,val is:%d\n",p->_val);pthread_cond_signal(&need_product);}
}int main()
{pthread_t t_product;pthread_t t_consumer;pthread_create(&t_product,NULL,Product,NULL);pthread_create(&t_consumer,NULL,Consumer,NULL);pthread_join(t_product,NULL);pthread_join(t_consumer,NULL);return 0;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71

基于環形隊列的生產者消費者模型
除了基于單鏈表的生產者與消費者模型,我們還可以利用信號量實現生產者消費者模型。
原理
生產者在空格子上生產數據。?
消費者在有商品的格子上消費數據。?
注意:
- 生產者先進行生產。
- 當消費者沒有數據要消費時,需等待生產者生產。
- 當生產者把緩沖區充滿時,需等待消費者消費,出現空格子時在生產。?

操作函數
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t *sem);
初始化信號量sem_init,參數value為信號量的值,參數pshared一般設為0,表示信號量用于同一進程內線程間同步。摧毀信號量sem_destroy。P操作(申請資源)sem_wait,使信號量的值-1。V操作(釋放資源)sem_post,使信號量的值+1。sem_trywait是嘗試申請資源。
代碼實現
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<semaphore.h>#define _SIZE_ 5
sem_t blanks;
sem_t datas;
int buf[_SIZE_] ={ 0 };
void* product(void* arg)
{int i = 0;while(1){usleep(500000);sem_wait(&blanks); int data = rand()%1000;buf[i] = data;printf("Product is:%d\n",data);sem_post(&datas); ++i;i %= _SIZE_;}
}
void* consumer(void* arg)
{int i = 0;while(1){usleep(500000); sem_wait(&datas); printf("Consumer is%d\n",buf[i]);sem_post(&blanks); ++i;i %= _SIZE_;}
}int main()
{sem_init(&blanks,0,_SIZE_);sem_init(&datas,0,0);pthread_t _consumer;pthread_t _product;pthread_create(&_consumer,NULL,consumer,NULL);pthread_create(&_product,NULL,product,NULL);pthread_join(_consumer,NULL);pthread_join(_product,NULL);sem_destroy(&blanks);sem_destroy(&datas);return 0;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
