Java多線程-工具篇-BlockingQueue

Java多線程-工具篇-BlockingQueue

?

轉載?http://www.cnblogs.com/jackyuj/archive/2010/11/24/1886553.html

?

  • ? 這也是我們在多線程環境下,為什么需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大,讓我們一起來見識下它的常用方法:
    BlockingQueue的核心方法
    放入數據:
      offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,
        則返回true,否則返回false.(本方法不阻塞當前執行方法的線程)
      offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往隊列中
        加入BlockingQueue,則返回失敗。
      put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷
        直到BlockingQueue里面有空間再繼續.
    獲取數據:
      poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,
        取不到時返回null;
      poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內,
        隊列一旦有數據可取,則立即返回隊列中的數據。否則知道時間超時還沒有數據可取,返回失敗。
      take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到
        BlockingQueue有新的數據被加入;?
      drainTo():一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),?
        通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。
  • 常見BlockingQueue
    在了解了BlockingQueue的基本功能后,讓我們來看看BlockingQueue家庭大致有哪些成員??
    ?
  • BlockingQueue成員詳細介紹
    1. ArrayBlockingQueue

    ?? ? ?基于數組的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,這是一個常用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue內部還保存著兩個整形變量,分別標識著隊列的頭部和尾部在數組中的位置。
      ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味著兩者無法真正并行運行,這點尤其不同于LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以采用分離鎖,從而實現生產者和消費者操作的完全并行運行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的數據寫入和獲取操作已經足夠輕巧,以至于引入獨立的鎖機制,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在于,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象。這在長時間內需要高效并發地處理大批量數據的系統中,其對于GC的影響還是存在一定的區別。而在創建ArrayBlockingQueue時,我們還可以控制對象的內部鎖是否采用公平鎖,默認采用非公平鎖。

    2. LinkedBlockingQueue
    ?? ? ?基于鏈表的阻塞隊列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩沖隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,并緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區達到最大值緩存容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對于消費者這端的處理也基于同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理并發數據,還因為其對于生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味著在高并發的情況下生產者和消費者可以并行地操作隊列中的數據,以此來提高整個隊列的并發性能。
    作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大于消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。

    ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞隊列,一般情況下,在處理多線程間的生產者消費者問題,使用這兩個類足以。

    下面的代碼演示了如何使用BlockingQueue:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    /**
    ?* @author jackyuj
    ?*/
    public class BlockingQueueTest {
    ????public static void main(String[] args) throws InterruptedException {
    ????????// 聲明一個容量為10的緩存隊列
    ????????BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
    ????????Producer producer1 = new Producer(queue);
    ????????Producer producer2 = new Producer(queue);
    ????????Producer producer3 = new Producer(queue);
    ????????Consumer consumer = new Consumer(queue);
    ????????// 借助Executors
    ????????ExecutorService service = Executors.newCachedThreadPool();
    ????????// 啟動線程
    ????????service.execute(producer1);
    ????????service.execute(producer2);
    ????????service.execute(producer3);
    ????????service.execute(consumer);
    ????????// 執行10s
    ????????Thread.sleep(10 * 1000);
    ????????producer1.stop();
    ????????producer2.stop();
    ????????producer3.stop();
    ????????Thread.sleep(2000);
    ????????// 退出Executor
    ????????service.shutdown();
    ????}
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    /**
    ?* 消費者線程
    ?*
    ?* @author jackyuj
    ?*/
    public class Consumer implements Runnable {
    ????public Consumer(BlockingQueue<String> queue) {
    ????????this.queue = queue;
    ????}
    ????public void run() {
    ????????System.out.println("啟動消費者線程!");
    ????????Random r = new Random();
    ????????boolean isRunning = true;
    ????????try {
    ????????????while (isRunning) {
    ????????????????System.out.println("正從隊列獲取數據...");
    ????????????????String data = queue.poll(2, TimeUnit.SECONDS);
    ????????????????if (null != data) {
    ????????????????????System.out.println("拿到數據:" + data);
    ????????????????????System.out.println("正在消費數據:" + data);
    ????????????????????Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
    ????????????????} else {
    ????????????????????// 超過2s還沒數據,認為所有生產線程都已經退出,自動退出消費線程。
    ????????????????????isRunning = false;
    ????????????????}
    ????????????}
    ????????} catch (InterruptedException e) {
    ????????????e.printStackTrace();
    ????????????Thread.currentThread().interrupt();
    ????????} finally {
    ????????????System.out.println("退出消費者線程!");
    ????????}
    ????}
    ????private BlockingQueue<String> queue;
    ????private static final int????? DEFAULT_RANGE_FOR_SLEEP = 1000;
    }
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    /**
    ?* 生產者線程
    ?*
    ?* @author jackyuj
    ?*/
    public class Producer implements Runnable {
    ????public Producer(BlockingQueue queue) {
    ????????this.queue = queue;
    ????}
    ????public void run() {
    ????????String data = null;
    ????????Random r = new Random();
    ????????System.out.println("啟動生產者線程!");
    ????????try {
    ????????????while (isRunning) {
    ????????????????System.out.println("正在生產數據...");
    ????????????????Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
    ????????????????data = "data:" + count.incrementAndGet();
    ????????????????System.out.println("將數據:" + data + "放入隊列...");
    ????????????????if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
    ????????????????????System.out.println("放入數據失敗:" + data);
    ????????????????}
    ????????????}
    ????????} catch (InterruptedException e) {
    ????????????e.printStackTrace();
    ????????????Thread.currentThread().interrupt();
    ????????} finally {
    ????????????System.out.println("退出生產者線程!");
    ????????}
    ????}
    ????public void stop() {
    ????????isRunning = false;
    ????}
    ????private volatile boolean????? isRunning?????????????? = true;
    ????private BlockingQueue queue;
    ????private static AtomicInteger? count?????????????????? = new AtomicInteger();
    ????private static final int????? DEFAULT_RANGE_FOR_SLEEP = 1000;
    }
  • 3. DelayQueue
    ?? ? ?DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue是一個沒有大小限制的隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。
    使用場景:
      DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連接隊列。

    4. PriorityBlockingQueue
    ?? ? ?基于優先級的阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定),但需要注意的是PriorityBlockingQueue并不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者。因此使用的時候要特別注意,生產者生產數據的速度絕對不能快于消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。在實現PriorityBlockingQueue時,內部控制線程同步的鎖采用的是公平鎖。

    5. SynchronousQueue
    ?? ? ?一種無緩沖的等待隊列,類似于無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿著產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那么對不起,大家都在集市等待。相對于有緩沖的BlockingQueue來說,少了一個中間經銷商的環節(緩沖區),如果有經銷商,生產者直接把產品批發給經銷商,而無需在意經銷商最終會將這些產品賣給那些消費者,由于經銷商可以庫存一部分商品,因此相對于直接交易模式,總體來說采用中間經銷商的模式會吞吐量高一些(可以批量買賣);但另一方面,又因為經銷商的引入,使得產品從生產者到消費者中間增加了額外的交易環節,單個產品的及時響應性能可能會降低。
      聲明一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。公平模式和非公平模式的區別:
      如果采用公平模式:SynchronousQueue會采用公平鎖,并配合一個FIFO隊列來阻塞多余的生產者和消費者,從而體系整體的公平策略;
      但如果是非公平模式(SynchronousQueue默認):SynchronousQueue采用非公平鎖,同時配合一個LIFO隊列來管理多余的生產者和消費者,而后一種模式,如果生產者和消費者的處理速度有差距,則很容易出現饑渴的情況,即可能有某些生產者或者是消費者的數據永遠都得不到處理。
    • 小結
        BlockingQueue不光實現了一個完整隊列所具有的基本功能,同時在多線程環境下,他還自動管理了多線間的自動等待于喚醒功能,從而使得程序員可以忽略這些細節,關注更高級的功能。?
posted on 2017-06-04 19:59 CanntBelieve 閱讀(...) 評論(...) 編輯 收藏

轉載于:https://www.cnblogs.com/FlyAway2013/p/6941668.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/392561.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/392561.shtml
英文地址,請注明出處:http://en.pswp.cn/news/392561.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

leetcode 204. 計數質數

統計所有小于非負整數 n 的質數的數量。 示例 1&#xff1a; 輸入&#xff1a;n 10 輸出&#xff1a;4 解釋&#xff1a;小于 10 的質數一共有 4 個, 它們是 2, 3, 5, 7 。 解題思路 大于等于5的質數一定和6的倍數相鄰。例如5和7&#xff0c;11和13,17和19等等&#xff1b…

JAVA 網絡編程小記

在進行JAVA網絡編程時&#xff0c;發現寫入的數據對方等200ms左右才會收到。起初認為是JAVA自已進行了 Cache。進行flush也沒有效果。查看JDK代碼&#xff0c;Write操作直接調用的native方法&#xff0c;說明JAVA層面并沒有緩存。再看flush&#xff0c;只是一個空方法. FileOut…

vue生成靜態js文件_如何立即使用Vue.js生成靜態網站

vue生成靜態js文件by Ond?ej Polesn通過Ond?ejPolesn 如何立即使用Vue.js生成靜態網站 (How to generate a static website with Vue.js in no time) You have decided to build a static site, but where do you start? How do you select the right tool for the job wit…

查看文件夾大小的4種方法,總有一種是你喜歡的

有必要檢查文件夾的大小,以確定它們是否占用了太多的存儲空間。此外,如果你通過互聯網或其他存儲設備傳輸文件夾,還需要查看文件夾大小。 幸運的是,在Windows設備上查看文件夾大小非常容易。窗口中提供了圖形化和基于命令行的應用程序,為你提供了多種方法。 如何在Windo…

Python 獲取服務器的CPU個數

在使用gunicorn時&#xff0c;需要設置workers&#xff0c; 例如&#xff1a; gunicorn --workers3 app:app -b 0.0.0.0:9000 其中&#xff0c;worker的數量并不是越多越好&#xff0c;推薦值是CPU的個數x21&#xff0c; CPU個數使用如下的方式獲取&#xff1a; python -c impo…

多種數據庫連接工具_20多種熱門數據工具及其不具備的功能

多種數據庫連接工具In the past few months, the data ecosystem has continued to burgeon as some parts of the stack consolidate and as new challenges arise. Our first attempt to help stakeholders navigate this ecosystem highlighted 25 Hot New Data Tools and W…

怎么連接 mysql_怎樣連接連接數據庫

這個博客是為了說明怎么連接數據庫第一步&#xff1a;肯定是要下載數據庫&#xff0c;本人用的SqlServer2008&#xff0c;是從別人的U盤中拷來的。第二步&#xff1a;數據庫的登錄方式設置為混合登錄&#xff0c;步驟如下&#xff1a;1.打開數據庫這是數據庫界面&#xff0c;要…

webstorm環境安裝配置(less+autoprefixer)

node安裝&#xff1a; 參考地址&#xff1a;http://www.runoob.com/nodejs/nodejs-install-setup.html 1.下載node安裝包并完成安裝 2.在開始菜單打開node 3.查看是否安裝完成&#xff08;npm是node自帶安裝的&#xff09; 命令&#xff1a;node -v npm -v less安裝&#xff1a…

leetcode 659. 分割數組為連續子序列(貪心算法)

給你一個按升序排序的整數數組 num&#xff08;可能包含重復數字&#xff09;&#xff0c;請你將它們分割成一個或多個子序列&#xff0c;其中每個子序列都由連續整數組成且長度至少為 3 。 如果可以完成上述分割&#xff0c;則返回 true &#xff1b;否則&#xff0c;返回 fa…

將JAVA編譯為EXE的幾種方法

< DOCTYPE html PUBLIC -WCDTD XHTML StrictEN httpwwwworgTRxhtmlDTDxhtml-strictdtd> 將JAVA編譯為EXE的幾種方法 -------------------------------------------------------------------------------- 將Java應用程序本地編譯為EXE的幾種方法(建議使用JOVE和JET)  a.…

文本訓練集_訓練文本中的不穩定性

文本訓練集介紹 (Introduction) In text generation, conventionally, maximum likelihood estimation is used to train a model to generate a text one token at a time. Each generated token will be compared against the ground-truth data. If any token is different …

山東省賽 傳遞閉包

https://vjudge.net/contest/311348#problem/A 思路&#xff1a;用floyd傳遞閉包處理點與點之間的關系&#xff0c;之后開數組記錄每個數字比它大的個數和小的個數&#xff0c;如果這個個數超過n/2那么它不可能作為中位數&#xff0c;其他的都有可能。 #include<bits/stdc.h…

如何使用動態工具提示構建React Native圖表

by Vikrant Negi通過Vikrant Negi 如何使用動態工具提示構建React Native圖表 (How to build React Native charts with dynamic tooltips) Creating charts, be it on the web or on mobile apps, has always been an interesting and challenging task especially in React …

如何解決ajax跨域問題(轉)

由 于此前很少寫前端的代碼(哈哈&#xff0c;不合格的程序員啊)&#xff0c;最近項目中用到json作為系統間交互的手段&#xff0c;自然就伴隨著眾多ajax請求&#xff0c;隨之而來的就是要解決 ajax的跨域問題。本篇將講述一個小白從遇到跨域不知道是跨域問題&#xff0c;到知道…

mysql并發錯誤_又談php+mysql并發數據出錯問題

最近&#xff0c;項目中的所有crond定時盡量取消&#xff0c;改成觸發式。比如每日6點清理數據。原來的邏輯&#xff0c;寫一個crond定時搞定現在改為觸發式6點之后第一個玩家/用戶 進入&#xff0c;才開始清理數據。出現了一個問題1 如何確保第一個玩家觸發&#xff1f;updat…

leetcode 621. 任務調度器(貪心算法)

給你一個用字符數組 tasks 表示的 CPU 需要執行的任務列表。其中每個字母表示一種不同種類的任務。任務可以以任意順序執行&#xff0c;并且每個任務都可以在 1 個單位時間內執行完。在任何一個單位時間&#xff0c;CPU 可以完成一個任務&#xff0c;或者處于待命狀態。 然而&…

英國腦科學領域_來自英國A級算法崩潰的數據科學家的4課

英國腦科學領域In the UK, families, educators, and government officials are in an uproar about the effects of a new algorithm for scoring “A-levels,” the advanced level qualifications used to evaluate students’ knowledge of specific subjects in preparati…

MVC發布后項目存在于根目錄中的子目錄中時的css與js、圖片路徑問題

加載固定資源js與css <script src"Url.Content("~/Scripts/js/jquery.min.js")" type"text/javascript"></script> <link href"Url.Content("~/Content/css/shop.css")" rel"stylesheet" type&quo…

telegram 機器人_學習使用Python在Telegram中構建您的第一個機器人

telegram 機器人Imagine this, there is a message bot that will send you a random cute dog image whenever you want, sounds cool right? Let’s make one!想象一下&#xff0c;有一個消息機器人可以隨時隨地向您發送隨機的可愛狗圖像&#xff0c;聽起來很酷吧&#xff1…

判斷輸入的字符串是否為回文_刷題之路(九)--判斷數字是否回文

Palindrome Number問題簡介&#xff1a;判斷輸入數字是否是回文,不是返回0,負數返回0舉例:1:輸入: 121輸出: true2:輸入: -121輸出: false解釋: 回文為121-&#xff0c;所以負數都不符合3:輸入: 10輸出: false解釋: 倒序為01&#xff0c;不符合要求解法一&#xff1a;這道題比較…