大綱
1.并發安全的數組列表CopyOnWriteArrayList
2.并發安全的鏈表隊列ConcurrentLinkedQueue
3.并發編程中的阻塞隊列概述
4.JUC的各種阻塞隊列介紹
5.LinkedBlockingQueue的具體實現原理
6.基于兩個隊列實現的集群同步機制
4.JUC的各種阻塞隊列介紹
(1)基于數組的阻塞隊列ArrayBlockingQueue
(2)基于鏈表的阻塞隊列LinkedBlockingQueue
(3)優先級阻塞隊列PriorityBlockingQueue
(4)延遲阻塞隊列DelayQueue
(5)無存儲結構的阻塞隊列SynchronousQueue
(6)阻塞隊列結合體LinkedTransferQueue
(7)雙向阻塞隊列LinkedBlockingDeque
(1)基于數組的阻塞隊列ArrayBlockingQueue
ArrayBlockingQueue是一個基于數組實現的阻塞隊列。其構造方法可以指定:數組的長度、公平還是非公平、數組的初始集合。
ArrayBlockingQueue會通過ReentrantLock來解決線程競爭的問題,以及采用Condition來解決線程的喚醒與阻塞的問題。
//A bounded BlockingQueue backed by an array.
//This queue orders elements FIFO (first-in-first-out).
//The head of the queue is that element that has been on the queue the longest time.
//The tail of the queue is that element that has been on the queue the shortest time.
//New elements are inserted at the tail of the queue,
//and the queue retrieval operations obtain elements at the head of the queue.
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //The queued itemsfinal Object[] items;//items index for next take, poll, peek or removeint takeIndex;//items index for next put, offer, or addint putIndex;//Number of elements in the queueint count;//Main lock guarding all accessfinal ReentrantLock lock;//Condition for waiting takesprivate final Condition notEmpty;//Condition for waiting putsprivate final Condition notFull;//Creates an ArrayBlockingQueue with the given (fixed) capacity and default access policy.public ArrayBlockingQueue(int capacity) {this(capacity, false);}//Creates an ArrayBlockingQueue with the given (fixed) capacity and the specified access policy.public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0) {throw new IllegalArgumentException();}this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}//Inserts the specified element at the tail of this queue, //waiting for space to become available if the queue is full.public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length) {notFull.await();}enqueue(e);} finally {lock.unlock();}}//Inserts element at current put position, advances, and signals.//Call only when holding lock.private void enqueue(E x) {final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length) {putIndex = 0;}count++;notEmpty.signal();}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {notEmpty.await();}return dequeue();} finally {lock.unlock();}}//Returns the number of elements in this queue.public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return count;} finally {lock.unlock();}}...
}
(2)基于鏈表的阻塞隊列LinkedBlockingQueue
LinkedBlockingQueue是一個基于鏈表實現的阻塞隊列,它可以不指定阻塞隊列的長度,它的默認長度是Integer.MAX_VALUE。由于這個默認長度非常大,一般也稱LinkedBlockingQueue為無界隊列。
//An optionally-bounded BlockingQueue based on linked nodes.
//This queue orders elements FIFO (first-in-first-out).
//The head of the queue is that element that has been on the queue the longest time.
//The tail of the queue is that element that has been on the queue the shortest time.
//New elements are inserted at the tail of the queue,
//and the queue retrieval operations obtain elements at the head of the queue.
//Linked queues typically have higher throughput than array-based queues
//but less predictable performance in most concurrent applications.
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...//The capacity bound, or Integer.MAX_VALUE if noneprivate final int capacity;//Current number of elementsprivate final AtomicInteger count = new AtomicInteger();//Head of linked list.transient Node<E> head;//Tail of linked list.private transient Node<E> last;//Lock held by take, poll, etcprivate final ReentrantLock takeLock = new ReentrantLock();//Lock held by put, offer, etcprivate final ReentrantLock putLock = new ReentrantLock();//Wait queue for waiting takesprivate final Condition notEmpty = takeLock.newCondition();//Wait queue for waiting putsprivate final Condition notFull = putLock.newCondition();//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}//Inserts the specified element at the tail of this queue, //waiting if necessary for space to become available.public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity) {notFull.signal();}} finally {putLock.unlock();}if (c == 0) {signalNotEmpty();}}//Links node at end of queue.private void enqueue(Node<E> node) {last = last.next = node;}//Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock takeLock.)private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1) {notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity) {signalNotFull();}return x;}private E dequeue() {Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}public int size() {return count.get();}...
}
(3)優先級阻塞隊列PriorityBlockingQueue
PriorityBlockingQueue是一個支持自定義元素優先級的無界阻塞隊列。默認情況下添加的元素采用自然順序升序排列,當然可以通過實現元素的compareTo()方法自定義優先級規則。
PriorityBlockingQueue是基于數組實現的,這個數組會自動進行動態擴容。在應用方面,消息中間件可以基于優先級阻塞隊列來實現。
//An unbounded BlockingQueue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations.
//While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError).
//This class does not permit null elements.
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //Priority queue represented as a balanced binary heap: the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].//The priority queue is ordered by comparator, or by the elements' natural ordering, //if comparator is null: For each node n in the heap and each descendant d of n, n <= d.//The element with the lowest value is in queue[0], assuming the queue is nonempty.private transient Object[] queue;//The number of elements in the priority queue.private transient int size;//The comparator, or null if priority queue uses elements' natural ordering.private transient Comparator<? super E> comparator;//Lock used for all public operationsprivate final ReentrantLock lock;//Condition for blocking when emptyprivate final Condition notEmpty;//Spinlock for allocation, acquired via CAS.private transient volatile int allocationSpinLock;//Creates a PriorityBlockingQueue with the default initial capacity (11) that //orders its elements according to their Comparable natural ordering.public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);}//Creates a PriorityBlockingQueue with the specified initial capacity that //orders its elements according to their Comparable natural ordering.public PriorityBlockingQueue(int initialCapacity) {this(initialCapacity, null);}//Creates a PriorityBlockingQueue with the specified initial capacity that orders its elements according to the specified comparator.public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {if (initialCapacity < 1) {throw new IllegalArgumentException();}this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.comparator = comparator;this.queue = new Object[initialCapacity];}//Inserts the specified element into this priority queue.//As the queue is unbounded, this method will never block.public void put(E e) {offer(e); // never need to block}//Inserts the specified element into this priority queue.//As the queue is unbounded, this method will never return false.public boolean offer(E e) {if (e == null) {throw new NullPointerException();}final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length)) {tryGrow(array, cap);}try {Comparator<? super E> cmp = comparator;if (cmp == null) {siftUpComparable(n, e, array);} else {siftUpUsingComparator(n, e, array, cmp);}size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}//Tries to grow array to accommodate at least one more element (but normally expand by about 50%), //giving up (allowing retry) on contention (which we expect to be rare). Call only while holding lock.private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) {int minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE) {throw new OutOfMemoryError();}newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array) {newArray = new Object[newCap];}} finally {allocationSpinLock = 0;}}if (newArray == null) {// back off if another thread is allocatingThread.yield();}lock.lock();if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0) {break;}array[k] = e;k = parent;}array[k] = key;}private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) {while(k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (cmp.compare(x, (T) e) >= 0) {break;}array[k] = e;k = parent;}array[k] = x;}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null) {notEmpty.await();}} finally {lock.unlock();}return result;}public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return size;} finally {lock.unlock();}}...
}
(4)延遲阻塞隊列DelayQueue
DelayQueue是一個支持延遲獲取元素的無界阻塞隊列,它是基于優先級隊列PriorityQueue實現的。
往DelayQueue隊列插入元素時,可以按照自定義的delay時間進行排序。也就是隊列中的元素順序是按照到期時間排序的,只有delay時間小于或等于0的元素才能夠被取出。
DelayQueue的應用場景有:
一.訂單超時支付需要自動取消訂單
二.任務超時處理需要自動丟棄任務
三.消息中間件的實現
//An unbounded BlockingQueue of Delayed elements, in which an element can only be taken when its delay has expired.
//The head of the queue is that Delayed element whose delay expired furthest in the past.
//If no delay has expired there is no head and poll will return null.
//Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero.
//Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements.
//For example, the size method returns the count of both expired and unexpired elements.
//This queue does not permit null elements.
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();//Thread designated to wait for the element at the head of the queue.//When a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.//The leader thread must signal some other thread before returning from take() or poll(...), //unless some other thread becomes leader in the interim. //Whenever the head of the queue is replaced with an element with an earlier expiration time, //the leader field is invalidated by being reset to null, and some waiting thread, //but not necessarily the current leader, is signalled. //So waiting threads must be prepared to acquire and lose leadership while waiting.private Thread leader = null;//Condition signalled when a newer element becomes available at the head of the queue or a new thread may need to become leader.private final Condition available = lock.newCondition();//Creates a new {@code DelayQueue} that is initially empty.public DelayQueue() {}//Inserts the specified element into this delay queue. //As the queue is unbounded this method will never block.public void put(E e) {offer(e);}//Inserts the specified element into this delay queue.public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}}//Retrieves and removes the head of this queue, //waiting if necessary until an element with an expired delay is available on this queue.public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null) {available.await();} else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0) {return q.poll();} first = null; // don't retain ref while waitingif (leader != null) {available.await();} else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread) {leader = null;}}}}}} finally {if (leader == null && q.peek() != null) {available.signal();}lock.unlock();}}...
}public class PriorityQueue<E> extends AbstractQueue<E> implements java.io.Serializable {//Priority queue represented as a balanced binary heap: the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].//The priority queue is ordered by comparator, or by the elements' natural ordering, if comparator is null: //For each node n in the heap and each descendant d of n, n <= d. //The element with the lowest value is in queue[0], assuming the queue is nonempty.transient Object[] queue; //The number of elements in the priority queue.private int size = 0;//The comparator, or null if priority queue uses elements' natural ordering.private final Comparator<? super E> comparator;public E peek() {return (size == 0) ? null : (E) queue[0];}//Inserts the specified element into this priority queue.public boolean offer(E e) {if (e == null) {throw new NullPointerException();}modCount++;int i = size;if (i >= queue.length) {grow(i + 1);}size = i + 1;if (i == 0) {queue[0] = e;} else {siftUp(i, e);}return true;}//Increases the capacity of the array.private void grow(int minCapacity) {int oldCapacity = queue.length;// Double size if small; else grow by 50%int newCapacity = oldCapacity + ((oldCapacity < 64) ? (oldCapacity + 2) : (oldCapacity >> 1));// overflow-conscious codeif (newCapacity - MAX_ARRAY_SIZE > 0) {newCapacity = hugeCapacity(minCapacity);}queue = Arrays.copyOf(queue, newCapacity);}private void siftUp(int k, E x) {if (comparator != null) {siftUpUsingComparator(k, x);} else {siftUpComparable(k, x);}}@SuppressWarnings("unchecked")private void siftUpComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = queue[parent];if (key.compareTo((E) e) >= 0) {break;}queue[k] = e;k = parent;}queue[k] = key;}@SuppressWarnings("unchecked")private void siftUpUsingComparator(int k, E x) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = queue[parent];if (comparator.compare(x, (E) e) >= 0) {break;}queue[k] = e;k = parent;}queue[k] = x;}...
}
(5)無存儲結構的阻塞隊列SynchronousQueue
SynchronousQueue的內部沒有容器來存儲數據,因此當生產者往其添加一個元素而沒有消費者去獲取元素時,生產者會阻塞。當消費者往其獲取一個元素而沒有生產者去添加元素時,消費者也會阻塞。
SynchronousQueue的本質是借助了無容量存儲的特點,來實現生產者線程和消費者線程的即時通信,所以它特別適合在兩個線程之間及時傳遞數據。
線程池是基于阻塞隊列來實現生產者/消費者模型的。當向線程池提交任務時,首先會把任務放入阻塞隊列中,然后線程池中會有對應的工作線程專門處理阻塞隊列中的任務。
Executors.newCachedThreadPool()就是基于SynchronousQueue來實現的,它會返回一個可以緩存的線程池。如果這個線程池大小超過處理當前任務所需的數量,會靈活回收空閑線程。當任務數量增加時,這個線程池會不斷創建新的工作線程來處理這些任務。
public class Executors {...//Creates a thread pool that creates new threads as needed, //but will reuse previously constructed threads when they are available.//These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.//Calls to execute will reuse previously constructed threads if available. //If no existing thread is available, a new thread will be created and added to the pool. //Threads that have not been used for sixty seconds are terminated and removed from the cache. //Thus, a pool that remains idle for long enough will not consume any resources. //Note that pools with similar properties but different details (for example, timeout parameters)//may be created using ThreadPoolExecutor constructors.public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}...
}
(6)阻塞隊列結合體LinkedTransferQueue
LinkedTransferQueue是一個基于鏈表實現的無界阻塞TransferQueue。
阻塞隊列的特性是根據隊列的數據情況來阻塞生產者線程或消費者線程,TransferQueue的特性是生產者線程生產數據后必須等消費者消費才返回。
LinkedTransferQueue是TransferQueue和LinkedBlockingQueue的結合體,而SynchronousQueue內部其實也是基于TransferQueue來實現的,所以LinkedTransferQueue是帶有阻塞隊列功能的SynchronousQueue。
(7)雙向阻塞隊列LinkedBlockingDeque
LinkedBlockingDeque是一個基于鏈表實現的雙向阻塞隊列,雙向隊列的兩端都可以插入和移除元素,可減少多線程并發下的一半競爭。
5.LinkedBlockingQueue的具體實現原理
(1)阻塞隊列的設計分析
(2)有界隊列LinkedBlockingQueue
(3)LinkedBlockingQueue的put()方法
(4)LinkedBlockingQueue的take()方法
(5)LinkedBlockingQueue使用兩把鎖拆分鎖功能
(6)LinkedBlockingQueue的size()方法和迭代
(7)對比LinkedBlockingQueue鏈表隊列和ArrayBlockingQueue數組隊列
(1)阻塞隊列的設計分析
阻塞隊列的特性為:如果隊列為空,消費者線程會被阻塞。如果隊列滿了,生產者線程會被阻塞。
為了實現這個特性:如何讓線程在滿足某個特定條件的情況下實現阻塞和喚醒?阻塞隊列中的數據應該用什么樣的容器來存儲?
線程的阻塞和喚醒,可以使用wait/notify或者Condition。阻塞隊列中數據的存儲,可以使用數組或者鏈表。
(2)有界隊列LinkedBlockingQueue
一.并發安全的無界隊列
比如ConcurrentLinkedQueue,是沒有邊界沒有大小限制的。它就是一個單向鏈表,可以無限制的往里面去存放數據。如果不停地往無界隊列里添加數據,那么可能會導致內存溢出。
二.并發安全的有界隊列
比如LinkedBlockingQueue,是有邊界的有大小限制的。它也是一個單向鏈表,如果超過了限制,往隊列里添加數據就會被阻塞。因此可以限制內存隊列的大小,避免內存隊列無限增長,最后撐爆內存。
(3)LinkedBlockingQueue的put()方法
put()方法是阻塞添加元素的方法,當隊列滿時,阻塞添加元素的線程。
首先把添加的元素封裝成一個Node對象,該對象表示鏈表中的一個結點。
然后使用ReentrantLock.lockInterruptibly()方法來獲取一個可被中斷的鎖,加鎖的目的是保證數據添加到隊列過程中的安全性 + 避免隊列長度超閾值。
接著調用enqueue()方法把封裝的Node對象存儲到鏈表尾部,然后通過AtomicInteger來遞增當前阻塞隊列中的元素個數。
最后根據AtomicInteger類型的變量判斷隊列元素是否已超閾值。
注意:這里用到了一個很重要的屬性notFull。notFull是一個Condition對象,用來阻塞和喚醒生產者線程。如果隊列元素個數等于最大容量,就調用notFull.await()阻塞生產者線程。如果隊列元素個數小于最大容量,則調用notFull.signal()喚醒生產者線程。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...private final int capacity;//阻塞隊列的最大容量,默認是Integer.MAX_VALUEprivate final AtomicInteger count = new AtomicInteger();//阻塞隊列的元素個數transient Node<E> head;//鏈表的頭結點private transient Node<E> last;//鏈表的尾結點//使用兩把鎖,使put()和take()的鎖分離,提升并發性能private final ReentrantLock takeLock = new ReentrantLock();//出隊的鎖private final ReentrantLock putLock = new ReentrantLock();//入隊的鎖//使用兩個Condition,分別阻塞和喚醒出隊時的線程和入隊時的線程private final Condition notEmpty = takeLock.newCondition();//出隊的等待隊列conditionprivate final Condition notFull = putLock.newCondition();//入隊的等待隊列condition//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}//Inserts the specified element at the tail of this queue, //waiting if necessary for space to become available.public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;//將添加的元素封裝成一個Node對象Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//當前隊列元素的數量putLock.lockInterruptibly();//加可被中斷的鎖try {//注意:這里用到了一個很重要的屬性notFull,它是一個Condition對象,用來阻塞和喚醒生產者線程//如果阻塞隊列當前的元素個數等于最大容量,就調用notFull.await()方法來阻塞生產者線程while (count.get() == capacity) {notFull.await();//阻塞當前線程,并釋放鎖}//把封裝的Node對象存儲到鏈表中enqueue(node);//通過AtomicInteger來遞增當前阻塞隊列中的元素個數,用于后續判斷是否已超阻塞隊列的最大容量c = count.getAndIncrement();//根據AtomicInteger類型的變量判斷隊列元素是否已超閾值if (c + 1 < capacity) {notFull.signal();}} finally {putLock.unlock();//釋放鎖}if (c == 0) {signalNotEmpty();}}//Links node at end of queue.private void enqueue(Node<E> node) {//node先成為當前last的next//然后last又指向last的next(即node)last = last.next = node;}//Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock takeLock.)private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}...
}
(4)LinkedBlockingQueue的take()方法
take()方法是阻塞獲取元素的方法,當隊列為空時,阻塞獲取元素的線程。
首先使用ReentrantLock.lockInterruptibly()方法來獲取一個可被中斷的鎖。
然后判斷元素個數是否為0,若是則通過notEmpty.await()阻塞消費者線程。
否則接著調用dequeue()方法從鏈表頭部獲取一個元素,并通過AtomicInteger來遞減當前阻塞隊列中的元素個數。
最后判斷阻塞隊列中的元素個數是否大于1,如果是,則調用notEmpty.signal()喚醒被阻塞的消費者線程。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...private final int capacity;//阻塞隊列的最大容量,默認是Integer.MAX_VALUEprivate final AtomicInteger count = new AtomicInteger();//阻塞隊列的元素個數transient Node<E> head;//鏈表的頭結點private transient Node<E> last;//鏈表的尾結點//使用兩把鎖,使put()和take()的鎖分離,提升并發性能private final ReentrantLock takeLock = new ReentrantLock();//出隊的鎖private final ReentrantLock putLock = new ReentrantLock();//入隊的鎖//使用兩個Condition,分別阻塞和喚醒出隊時的線程和入隊時的線程private final Condition notEmpty = takeLock.newCondition();//出隊的等待隊列conditionprivate final Condition notFull = putLock.newCondition();//入隊的等待隊列condition//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;//獲取一個可中斷的鎖takeLock.lockInterruptibly();try {//判斷元素個數是否為0while (count.get() == 0) {notEmpty.await();//阻塞當前線程并釋放鎖}//調用dequeue()方法從鏈表中獲取一個元素x = dequeue();//通過AtomicInteger來遞減當前阻塞隊列中的元素個數c = count.getAndDecrement();//判斷阻塞隊列中的元素個數是否大于1//如果是,則調用notEmpty.signal()喚醒被阻塞的消費者消除if (c > 1) {notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity) {signalNotFull();}return x;}//首先獲取鏈表的頭結點head//然后拿到頭結點的下一個結點first//然后把原來的頭結點從隊列中移除,設置first結點的數據為null,并將first結點設置為新的頭結點//最后返回first結點的數據private E dequeue() {Node<E> h = head;//h指向headNode<E> first = h.next;//first指向h的nexth.next = h;// help GChead = first;E x = first.item;first.item = null;return x;}...
}
(5)LinkedBlockingQueue使用兩把鎖拆分鎖功能
兩把獨占鎖可以提升并發性能,因為出隊和入隊用的是不同的鎖。這樣在并發出隊和入隊的時候,出隊和入隊就可以同時執行,不會鎖沖突。
這也是鎖優化的一種思想,通過將一把鎖按不同的功能進行拆分,使用不同的鎖控制不同功能下的并發沖突,從而提升性能。
(6)LinkedBlockingQueue的size()方法和迭代
一.size()方法獲取的結果也不是100%準確
LinkedBlockingQueue的size()方法獲取元素個數是通過AtomicInteger獲取的。
相比ConcurrentLinkedQueue通過遍歷隊列獲取,準確性大很多。
相比CopyOnWriteArrayList通過遍歷老副本數組獲取,準確性也大很多。
但是相比ConcurrentHashMap通過分段CAS統計,那么準確性則差不多。
注意:LinkedBlockingQueue也不能獲取到100%準確的隊列元素的個數。除非鎖掉整個隊列,調用size()時不允許入隊和出隊,才會是100%準確。因為是完成入隊或出隊之后,才會對AtomicInteger變量進行遞增或遞減。
二.迭代時獲取兩把鎖來鎖整個隊列
LinkedBlockingQueue的遍歷會直接鎖整個隊列,即會先獲取兩把鎖。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {...private final AtomicInteger count = new AtomicInteger();//阻塞隊列的元素個數transient Node<E> head;//鏈表的頭結點private transient Node<E> last;//鏈表的尾結點//使用兩把鎖,使put()和take()的鎖分離,提升并發性能private final ReentrantLock takeLock = new ReentrantLock();//出隊的鎖private final ReentrantLock putLock = new ReentrantLock();//入隊的鎖public int size() {return count.get();}public Iterator<E> iterator() {return new Itr();}private class Itr implements Iterator<E> {private Node<E> current;private Node<E> lastRet;private E currentElement;Itr() {fullyLock();try {current = head.next;if (current != null) {currentElement = current.item;}} finally {fullyUnlock();}}...}void fullyLock() {putLock.lock();takeLock.lock();}void fullyUnlock() {takeLock.unlock();putLock.unlock();}...
}
(7)對比LinkedBlockingQueue鏈表隊列和ArrayBlockingQueue數組隊列
一.LinkedBlockingQueue是基于鏈表實現的有界阻塞隊列,ArrayBlockingQueue是基于數組實現的有界阻塞隊列
二.ArrayBlockingQueue的整體實現原理與LinkedBlockingQueue的整體實現原理是一樣的
三.LinkedBlockingQueue需要使用兩把獨占鎖,分別鎖出隊和入隊的場景
四.ArrayBlockingQueue只使用一把鎖,鎖整個數組,所以其入隊和出隊不能同時進行
五.ArrayBlockingQueue執行size()方法獲取元素個數時會直接加獨占鎖
六.ArrayBlockingQueue和LinkedBlockingQueue執行迭代方法時都會鎖數據
6.基于兩個隊列實現的集群同步機制
(1)服務注冊中心集群需要實現的功能
(2)基于兩個隊列實現的集群同步機制
(3)使用ConcurrentLinkedQueue實現第一個隊列
(4)使用LinkedBlockingQueue實現第二個隊列
(5)集群同步機制的具體實現
(1)服務注冊中心集群需要實現的功能
服務實例向任何一個服務注冊中心實例發送注冊、下線、心跳的請求,該服務注冊中心實例都需要將這些信息同步到其他的服務注冊中心實例上,從而確保所有服務注冊中心實例的內存注冊表的數據是一致的。
(2)基于兩個隊列實現的集群同步機制
某服務注冊中心實例接收到服務實例A的請求時,首先會把服務實例A的服務請求信息存儲到本地的內存注冊表里,也就是把服務實例A的服務請求信息寫到第一個內存隊列中,之后該服務注冊中心實例對服務實例A的請求處理就可以結束并返回。
接著該服務注冊中心實例會有一個后臺線程消費第一個內存隊列里的數據,把消費到的第一個內存隊列的數據batch打包然后寫到第二個內存隊列里。
最后該服務注冊中心實例還有一個后臺線程消費第二個內存隊列里的數據,把消費到的第二個內存隊列的數據同步到其他服務注冊中心實例中。
(3)使用ConcurrentLinkedQueue實現第一個隊列
首先有兩種隊列:
一是無界隊列ConcurrentLinkedQueue,基于CAS實現,并發性能很高。
二是有界隊列LinkedBlockingQueue,基于兩把鎖實現,并發性能一般。
LinkedBlockingQueue默認的隊列長度是MAX_VALUE,所以可以看成是無界隊列。但是也可以指定正常大小的隊列長度,從而實現入隊的阻塞,避免耗盡內存。
當服務注冊中心實例接收到各種請求時,會先將請求信息放入第一個隊列。所以第一個隊列會存在高并發寫的情況,因此LinkedBlockingQueue不合適。
因為LinkedBlockingQueue屬于阻塞隊列,如果LinkedBlockingQueue滿了,那么服務注冊中心實例中的,處理服務請求的線程,就會被阻塞住。而且LinkedBlockingQueue的并發性能也不是太高,要獲取獨占鎖才能寫,所以最好還是使用無界隊列ConcurrentLinkedQueue來實現第一個隊列。
(4)使用LinkedBlockingQueue實現第二個隊列
消費第一個內存隊列的數據時,可以按時間來進行batch打包,比如每隔500ms才將消費到的所有數據打包成一個batch消息。接著再將這個batch信息放入到第二個內存隊列中,這樣消費第二個隊列的數據時,只需同步batch信息到集群其他實例即可。
可見對第二個隊列進行的入隊和出隊操作是由少數的后臺線程來執行的,因此可以使用有界隊列LinkedBlockingQueue來實現第二個內存隊列。
此外還要估算有界隊列LinkedBlockingQueue的隊列長度應設置多少才合適。假如每一條需要同步給集群其他實例的請求信息,有6個字段,占30字節。平均每一條batch信息會包含100條請求信息,也就是會占3000字節 = 3KB。那么1000條batch消息,才占用3000KB = 3MB。因此可以設置第二個內存隊列LinkedBlockingQueue的長度為1000。
(5)集群同步機制的具體實現
//集群同步組件
public class PeersReplicator {//集群同步生成batch的間隔時間:500msprivate static final long PEERS_REPLICATE_BATCH_INTERVAL = 500;private static final PeersReplicator instance = new PeersReplicator();private PeersReplicator() {//啟動接收請求和打包batch的線程AcceptorBatchThread acceptorBatchThread = new AcceptorBatchThread();acceptorBatchThread.setDaemon(true); acceptorBatchThread.start();//啟動同步發送batch的線程PeersReplicateThread peersReplicateThread = new PeersReplicateThread();peersReplicateThread.setDaemon(true);peersReplicateThread.start(); }public static PeersReplicator getInstance() {return instance;}//第一個內存隊列:處理高并發的服務請求,所以存在高并發的寫入情況,無界隊列private ConcurrentLinkedQueue<AbstractRequest> acceptorQueue = new ConcurrentLinkedQueue<AbstractRequest>();//第二個內存隊列:有界隊列,用于同步batch消息到其他集群實例private LinkedBlockingQueue<PeersReplicateBatch> replicateQueue = new LinkedBlockingQueue<PeersReplicateBatch>(10000); //同步服務注冊請求public void replicateRegister(RegisterRequest request) {request.setType(AbstractRequest.REGISTER_REQUEST); //將請求消息放入第一個內存隊列acceptorQueue.offer(request);}//同步服務下線請求public void replicateCancel(CancelRequest request) {request.setType(AbstractRequest.CANCEL_REQUEST);//將請求消息放入第一個內存隊列acceptorQueue.offer(request);}//同步發送心跳請求public void replicateHeartbeat(HeartbeatRequest request) {request.setType(AbstractRequest.HEARTBEAT_REQUEST);//將請求消息放入第一個內存隊列acceptorQueue.offer(request);}//負責接收數據以及打包為batch的后臺線程class AcceptorBatchThread extends Thread {long latestBatchGeneration = System.currentTimeMillis();@Overridepublic void run() {while(true) {try {//每隔500ms生成一個batchPeersReplicateBatch batch = new PeersReplicateBatch();long now = System.currentTimeMillis();if (now - latestBatchGeneration >= PEERS_REPLICATE_BATCH_INTERVAL) {//已經到了500ms的時間間隔//將batch消息放入第二個內存隊列replicateQueue.offer(batch);//更新latestBatchGenerationlatestBatchGeneration = System.currentTimeMillis();//重置batchbatch = new PeersReplicateBatch();} else {//還沒到500ms的時間間隔//從第一層隊列獲取數據,然后batch放入到第二層隊列中AbstractRequest request = acceptorQueue.poll();if (request != null) {batch.add(request); } else {Thread.sleep(100);} }} catch (Exception e) {e.printStackTrace(); }}}}//集群同步線程class PeersReplicateThread extends Thread {@Overridepublic void run() {while(true) {try {PeersReplicateBatch batch = replicateQueue.take();if (batch != null) {//遍歷其他的register-server地址//給每個地址的register-server都發送一個http請求同步batchSystem.out.println("給其他的register-server發送請求,同步batch......"); }} catch (Exception e) {e.printStackTrace(); }}}}
}//用于進行批量同步的batch消息
public class PeersReplicateBatch {private List<AbstractRequest> requests = new ArrayList<AbstractRequest>();public void add(AbstractRequest request) {this.requests.add(request);}public List<AbstractRequest> getRequests() {return requests;}public void setRequests(List<AbstractRequest> requests) {this.requests = requests;}
}//負責接收和處理register-client發送過來的請求的
public class RegisterServerController {//服務注冊表private ServiceRegistry registry = ServiceRegistry.getInstance();//服務注冊表的緩存private ServiceRegistryCache registryCache = ServiceRegistryCache.getInstance();//集群同步組件private PeersReplicator peersReplicator = PeersReplicator.getInstance();//服務注冊public RegisterResponse register(RegisterRequest registerRequest) {RegisterResponse registerResponse = new RegisterResponse();try {//在注冊表中加入這個服務實例ServiceInstance serviceInstance = new ServiceInstance();serviceInstance.setHostname(registerRequest.getHostname()); serviceInstance.setIp(registerRequest.getIp()); serviceInstance.setPort(registerRequest.getPort()); serviceInstance.setServiceInstanceId(registerRequest.getServiceInstanceId()); serviceInstance.setServiceName(registerRequest.getServiceName());registry.register(serviceInstance);//更新自我保護機制的閾值synchronized (SelfProtectionPolicy.class) {SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() + 2);selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));}//過期掉注冊表緩存registryCache.invalidate();//進行集群同步peersReplicator.replicateRegister(registerRequest);registerResponse.setStatus(RegisterResponse.SUCCESS); } catch (Exception e) {e.printStackTrace(); registerResponse.setStatus(RegisterResponse.FAILURE);}return registerResponse;}//服務下線 public void cancel(CancelRequest cancelRequest) {//從服務注冊中摘除實例registry.remove(cancelRequest.getServiceName(), cancelRequest.getServiceInstanceId());//更新自我保護機制的閾值synchronized (SelfProtectionPolicy.class) {SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() - 2);selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));}//過期掉注冊表緩存registryCache.invalidate();//進行集群同步peersReplicator.replicateCancel(cancelRequest); }//發送心跳public HeartbeatResponse heartbeat(HeartbeatRequest heartbeatRequest) { HeartbeatResponse heartbeatResponse = new HeartbeatResponse();try {//獲取服務實例ServiceInstance serviceInstance = registry.getServiceInstance(heartbeatRequest.getServiceName(), heartbeatRequest.getServiceInstanceId());if (serviceInstance != null) {serviceInstance.renew();}//記錄一下每分鐘的心跳的次數HeartbeatCounter heartbeatMessuredRate = HeartbeatCounter.getInstance();heartbeatMessuredRate.increment();//進行集群同步peersReplicator.replicateHeartbeat(heartbeatRequest);heartbeatResponse.setStatus(HeartbeatResponse.SUCCESS); } catch (Exception e) {e.printStackTrace(); heartbeatResponse.setStatus(HeartbeatResponse.FAILURE); }return heartbeatResponse;}//同步batch數據public void replicateBatch(PeersReplicateBatch batch) {for (AbstractRequest request : batch.getRequests()) {if (request.getType().equals(AbstractRequest.REGISTER_REQUEST)) {register((RegisterRequest) request);} else if (request.getType().equals(AbstractRequest.CANCEL_REQUEST)) {cancel((CancelRequest) request);} else if (request.getType().equals(AbstractRequest.HEARTBEAT_REQUEST)) {heartbeat((HeartbeatRequest) request); }}}//拉取全量注冊表public Applications fetchFullRegistry() {return (Applications) registryCache.get(CacheKey.FULL_SERVICE_REGISTRY);}//拉取增量注冊表public DeltaRegistry fetchDeltaRegistry() {return (DeltaRegistry) registryCache.get(CacheKey.DELTA_SERVICE_REGISTRY); }
}