當我第一次聽到問題陳述時,我立即知道需要什么。 但是,這次我的做法與上次有所不同。 這與我今天如何看待技術有關。 我不會涉足任何非技術方面,并且會直接跳入問題及其解決方案。 我開始研究市場上存在的東西,并發現了幾篇文章,這些文章幫助我以正確的方式傳達了我的想法。
問題陳述
我們需要一個用于批量遷移的解決方案。 我們正在將數據從系統1遷移到系統2,在此過程中,我們需要執行以下三個任務:
- 根據組從數據庫加載數據
- 處理數據
- 通過修改來更新在步驟1中加載的記錄
我們必須處理100個小組,每個小組大約有4萬條記錄。 您可以想象如果我們以同步方式執行此練習將花費多少時間。 這里的圖像有效地解釋了這個問題。
![]() |
生產者消費者:問題 |
生產者和消費者模式
首先讓我們看一下生產者消費者模式。 如果您參考上面的問題說明并查看圖片,我們會看到有太多實體準備使用其部分數據。 但是,沒有足夠的工人可以處理所有數據。 因此,隨著生產者繼續排隊,它只會繼續增長。 我們看到系統開始占用線程并花費大量時間。
中級解決方案
![]() |
生產者消費者:中級方法 |
我們確實有一個中間解決方案。 參考該圖像,您將立即注意到,生產者將他們的工作堆積在文件柜中,而工人在完成上一項任務時繼續將其撿起來。 但是,這種方法確實存在一些明顯的缺點:
- 仍然只有一名工人必須完成所有工作。 外部系統可能很高興,但是任務將繼續存在,直到工作人員完成所有任務為止
- 生產者將他們的數據堆積在隊列中,并且需要資源來保存它們。 就像在此示例中,機柜可以裝滿一樣,JVM資源也可能發生同樣的情況。 我們需要注意要在內存中放入多少數據,在某些情況下可能不會太多。
解決方案
![]() |
生產者消費者:解決方案 |
解決方案是我們每天在很多地方都能看到的,例如電影院大廳排隊,汽油泵等。有很多人來訂票,而根據進來的人數,增加了更多的人來發行票。 本質上,請參考此處的圖像,您會注意到生產者將繼續向內閣添加他們的工作,并且我們有更多的工人來處理工作量。
Java提供了并發包來解決此問題。 到現在為止,我一直在較低級別上進行線程工作,這是我第一次使用此程序包。 當我開始瀏覽網絡并閱讀其他博客作者的言論時,我遇到了一篇非常好的文章 。 它有助于非常有效地理解BlockingQueue的使用。 但是,Dhruba提供的解決方案并不能幫助我實現所需的高吞吐量。 因此,我開始探索對ArrayBlockingQueue的使用。
控制器
這是管理生產者和消費者之間的合同的第一類。 控制器將為生產者設置1個線程,為消費者設置2個線程。 根據需要,我們可以創建所需數量的線程。 甚至甚至可以從屬性中讀取數據或做一些動態魔術。 現在,我們將保持簡單。
package com.kapil.techieforever.producerconsumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestProducerConsumer
{
public static void main(String args[])
{
try
{
Broker broker = new Broker();
ExecutorService threadPool = Executors.newFixedThreadPool(3);
threadPool.execute(new Consumer("1", broker));
threadPool.execute(new Consumer("2", broker));
Future producerStatus = threadPool.submit(new Producer(broker));
// this will wait for the producer to finish its execution.
producerStatus.get();
threadPool.shutdown();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
我正在使用ExecuteService創建線程池并對其進行管理。 代替使用基本的Thread實現,這是一種更有效的方法,因為它將根據需要處理退出和重新啟動線程。 您還將注意到,我正在使用Future類來獲取生產者線程的狀態。 該類非常有效,它將使我的程序停止進一步執行。 這是在線程上替換“ .join”方法的一種好方法。 注意:在這個例子中,我并不是很有效地使用Future。 因此您可能需要嘗試一些適合自己的事情。
另外,您還應注意在生產者和消費者之間用作文件柜的Broker類。 我們將在短時間內看到它的實現。
生產者
此類負責產生需要處理的數據。
package com.kapil.techieforever.producerconsumer;
public class Producer implements Runnable
{
private Broker broker;
public Producer(Broker broker)
{
this.broker = broker;
}
@Override
public void run()
{
try
{
for (Integer i = 1; i < 5 + 1; ++i)
{
System.out.println("Producer produced: " + i);
Thread.sleep(100);
broker.put(i);
}
this.broker.continueProducing = Boolean.FALSE;
System.out.println("Producer finished its job; terminating.");
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
此類正在做它所能做的最簡單的事情-向代理添加一個整數。 需要注意的一些關鍵領域是:
1. Broker上有一個屬性,生產者在完成生產后最終會對其進行更新。 這也稱為“最終”或“毒藥”條目。 消費者使用它來知道不再有數據 2.我使用Thread.sleep來模擬某些生產者可能需要更多時間來生產數據。 您可以調整此值并查看消費者的行為
消費者
此類負責從代理讀取數據并完成其工作
package com.kapil.techieforever.producerconsumer;
public class Consumer implements Runnable
{
private String name;
private Broker broker;
public Consumer(String name, Broker broker)
{
this.name = name;
this.broker = broker;
}
@Override
public void run()
{
try
{
Integer data = broker.get();
while (broker.continueProducing || data != null)
{
Thread.sleep(1000);
System.out.println("Consumer " + this.name + " processed data from broker: " + data);
data = broker.get();
}
System.out.println("Comsumer " + this.name + " finished its job; terminating.");
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
這還是一個簡單的類,它讀取Integer并將其打印在控制臺上。 但是,要注意的關鍵點是:
1.處理數據的循環是一個無限循環,它在兩種情況下運行–直到生產者消費并且經紀人有一些數據為止
2.同樣,Thread.sleep用于創建有效的不同方案
經紀人
package com.kapil.techieforever.producerconsumer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Broker
{
public ArrayBlockingQueue queue = new ArrayBlockingQueue(100);
public Boolean continueProducing = Boolean.TRUE;
public void put(Integer data) throws InterruptedException
{
this.queue.put(data);
}
public Integer get() throws InterruptedException
{
return this.queue.poll(1, TimeUnit.SECONDS);
}
}
首先要注意的是,我們使用ArrayBlockingQueue作為數據持有人。 我不會說這是什么,而是要您在此處的JavaDocs上閱讀它。 但是,我將解釋生產者將把數據放入隊列,而使用者將以FIFO格式從隊列中獲取數據。 但是,如果生產者運行緩慢,則消費者將等待數據進入,如果陣列已滿,生產者將等待數據填滿。
另外,請注意,我使用的是“投票”功能,而不是進入隊列。 這是為了確保消費者不會一直等待,等待會在幾秒鐘后超時。 這有助于我們進行相互交流,并在處理完所有數據后殺死消費者。 (注意:嘗試用get代替poll,您將看到一些有趣的輸出)。
碼
我的代碼位于Google項目托管上 。 隨意瀏覽并從那里下載。 本質上,這是一個蝕(Spring STS)項目。 根據下載時間,您可能還會在下載時獲得其他軟件包和類。 也可以隨意查看這些內容并分享您的評論
–您可以在SVN瀏覽器中瀏覽源代碼,或者;
–您可以從項目本身下載它 。
側面解決方案
最初,我在中間發布了此解決方案,但是后來我意識到這不是做事的方法,因此我從主要內容中刪除了此內容,并將其放在最后。 最終解決方案的另一種變體是,工人/消費者一次不處理一項工作,而是一起處理多個工作,并在完成下一個工作之前先完成工作。 這種方法可以產生相似的結果,但是在某些情況下,如果我們有一些工作需要花費不同的時間才能完成,那么從本質上講,這意味著某些工人比其他工人最終會更快地結束工作,從而造成了瓶頸。 并且,如果作業是事先分配的,這意味著所有消費者將在加工之前擁有所有作業(不是生產者-消費者模式),那么這個問題可能加起來甚至更多,并導致處理邏輯的更多延遲。
相關文章
- 隊列是Devil自己的數據結構 (petewarden.typepad.com)
- 我對撒但的小幫手排隊有錯嗎? (petewarden.typepad.com)
- http://code.google.com/p/disruptor/
參考: 并發模式: JCG合作伙伴的 生產者和消費者 ? Scratch Pad博客上的Kapil Viren Ahuja。
翻譯自: https://www.javacodegeeks.com/2012/02/concurrency-pattern-producer-and.html