大綱
1.并發安全的數組列表CopyOnWriteArrayList
2.并發安全的鏈表隊列ConcurrentLinkedQueue
3.并發編程中的阻塞隊列概述
4.JUC的各種阻塞隊列介紹
5.LinkedBlockingQueue的具體實現原理
6.基于兩個隊列實現的集群同步機制
1.并發安全的數組列表CopyOnWriteArrayList
(1)CopyOnWriteArrayList的初始化
(2)基于鎖 + 寫時復制機制實現的增刪改操作
(3)使用寫時復制的原因是讀操作不加鎖 + 不使用Unsafe讀取數組元素
(4)對數組進行迭代時采用了副本快照機制
(5)核心思想是通過弱一致性提升讀并發
(6)寫時復制的總結
(1)CopyOnWriteArrayList的初始化
并發安全的HashMap是ConcurrentHashMap
并發安全的ArrayList是CopyOnWriteArrayList
并發安全的LinkedList是ConcurrentLinkedQueue
從CopyOnWriteArrayList的構造方法可知,CopyOnWriteArrayList基于Object對象數組實現。
這個Object對象數組array會使用volatile修飾,保證了多線程下的可見性。只要有一個線程修改了數組array,其他線程可以馬上讀取到最新值。
//A thread-safe variant of java.util.ArrayList in which all mutative operations
//(add, set, and so on) are implemented by making a fresh copy of the underlying array.
public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable { ...//The lock protecting all mutatorsfinal transient ReentrantLock lock = new ReentrantLock();//The array, accessed only via getArray/setArray.private transient volatile Object[] array;//Creates an empty list.public CopyOnWriteArrayList() {setArray(new Object[0]);}//Sets the array.final void setArray(Object[] a) {array = a;}...
}
(2)基于鎖 + 寫時復制機制實現的增刪改操作
一.使用獨占鎖解決對數組的寫寫并發問題
每個CopyOnWriteArrayList都有一個Object數組 + 一個ReentrantLock鎖。在對Object數組進行增刪改時,都要先獲取鎖,保證只有一個線程增刪改。從而確保多線程增刪改CopyOnWriteArrayList的Object數組是并發安全的。注意:獲取鎖的動作需要在執行getArray()方法前執行。
但因為獲取獨占鎖,所以導致CopyOnWriteArrayList的寫并發并性能不太好。而ConcurrentHashMap由于通過CAS設置 + 分段加鎖,所以寫并發性能很高。
二.使用寫時復制機制解決對數組的讀寫并發問題
CopyOnWrite就是寫時復制。寫數據時不直接在當前數組里寫,而是先把當前數組的數據復制到新數組里。然后再在新數組里寫數據,寫完數據后再將新數組賦值給array變量。這樣原數組由于沒有了array變量的引用,很快就會被JVM回收掉。
其中會使用System.arraycopy()方法和Arrays.copyOf()方法來復制數據到新數組,從Arrays.copyOf(elements, len + 1)可知,新數組的大小比原數組大小多1。
所以CopyOnWriteArrayList不需要進行數組擴容,這與ArrayList不一樣。ArrayList會先初始化一個固定大小的數組,然后數組大小達到閾值時會擴容。
三.總結
為了解決CopyOnWriteArrayList的數組寫寫并發問題,使用了鎖。
為了解決CopyOnWriteArrayList的數組讀寫并發問題,使用了寫時復制。
所以CopyOnWriteArrayList可以保證多線程對數組寫寫 + 讀寫的并發安全。
//A thread-safe variant of java.util.ArrayList in which all mutative operations
//(add, set, and so on) are implemented by making a fresh copy of the underlying array.
public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable { ...//The lock protecting all mutatorsfinal transient ReentrantLock lock = new ReentrantLock();//The array, accessed only via getArray/setArray.private transient volatile Object[] array;//Creates an empty list.public CopyOnWriteArrayList() {setArray(new Object[0]);}//Sets the array.final void setArray(Object[] a) {array = a;}//Gets the array. Non-private so as to also be accessible from CopyOnWriteArraySet class.final Object[] getArray() {return array;}//增:Appends the specified element to the end of this list.public boolean add(E e) {final ReentrantLock lock = this.lock;lock.lock();try {Object[] elements = getArray();int len = elements.length;Object[] newElements = Arrays.copyOf(elements, len + 1);newElements[len] = e;setArray(newElements);return true;} finally {lock.unlock();}}//刪:Removes the element at the specified position in this list.//Shifts any subsequent elements to the left (subtracts one from their indices). //Returns the element that was removed from the list.public E remove(int index) {final ReentrantLock lock = this.lock;lock.lock();try {Object[] elements = getArray();int len = elements.length;E oldValue = get(elements, index);int numMoved = len - index - 1;if (numMoved == 0) {setArray(Arrays.copyOf(elements, len - 1));} else {//先創建新數組,新數組的大小為len-1,比原數組的大小少1Object[] newElements = new Object[len - 1];//把原數組里從0開始拷貝index個元素到新數組里,并且從新數組的0位置開始放置System.arraycopy(elements, 0, newElements, 0, index);//把原數組從index+1開始拷貝numMoved個元素到新數組里,并且從新數組的index位置開始放置;System.arraycopy(elements, index + 1, newElements, index, numMoved);setArray(newElements);}return oldValue;} finally {lock.unlock();}}//改:Replaces the element at the specified position in this list with the specified element.public E set(int index, E element) {final ReentrantLock lock = this.lock;lock.lock();try {Object[] elements = getArray();E oldValue = get(elements, index);if (oldValue != element) {int len = elements.length;Object[] newElements = Arrays.copyOf(elements, len);newElements[index] = element;setArray(newElements);} else {//Not quite a no-op; ensures volatile write semanticssetArray(elements);}return oldValue;} finally {lock.unlock();}}...
}
(3)使用寫時復制的原因是讀操作不加鎖 + 不使用Unsafe讀取數組元素
CopyOnWriteArrayList的增刪改采用寫時復制的原因在于get操作不需加鎖。get操作就是先獲取array數組,然后再通過index定位返回對應位置的元素。
由于在寫數據的時候,首先更新的是復制了原數組數據的新數組。所以同一時間大量的線程讀取數組數據時,都會讀到原數組的數據,因此讀寫之間不會出現并發沖突的問題。
而且在寫數據的時候,在更新完新數組之后,才會更新volatile修飾的數組變量。所以讀操作只需要直接對volatile修飾的數組變量進行讀取,就能獲取最新的數組值。
如果不使用寫時復制機制,那么即便有寫線程先更新了array引用的數組中的元素,后續的讀線程也只是具有對使用volatile修飾的array引用的可見性,而不會具有對array引用的數組中的元素的可見性。所以此時只要array引用沒有發生改變,讀線程還是會讀到舊的元素,除非使用Unsafe.getObjectVolatile()方法來獲取array引用的數組的元素。
public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {...//The array, accessed only via getArray/setArray.private transient volatile Object[] array;//Gets the array. Non-private so as to also be accessible from CopyOnWriteArraySet class.final Object[] getArray() {return array;}public E get(int index) {//先通過getArray()方法獲取array數組,然后再通過get()方法定位到數組某位置的元素return get(getArray(), index);}private E get(Object[] a, int index) {return (E) a[index];}...
}
(4)對數組進行迭代時采用了副本快照機制
CopyOnWriteArrayList的Iterator迭代器里有一個快照數組snapshot,該數組指向的就是創建迭代器時CopyOnWriteArrayList的當前數組array。
所以使用CopyOnWriteArrayList的迭代器進行迭代時,會遍歷快照數組。此時如果有其他線程更新了數組array,也不會影響迭代的過程。
public class CopyOnWriteArrayListDemo {static List<String> list = new CopyOnWriteArrayList<String>();public static void main(String[] args) {list.add("k");System.out.println(list);Iterator<String> iterator = list.iterator();while (iterator.hasNext()) {System.out.println(iterator.next());}}
}public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {...public Iterator<E> iterator() {return new COWIterator<E>(getArray(), 0);}...static final class COWIterator<E> implements ListIterator<E> {private final Object[] snapshot;private int cursor;private COWIterator(Object[] elements, int initialCursor) {cursor = initialCursor;snapshot = elements;}...}...
}
(5)核心思想是通過最終一致性提升讀并發
CopyOnWriteArrayList的核心思想是通過弱一致性來提升讀寫并發的能力。
CopyOnWriteArrayList基于寫時復制機制存在的最大問題是最終一致性。
多個線程并發讀寫數組,寫線程已將新數組修改好,但還沒設置給array。此時其他讀線程讀到的(get或者迭代)都是數組array的數據,于是在同一時刻,讀線程和寫線程看到的數據是不一致的。這就是寫時復制機制存在的問題:最終一致性或弱一致性。
(6)寫時復制的總結
一.優點
讀讀不互斥,讀寫不互斥,寫寫互斥。同一時間只有一個線程可以寫,寫的同時允許其他線程來讀。
二.缺點
空間換時間,寫的時候內存里會出現一模一樣的副本,對內存消耗大。通過數組副本可以保證大量的讀不需要和寫互斥。如果數組很大,可能要考慮內存占用會是數組大小的幾倍。此外使用數組副本來統計數據,會存在統計數據不一致的問題。
三.使用場景
適用于讀多寫少的場景,這樣大量的讀操作不會被寫操作影響,而且不要求統計數據具有實時性。
2.并發安全的鏈表隊列ConcurrentLinkedQueue
(1)ConcurrentLinkedQueue的介紹
(2)ConcurrentLinkedQueue的構造方法
(3)ConcurrentLinkedQueue的offer()方法
(4)ConcurrentLinkedQueue的poll()方法
(5)ConcurrentLinkedQueue的peak()方法
(6)ConcurrentLinkedQueue的size()方法
(1)ConcurrentLinkedQueue的介紹
ConcurrentLinkedQueue是一種并發安全且非阻塞的鏈表隊列(無界隊列)。
ConcurrentLinkedQueue采用CAS機制來保證多線程操作隊列時的并發安全。
鏈表隊列會采用先進先出的規則來對結點進行排序。每次往鏈表隊列添加元素時,都會添加到隊列的尾部。每次需要獲取元素時,都會直接返回隊列頭部的元素。
并發安全的HashMap是ConcurrentHashMap
并發安全的ArrayList是CopyOnWriteArrayList
并發安全的LinkedList是ConcurrentLinkedQueue
(2)ConcurrentLinkedQueue的構造方法
ConcurrentLinkedQueue是基于鏈表實現的,鏈表結點為其內部類Node。
ConcurrentLinkedQueue的構造方法會初始化鏈表的頭結點和尾結點為同一個值為null的Node對象。
Node結點通過next指針指向下一個Node結點,從而組成一個單向鏈表。而ConcurrentLinkedQueue的head和tail兩個指針指向了鏈表的頭和尾結點。
public class ConcurrentLinkedQueueDemo {public static void main(String[] args) {ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();queue.offer("張三");//向隊尾添加元素queue.offer("李四");//向隊尾添加元素queue.offer("王五");//向隊尾添加元素System.out.println(queue.peek());//返回隊頭的元素不出隊System.out.println(queue.poll());//返回隊頭的元素而且出隊System.out.println(queue.peek());//返回隊頭的元素不出隊}
}//An unbounded thread-safe queue based on linked nodes.
//This queue orders elements FIFO (first-in-first-out).
//The head of the queue is that element that has been on the queue the longest time.
//The tail of the queue is that element that has been on the queue the shortest time.
//New elements are inserted at the tail of the queue,
//and the queue retrieval operations obtain elements at the head of the queue.
//A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection.
//Like most other concurrent collection implementations, this class does not permit the use of null elements.
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable {...private transient volatile Node<E> head;private transient volatile Node<E> tail;//構造方法,初始化鏈表隊列的頭結點和尾結點為同一個值為null的Node對象//Creates a ConcurrentLinkedQueue that is initially empty.public ConcurrentLinkedQueue() {head = tail = new Node<E>(null);}private static class Node<E> {volatile E item;volatile Node<E> next;private static final sun.misc.Unsafe UNSAFE;private static final long itemOffset;private static final long nextOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = Node.class;itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}}Node(E item) {UNSAFE.putObject(this, itemOffset, item);}boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val);}boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}}...
}
(3)ConcurrentLinkedQueue的offer()方法
其中關鍵的代碼就是"p.casNext(null, newNode))",就是把p的next指針由原來的指向空設置為指向新的結點,并且通過CAS確保同一時間只有一個線程可以成功執行這個操作。
注意:更新tail指針并不是實時更新的,而是隔一個結點再更新。這樣可以減少CAS指令的執行次數,從而降低CAS操作帶來的性能影響。
插入第一個元素后,tail指針指向倒數第二個節點。
插入第二個元素后,tail指針指向最后一個節點。
插入第三個元素后,tail指針指向倒數第二個節點。
插入第四個元素后,tail指針指向最后一個節點。
//An unbounded thread-safe queue based on linked nodes.
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable {...private transient volatile Node<E> head;private transient volatile Node<E> tail;private static final sun.misc.Unsafe UNSAFE;private static final long headOffset;private static final long tailOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = ConcurrentLinkedQueue.class;headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail"));} catch (Exception e) {throw new Error(e);}}//構造方法,初始化鏈表隊列的頭結點和尾結點為同一個值為null的Node對象//Creates a ConcurrentLinkedQueue that is initially empty.public ConcurrentLinkedQueue() {head = tail = new Node<E>(null);}public boolean offer(E e) {checkNotNull(e);final Node<E> newNode = new Node<E>(e);//插第一個元素時, tail和head都是初始化時的空節點, p也指向該空節點, q是該空節點的next元素;//很明顯q是null, p.casNext后, p的next設為第一個元素, 此時p和t相等, tail的next是第一個元素;//由于p==t, 于是返回true, head和tail還是指向初始化時的空節點, tail指針指向的是倒數第二個節點;//插第二個元素時, q成為第一個元素,不為null了, 而且p指向tail, tail的next是第一個元素, 所以p != q;//由于此時p和t還是一樣的, 所以會將q賦值給p, 也就是p指向第一個元素了, 再次進行新一輪循環;//新一輪循環時, q指向第一個元素的next成為null, 所以會對第一個元素執行casNext操作;//也就是將第二個元素設為第一個元素的next, 設完后由于p和t不相等了, 會執行casTail設第二個元素為tail;//插入第三個元素時, 又會和插入第一個元素一樣了, 這時tail指針指向的是倒數第二個節點;//插入第四個元素時, 和插入第二個元素一樣, 這是tail指針指向的是最后一個節點;for (Node<E> t = tail, p = t;;) {Node<E> q = p.next;//p是尾結點,q是尾結點的下一個結點if (q == null) {//插入第一個元素時執行的代碼if (p.casNext(null, newNode)) {//將新結點設置為尾結點的下一個結點if (p != t) {//隔一個結點再CAS更新tail指針casTail(t, newNode);}return true;}} else if (p == q) {p = (t != (t = tail)) ? t : head;} else {//插入第二個元素時執行的代碼p = (p != t && t != (t = tail)) ? t : q;}}}private boolean casTail(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);}...
}
(4)ConcurrentLinkedQueue的poll()方法
poll()方法會將鏈表隊列的頭結點出隊。
注意:更新head指針時也不是實時更新的,而是隔一個結點再更新。這樣可以減少CAS指令的執行次數,從而降低CAS操作帶來的性能影響。
//An unbounded thread-safe queue based on linked nodes.
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable {...private transient volatile Node<E> head;private transient volatile Node<E> tail;private static final sun.misc.Unsafe UNSAFE;private static final long headOffset;private static final long tailOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = ConcurrentLinkedQueue.class;headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail"));} catch (Exception e) {throw new Error(e);}}//構造方法,初始化鏈表隊列的頭結點和尾結點為同一個值為null的Node對象//Creates a ConcurrentLinkedQueue that is initially empty.public ConcurrentLinkedQueue() {head = tail = new Node<E>(null);}public E poll() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {E item = p.item;if (item != null && p.casItem(item, null)) {if (p != h) {//隔一個結點才CAS更新head指針updateHead(h, ((q = p.next) != null) ? q : p);}return item;} else if ((q = p.next) == null) {updateHead(h, p);return null;} else if (p == q) {continue restartFromHead;} else {p = q;}}}}final void updateHead(Node<E> h, Node<E> p) {if (h != p && casHead(h, p)) {h.lazySetNext(h);}}private boolean casHead(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);}...
}
(5)ConcurrentLinkedQueue的peak()方法
peek()方法會獲取鏈表的頭結點,但是不會出隊。
//An unbounded thread-safe queue based on linked nodes.
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable {...private transient volatile Node<E> head;public E peek() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {E item = p.item;if (item != null || (q = p.next) == null) {updateHead(h, p);return item;} else if (p == q) {continue restartFromHead;} else {p = q;}}}}final void updateHead(Node<E> h, Node<E> p) {if (h != p && casHead(h, p)) {h.lazySetNext(h);}}private boolean casHead(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);}...
}
(6)ConcurrentLinkedQueue的size()方法
size()方法主要用來返回鏈表隊列的大小,查看鏈表隊列有多少個元素。size()方法不會加鎖,會直接從頭節點開始遍歷鏈表隊列中的每個結點。
//An unbounded thread-safe queue based on linked nodes.
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable {...public int size() {int count = 0;for (Node<E> p = first(); p != null; p = succ(p))if (p.item != null) {if (++count == Integer.MAX_VALUE) {break;}}}return count;}//Returns the first live (non-deleted) node on list, or null if none.//This is yet another variant of poll/peek; here returning the first node, not element.//We could make peek() a wrapper around first(), but that would cost an extra volatile read of item,//and the need to add a retry loop to deal with the possibility of losing a race to a concurrent poll(). Node<E> first() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {boolean hasItem = (p.item != null);if (hasItem || (q = p.next) == null) {updateHead(h, p);return hasItem ? p : null;} else if (p == q) {continue restartFromHead;} else {p = q;}}}}//Returns the successor of p, or the head node if p.next has been linked to self, //which will only be true if traversing with a stale pointer that is now off the list.final Node<E> succ(Node<E> p) {Node<E> next = p.next;return (p == next) ? head : next;}...
}
如果在遍歷的過程中,有線程執行入隊或者是出隊的操作,此時會怎樣?
從隊頭開始遍歷,遍歷到一半時:如果有線程在隊列尾部進行入隊操作,此時的遍歷能及時看到新添加的元素。因為入隊操作就是設置隊列尾部節點的next指針指向新添加的結點,而入隊時設置next指針屬于volatile寫,因此遍歷時是可以看到的。如果有線程從隊列頭部進行出隊操作,此時的遍歷則無法感知有元素出隊了。
所以可以總結出這些并發安全的集合:ConcurrentHashMap、CopyOnWriteArrayList和ConcurrentLinkedQueue,為了優化多線程下的并發性能,會犧牲掉統計數據的一致性。為了保證多線程寫的高并發性能,會大量采用CAS進行無鎖化操作。同時會讓很多讀操作比如常見的size()操作,不使用鎖。因此使用這些并發安全的集合時,要考慮并發下的統計數據的不一致問題。
3.并發編程中的阻塞隊列概述
(1)什么是阻塞隊列
(2)阻塞隊列提供的方法
(3)阻塞隊列的應用
(1)什么是阻塞隊列
隊列是一種只允許在一端進行移除操作、在另一端進行插入操作的線性表,隊列中允許插入的一端稱為隊尾,允許移除的一端稱為隊頭。
阻塞隊列就是在隊列的基礎上增加了兩個操作:
一.支持阻塞插入
在隊列滿時會阻塞繼續往隊列中添加數據的線程,直到隊列中有元素被釋放。
二.支持阻塞移除
在隊列空時會阻塞從隊列中獲取元素的線程,直到隊列中添加了新的元素。
阻塞隊列其實實現了一個生產者/消費者模型:生產者往隊列中添加數據,消費者從隊列中獲取數據。隊列滿了阻塞生產者,隊列空了阻塞消費者。
阻塞隊列中的元素可能會使用數組或者鏈表等來進行存儲。一個隊列中能容納多少個元素取決于隊列的容量大小,因此阻塞隊列也分為有界隊列和無界隊列。
有界隊列指有固定大小的隊列,無界隊列指沒有固定大小的隊列。實際上無界隊列也是有大小限制的,只是大小限制為非常大,可認為無界。
注意:在無界隊列中,由于理論上不存在隊列滿的情況,所以不存在阻塞。
阻塞隊列在很多地方都會用到,比如線程池、ZooKeeper。一般使用阻塞隊列來實現生產者/消費者模型。
(2)阻塞隊列提供的方法
阻塞隊列的操作有插入、移除、檢查,在隊列滿或者空時會有不同的效果。
一.拋出異常
當隊列滿的時候通過add(e)方法添加元素,會拋出異常。
當隊列空的時候調用remove(e)方法移除元素,也會拋出異常。
二.返回特殊值
調用offer(e)方法向隊列入隊元素時,會返回添加結果true或false。
調用poll()方法從隊列出隊元素時,會從隊列取出一個元素或null。
三.一直阻塞
在隊列滿了的情況下,調用插入方法put(e)向隊列中插入元素時,隊列會阻塞插入元素的線程,直到隊列不滿或者響應中斷才退出阻塞。
在隊列空了的情況下,調用移除方法take()從隊列移除元素時,隊列會阻塞移除元素的線程,直到隊列不為空時喚醒線程。
四.超時退出
超時退出其實就是在offer()和poll()方法中增加了阻塞的等待時間。
(3)阻塞隊列的應用
阻塞隊列可以理解為線程級別的消息隊列。
消息中間件可以理解為進程級別的消息隊列。
所以可以通過阻塞隊列來緩存線程的請求,從而達到流量削峰的目的。