聊聊并發——生產者消費者模式

在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什么要使用生產者和消費者模式

在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。

什么是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。

這個阻塞隊列就是用來給生產者和消費者解耦的。縱觀大多數設計模式,都會找一個第三者出來進行解耦,如工廠模式的第三者是工廠類,模板模式的第三者是模板類。在學習一些設計模式的過程中,如果先找到這個模式的第三者,能幫助我們快速熟悉一個設計模式。

生產者消費者模式實戰

我和同事一起利用業余時間開發的Yuna工具中使用了生產者和消費者模式。首先我先介紹下Yuna工具,在阿里巴巴很多同事都喜歡通過郵件分享技術文章,因為通過郵件分享很方便,同學們在網上看到好的技術文章,復制粘貼發送就完成了分享,但是我們發現技術文章不能沉淀下來,對于新來的同學看不到以前分享的技術文章,大家也很難找到以前分享過的技術文章。為了解決這問題,我們開發了Yuna工具。Yuna取名自我喜歡的一款游戲最終幻想里的女主角。

首先我們申請了一個專門用來收集分享郵件的郵箱,比如share@alibaba.com,同學將分享的文章發送到這個郵箱,讓同學們每次都抄送到這個郵箱肯定很麻煩,所以我們的做法是將這個郵箱地址放在部門郵件列表里,所以分享的同學只需要象以前一樣向整個部門分享文章就行,Yuna工具通過讀取郵件服務器里該郵箱的郵件,把所有分享的郵件下載下來,包括郵件的附件,圖片,和郵件回復,我們可能會從這個郵箱里下載到一些非分享的文章,所以我們要求分享的郵件標題必須帶有一個關鍵字,比如[內貿技術分享],下載完郵件之后,通過confluence的web service接口,把文章插入到confluence里,這樣新同事就可以在confluence里看以前分享過的文章,并且Yuna工具還可以自動把文章進行分類和歸檔。

為了快速上線該功能,當時我們花了三天業余時間快速開發了Yuna1.0版本。在1.0版本中我并沒有使用生產者消費模式,而是使用單線程來處理,因為當時只需要處理我們一個部門的郵件,所以單線程明顯夠用,整個過程是串行執行的。在一個線程里,程序先抽取全部的郵件,轉化為文章對象,然后添加全部的文章,最后刪除抽取過的郵件。代碼如下:

public void extract() {logger.debug("開始" + getExtractorName() + "。。");//抽取郵件List<Article> articles = extractEmail();//添加文章for (Article article : articles) {addArticleOrComment(article);}//清空郵件cleanEmail();logger.debug("完成" + getExtractorName() + "。。");}

Yuna工具在推廣后,越來越多的部門使用這個工具,處理的時間越來越慢,Yuna是每隔5分鐘進行一次抽取的,而當郵件多的時候一次處理可能就花了幾分鐘,于是我在Yuna2.0版本里使用了生產者消費者模式來處理郵件,首先生產者線程按一定的規則去郵件系統里抽取郵件,然后存放在阻塞隊列里,消費者從阻塞隊列里取出文章后插入到conflunce里。代碼如下:

public class QuickEmailToWikiExtractor extends AbstractExtractor {private ThreadPoolExecutor      threadsPool;private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue;public QuickEmailToWikiExtractor() {emailQueue= new ArticleBlockingQueue<ExchangeEmailShallowDTO>();int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000));}public void extract() {logger.debug("開始" + getExtractorName() + "。。");long start = System.currentTimeMillis();//抽取所有郵件放到隊列里new ExtractEmailTask().start();// 把隊列里的文章插入到WikiinsertToWiki();long end = System.currentTimeMillis();double cost = (end - start) / 1000;logger.debug("完成" + getExtractorName() + ",花費時間:" + cost + "秒");}/*** 把隊列里的文章插入到Wiki*/private void insertToWiki() {//登錄wiki,每間隔一段時間需要登錄一次confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD);while (true) {//2秒內取不到就退出ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS);if (email == null) {break;}threadsPool.submit(new insertToWikiTask(email));}}protected List<Article> extractEmail() {List<ExchangeEmailShallowDTO> allEmails = getEmailService().queryAllEmails();if (allEmails == null) {return null;}for (ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails) {emailQueue.offer(exchangeEmailShallowDTO);}return null;}/*** 抽取郵件任務* * @author tengfei.fangtf*/public class ExtractEmailTask extends Thread {public void run() {extractEmail();}}
}

多生產者和多消費者場景

在多核時代,多線程并發處理速度比單線程處理速度更快,所以我們可以使用多個線程來生產數據,同樣可以使用多個消費線程來消費數據。而更復雜的情況是,消費者消費的數據,有可能需要繼續處理,于是消費者處理完數據之后,它又要作為生產者把數據放在新的隊列里,交給其他消費者繼續處理。如下圖:

我們在一個長連接服務器中使用了這種模式,生產者1負責將所有客戶端發送的消息存放在阻塞隊列1里,消費者1從隊列里讀消息,然后通過消息ID進行hash得到N個隊列中的一個,然后根據編號將消息存放在到不同的隊列里,每個阻塞隊列會分配一個線程來消費阻塞隊列里的數據。如果消費者2無法消費消息,就將消息再拋回到阻塞隊列1中,交給其他消費者處理。

以下是消息總隊列的代碼;

/*** 總消息隊列管理* * @author tengfei.fangtf*/
public class MsgQueueManager implements IMsgQueue{private static final Logger              LOGGER             = LoggerFactory.getLogger(MsgQueueManager.class);/*** 消息總隊列*/public final BlockingQueue<Message> messageQueue;private MsgQueueManager() {messageQueue = new LinkedTransferQueue<Message>();}public void put(Message msg) {try {messageQueue.put(msg);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}public Message take() {try {return messageQueue.take();} catch (InterruptedException e) {Thread.currentThread().interrupt();}return null;}}

啟動一個消息分發線程。在這個線程里子隊列自動去總隊列里獲取消息。

/*** 分發消息,負責把消息從大隊列塞到小隊列里* * @author tengfei.fangtf*/static class DispatchMessageTask implements Runnable {@Overridepublic void run() {BlockingQueue<Message> subQueue;for (;;) {//如果沒有數據,則阻塞在這里Message msg = MsgQueueFactory.getMessageQueue().take();//如果為空,則表示沒有Session機器連接上來,
需要等待,直到有Session機器連接上來while ((subQueue = getInstance().getSubQueue()) == null) {try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}//把消息放到小隊列里try {subQueue.put(msg);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}}

使用Hash算法獲取一個子隊列。

/*** 均衡獲取一個子隊列。* * @return*/public BlockingQueue<Message> getSubQueue() {int errorCount = 0;for (;;) {if (subMsgQueues.isEmpty()) {return null;}int index = (int) (System.nanoTime() % subMsgQueues.size());try {return subMsgQueues.get(index);} catch (Exception e) {//出現錯誤表示,在獲取隊列大小之后,隊列進行了一次刪除操作LOGGER.error("獲取子隊列出現錯誤", e);if ((++errorCount) < 3) {continue;}}}}

使用的時候我們只需要往總隊列里發消息。

//往消息隊列里添加一條消息IMsgQueue messageQueue = MsgQueueFactory.getMessageQueue();Packet msg = Packet.createPacket(Packet64FrameType.
TYPE_DATA, "{}".getBytes(), (short) 1);messageQueue.put(msg);

小結

本章講解了生產者消費者模式,并給出了實例。讀者可以在平時的工作中思考下哪些場景可以使用生產者消費者模式,我相信這種場景應該非常之多,特別是需要處理任務時間比較長的場景,比如上傳附件并處理,用戶把文件上傳到系統后,系統把文件丟到隊列里,然后立刻返回告訴用戶上傳成功,最后消費者再去隊列里取出文件處理。比如調用一個遠程接口查詢數據,如果遠程服務接口查詢時需要幾十秒的時間,那么它可以提供一個申請查詢的接口,這個接口把要申請查詢任務放數據庫中,然后該接口立刻返回。然后服務器端用線程輪詢并獲取申請任務進行處理,處理完之后發消息給調用方,讓調用方再來調用另外一個接口拿數據。

另外Java中的線程池類其實就是一種生產者和消費者模式的實現方式,但是實現方法更高明。生產者把任務丟給線程池,線程池創建線程并處理任務,如果將要運行的任務數大于線程池的基本線程數就把任務扔到阻塞隊列里,這種做法比只使用一個阻塞隊列來實現生產者和消費者模式顯然要高明很多,因為消費者能夠處理直接就處理掉了,這樣速度更快,而生產者先存,消費者再取這種方式顯然慢一些。

我們的系統也可以使用線程池來實現多生產者消費者模式。比如創建N個不同規模的Java線程池來處理不同性質的任務,比如線程池1將數據讀到內存之后,交給線程池2里的線程繼續處理壓縮數據。線程池1主要處理IO密集型任務,線程池2主要處理CPU密集型任務。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/447731.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/447731.shtml
英文地址,請注明出處:http://en.pswp.cn/news/447731.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

多個 VUE 前端工程部署設置、nginx 代理配置

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 PS&#xff1a;早期 我只有一個 VUE 前端工程&#xff1a;gentle-vue &#xff0c;加一個 java 后端工程&#xff1a;gentle &#xff0…

FreeSql (二十六)貪婪加載 Include、IncludeMany、Dto、ToList

貪婪加載顧名思議就是把所有要加載的東西一次性讀取。 本節內容為了配合【延時加載】而誕生&#xff0c;貪婪加載和他本該在一起介紹&#xff0c;開發項目的過程中應該雙管齊下&#xff0c;才能寫出高質量的程序。 Dto 映射查詢 Select<Tag>().Limit(10).ToList(a > n…

FreeSql (二十七)將已寫好的 SQL 語句,與實體類映射進行二次查詢

有時候&#xff0c;我們希望將寫好的 sql 語句&#xff0c;甚至是存儲過程進行查詢&#xff0c;雖然效率不高&#xff08;有時候并不是效率至上&#xff09;。 巧用AsTable var sql fsql.Select<UserX>().AsTable((a, b) > "(select * from user where clicks &…

解決: 網站訪問報錯 AccessDenied (阿里云 OSS + CDN )

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 1. 使用阿里云 OSS服務 CDN 服務后&#xff0c;直接用頂級域名訪問個人站點失敗&#xff0c;報錯如下&#xff1a; <Code>Acces…

二十一世紀Windows簡史

摘要&#xff1a;Windows擁有者超過90%的消費級操作系統市場份額&#xff0c;處于龍頭老大的位置&#xff0c;那其成長的故事是怎么的&#xff1f;ZDNet總結了21世紀Windows的發展史&#xff0c;以及圍繞微軟操作系統發生的事情&#xff0c;不妨一看。 據微軟4月26日的官方通知…

FreeSql (二十八)事務

FreeSql實現了四種數據庫事務的使用方法&#xff0c;臟讀等事務相關方法暫時未提供。主要原因系這些方法各大數據庫、甚至引擎的事務級別五花八門較難統一。 事務用于處理數據的一致性&#xff0c;處于同一個事務中的操作是一個UnitOfWork&#xff0c;要么全部執行成功&#xf…

VUE 項目中引入 json 配置

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 這個寫法還是第一次見到&#xff0c;也不知道是否還有什么環境配置&#xff0c;記錄一下&#xff0c;或許以后什么時候也可以參考&#…

新手課堂之汽車燈光操作及位置

駕考科目三模擬夜間燈光操作你了解多少&#xff1f;汽車燈光該如何操作&#xff1f;下面我們隨眾悅學車網編輯一起來看看吧&#xff01; 科目三考試中&#xff0c;模擬夜間燈光使用是每個學員都要參加的一項考試&#xff0c;那么&#xff0c;汽車燈光包括些什么燈呢&#xff1f…

FreeSql (二十九)Lambda 表達式

FreeSql 支持功能豐富的表達式函數解析&#xff0c;方便程序員在不了解數據庫函數的情況下編寫代碼。這是 FreeSql 非常特色的功能之一&#xff0c;深入細化函數解析盡量做到滿意&#xff0c;所支持的類型基本都可以使用對應的表達式函數&#xff0c;例如 日期、字符串、IN查詢…

Spring注解 @Qualifier 說明、用法

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 -------------------------------------------- 我是分隔線 --------------------------------------------------- Qualifier&#xf…

科目三中模擬燈光使用考試常見的錯誤 廣州學車網光大國際駕校學車

夜間駕駛雖是 駕照考試中抽選的內容&#xff0c;但科目三中模擬燈光使用考試&#xff0c;還是要了解一下的。以下列出了考試中常見的錯誤。 1.前照燈非遠光狀態&#xff0c;聽到“請將前照燈變換成遠光”指令&#xff0c;不變換或變換錯誤的或者前照燈在遠光狀態下&#xff0c;…

FreeSql (三十)讀寫分離

FreeSql 支持數據庫讀寫分離&#xff0c;本功能是客戶端的讀寫分離行為&#xff0c;數據庫服務器該怎么配置仍然那樣配置&#xff0c;不受本功能影響&#xff0c;為了方便描術后面講到的【讀寫分離】都是指客戶端的功能支持。 各種數據庫的讀寫方案不一&#xff0c;數據庫端開啟…

把 excel 表中的數據 批量修改到指定數據庫表中、根據 excel 表中數據修改數據庫表中數據

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 1. 收到一張 excel 表&#xff0c;要求根據 “轉賬時間”一列的值批量修改數據庫表中 "放款時間"一列的值。 2. 寫出 sql 模…

科?目?三?智?能?考?試?系?統?實?際?道?路?考?試?項?目?評?判?標?準

科目三智能考試系統是將公安部實際道路考試項目評判標準加以量化&#xff0c;重點考學員的駕駛技能、安全意識及文明駕車的理念&#xff0c;并將其融入 各個考試項目之中。 二考試技術指標 1、上車準備 考試開始后&#xff0c;學員應當根據考試員發出的“上車準備”指令后…

FreeSql (三十一)分區分表

分區 分區就是把一個數據表的文件和索引分散存儲在不同的物理文件中。把一張表的數據分成N多個區塊&#xff0c;這些區塊可以在同一個磁盤上&#xff0c;也可以在不同的磁盤上&#xff0c;數據庫不同實現方式有所不同。 與分表不同&#xff0c;一張大表進行分區后&#xff0c;他…

FreeSql (三十二)Aop

FreeSql AOP 已有的功能介紹&#xff0c;未來為會根據用戶需求不斷增強。 審計 CRUD 馬云說過&#xff0c;996是修福報。對于多數程序員來說&#xff0c;加班是好事。。。起碼不是閑人&#xff0c;不會下崗。 當如果因為某個 sql 騷操作耗時很高&#xff0c;沒有一個相關的審計…

SpringMvc 注解 @InitBinder 表單多對象精準綁定接收

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 InitBinder用于在Controller中標注于方法&#xff0c;表示為當前控制器注冊一個屬性編輯器或者其他&#xff0c;只對當前的Controller有…

2014年科目三智能化考試十大必知事項

一、模擬夜考燈光使用 1&#xff0c;夜間通過急轉彎/坡道/拱橋/人行橫道/或沒有交通信號燈控制的路口&#xff1b; 燈光使用&#xff1a;交替使用遠近光(變光2次以上) 2&#xff0c;夜間在窄路窄橋與非機動車會車 燈光使用&#xff1a;近光燈 3&#xff0c;夜間在道路上發生故障…

SpringMVC注解 @initbinder 解決類型轉換問題

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 使用 SpringMVC 時&#xff0c;常遇到表單中日期字符串和 JavaBean 的 Date 類型的轉換&#xff0c;而 SpringMVC 默認不支持這個格式的…

看了就徹底明白人生!!!

出生一張紙&#xff0c;開始一輩子&#xff1b; 畢業一張紙&#xff0c;奮斗一輩子&#xff1b; 婚姻一張紙&#xff0c;折磨一輩子&#xff1b; 做官一張紙&#xff0c;斗爭一輩子&#xff1b; 金錢一張紙&#xff0c;辛苦一輩子&#xff1b; 榮譽一張紙&#xff0c;虛名一輩子…