概述
java.util.concurrent 包是專為 Java并發編程
而設計的包。包下的所有類可以分為如下幾大類:
- locks部分:顯式鎖(互斥鎖和速寫鎖)相關;
- atomic部分:原子變量類相關,是構建非阻塞算法的基礎;
- executor部分:線程池相關;
- collections部分:并發容器相關;
- tools部分:同步工具相關,如信號量、閉鎖、柵欄等功能;
類圖結構:
腦圖地址: http://www.xmind.net/m/tJy5,感謝深入淺出 Java Concurrency ,此腦圖在這篇基礎上修改而來。
BlockingQueue
此接口是一個線程安全
的 存取實例
的隊列。
使用場景
BlockingQueue通常用于一個線程生產對象,而另外一個線程消費這些對象的場景。
注意事項:
- 此隊列是
有限的
,如果隊列到達臨界點,Thread1
就會阻塞,直到Thread2
從隊列中拿走一個對象。 - 若果隊列是空的,
Thread2
會阻塞,直到Thread1
把一個對象丟進隊列。
相關方法
BlockingQueue中包含了如下操作方法:
Throws Exception | Special Value | Blocks | Times Out | |
---|---|---|---|---|
Insert | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
Remove | remove(o) | poll() | take() | poll(timeout, timeunit) |
Examine | element() | peek() |
名詞解釋:
- Throws Exception: 如果試圖的操作無法立即執行,拋一個異常。
- Special Value: 如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。
- Blocks: 如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行。
- Times Out: 如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。
注意事項:
- 無法插入 null,否則會拋出一個 NullPointerException。
- 隊列這種數據結構,導致除了獲取開始和結尾位置的其他對象的效率都不高,雖然可通過
remove(o)
來移除任一對象。
實現類
因為是一個接口,所以我們必須使用一個實現類來使用它,有如下實現類:
- ArrayBlockingQueue: 數組阻塞隊列
- DelayQueue: 延遲隊列
- LinkedBlockingQueue: 鏈阻塞隊列
- PriorityBlockingQueue: 具有優先級的阻塞隊列
- SynchronousQueue: 同步隊列
使用示例:
見: BlockingQueue
ArrayBlockingQueue
ArrayBlockingQueue 是一個有界的阻塞隊列
- 內部實現是將對象放到一個數組里。數組有個特性:一旦初始化,大小就無法修改。因此無法修改
ArrayBlockingQueue
初始化時的上限。 ArrayBlockingQueue
內部以FIFO(先進先出)
的順序對元素進行存儲。隊列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。
DelayQueue
DelayQueue 對元素進行持有直到一個特定的延遲到期。注入其中的元素必須實現 java.util.concurrent.Delayed 接口:
public interface Delayed extends Comparable<Delayed< { public long getDelay(TimeUnit timeUnit); // 返回將要延遲的時間段
}
- 1
- 2
- 3
- 在每個元素的 getDelay() 方法返回的值的時間段之后才釋放掉該元素。如果返回的是 0 或者負值,延遲將被認為過期,該元素將會在 DelayQueue 的下一次 take 被調用的時候被釋放掉。
Delayed
接口也繼承了java.lang.Comparable
接口,Delayed
對象之間可以進行對比。這對DelayQueue
隊列中的元素進行排序時有用,因此它們可以根據過期時間進行有序釋放。
LinkedBlockingQueue
內部以一個鏈式結構(鏈接節點)對其元素進行存儲 。
- 可以選擇一個上限。如果沒有定義上限,將使用 Integer.MAX_VALUE 作為上限。
- 內部以 FIFO(先進先出)的順序對元素進行存儲。
PriorityBlockingQueue
一個無界的并發隊列,它使用了和類 java.util.PriorityQueue 一樣的排序規則。
- 無法向這個隊列中插入 null 值。
- 插入到 其中的元素必須實現 java.lang.Comparable 接口。
- 對于具有相等優先級(compare() == 0)的元素并不強制任何特定行為。
- 從一個 PriorityBlockingQueue 獲得一個 Iterator 的話,該 Iterator 并不能保證它對元素的遍歷是以優先級為序的。
SynchronousQueue
一個特殊的隊列,它的內部同時只能夠容納單個元素。
- 如果該隊列已有一元素的話,試圖向隊列中插入一個新元素的線程將會阻塞,直到另一個線程將該元素從隊列中抽走。
- 如果該隊列為空,試圖向隊列中抽取一個元素的線程將會阻塞,直到另一個線程向隊列中插入了一條新的元素。
BlockingDeque
此接口表示一個線程安全
放入和提取實例的雙端隊列
。
使用場景
通常用在一個線程既是生產者又是消費者的時候。
注意事項
- 如果雙端隊列已滿,插入線程將被阻塞,直到一個移除線程從該隊列中移出了一個元素。
- 如果雙端隊列為空,移除線程將被阻塞,直到一個插入線程向該隊列插入了一個新元素。
相關方法
Throws Exception | Special Value | Blocks | Times Out | |
---|---|---|---|---|
Insert | addFirst(o) | offerFirst(o) | putFirst(o) | offerFirst(o, timeout, timeunit) |
Remove | removeFirst(o) | pollFirst(o) | takeFirst(o) | pollFirst(timeout, timeunit) |
Examine | getFirst(o) | peekFirst(o) |
Throws Exception | Special Value | Blocks | Times Out | |
---|---|---|---|---|
Insert | addLast(o) | offerLast(o) | putLast(o) | offerLast(o, timeout, timeunit) |
Remove | removeLast(o) | pollLast(o) | takeLast(o) | pollLast(timeout, timeunit) |
Examine | getLast(o) | peekLast(o) |
注意事項
- 關于方法的處理方式和上節一樣。
- BlockingDeque 接口繼承自 BlockingQueue 接口,可以用其中定義的方法。
實現類
- LinkedBlockingDeque : 鏈阻塞雙端隊列
LinkedBlockingDeque
LinkedBlockingDeque 是一個雙端隊列,可以從任意一端插入或者抽取元素的隊列。
- 在它為空的時候,一個試圖從中抽取數據的線程將會阻塞,無論該線程是試圖從哪一端抽取數據。
ConcurrentMap
一個能夠對別人的訪問(插入和提取)進行并發處理的 java.util.Map接口。
ConcurrentMap 除了從其父接口 java.util.Map 繼承來的方法之外還有一些額外的原子性方法。
實現類
因為是接口,必須用實現類來使用它,其實現類為
- ConcurrentHashMap
ConcurrentHashMap與HashTable比較
- 更好的并發性能,在你從中讀取對象的時候 ConcurrentHashMap 并不會把整個 Map 鎖住,只是把 Map 中正在被寫入的部分進行鎖定。
- 在被遍歷的時候,即使是 ConcurrentHashMap 被改動,它也不會拋 ConcurrentModificationException。
ConcurrentNavigableMap
一個支持并發訪問的 java.util.NavigableMap,它還能讓它的子 map 具備并發訪問的能力。
headMap
headMap(T toKey) 方法返回一個包含了小于給定 toKey 的 key 的子 map。
tailMap
tailMap(T fromKey) 方法返回一個包含了不小于給定 fromKey 的 key 的子 map。
subMap
subMap() 方法返回原始 map 中,鍵介于 from(包含) 和 to (不包含) 之間的子 map。
更多方法
- descendingKeySet()
- descendingMap()
- navigableKeySet()
CountDownLatch
CountDownLatch 是一個并發構造,它允許一個或多個線程等待一系列指定操作的完成。
- CountDownLatch 以一個給定的數量初始化。countDown() 每被調用一次,這一數量就減一。
- 通過調用 await() 方法之一,線程可以阻塞等待這一數量到達零。
CyclicBarrier
CyclicBarrier 類是一種同步機制,它能夠對處理一些算法的線程實現同步。
更多實例參考: CyclicBarrier
Exchanger
Exchanger 類表示一種兩個線程可以進行互相交換對象的會和點。
更多實例參考: Exchanger
Semaphore
Semaphore 類是一個計數信號量。具備兩個主要方法:
- acquire()
- release()
- 每調用一次 acquire(),一個許可會被調用線程取走。
- 每調用一次 release(),一個許可會被返還給信號量。
Semaphore 用法
- 保護一個重要(代碼)部分防止一次超過 N 個線程進入。
- 在兩個線程之間發送信號。
保護重要部分
如果你將信號量用于保護一個重要部分,試圖進入這一部分的代碼通常會首先嘗試獲得一個許可,然后才能進入重要部分(代碼塊),執行完之后,再把許可釋放掉。
Semaphore semaphore = new Semaphore(1);
//critical section
semaphore.acquire();
...
semaphore.release();
- 1
- 2
- 3
- 4
- 5
在線程之間發送信號
如果你將一個信號量用于在兩個線程之間傳送信號,通常你應該用一個線程調用 acquire() 方法,而另一個線程調用 release() 方法。
- 如果沒有可用的許可,acquire() 調用將會阻塞,直到一個許可被另一個線程釋放出來。
- 如果無法往信號量釋放更多許可時,一個 release() 調用也會阻塞。
公平性
無法擔保掉第一個調用 acquire() 的線程會是第一個獲得一個許可的線程。
可以通過如下來強制公平:
Semaphore semaphore = new Semaphore(1, true);
- 1
- 需要注意,強制公平會影響到并發性能,建議不使用。
ExecutorService
這里之前有過簡單的總結: Java 中幾種常用的線程池
存在于 java.util.concurrent 包里的 ExecutorService 實現就是一個線程池實現。
實現類
此接口實現類包括:
- ScheduledThreadPoolExecutor : 通過
Executors.newScheduledThreadPool(10)
創建的 - ThreadPoolExecutor: 除了第一種的
其他三種方式
創建的
相關方法
- execute(Runnable):
無法得知被執行的 Runnable 的執行結果 - submit(Runnable):
返回一個 Future 對象,可以知道Runnable 是否執行完畢。 - submit(Callable):
Callable 實例除了它的 call() 方法能夠返回一個結果,通過Future可以獲取。 - invokeAny(…):
傳入一系列的 Callable 或者其子接口的實例對象,無法保證返回的是哪個 Callable 的結果 ,只能表明其中一個已執行結束。
如果其中一個任務執行結束(或者拋了一個異常),其他 Callable 將被取消。 - invokeAll(…):
返回一系列的 Future 對象,通過它們你可以獲取每個 Callable 的執行結果。
關閉ExecutorService
- shutdown() : 不會立即關閉,但它將不再接受新的任務
- shutdownNow(): 立即關閉
ThreadPoolExecutor
- ThreadPoolExecutor 使用其內部池中的線程執行給定任務(Callable 或者 Runnable)。
ScheduledExecutorService(接口,其實現類為ScheduledThreadPoolExecutor)
- ScheduledExecutorService能夠將任務延后執行,或者間隔固定時間多次執行。
- ScheduledExecutorService中的 任務由一個工作者線程異步執行,而不是由提交任務給 ScheduledExecutorService 的那個線程執行。
相關方法
- schedule (Callable task, long delay, TimeUnit timeunit):
Callable 在給定的延遲之后執行,并返回結果。 - schedule (Runnable task, long delay, TimeUnit timeunit)
除了 Runnable 無法返回一個結果之外,和第一個方法類似。 - scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)
這一方法規劃一個任務將被定期執行。該任務將會在首個 initialDelay 之后得到執行,然后每個 period 時間之后重復執行。
period 被解釋為前一個執行的開始和下一個執行的開始之間的間隔時間。 - scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)
和上一個方法類似,只是period 則被解釋為前一個執行的結束和下一個執行的結束之間的間隔。
ForkJoinPool
ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一點不同。ForkJoinPool 讓我們可以很方便地把任務分裂成幾個更小的任務,這些分裂出來的任務也將會提交給 ForkJoinPool。
用法參考:Java Fork and Join using ForkJoinPool
Lock
Lock 是一個類似于 synchronized 塊的線程同步機制。但是 Lock 比 synchronized 塊更加靈活、精細。
實現類
Lock是一個接口,其實現類包括:
- ReentrantLock
示例
Lock lock = new ReentrantLock();
lock.lock();
//critical section
lock.unlock();
- 1
- 2
- 3
- 4
- 調用
lock() 方法
之后,這個 lock 實例就被鎖住啦。 - 當lock示例被鎖后,任何其他再過來調用 lock() 方法的線程將會被阻塞住,直到調用了unlock() 方法。
- unlock() 被調用了,lock 對象解鎖了,其他線程可以對它進行鎖定了。
Lock 和 synchronized區別
- synchronized 代碼塊不能夠保證進入訪問等待的線程的先后順序。
- 你不能夠傳遞任何參數給一個 synchronized 代碼塊的入口。因此,對于 synchronized 代碼塊的訪問等待設置超時時間是不可能的事情。
- synchronized 塊必須被完整地包含在單個方法里。而一個 Lock 對象可以把它的 lock() 和 unlock() 方法的調用放在不同的方法里。
ReadWriteLock
讀寫鎖一種先進的線程鎖機制。
- 允許多個線程在同一時間對某特定資源進行讀取,
- 但同一時間內只能有一個線程對其進行寫入。
實現類
- ReentrantReadWriteLock
規則
- 如果沒有任何寫操作鎖定,那么可以有多個讀操作鎖定該鎖
- 如果沒有任何讀操作或者寫操作,只能有一個寫線程對該鎖進行鎖定。
示例:
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readWriteLock.readLock().lock(); // multiple readers can enter this section // if not locked for writing, and not writers waiting // to lock for writing.
readWriteLock.readLock().unlock();
readWriteLock.writeLock().lock(); // only one writer can enter this section, // and only if no threads are currently reading.
readWriteLock.writeLock().unlock();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
更多原子性包裝類
位于 atomic
包下,包含一系列原子性變量。
- AtomicBoolean
- AtomicInteger
- AtomicLong
- AtomicReference
…
參考資料: Java 并發工具包 java.util.concurrent 用戶指南
java.util.concurrency - Java Concurrency Utilities
擴展閱讀:
深入淺出 Java Concurrency : 講解的很詳細