java 多線程阻塞隊列 與 阻塞方法與和非阻塞方法

Queue是什么

隊列,是一種數據結構。除了優先級隊列和LIFO隊列外,隊列都是以FIFO(先進先出)的方式對各個元素進行排序的。無論使用哪種排序方式,隊列的頭都是調用remove()或poll()移除元素的。在FIFO隊列中,所有新元素都插入隊列的末尾。隊列都是線程安全的,內部已經實現安全措施,不用我們擔心

?

Queue中的方法

Queue中的方法不難理解,6個,每2對是一個也就是總共3對。看一下JDK API就知道了:

注意一點就好,Queue通常不允許插入Null,盡管某些實現(比如LinkedList)是允許的,但是也不建議。



使用非阻塞隊列的時候有一個很大問題就是:它不會對當前線程產生阻塞,那么在面對類似消費者-生產者的模型時,就必須額外地實現同步策略以及線程間喚醒策略,這個實現起來就非常麻煩。但是有了阻塞隊列就不一樣了,它會對當前線程產生阻塞,比如一個線程從一個空的阻塞隊列中取元素,此時線程會被阻塞直到阻塞隊列中有了元素。當隊列中有元素后,被阻塞的線程會自動被喚醒(不需要我們編寫代碼去喚醒)。這樣提供了極大的方便性。

一.幾種主要的阻塞隊列

自從Java 1.5之后,在java.util.concurrent包下提供了若干個阻塞隊列,主要有以下幾個:

  ArrayBlockingQueue:基于數組實現的一個阻塞隊列,在創建ArrayBlockingQueue對象時必須制定容量大小。并且可以指定公平性與非公平性,默認情況下為非公平的,即不保證等待時間最長的隊列最優先能夠訪問隊列。

  LinkedBlockingQueue:基于鏈表實現的一個阻塞隊列,在創建LinkedBlockingQueue對象時如果不指定容量大小,則默認大小為Integer.MAX_VALUE。

  PriorityBlockingQueue:以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,它會按照元素的優先級對元素進行排序,按照優先級順序出隊,每次出隊的元素都是優先級最高的元素。注意,此阻塞隊列為無界阻塞隊列,即容量沒有上限(通過源碼就可以知道,它沒有容器滿的信號標志),前面2種都是有界隊列。

  DelayQueue:基于PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。


二.阻塞隊列中的方法 VS 非阻塞隊列中的方法
1.非阻塞隊列中的幾個主要方法:
add(E e):將元素e插入到隊列末尾,如果插入成功,則返回true;如果插入失敗(即隊列已滿),則會拋出異常;
remove():移除隊首元素,若移除成功,則返回true;如果移除失敗(隊列為空),則會拋出異常;
offer(E e):將元素e插入到隊列末尾,如果插入成功,則返回true;如果插入失敗(即隊列已滿),則返回false;
poll():移除并獲取隊首元素,若成功,則返回隊首元素;否則返回null;
peek():獲取隊首元素,若成功,則返回隊首元素;否則返回null

對于非阻塞隊列,一般情況下建議使用offer、poll和peek三個方法,不建議使用add和remove方法。因為使用offer、poll和peek三個方法可以通過返回值判斷操作成功與否,而使用add和remove方法卻不能達到這樣的效果。注意,非阻塞隊列中的方法都沒有進行同步措施。

//在這篇筆記中沒有介紹非阻塞隊列,大部分阻塞隊列都可以有非阻塞方法和阻塞方法
2.阻塞隊列中的幾個主要方法:
阻塞隊列包括了非阻塞隊列中的大部分方法,上面列舉的5個方法在阻塞隊列中都存在,但是要注意這5個方法在阻塞隊列中都進行了同步措施。除此之外,阻塞隊列提供了另外4個非常有用的方法:
put(E e)
take()
offer(E e,long timeout, TimeUnit unit)
poll(long timeout, TimeUnit unit)
put方法用來向隊尾存入元素,如果隊列滿,則等待;
take方法用來從隊首取元素,如果隊列為空,則等待;
offer方法用來向隊尾存入元素,如果隊列滿,則等待一定的時間,當時間期限達到時,如果還沒有插入成功,則返回false;否則返回true;
poll方法用來從隊首取元素,如果隊列空,則等待一定的時間,當時間期限達到時,如果取到,則返回null;否則返回取得的元素;
?

注意:

1、必須要使用take()方法在獲取的時候達成阻塞結果
2、使用poll()方法將產生非阻塞效果


三.阻塞隊列的實現原理
前面談到了非阻塞隊列和阻塞隊列中常用的方法,下面來探討阻塞隊列的實現原理,本文以ArrayBlockingQueue為例,其他阻塞隊列實現原理可能和ArrayBlockingQueue有一些差別,但是大體思路應該類似,有興趣的朋友可自行查看其他阻塞隊列的實現源碼。
首先看一下ArrayBlockingQueue類中的幾個成員變量:

  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {
  3. private static final long serialVersionUID = -817911632652898426L;
  4. /** The queued items */
  5. private final E[] items;
  6. /** items index for next take, poll or remove */
  7. private int takeIndex;
  8. /** items index for next put, offer, or add. */
  9. private int putIndex;
  10. /** Number of items in the queue */
  11. private int count;
  12. /*
  13. * Concurrency control uses the classic two-condition algorithm
  14. * found in any textbook.
  15. */
  16. /** Main lock guarding all access */
  17. private final ReentrantLock lock;
  18. /** Condition for waiting takes */
  19. private final Condition notEmpty;
  20. /** Condition for waiting puts */
  21. private final Condition notFull;
  22. }

可以看出,ArrayBlockingQueue中用來存儲元素的實際上是一個數組,takeIndex和putIndex分別表示隊首元素和隊尾元素的下標,count表示隊列中元素的個數。
lock是一個可重入鎖,notEmpty和notFull是等待條件。
下面看一下ArrayBlockingQueue的構造器,構造器有三個重載版本:
  1. public ArrayBlockingQueue(int capacity) {
  2. }
  3. public ArrayBlockingQueue(int capacity, boolean fair) {
  4. ?
  5. }
  6. public ArrayBlockingQueue(int capacity, boolean fair,
  7. ????????????????????????? Collection<? extends E> c) {
  8. }

第一個構造器只有一個參數用來指定容量,第二個構造器可以指定容量和公平性,第三個構造器可以指定容量、公平性以及用另外一個集合進行初始化。
然后看它的兩個關鍵方法的實現:put()和take():
  1. public void put(E e) throws InterruptedException {
  2. ??? if (e == null) throw new NullPointerException();
  3. ??? final E[] items = this.items;
  4. ??? final ReentrantLock lock = this.lock;
  5. ??? lock.lockInterruptibly();
  6. ??? try {
  7. ??????? try {
  8. ??????????? while (count == items.length)
  9. ??????????????? notFull.await();
  10. ??????? } catch (InterruptedException ie) {
  11. ??????????? notFull.signal(); // propagate to non-interrupted thread
  12. ??????????? throw ie;
  13. ??????? }
  14. ??????? insert(e);
  15. ??? } finally {
  16. ??????? lock.unlock();
  17. ??? }
  18. }

從put方法的實現可以看出,它先獲取了鎖,并且獲取的是可中斷鎖,然后判斷當前元素個數是否等于數組的長度,如果相等,則調用notFull.await()進行等待,如果捕獲到中斷異常,則喚醒線程并拋出異常。
當被其他線程喚醒時,通過insert(e)方法插入元素,最后解鎖。
我們看一下insert方法的實現:
  1. private void insert(E x) {
  2. ??? items[putIndex] = x;
  3. ??? putIndex = inc(putIndex);
  4. ??? ++count;
  5. ??? notEmpty.signal();
  6. }

它是一個private方法,插入成功后,通過notEmpty喚醒正在等待取元素的線程。
下面是take()方法的實現:
  1. public E take() throws InterruptedException {
  2. ??? final ReentrantLock lock = this.lock;
  3. ??? lock.lockInterruptibly();
  4. ??? try {
  5. ??????? try {
  6. ??????????? while (count == 0)
  7. ??????????????? notEmpty.await();
  8. ??????? } catch (InterruptedException ie) {
  9. ??????????? notEmpty.signal(); // propagate to non-interrupted thread
  10. ??????????? throw ie;
  11. ??????? }
  12. ??????? E x = extract();
  13. ??????? return x;
  14. ??? } finally {
  15. ??????? lock.unlock();
  16. ??? }
  17. }

跟put方法實現很類似,只不過put方法等待的是notFull信號,而take方法等待的是notEmpty信號。在take方法中,如果可以取元素,則通過extract方法取得元素,下面是extract方法的實現:
  1. private E extract() {
  2. ??? final E[] items = this.items;
  3. ??? E x = items[takeIndex];
  4. ??? items[takeIndex] = null;
  5. ??? takeIndex = inc(takeIndex);
  6. ??? --count;
  7. ??? notFull.signal();
  8. ??? return x;
  9. }

跟insert方法也很類似。
其實從這里大家應該明白了阻塞隊列的實現原理,事實它和我們用Object.wait()、Object.notify()和非阻塞隊列實現生產者-消費者的思路類似,只不過它把這些工作一起集成到了阻塞隊列中實現。


四.示例和使用場景
下面先使用Object.wait()和Object.notify()、非阻塞隊列實現生產者-消費者模式:
  1. public class Test {
  2. ??? private int queueSize = 10;
  3. ??? private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
  4. ?
  5. ??? public static void main(String[] args)? {
  6. ??????? Test test = new Test();
  7. ??????? Producer producer = test.new Producer();
  8. ??????? Consumer consumer = test.new Consumer();
  9. ?
  10. ??????? producer.start();
  11. ??????? consumer.start();
  12. ??? }
  13. ?
  14. ??? class Consumer extends Thread{
  15. ?
  16. ??????? @Override
  17. ??????? public void run() {
  18. ??????????? consume();
  19. ??????? }
  20. ?
  21. ??????? private void consume() {
  22. ??????????? while(true){
  23. ??????????????? synchronized (queue) {
  24. ??????????????????? while(queue.size() == 0){
  25. ??????????????????????? try {
  26. ??????????????????????????? System.out.println("隊列空,等待數據");
  27. ??????????????????????????? queue.wait();
  28. ??????????????????????? } catch (InterruptedException e) {
  29. ??????????????????????????? e.printStackTrace();
  30. ??????????????????????????? queue.notify();
  31. ??????????????????????? }
  32. ??????????????????? }
  33. ??????????????????? queue.poll();????????? //每次移走隊首元素
  34. ??????????????????? queue.notify();
  35. ??????????????????? System.out.println("從隊列取走一個元素,隊列剩余"+queue.size()+"個元素");
  36. ??????????????? }
  37. ??????????? }
  38. ??????? }
  39. ??? }
  40. ?
  41. ??? class Producer extends Thread{
  42. ?
  43. ??????? @Override
  44. ??????? public void run() {
  45. ??????????? produce();
  46. ??????? }
  47. ?
  48. ??????? private void produce() {
  49. ??????????? while(true){
  50. ??????????????? synchronized (queue) {
  51. ??????????????????? while(queue.size() == queueSize){
  52. ??????????????????????? try {
  53. ??????????????????????????? System.out.println("隊列滿,等待有空余空間");
  54. ??????????????????????????? queue.wait();
  55. ??????????????????????? } catch (InterruptedException e) {
  56. ??????????????????????????? e.printStackTrace();
  57. ??????????????????????????? queue.notify();
  58. ??????????????????????? }
  59. ??????????????????? }
  60. ??????????????????? queue.offer(1);??????? //每次插入一個元素
  61. ??????????????????? queue.notify();
  62. ??????????????????? System.out.println("向隊列取中插入一個元素,隊列剩余空間:"+(queueSize-queue.size()));
  63. ??????????????? }
  64. ??????????? }
  65. ??????? }
  66. ??? }
  67. }

這個是經典的生產者-消費者模式,通過阻塞隊列和Object.wait()和Object.notify()實現,wait()和notify()主要用來實現線程間通信。
具體的線程間通信方式(wait和notify的使用)在后續問章中會講述到。
下面是使用阻塞隊列實現的生產者-消費者模式:

  1. public class Test {
  2. private int queueSize = 10;
  3. private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
  4. public static void main(String[] args) {
  5. Test test = new Test();
  6. Producer producer = test.new Producer();
  7. Consumer consumer = test.new Consumer();
  8. producer.start();
  9. consumer.start();
  10. }
  11. class Consumer extends Thread{
  12. @Override
  13. public void run() {
  14. consume();
  15. }
  16. private void consume() {
  17. while(true){
  18. try {
  19. queue.take();
  20. System.out.println("從隊列取走一個元素,隊列剩余"+queue.size()+"個元素");
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. }
  26. }
  27. class Producer extends Thread{
  28. @Override
  29. public void run() {
  30. produce();
  31. }
  32. private void produce() {
  33. while(true){
  34. try {
  35. queue.put(1);
  36. System.out.println("向隊列取中插入一個元素,隊列剩余空間:"+(queueSize-queue.size()));
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. }
  42. }
  43. }


 有沒有發現,使用阻塞隊列代碼要簡單得多,不需要再單獨考慮同步和線程間通信的問題。
在并發編程中,一般推薦使用阻塞隊列,這樣實現可以盡量地避免程序出現意外的錯誤。
阻塞隊列使用最經典的場景就是socket客戶端數據的讀取和解析,讀取數據的線程不斷將數據放入隊列,然后解析線程不斷從隊列取數據解析。還有其他類似的場景,只要符合生產者-消費者模型的都可以使用阻塞隊列。


轉載于:https://www.cnblogs.com/signheart/p/6606475.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/393399.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/393399.shtml
英文地址,請注明出處:http://en.pswp.cn/news/393399.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

批量移動AD用戶到指定OU

作為域管理員&#xff0c;在日常工作中使用ADUC&#xff08;AD用戶和計算機&#xff09;工具在圖形界面中進行賬號管理操作可謂是家常便飯了。然而一個個增加、移動、刪除用戶&#xff0c;這樣操作有時真的夠煩&#xff0c;當管理大批量的賬戶時&#xff0c;重復操作浪費的時間…

vs 編譯說明

靜態編譯/MT&#xff0c;/MTD 是指使用libc和msvc相關的靜態庫(lib)。動態編譯&#xff0c;/MD&#xff0c;/MDd是指用相應的DLL版本編譯。其中字母含義 d&#xff1a;debug m&#xff1a;multi-threading(多線程) t&#xff1a;text代碼 d&#xff1a;dynamic(動態)…

python numeric_Python pandas.to_numeric函數方法的使用

pandas.to_numeric(arg, errorsraise, downcastNone) [source]將參數轉換為數字類型。默認返回dtype為float64或int64&#xff0c; 具體取決于提供的數據。使用downcast參數獲取其他dtype。請注意&#xff0c;如果傳入非常大的數字&#xff0c;則可能會導致精度損失。由…

javascript 分號_讓我們談談JavaScript中的分號

javascript 分號要使用它們&#xff0c;還是不使用它們… (To use them, or not to use them…) Semicolons in JavaScript divide the community. Some prefer to use them always, no matter what. Others like to avoid them.JavaScript中的分號分隔社區。 有些人更喜歡始終…

leetcode436. 尋找右區間(二分法)

給定一組區間&#xff0c;對于每一個區間 i&#xff0c;檢查是否存在一個區間 j&#xff0c;它的起始點大于或等于區間 i 的終點&#xff0c;這可以稱為 j 在 i 的“右側”。 對于任何區間&#xff0c;你需要存儲的滿足條件的區間 j 的最小索引&#xff0c;這意味著區間 j 有最…

python篇第6天【數據類型】

Python有五個標準的數據類型&#xff1a;Numbers&#xff08;數字&#xff09;String&#xff08;字符串&#xff09;List&#xff08;列表&#xff09;Tuple&#xff08;元組&#xff09;Dictionary&#xff08;字典&#xff09;Python數字數字數據類型用于存儲數值。他們是不…

如何確定Ionic是否適合您的項目

by Simon Grimm西蒙格里姆(Simon Grimm) 如何確定Ionic是否適合您的項目 (How to find out if Ionic is the right choice for your project) Ionic has been around for quite some years. With the latest release of version 4, it has become an even better option for d…

二維數組的查找 java_查找二維數組java的總和

我正在一個項目中&#xff0c;我必須讀取文件并將內容輸入2D數組。然后&#xff0c;我必須對每一行&#xff0c;每一列和矩陣的周長求和。到目前為止&#xff0c;除外圍功能外&#xff0c;我一切正常。我正在嘗試為兩個外部列的頂行&#xff0c;底行和中間創建單獨的for循環。矩…

遞歸法解決兔子問題

記得以前過相似問題&#xff0c;今天有同事問道&#xff0c;竟然不知所答&#xff0c;故寫篇文章以記之。 一般而言&#xff0c;兔子在出生兩個月后&#xff0c;就有繁殖能力&#xff0c;一對兔子每個月能生出一對小兔子來。如果所有兔子都不死&#xff0c;那么若干月以后可以繁…

mysql本地連接錯誤解決辦法

今天公司同事在測試服務器上死活不能用一個賬號在本地登陸,但是遠程就可以,于是我幫忙看了下,測試服務器的IP是10.10.2.226,錯誤如下:linux-0fdr:/home1/mysql_data # mysql -h 10.10.2.226 -u jxq2 -pjxq2ERROR 1045 (28000): Access denied for user jxq2linux-0fdr (using p…

leetcode546. 移除盒子(dp)

給出一些不同顏色的盒子&#xff0c;盒子的顏色由數字表示&#xff0c;即不同的數字表示不同的顏色。 你將經過若干輪操作去去掉盒子&#xff0c;直到所有的盒子都去掉為止。每一輪你可以移除具有相同顏色的連續 k 個盒子&#xff08;k > 1&#xff09;&#xff0c;這樣一輪…

408. Valid Word Abbreviation

題目&#xff1a; Given a non-empty string s and an abbreviation abbr, return whether the string matches with the given abbreviation. A string such as "word" contains only the following valid abbreviations: ["word", "1ord", &qu…

oracle常用操作指令

登錄oracle用戶: sqlplus 用戶名/密碼 創建用戶&#xff1a;create user 要創建的用戶名 identified by 當前用戶名; 授權&#xff1a;grant resource,connect to 要授權的用戶名; 刪除用戶&#xff1a;drop user 用戶名 創建表&#xff1a; create table student( id n…

java接收二進制數據_java-從套接字讀取二進制數據

我正在嘗試連接到服務器,然后向其發送HTTP請求(在這種情況下為GET).這個想法是請求一個文件,然后從服務器接收它.它應同時適用于文本文件和二進制文件(例如imgs).我對文本文件沒有任何問題,它可以完美工作,但是對二進制文件有一些麻煩.首先,我聲明一個BufferedReader(用于讀取標…

web開發入門_Web開發人員和設計師的自由職業入門

web開發入門Learn how to get started with freelancing as a web developer and designer. Cara Bell shares lessons and tips she has learned from her years as a freelancer.了解如何以網絡開發人員和設計師的身份開始自由職業。 卡拉貝爾(Cara Bell)分享了她從自由職業者…

leetcode1343. 大小為 K 且平均值大于等于閾值的子數組數目(隊列)

給你一個整數數組 arr 和兩個整數 k 和 threshold 。 請你返回長度為 k 且平均值大于等于 threshold 的子數組數目。 示例 1&#xff1a; 輸入&#xff1a;arr [2,2,2,2,5,5,5,8], k 3, threshold 4 輸出&#xff1a;3 解釋&#xff1a;子數組 [2,5,5],[5,5,5] 和 [5,5,8…

二分查找遞歸和非遞歸方法分析

遞歸實現&#xff1a; 自己寫的遞歸&#xff1a;多一個賦值操作&#xff0c;雖然可以得到正確的結果。但是比較難以理解。 問題&#xff1a;沒有深刻理解遞歸返回值。return會在遞歸調用到最后&#xff0c;在遞歸結束的地方&#xff0c;會將返回值一層一層返回給方法&#xff0…

BaseYii_autoload

BaseYii_autoload 判斷是否是classMap還是命名空間的 然后 轉換成 絕對路徑 include 文件  public static function autoload($className){      //classMap 一般都是類庫 官方 或者自定義類映射 if (isset(static::$classMap[$className])) {$classFile static::$cla…

sasl java_javaSASL_SSL帳號密碼方式訪問kafka

java SASL_SSL帳號密碼 方式訪問 kafkaProducer Java Sample java生產者:Properties props new Properties();props.put("bootstrap.servers","*******:9092,*******:9092");props.put("acks", "all");//props.put("retries&quo…

RedHat5.2下Linux Oracle 10g ASM 安裝詳細實錄-第二篇-ASM安裝

五、安裝ASM 1、在oracle網站下載支持包&#xff1a;http://www.oracle.com/technology ... x/asmlib/rhel5.html 2、根據linux內核下載相應的asm安裝包:根據uname –a查看內核&#xff08;黃底紅字為內核&#xff09;&#xff1a;$ uname -aLinux L-DB-3-6 2.6.18-92.el5 #1 S…