生產者消費者模型(Producer-Consumer Model)是計算機科學中一個經典的并發編程模型,用于解決多線程/多進程環境下的協作問題。
基本概念
生產者:負責生成數據或任務的實體
消費者:負責處理數據或執行任務的實體
緩沖區:生產者與消費者之間共享的數據存儲區域
模型特點
生產者與消費者以不同的速度運行
兩者通過共享的緩沖區進行通信
緩沖區有大小限制,可能滿或空
需要解決的問題
同步問題:
當緩沖區滿時,生產者需要等待
當緩沖區空時,消費者需要等待
互斥問題:
對緩沖區的訪問必須是互斥的,防止數據競爭
常見實現方式
使用信號量(Semaphore):空緩沖區信號量滿緩沖區信號量互斥信號量
使用條件變量(Condition Variable)和互斥鎖(Mutex)
使用阻塞隊列(高級語言中常用)
本篇我們使用互斥鎖和阻塞隊列來解決這個問題
在多線程的鎖中我們首先要避免的就是死鎖這個問題,在 Java 中,Lock 接口及其實現類(如 ReentrantLock)是在 JDK 5(Java 5) 引入的,屬于java.util.concurrent.locks 包的一部分。我們這里使用Java的synchronized實現
首先來分析問題,我們可以抽象的將生產者消費者問題想象為,廚師和顧客的問題:
顧客:
- 判斷桌子上是否有食物
- 如果沒有就等待
- 如果有就直接吃掉
- 吃完食物之后,通知廚師繼續做食物
廚師:
- 判斷桌子上是否有食物
- 如果桌子上有食物的話就等待
- 如果桌子上沒有食物的話就制作食物
- 將食物放置在桌子上
- 喚醒等待的顧客開始吃
分析完畢,首先我們應該先新建三個類分別為:Cook(廚師類),Customer(顧客類),Desk(桌子類)
初始化桌子:
- 初始化桌子上食物的標志,
0為無食物,1為有食物
- 初始化顧客的上限,例如顧客最多吃10份食物
package Thread.Producer_Consumer;public class Desk {public static int FoodFlag=0;//食物當前的狀態表示當前桌子上是否有食物public static int count=10;//消費者最多可以吃10個食物public static Object lock=new Object();////創建一個鎖對象,用于生產者和消費者線程間的同步
}//由于是所有線程共同的變量所以我們使用static關鍵字修飾
接下來開始完成顧客線程
package Thread.Producer_Consumer;public class Customer extends Thread{//消費者線程@Overridepublic void run() {while( true){synchronized(Desk.lock){// 檢查是否達到食物上限(count=0表示不能再吃)if(Desk.count==0){break;}else {// 檢查桌子上是否有食物(FoodFlag=1表示有食物)if (Desk.FoodFlag == 1) {System.out.println("顧客吃掉食物");Desk.FoodFlag = 0;//表示沒有食物Desk.count--;//剩余可吃食物數量減1System.out.println("顧客還可以吃"+Desk.count);Desk.lock.notifyAll();//喚醒lock鎖中所有等待的線程} else {//桌上沒有食物,則等待try {Desk.lock.wait();//釋放鎖,并進入等待狀態} catch (InterruptedException e) {throw new RuntimeException(e);}}}}}System.out.println("顧客吃飽了,結束消費!");}
}
廚師線程
package Thread.Producer_Consumer;public class Cook extends Thread{@Overridepublic void run() {while (true) {synchronized (Desk.lock){//如果消費者可以吃的食物的數量已經達到最大,那么則直接退出if (Desk.count==0) {break;}else{//如果桌子有食物,等待消費者進程if(Desk.FoodFlag==1){try {Desk.lock.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}else{//如果桌子沒有食物System.out.println("生產者正在生產食物...");//設置桌子有食物Desk.FoodFlag=1;//喚醒消費者線程Desk.lock.notifyAll();}}}}}
}
最后創建一個測試類
package Thread.Producer_Consumer;public class Test {public static void main(String[] args) {Desk desk=new Desk();Customer f1=new Customer();Cook c1=new Cook();f1.setName("消費者1");c1.setName("生產者1");f1.start();c1.start();}
}
下來我們使用阻塞隊列來實現一下:
桌子類:
package Thread.Producer_Consumer_2;public class Desk {public static int count=10;//消費者最多可以吃10個食物public static int Food_max=10;
}
廚師類:
package Thread.Producer_Consumer_2;import java.util.concurrent.ArrayBlockingQueue;public class Cook extends Thread{ArrayBlockingQueue<String> queue;public Cook(ArrayBlockingQueue<String> queue){this.queue=queue;}//構造方法,創建一個阻塞隊列@Overridepublic void run() {//廚師不斷將食物放進隊列中while(true){if(Desk.Food_max<=0){break;}else{try {queue.put("食物");System.out.println("廚師放了一個食物");Desk.Food_max--;} catch (InterruptedException e) {throw new RuntimeException(e);}}}}
}
顧客類:
package Thread.Producer_Consumer_2;import Thread.Producer_Consumer.Desk;import java.util.concurrent.ArrayBlockingQueue;public class Customer extends Thread{//消費者線程ArrayBlockingQueue<String> queue;public Customer(ArrayBlockingQueue<String> queue){this.queue=queue;}@Overridepublic void run() {while( true){if(Desk.count==0){System.out.println("顧客吃到上限了");break;}else{try {queue.take();System.out.println("消費者吃掉了一個食物");Desk.count--;} catch (InterruptedException e) {throw new RuntimeException(e);}}}}
}
測試類:
package Thread.Producer_Consumer_2;import Thread.Producer_Consumer.Desk;import java.util.concurrent.ArrayBlockingQueue;public class Test {public static void main(String[] args) {ArrayBlockingQueue<String> list=new ArrayBlockingQueue<>(5);//創建一個阻塞隊列Customer f1=new Customer(list);Cook c1=new Cook(list);f1.setName("消費者1");c1.setName("生產者1");f1.start();c1.start();}
}
阻塞隊列總結:
在創建阻塞隊列的時候,需要創建實現類的對象,我們可以通過查看源碼的形式來看一下
ArrayBlockingQueue實現了BlockingQueue這個接口
BlockingQueue又繼承于Queue
Queue又繼承于Collection
Collection又繼承于Iterable,由此我們也就可以得出,阻塞隊列是可以通過for_each循環遍歷的
回歸主題,在ArrayBlockingQueue中,有一個帶參的構造方法,由此來創建阻塞隊列,capacity代表阻塞隊列的容量(不要忘記了泛型代表了阻塞隊列的參數為哪種類型)
同時,LinkBlockingQueue也實現了BlockingQueue這個接口
由此可以得出一個圖:
由于put()和take()方法都是自帶鎖的,所以我們并不用手動設置鎖,同時,由于我們代碼中的輸出語句在鎖的外面
這導致了輸出時偶爾并不能按照我們的想法進行輸出,但是執行時一定是正確的