當我們嘗試多線程編程時,生產者-消費者問題是最常見的問題之一。 盡管不像多線程編程中的其他一些問題那樣具有挑戰性,但是錯誤地實現此問題可能會造成應用程序混亂。 生產的物品將不使用,開始的物品將被跳過,消耗量取決于生產是在消耗嘗試之前還是之后開始的,等等。此外,您可能會在異常發生后很長時間注意到異常,最重要的是,幾乎所有異常線程程序,這一程序也很難調試和復制。
因此,在這篇文章中,我認為我將嘗試借助Java出色的java.util.concurrent包及其類來解決Java中的此問題。
首先,讓我們看一下生產者-消費者問題的特征:
- 生產者生產物品。
- 消費者消費生產者生產的物品。
- 生產者完成生產,并讓消費者知道他們已經完成了。
請注意,在此生產者-消費者問題中,生產者運行在與消費者不同的線程上。 此設置在兩種情況下有意義:
- 消耗該項目的步驟獨立產生,而不依賴于其他項目。
- 處理項目的時間大于生產項目的時間。
第二點中的“較大”一詞有些寬松。 考慮以下情況:生產者從文件中讀取一行,而“消耗和處理”只是將行以特殊格式記錄回文件中,那么使用生產者消費者問題解決方案可以被認為是過度設計的情況一個解法。 但是,如果對于每行,“消耗和處理”步驟是向Web服務器發出HTTP GET / POST請求,然后將結果轉儲到某個地方,則我們應該選擇生產者-消費者解決方案。 在這種情況下,我假設行(item)本身具有執行GET / POST的所有數據,而我們不依賴于上一行/下一行。
因此,讓我們首先看一下我在下面發布的生產者-消費者問題解決方案的特征:
- 可以有多個生產者。
- 將有多個消費者。
- 一旦完成新物品的生產,生產者將告知消費者,以便消費者在消費并加工完最后一件物品后退出。
有趣的是,要在通用級別解決此問題,我們只能解決消費者方,而不能解決生產方。 這是因為項目的生產可以隨時進行,而我們以通用方式進行項目生產的控制幾乎沒有。 但是,我們可以在接受生產者提供的商品時控制消費者的行為。 制定了規則之后,讓我們看一下消費者合同:
package com.maximus.producerconsumer;public interface Consumer
{public boolean consume(Item j);public void finishConsumption();
}
在這里,可以由多個類似商品的生產者共享消費者。 類似的項目,我的意思是生產者,其生產“項目”類型的對象。 Item的定義如下:
package com.maximus.consumer;public interface Item
{public void process();
}
現在我們來看一下Consumer接口的實現:
package com.maximus.consumer;import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;public class ConsumerImpl implements Consumer
{private BlockingQueue< Item > itemQueue = new LinkedBlockingQueue<Item>();private ExecutorService executorService = Executors.newCachedThreadPool();private List<ItemProcessor> jobList = new LinkedList<ItemProcessor>();private volatile boolean shutdownCalled = false;public ConsumerImpl(int poolSize){for(int i = 0; i < poolSize; i++){ItemProcessor jobThread = new ItemProcessor(itemQueue);jobList.add(jobThread);executorService.submit(jobThread);}}public boolean consume(Item j){if(!shutdownCalled){try{itemQueue.put(j);}catch(InterruptedException ie){Thread.currentThread().interrupt();return false;}return true;}else{return false;}}public void finishConsumption(){for(ItemProcessor j : jobList){j.cancelExecution();}executorService.shutdown();}
}
現在,唯一感興趣的點是消費者內部用于處理傳入商品的ItemProcessor。 ItemProcessor的編碼如下:
package com.maximus.consumer;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class ItemProcessor implements Runnable
{private BlockingQueue<Item> jobQueue;private volatile boolean keepProcessing;public ItemProcessor(BlockingQueue<Item> queue){jobQueue = queue;keepProcessing = true;}public void run(){while(keepProcessing || !jobQueue.isEmpty()){try{Item j = jobQueue.poll(10, TimeUnit.SECONDS);if(j != null){j.process();}}catch(InterruptedException ie){Thread.currentThread().interrupt();return;}}}public void cancelExecution(){this.keepProcessing = false;}
}
上面唯一的挑戰是while循環中的條件。 這樣編寫while循環,即使在生產者完成生產并通知消費者生產完成之后,也可以支持項目消耗的繼續。 上面的while循環可確保在線程退出之前完成所有項目的消耗。
上面的使用者是線程安全的,可以共享多個生產者,以便每個生產者可以并發調用consumer.consume(),而不必擔心同步和其他多線程警告。 生產者只需要提交Item接口的實現,其process()方法將包含如何完成消耗的邏輯。
作為閱讀本文的獎勵,我提出了一個測試程序,演示了如何使用上述類:
package com.maximus.consumer;import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;public class Test
{public static void main(String[] args) throws Exception{Consumer consumer = new ConsumerImpl(10);BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(new File(args[0]))));String line = "";while((line = br.readLine()) != null){System.out.println("Producer producing: " + line);consumer.consume(new PrintJob(line));}consumer.finishConsumption();}
}class PrintJob implements Item
{private String line;public PrintJob(String s){line = s;}public void process(){System.out.println(Thread.currentThread().getName() + " consuming :" + line);}
}
可以通過多種不同的方式來調整上述消費者,使其更加靈活。 我們可以定義生產完成后消費者將做什么。 可能對其進行了調整,以允許批處理,但我將其留給用戶使用。 隨意使用它,并以任何想要的方式扭曲它。
編碼愉快!
參考: The Java HotSpot博客上的JCG合作伙伴 Sarma Swaranga 解決了Java中的生產者-消費者問題 。
翻譯自: https://www.javacodegeeks.com/2012/05/solving-producer-consumer-problem-in.html