BlockingQueue詳解

前言:

?

?? ? 在新增的Concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全“傳輸”數據的問題。通過這些高效并且線程安全的隊列類,為我們快速搭建高質量的多線程程序帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場景。

  • 認識BlockingQueue
    阻塞隊列,顧名思義,首先它是一個隊列,而一個隊列在數據結構中所起的作用大致如下圖所示:

    從上圖我們可以很清楚看到,通過一個共享的隊列,可以使得數據由隊列的一端輸入,從另外一端輸出;
    常用的隊列主要有以下兩種:(當然通過不同的實現方式,還可以延伸出很多不同類型的隊列,DelayQueue就是其中的一種)
      先進先出(FIFO):先插入的隊列的元素也最先出隊列,類似于排隊的功能。從某種程度上來說這種隊列也體現了一種公平性。
      后進先出(LIFO):后插入隊列的元素最先出隊列,這種隊列優先處理最近發生的事件。

    ?? ? ?多線程環境中,通過隊列可以很容易實現數據共享,比如經典的“生產者”和“消費者”模型中,通過隊列可以很便利地實現兩者之間的數據共享。假設我們有若干生產者線程,另外又有若干個消費者線程。如果生產者線程需要把準備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就可以很方便地解決他們之間的數據共享問題。但如果生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的情況呢?理想情況下,如果生產者產出數據的速度大于消費者消費的速度,并且當生產出來的數據累積到一定程度的時候,那么生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。然而,在concurrent包發布以前,在多線程環境下,我們每個程序員都必須去自己控制這些細節,尤其還要兼顧效率和線程安全,而這會給我們的程序帶來不小的復雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多線程領域:所謂阻塞,在某些情況下會掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會自動被喚醒)
    下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景:
           如上圖所示:當隊列中沒有數據的情況下,消費者端的所有線程都會被自動阻塞(掛起),直到有數據放入隊列。


       如上圖所示:當隊列中填滿數據的情況下,生產者端的所有線程都會被自動阻塞(掛起),直到隊列中有空的位置,線程被自動喚醒。
    ?? ? 這也是我們在多線程環境下,為什么需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大,讓我們一起來見識下它的常用方法:
    BlockingQueue的核心方法:
    放入數據:
      offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,
        則返回true,否則返回false.(本方法不阻塞當前執行方法的線程)
      offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往隊列中
        加入BlockingQueue,則返回失敗。
      put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷
        直到BlockingQueue里面有空間再繼續.
    獲取數據:
      poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,
        取不到時返回null;
      poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內,
        隊列一旦有數據可取,則立即返回隊列中的數據。否則知道時間超時還沒有數據可取,返回失敗。
      take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到
        BlockingQueue有新的數據被加入;?
      drainTo():一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),?
        通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。
  • 常見BlockingQueue
    在了解了BlockingQueue的基本功能后,讓我們來看看BlockingQueue家庭大致有哪些成員??
    ?
  • BlockingQueue成員詳細介紹
    1. ArrayBlockingQueue
    ?? ? ?基于數組的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,這是一個常用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue內部還保存著兩個整形變量,分別標識著隊列的頭部和尾部在數組中的位置。
      ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味著兩者無法真正并行運行,這點尤其不同于LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以采用分離鎖,從而實現生產者和消費者操作的完全并行運行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的數據寫入和獲取操作已經足夠輕巧,以至于引入獨立的鎖機制,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在于,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象。這在長時間內需要高效并發地處理大批量數據的系統中,其對于GC的影響還是存在一定的區別。而在創建ArrayBlockingQueue時,我們還可以控制對象的內部鎖是否采用公平鎖,默認采用非公平鎖。

    2. LinkedBlockingQueue
    ?? ? ?基于鏈表的阻塞隊列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩沖隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,并緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區達到最大值緩存容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對于消費者這端的處理也基于同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理并發數據,還因為其對于生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味著在高并發的情況下生產者和消費者可以并行地操作隊列中的數據,以此來提高整個隊列的并發性能。
    作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大于消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。

    ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞隊列,一般情況下,在處理多線程間的生產者消費者問題,使用這兩個類足以。

    下面的代碼演示了如何使用BlockingQueue:
    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    import?java.util.concurrent.BlockingQueue;
    import?java.util.concurrent.ExecutorService;
    import?java.util.concurrent.Executors;
    import?java.util.concurrent.LinkedBlockingQueue;
    /**
    ?* @author jackyuj
    ?*/
    public?class?BlockingQueueTest {
    ????public?static?void?main(String[] args) throws?InterruptedException {
    ????????// 聲明一個容量為10的緩存隊列
    ????????BlockingQueue<String> queue = new?LinkedBlockingQueue<String>(10);
    ????????Producer producer1 = new?Producer(queue);
    ????????Producer producer2 = new?Producer(queue);
    ????????Producer producer3 = new?Producer(queue);
    ????????Consumer consumer = new?Consumer(queue);
    ????????// 借助Executors
    ????????ExecutorService service = Executors.newCachedThreadPool();
    ????????// 啟動線程
    ????????service.execute(producer1);
    ????????service.execute(producer2);
    ????????service.execute(producer3);
    ????????service.execute(consumer);
    ????????// 執行10s
    ????????Thread.sleep(10?* 1000);
    ????????producer1.stop();
    ????????producer2.stop();
    ????????producer3.stop();
    ????????Thread.sleep(2000);
    ????????// 退出Executor
    ????????service.shutdown();
    ????}
    }
    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    import?java.util.Random;
    import?java.util.concurrent.BlockingQueue;
    import?java.util.concurrent.TimeUnit;
    /**
    ?* 消費者線程
    ?*
    ?* @author jackyuj
    ?*/
    public?class?Consumer implements?Runnable {
    ????public?Consumer(BlockingQueue<String> queue) {
    ????????this.queue = queue;
    ????}
    ????public?void?run() {
    ????????System.out.println("啟動消費者線程!");
    ????????Random r = new?Random();
    ????????boolean?isRunning = true;
    ????????try?{
    ????????????while?(isRunning) {
    ????????????????System.out.println("正從隊列獲取數據...");
    ????????????????String data = queue.poll(2, TimeUnit.SECONDS);
    ????????????????if?(null?!= data) {
    ????????????????????System.out.println("拿到數據:"?+ data);
    ????????????????????System.out.println("正在消費數據:"?+ data);
    ????????????????????Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
    ????????????????} else?{
    ????????????????????// 超過2s還沒數據,認為所有生產線程都已經退出,自動退出消費線程。
    ????????????????????isRunning = false;
    ????????????????}
    ????????????}
    ????????} catch?(InterruptedException e) {
    ????????????e.printStackTrace();
    ????????????Thread.currentThread().interrupt();
    ????????} finally?{
    ????????????System.out.println("退出消費者線程!");
    ????????}
    ????}
    ????private?BlockingQueue<String> queue;
    ????private?static?final?int??????DEFAULT_RANGE_FOR_SLEEP = 1000;
    }
    import?java.util.Random;
    import?java.util.concurrent.BlockingQueue;
    import?java.util.concurrent.TimeUnit;
    import?java.util.concurrent.atomic.AtomicInteger;
    /**
    ?* 生產者線程
    ?*
    ?* @author jackyuj
    ?*/
    public?class?Producer implements?Runnable {
    ????public?Producer(BlockingQueue queue) {
    ????????this.queue = queue;
    ????}
    ????public?void?run() {
    ????????String data = null;
    ????????Random r = new?Random();
    ????????System.out.println("啟動生產者線程!");
    ????????try?{
    ????????????while?(isRunning) {
    ????????????????System.out.println("正在生產數據...");
    ????????????????Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
    ????????????????data = "data:"?+ count.incrementAndGet();
    ????????????????System.out.println("將數據:"?+ data + "放入隊列...");
    ????????????????if?(!queue.offer(data, 2, TimeUnit.SECONDS)) {
    ????????????????????System.out.println("放入數據失敗:"?+ data);
    ????????????????}
    ????????????}
    ????????} catch?(InterruptedException e) {
    ????????????e.printStackTrace();
    ????????????Thread.currentThread().interrupt();
    ????????} finally?{
    ????????????System.out.println("退出生產者線程!");
    ????????}
    ????}
    ????public?void?stop() {
    ????????isRunning = false;
    ????}
    ????private?volatile?boolean??????isRunning?????????????? = true;
    ????private?BlockingQueue queue;
    ????private?static?AtomicInteger? count?????????????????? = new?AtomicInteger();
    ????private?static?final?int??????DEFAULT_RANGE_FOR_SLEEP = 1000;
    }
  • 3. DelayQueue
    ?? ? ?DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue是一個沒有大小限制的隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。
    使用場景:
      DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連接隊列。

    4. PriorityBlockingQueue
    ?? ? ?基于優先級的阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定),但需要注意的是PriorityBlockingQueue并不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者。因此使用的時候要特別注意,生產者生產數據的速度絕對不能快于消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。在實現PriorityBlockingQueue時,內部控制線程同步的鎖采用的是公平鎖。

    5. SynchronousQueue
    ?? ? ?一種無緩沖的等待隊列,類似于無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿著產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那么對不起,大家都在集市等待。相對于有緩沖的BlockingQueue來說,少了一個中間經銷商的環節(緩沖區),如果有經銷商,生產者直接把產品批發給經銷商,而無需在意經銷商最終會將這些產品賣給那些消費者,由于經銷商可以庫存一部分商品,因此相對于直接交易模式,總體來說采用中間經銷商的模式會吞吐量高一些(可以批量買賣);但另一方面,又因為經銷商的引入,使得產品從生產者到消費者中間增加了額外的交易環節,單個產品的及時響應性能可能會降低。
      聲明一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。公平模式和非公平模式的區別:
      如果采用公平模式:SynchronousQueue會采用公平鎖,并配合一個FIFO隊列來阻塞多余的生產者和消費者,從而體系整體的公平策略;
      但如果是非公平模式(SynchronousQueue默認):SynchronousQueue采用非公平鎖,同時配合一個LIFO隊列來管理多余的生產者和消費者,而后一種模式,如果生產者和消費者的處理速度有差距,則很容易出現饑渴的情況,即可能有某些生產者或者是消費者的數據永遠都得不到處理。
  • 小結
      BlockingQueue不光實現了一個完整隊列所具有的基本功能,同時在多線程環境下,他還自動管理了多線間的自動等待于喚醒功能,從而使得程序員可以忽略這些細節,關注更高級的功能。?

轉載于:https://www.cnblogs.com/xuxiuxiu/p/6830485.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/541406.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/541406.shtml
英文地址,請注明出處:http://en.pswp.cn/news/541406.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

java isempty_Java ArrayDeque isEmpty()方法與示例

java isemptyArrayDeque類isEmpty()方法 (ArrayDeque Class isEmpty() method) isEmpty() Method is available in java.lang package. isEmpty()方法在java.lang包中可用。 isEmpty() Method is used to check whether this deque is "empty" or "non-empty&qu…

[QGLViewer]3D場景鼠標點擊位置

重載鼠標事件&#xff1a; void AxMapControl::mousePressEvent(QMouseEvent* e) {switch(currentTool){case AX_DRAW_DIRECTION:{if (e->button() Qt::LeftButton) {QPoint screenPte->pos();qglviewer::Vec orig1, dir1;camera()->convertClickToLine(screenPt, or…

elispce導入java項目_emacs的java編程環境設置(jdee,lib,cedet,ecb

1&#xff1a;下載jdee,lib,ecb。(已安裝cedet就不用再安了)2&#xff1a;解壓縮放入load-path目錄。然后load&#xff0c;require。(add-to-list load-path "~/.emacs.d/lisp/jdee-2.4.0.1/lisp")(add-to-list load-path "~/.emacs.d/lisp/elib-1.0")(add…

element 項目 示例_Java ArrayDeque element()方法與示例

element 項目 示例ArrayDeque類element()方法 (ArrayDeque Class element() method) element() Method is available in java.lang package. element()方法在java.lang包中可用。 element() Method is used to retrieve the first element of the deque but without removing t…

can收發器 rx_CANOpen系列教程03 _CAN收發器功能、原理及作用

1寫在前面前面文章是從大方向介紹了CAN網絡&#xff0c;讓大家對CAN網絡有一定的認識。本文將范圍縮小&#xff0c;講述整個CAN網絡其中的一個CAN收發器。如下圖標記出來的部分&#xff1a;本文結合眾多初學者容易產生的疑問來講述CAN收發器相關的知識點&#xff0c;大概有如下…

操作系統文件分配策略_操作系統中的文件分配方法

操作系統文件分配策略分配方法 (Allocation Method) The allocation method defines how the files are stored in the disk blocks. The direct access nature of the disks gives us the flexibility to implement the files. In many cases, different files or many files …

簡述container與container-fluid的區別

在bootstrap中的布局容器一欄中&#xff0c;提供了container與container-fluid兩種容器&#xff0c;其官方解釋為&#xff1a; .container 類用于固定寬度并支持響應式布局的容器。 .container-fluid 類用于 100% 寬度&#xff0c;占據全部視口&#xff08;viewport&#xff09…

centos php fpm 停止_如何關閉php-fpm進程?

因為你是編譯的&#xff0c;可以在源碼中復制php-fpm的init文件到系統中&#xff1a;cp -f sapi/fpm/init.d.php-fpm /etc/init.d/php-fpm然后就可以使用以下命令啟動、停止、重啟和重新加載php-fpm了&#xff1a;service php-fpm startservice php-fpm restartservice php-fpm…

minus_Java Duration類| minus()方法與示例

minus持續時間類minus()方法 (Duration Class minus() method) Syntax: 句法&#xff1a; public Duration minus(Duration d);public Duration minus(long amt, TemporalUnit t_unit);minus() method is available in java.time package. minus()方法在java.time包中可用。 m…

Mongodb聚合函數

插入 測試數據 for(var j1;j<3;j){ for(var i1;i<3;i){ var person{Name:"jack"i,Age:i,Address:["henan","wuhan"],Course:[{Name:"shuxue",Score:i},{Name:"wuli",Score:i}]}db.DemoTest.Person.insert(pers…

php rename函數_php rename函數怎么用

PHP rename()函數用于重命名文件或目錄&#xff0c;語法“rename(文件舊名稱,新名稱,句柄環境)”&#xff0c;使用用戶指定的新名稱更改文件或目錄的舊名稱&#xff0c;并且可以根據需要在目錄之間移動&#xff1b;成功時返回True&#xff0c;失敗時返回False。php rename()函數…

Java BigInteger類| xor()方法與示例

BigInteger類的xor()方法 (BigInteger Class xor() method) xor() method is available in java.math package. xor()方法在java.math包中可用。 xor() method is used to perform xor operation between this BigInteger and the given BigInteger and we all know when we pe…

Spring Data Redis實戰之提供RedisTemplate

為什么80%的碼農都做不了架構師&#xff1f;>>> 參考&#xff1a; http://www.cnblogs.com/edwinchen/p/3816938.html 本項目創建的是Maven項目 一、pom.xml引入dependencies <dependency><groupId>org.springframework.data</groupId><artif…

Java BigInteger類| and()方法與示例

BigInteger類和()方法 (BigInteger Class and() method) and() method is available in java.math package. and()方法在java.math包中可用。 and() method is used to perform and operation between this BigInteger and the given BigInteger (val) [i.e. (this BigInteger)…

php映射,PHP實現路由映射到指定控制器

自定義路由的功能&#xff0c;指定到pathinfo的url上,再次升級之前的腳本SimpleLoader.phpclass SimpleLoader{public static function run($rulesarray()){header("content-type:text/html;charsetutf-8");self::register();self::commandLine();self::router($rule…

stl vector 函數_vector :: clear()函數,以及C ++ STL中的示例

stl vector 函數C vector :: clear()函數 (C vector::clear() function) vector::clear() is a library function of "vector" header, it is used to remove/clear all elements of the vector, it makes the 0 sized vector after removing all elements. vector …

Commonjs規范及Node模塊實現

前面的話 Node在實現中并非完全按照CommonJS規范實現&#xff0c;而是對模塊規范進行了一定的取舍&#xff0c;同時也增加了少許自身需要的特性。本文將詳細介紹NodeJS的模塊實現 引入 nodejs是區別于javascript的&#xff0c;在javascript中的頂層對象是window&#xff0c;而在…

thinkphp3 php jwt,ThinkPHP5 使用 JWT 進行加密

- 使用 Composer安裝此擴展- 代碼示例<?php /*** [InterCommon-接口公用]* Author RainCyan* DateTime 2019-08-12T16:38:080800*/namespace app\hladmin\controller;use think\Controller;use \Firebase\JWT\JWT;class InterCommonController extends Controller {private…

數據管理與商業智能_商業智能與數據科學

數據管理與商業智能In this heavily jargonized trade, the words typically overlap one another, leading to a scarcity of understanding or a state of confusion around these ideas. whereas big data vs analytics or computing vs machine learning vs cognitive inte…

JavaWeb網上圖書商城完整項目--day02-14.登錄功能的login頁面處理

1、現在注冊成功之后&#xff0c;我們來到登錄頁面&#xff0c;登錄頁面在于 在登錄頁面。我們也需要向注冊頁面一樣對登錄的用戶名、密碼 驗證碼等在jsp頁面中進行校驗&#xff0c;校驗我們單獨放置一個login.js文件中進行處理&#xff0c;然后login.jsp加載該js文件 我們來看…