并發模式:生產者和消費者

在我15年的職業生涯中,生產者和消費者的問題是我僅遇到過幾次。 在大多數編程情況下,我們正在做的事情是以同步方式執行功能,其中JVM或Web容器自行處理多線程的復雜性。 但是,在編寫某些需要的用例時。 上周,我遇到了一個這樣的用例,使我在上一次這樣做的時候回溯了三年。 但是,上次完成的方式卻大不相同。

當我第一次聽到問題陳述時,我立即知道需要什么。 但是,這次我的做法與上次有所不同。 這與我今天如何看待技術有關。 我不會涉足任何非技術方面,并且會直接跳入問題及其解決方案。 我開始研究市場上存在的東西,并發現了幾篇文章,這些文章幫助我以正確的方式傳達了我的想法。

問題陳述

我們需要一個用于批量遷移的解決方案。 我們正在將數據從系統1遷移到系統2,在此過程中,我們需要執行以下三個任務:

  • 根據組從數據庫加載數據
  • 處理數據
  • 通過修改來更新在步驟1中加載的記錄

我們必須處理100個小組,每個小組大約有4萬條記錄。 您可以想象如果我們以同步方式執行此練習將花費多少時間。 這里的圖像有效地解釋了這個問題。

生產者消費者:問題

生產者和消費者模式

首先讓我們看一下生產者消費者模式。 如果您參考上面的問題說明并查看圖片,我們會看到有太多實體準備使用其部分數據。 但是,沒有足夠的工人可以處理所有數據。 因此,隨著生產者繼續排隊,它只會繼續增長。 我們看到系統開始占用線程并花費大量時間。

中級解決方案

生產者消費者:中級方法

我們確實有一個中間解決方案。 參考該圖像,您將立即注意到,生產者將他們的工作堆積在文件柜中,而工人在完成上一項任務時繼續將其撿起來。 但是,這種方法確實存在一些明顯的缺點:

  1. 仍然只有一名工人必須完成所有工作。 外部系統可能很高興,但是任務將繼續存在,直到工作人員完成所有任務為止
  2. 生產者將他們的數據堆積在隊列中,并且需要資源來保存它們。 就像在此示例中,機柜可以裝滿一樣,JVM資源也可能發生同樣的情況。 我們需要注意要在內存中放入多少數據,在某些情況下可能不會太多。

解決方案

生產者消費者:解決方案

解決方案是我們每天在很多地方都能看到的,例如電影院大廳排隊,汽油泵等。有很多人來訂票,而根據進來的人數,增加了更多的人來發行票。 本質上,請參考此處的圖像,您會注意到生產者將繼續向內閣添加他們的工作,并且我們有更多的工人來處理工作量。

Java提供了并發包來解決此問題。 到現在為止,我一直在較低級別上進行線程工作,這是我第一次使用此程序包。 當我開始瀏覽網絡并閱讀其他博客作者的言論時,我遇到了一篇非常好的文章 。 它有助于非常有效地理解BlockingQueue的使用。 但是,Dhruba提供的解決方案并不能幫助我實現所需的高吞吐量。 因此,我開始探索對ArrayBlockingQueue的使用。

控制器

這是管理生產者和消費者之間的合同的第一類。 控制器將為生產者設置1個線程,為消費者設置2個線程。 根據需要,我們可以創建所需數量的線程。 甚至甚至可以從屬性中讀取數據或做一些動態魔術。 現在,我們將保持簡單。

package com.kapil.techieforever.producerconsumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestProducerConsumer
{
public static void main(String args[])
{
try
{
Broker broker = new Broker();
ExecutorService threadPool = Executors.newFixedThreadPool(3);
threadPool.execute(new Consumer("1", broker));
threadPool.execute(new Consumer("2", broker));
Future producerStatus = threadPool.submit(new Producer(broker));
// this will wait for the producer to finish its execution.
producerStatus.get();
threadPool.shutdown();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}

我正在使用ExecuteService創建線程池并對其進行管理。 代替使用基本的Thread實現,這是一種更有效的方法,因為它將根據需要處理退出和重新啟動線程。 您還將注意到,我正在使用Future類來獲取生產者線程的狀態。 該類非常有效,它將使我的程序停止進一步執行。 這是在線程上替換“ .join”方法的一種好方法。 注意:在這個例子中,我并不是很有效地使用Future。 因此您可能需要嘗試一些適合自己的事情。
另外,您還應注意在生產者和消費者之間用作文件柜的Broker類。 我們將在短時間內看到它的實現。

生產者

此類負責產生需要處理的數據。

package com.kapil.techieforever.producerconsumer;
public class Producer implements Runnable
{
private Broker broker;
public Producer(Broker broker)
{
this.broker = broker;
}
@Override
public void run()
{
try
{
for (Integer i = 1; i < 5 + 1; ++i)
{
System.out.println("Producer produced: " + i);
Thread.sleep(100);
broker.put(i);
}
this.broker.continueProducing = Boolean.FALSE;
System.out.println("Producer finished its job; terminating.");
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}

此類正在做它所能做的最簡單的事情-向代理添加一個整數。 需要注意的一些關鍵領域是:
1. Broker上有一個屬性,生產者在完成生產后最終會對其進行更新。 這也稱為“最終”或“毒藥”條目。 消費者使用它來知道不再有數據 2.我使用Thread.sleep來模擬某些生產者可能需要更多時間來生產數據。 您可以調整此值并查看消費者的行為

消費者

此類負責從代理讀取數據并完成其工作

package com.kapil.techieforever.producerconsumer;
public class Consumer implements Runnable
{
private String name;
private Broker broker;
public Consumer(String name, Broker broker)
{
this.name = name;
this.broker = broker;
}
@Override
public void run()
{
try
{
Integer data = broker.get();
while (broker.continueProducing || data != null)
{
Thread.sleep(1000);
System.out.println("Consumer " + this.name + " processed data from broker: " + data);
data = broker.get();
}
System.out.println("Comsumer " + this.name + " finished its job; terminating.");
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}

這還是一個簡單的類,它讀取Integer并將其打印在控制臺上。 但是,要注意的關鍵點是:
1.處理數據的循環是一個無限循環,它在兩種情況下運行–直到生產者消費并且經紀人有一些數據為止
2.同樣,Thread.sleep用于創建有效的不同方案

經紀人

package com.kapil.techieforever.producerconsumer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Broker
{
public ArrayBlockingQueue queue = new ArrayBlockingQueue(100);
public Boolean continueProducing = Boolean.TRUE;
public void put(Integer data) throws InterruptedException
{
this.queue.put(data);
}
public Integer get() throws InterruptedException
{
return this.queue.poll(1, TimeUnit.SECONDS);
}
}

首先要注意的是,我們使用ArrayBlockingQueue作為數據持有人。 我不會說這是什么,而是要您在此處的JavaDocs上閱讀它。 但是,我將解釋生產者將把數據放入隊列,而使用者將以FIFO格式從隊列中獲取數據。 但是,如果生產者運行緩慢,則消費者將等待數據進入,如果陣列已滿,生產者將等待數據填滿。

另外,請注意,我使用的是“投票”功能,而不是進入隊列。 這是為了確保消費者不會一直等待,等待會在幾秒鐘后超時。 這有助于我們進行相互交流,并在處理完所有數據后殺死消費者。 (注意:嘗試用get代替poll,您將看到一些有趣的輸出)。

我的代碼位于Google項目托管上 。 隨意瀏覽并從那里下載。 本質上,這是一個蝕(Spring STS)項目。 根據下載時間,您可能還會在下載時獲得其他軟件包和類。 也可以隨意查看這些內容并分享您的評論
–您可以在SVN瀏覽器中瀏覽源代碼,或者;
–您可以從項目本身下載它 。

側面解決方案

最初,我在中間發布了此解決方案,但是后來我意識到這不是做事的方法,因此我從主要內容中刪除了此內容,并將其放在最后。 最終解決方案的另一種變體是,工人/消費者一次不處理一項工作,而是一起處理多個工作,并在完成下一個工作之前先完成工作。 這種方法可以產生相似的結果,但是在某些情況下,如果我們有一些工作需要花費不同的時間才能完成,那么從本質上講,這意味著某些工人比其他工人最終會更快地結束工作,從而造成了瓶頸。 并且,如果作業是事先分配的,這意味著所有消費者將在加工之前擁有所有作業(不是生產者-消費者模式),那么這個問題可能加起來甚至更多,并導致處理邏輯的更多延遲。

相關文章

  • 隊列是Devil自己的數據結構 (petewarden.typepad.com)
  • 我對撒但的小幫手排隊有錯嗎? (petewarden.typepad.com)
  • http://code.google.com/p/disruptor/

參考: 并發模式: JCG合作伙伴的 生產者和消費者 ? Scratch Pad博客上的Kapil Viren Ahuja。


翻譯自: https://www.javacodegeeks.com/2012/02/concurrency-pattern-producer-and.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/373378.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/373378.shtml
英文地址,請注明出處:http://en.pswp.cn/news/373378.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

POJ 1006 - Biorhythms (中國剩余定理)

B - BiorhythmsTime Limit:1000MS Memory Limit:10000KB 64bit IO Format:%I64d & %I64u Submit Status Practice POJ 1006Description 人生來就有三個生理周期&#xff0c;分別為體力、感情和智力周期&#xff0c;它們的周期長度為23天、28天和33天。每一個周期中…

子線程中更新UI線程的三個方法

1、通過handler方式&#xff0c;sendmessage。 多個類間傳遞比較麻煩&#xff0c;也懶的寫... 2、線程中通過runOnUiThread&#xff08;&#xff09; new Thread() { public void run() { //這兒是耗時操作&#xff0c;完成之后更新UI&#xff1b; runOnUiThread(new Runnab…

mysql limit acs_mysql查詢操作

簡單查詢&#xff1a;select * from 表名;避免重復&#xff1a;select distinct 字段 from 表名;條件查詢&#xff1a;select 字段,字段 from 表名 where id<5(條件);四則運算查詢&#xff1a;select id,dep_id,id*dep_id from company.employee5 where id<5;定義顯示格式…

作業管理系統數據字典

轉載于:https://www.cnblogs.com/heyangcan/p/5312394.html

使用Hive和iReport進行大數據分析

每個JJ Abrams的電視連續劇疑犯追蹤從主要人物芬奇先生一個下列敘述情節開始&#xff1a;“ 你是被監視。 政府擁有一個秘密系統-每天每天每小時都會對您進行監視的機器。 我知道是因為...我建造了它。 “當然&#xff0c;我們的技術人員知道得更多。 龐大的電氣和軟件工程師團…

docker集群管理

docker集群管理 ps&#xff1a;docker machine docker swarm docker compose 在Docker Machine發布之前&#xff0c;你可能會遇到以下問題&#xff1a; 你需要登錄主機&#xff0c;按照主機及操作系統特有的安裝以及配置步驟安裝Docker&#xff0c;使其能運行Docker…

從0學java_從零開始學JAVA(一.Java的基礎語法)

基本語法編寫 Java 程序時&#xff0c;應注意以下幾點&#xff1a;大小寫敏感&#xff1a;Java 是大小寫敏感的&#xff0c;這就意味著標識符 Hello 與 hello 是不同的。類名&#xff1a;對于所有的類來說&#xff0c;類名的首字母應該大寫。如果類名由若干單詞組成&#xff0c…

linux mount (掛載命令)詳解

掛接命令(mount) 首先&#xff0c;介紹一下掛接(mount)命令的使用方法&#xff0c;mount命令參數非常多&#xff0c;這里主要講一下今天我們要用到的。 命令格式&#xff1a;mount [-t vfstype] [-o options] device dir 其中&#xff1a; 1.-t vfstype 指定文件系統的類型&…

Android官方培訓課程中文版(v0.9.5)

http://hukai.me/android-training-course-in-chinese/index.html轉載于:https://www.cnblogs.com/xiaoyao095/p/6125715.html

使用SaxParser和完整代碼進行XML解析

SAX解析器使用回調函數&#xff08;org.xml.sax.helpers.DefaultHandler&#xff09;通知客戶端XML文檔結構。 您應該擴展DefaultHandler并重寫一些方法來實現xml解析。 覆蓋的方法是 startDocument&#xff08;&#xff09;和endDocument&#xff08;&#xff09;–在XML文檔…

mysql添加字符串日期時間_mysql學習筆記--- 字符串函數、日期時間函數

一、常見字符串函數&#xff1a;1、CHAR_LENGTH 獲取長度(字符為單位)2、FORMAT 格式化3、INSERT 替換的方式插入4、INSTR 獲取位置5、LEFT/RIGHT 取左、取右6、LENGTH 獲取長度(字節為單位)7、LTRIM/RTRIM/TRIM 去空格(左/右/自定義)8、STRCMP 字符串比較9、CONCAT 字…

Android異常和工具使用筆記

Android異常和工具使用筆記 1、r文件找不到去你的工程目錄下&#xff0c;手動的把gen刪掉&#xff0c;然后去project中刷新一下&#xff0c;在編譯看看。以前遇到過類似的問題&#xff0c;實在不行就把你的eclispe,adt升級到最新的版本吧 抓住那么一點點線索&#xff0c;就要去…

ADO.NET 核心對象簡介

ADO.NET ADO.NET是.NET中一組用于和數據源進行交互的面向對象類庫&#xff0c;提供了數據訪問的高層接口。 ADO.NET類庫在System.Data命名空間內&#xff0c;根據我們訪問的不同數據庫選擇命名空間&#xff0c;System.Data.SqlClient。 ADO.NET類最重要的優點是支持數據庫以斷開…

MongoDB與Spring Data項目

如今&#xff0c;我們所有人都在觀察NoSql解決方案的爆炸式增長。 我已經習慣了RDBMS&#xff0c;但這些并不是您可能遇到的所有挑戰的解決方案。 根據最近的經驗&#xff0c;我有機會使用MongoDB –文檔數據庫。 在本文中&#xff0c;我打算介紹將MongoDB與Spring Data項目一起…

java轉換為字符串_java – 如何從int轉換為字符串?

正常方式是Integer.toString(i)或String.valueOf(i)。串聯將工作&#xff0c;但它是非常規的&#xff0c;可能是一個難聞的氣味&#xff0c;因為它暗示作者不知道上述兩種方法(他們不知道什么&#xff1f;)。Java在使用字符串(見the documentation)時對操作符提供了特殊的支持&…

簡學LINGO(三)——實例篇

1. 裝配線平衡模型 一個裝配線含有一系列的工作站。在終于產品的加工過程中每一個工作站運行一種或者是幾種特定的任務。裝配線周期是指全部工作站完畢分配給他們各自任務所花費時間的最大值。平衡裝配線的目標是為每一個工作站分配加工任務。盡可能使每一個工作站運行同樣數量…

Hibernate緩存級別教程

開始使用Hibernate的人們常見的問題之一就是性能&#xff0c;如果您沒有太多的Hibernate經驗&#xff0c;您會發現應用程序變慢的速度。 如果啟用sql跟蹤&#xff0c;您將看到有多少查詢被發送到數據庫&#xff0c;而這些查詢幾乎不需要Hibernate知識就可以避免。 在當前文章中…

java方法執行的時間_計算Java中任意一個方法的執行時間的工具類

1 packagealgorithm.study.utils;23 importjava.lang.reflect.Method;45 /**6 * This class is getting a method execute time and provide some other functions.7 *8 *authorygh 2017年2月24日9 */10 public classMethodExecuteTimeUtils {1112 /**13 * Get a method execut…

如何在 IIS 中設置 HTTPS 服務

Windows Server2008、IIS7啟用CA認證及證書制作完整過程 這篇文章介紹了如何安裝證書申請工具&#xff1b; 如何在iis創建證書申請&#xff1b; 如何使用iis申請證書生成的txt文件&#xff0c;在工具中開始申請證書&#xff1b; 如何導出證書&#xff1b; 以及在網站中開始使用…

Android之衛星菜單的實現

衛星菜單是現在一個非常受歡迎的“控件”&#xff0c;很多Android程序員都趨之若鶩&#xff0c;預覽如下圖。傳統的衛星菜單是用Animation實現的&#xff0c;需要大量的代碼&#xff0c;而且算法極多&#xff0c;一不小心就要通宵Debug。本帖貼出用屬性動畫Animator來實現衛星菜…