原文地址:http://benjaminwhx.com/2018/05/11/%E3%80%90%E7%BB%86%E8%B0%88Java%E5%B9%B6%E5%8F%91%E3%80%91%E8%B0%88%E8%B0%88LinkedBlockingQueue/
在集合框架里,想必大家都用過ArrayList和LinkedList,也經常在面試中問到他們之間的區別。ArrayList和ArrayBlockingQueue一樣,內部基于數組來存放元素,而LinkedBlockingQueue則和LinkedList一樣,內部基于鏈表來存放元素。
LinkedBlockingQueue實現了BlockingQueue接口,這里放一張類的繼承關系圖(圖片來自之前的文章:說說隊列Queue)
LinkedBlockingQueue不同于ArrayBlockingQueue,它如果不指定容量,默認為Integer.MAX_VALUE
,也就是無界隊列。所以為了避免隊列過大造成機器負載或者內存爆滿的情況出現,我們在使用的時候建議手動傳一個隊列的大小。
2、源碼分析
2.1 屬性
/*** 節點類,用于存儲數據*/ static class Node<E> {E item;Node<E> next;Node(E x) { item = x; } }/** 阻塞隊列的大小,默認為Integer.MAX_VALUE */ private final int capacity;/** 當前阻塞隊列中的元素個數 */ private final AtomicInteger count = new AtomicInteger();/*** 阻塞隊列的頭結點*/ transient Node<E> head;/*** 阻塞隊列的尾節點*/ private transient Node<E> last;/** 獲取并移除元素時使用的鎖,如take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock();/** notEmpty條件對象,當隊列沒有數據時用于掛起執行刪除的線程 */ private final Condition notEmpty = takeLock.newCondition();/** 添加元素時使用的鎖如 put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock();/** notFull條件對象,當隊列數據已滿時用于掛起執行添加的線程 */ private final Condition notFull = putLock.newCondition();
從上面的屬性我們知道,每個添加到LinkedBlockingQueue隊列中的數據都將被封裝成Node節點,添加的鏈表隊列中,其中head和last分別指向隊列的頭結點和尾結點。與ArrayBlockingQueue不同的是,LinkedBlockingQueue內部分別使用了takeLock 和 putLock 對并發進行控制,也就是說,添加和刪除操作并不是互斥操作,可以同時進行,這樣也就可以大大提高吞吐量。
這里如果不指定隊列的容量大小,也就是使用默認的Integer.MAX_VALUE,如果存在添加速度大于刪除速度時候,有可能會內存溢出,這點在使用前希望慎重考慮。
另外,LinkedBlockingQueue對每一個lock鎖都提供了一個Condition用來掛起和喚醒其他線程。
構造函數
public LinkedBlockingQueue() {// 默認大小為Integer.MAX_VALUEthis(Integer.MAX_VALUE); }public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null); }public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;putLock.lock();try {int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {putLock.unlock();} }
默認的構造函數和最后一個構造函數創建的隊列大小都為Integer.MAX_VALUE,只有第二個構造函數用戶可以指定隊列的大小。第二個構造函數最后初始化了last和head節點,讓它們都指向了一個元素為null的節點。
方法
同樣,LinkedBlockingQueue也有著和ArrayBlockingQueue一樣的方法,我們先來看看入隊列的方法。
2.3.1、入隊方法
LinkedBlockingQueue提供了多種入隊操作的實現來滿足不同情況下的需求,入隊操作有如下幾種:
- void put(E e);
- boolean offer(E e);
- boolean offer(E e, long timeout, TimeUnit unit)。
put(E e)
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;// 獲取鎖中斷 putLock.lockInterruptibly();try {//判斷隊列是否已滿,如果已滿阻塞等待while (count.get() == capacity) {notFull.await();}// 把node放入隊列中 enqueue(node);c = count.getAndIncrement();// 再次判斷隊列是否有可用空間,如果有喚醒下一個線程進行添加操作if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}// 如果隊列中有一條數據,喚醒消費線程進行消費if (c == 0)signalNotEmpty(); }
小結put方法來看,它總共做了以下情況的考慮:
- 隊列已滿,阻塞等待。
- 隊列未滿,創建一個node節點放入隊列中,如果放完以后隊列還有剩余空間,繼續喚醒下一個添加線程進行添加。如果放之前隊列中沒有元素,放完以后要喚醒消費線程進行消費。
offer(E e)
public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;if (count.get() == capacity)return false;int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {// 隊列有可用空間,放入node節點,判斷放入元素后是否還有可用空間,// 如果有,喚醒下一個添加線程進行添加操作。if (count.get() < capacity) {enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0; }
可以看到offer僅僅對put方法改動了一點點,當隊列沒有可用元素的時候,不同于put方法的阻塞等待,offer方法直接方法false。
offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {// 等待超時時間nanos,超時時間到了返回falsewhile (count.get() == capacity) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true; }
該方法只是對offer方法進行了阻塞超時處理,使用了Condition的awaitNanos來進行超時等待,這里為什么要用while循環?因為awaitNanos方法是可中斷的,為了防止在等待過程中線程被中斷,這里使用while循環進行等待過程中中斷的處理,繼續等待剩下需等待的時間。