一、狀態依賴性管理
- 對于單線程程序,某個條件為假,那么這個條件將永遠無法成真
- 在并發程序中,基于狀態的條件可能會由于其他線程的操作而改變
1 可阻塞的狀態依賴操作的結構 2 3 acquire lock on object state 4 while (precondition does not hold) 5 { 6 release lock 7 wait until precondition might hold 8 optionally fail if interrupted or timeout expires 9 reacquire lock 10 } 11 perform action 12 release lock
?
1 //有界緩存實現的基類2 public abstract class BaseBoundedBuffer<V> {3 private final V[] buf;4 private int tail;5 private int head;6 private int count;7 8 protected BaseBoundedBuffer(int capacity){9 this.buf = (V[]) new Object[capacity]; 10 } 11 12 protected synchronized final void doPut(V v){ 13 buf[tail] = v; 14 if (++tail == buf.length){ 15 tail = 0; 16 } 17 ++count; 18 } 19 20 protected synchronized final V doTake(){ 21 V v = buf[head]; 22 buf[head] = null; //let gc collect 23 if (++head == buf.length){ 24 head = 0; 25 } 26 --count; 27 return v; 28 } 29 30 public synchronized final boolean isFull(){ 31 return count == buf.length; 32 } 33 34 public synchronized final boolean isEmpty(){ 35 return count == 0; 36 } 37 }
1、示例:將前提條件的失敗傳遞給調用者?
1 public class GrumyBoundedBuffer<V> extends BaseBoundedBuffer<V> { 2 public GrumyBoundedBuffer(int size){ 3 super(size); 4 } 5 6 public synchronized void put(V v){ 7 if (isFull()){ 8 throw new BufferFullException(); 9 } 10 doPut(v); 11 } 12 13 public synchronized V take(){ 14 if (isEmpty()) 15 throw new BufferEmptyExeption(); 16 return doTake(); 17 } 18 } 19 20 當不滿足前提條件時,有界緩存不會執行相應的操作
?
缺點:已滿情況不應為異常;調用者自行處理失敗;sleep:降低響應性;自旋等待:浪費CPU;yield讓出CPU
2、示例:通過輪詢與休眠來實現簡單的阻塞
1 public class SleepyBounedBuffer<V> extends BaseBoundedBuffer<V> { 2 private static long SLEEP_TIME; 3 public SleepyBounedBuffer(int size) { 4 super(size); 5 } 6 7 public void put(V v) throws InterruptedException{ 8 while (true){ 9 synchronized(this){ 10 if (!isFull()){ 11 doPut(v); 12 return; 13 } 14 } 15 Thread.sleep(SLEEP_TIME); 16 } 17 } 18 19 public V take() throws InterruptedException{ 20 while (true){ 21 synchronized(this){ 22 if (!isEmpty()){ 23 return doTake(); 24 } 25 } 26 Thread.sleep(SLEEP_TIME); 27 } 28 } 29 } 30 31 “輪詢與休眠“重試機制
?
優點:對于調用者,無需處理失敗與異常,操作可阻塞,可中斷(休眠時候不要持有鎖)
缺點:對于休眠時間設置的權衡(響應性與CPU資源)
3、條件隊列——使得一組線程(稱之為等待線程集合)能夠通過某種方式來等待特定的條件變成真(元素是一個個正在等待相關條件的線程)
- 每個對象都可以作為一個條件隊列(API:wait、notify和notifyAll)
- Object.wait會自動釋放鎖,并請求操作系統掛起當前線程,從而使其他線程能夠獲得這個鎖并且修改對象的狀態
- Object.notify/notifyAll通知被掛起的線程可以重新請求資源執行
- 只有能對狀態進行檢查時,才能在某個條件上等待,并且只有能修改狀態時,才能從條件等待中釋放另一個線程
- 條件隊列在CPU效率、上下文切換開銷和響應性等進行了優化
- 如果某個功能無法通過“輪詢和休眠”來實現,那么使用條件隊列也無法實現
1 public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {2 3 public BoundedBuffer(int capacity) {4 super(capacity);5 }6 7 public synchronized void put(V v) throws InterruptedException{8 while (isFull()){9 wait(); 10 } 11 doPut(v); 12 notifyAll(); 13 } 14 15 public synchronized V take() throws InterruptedException{ 16 while (isEmpty()){ 17 wait(); 18 } 19 V v = doTake(); 20 notifyAll(); 21 return v; 22 } 23 }
?
二、使用條件隊列
1、條件謂詞
- 條件等待中存在一種重要的三元關系,包括加鎖、wait方法和一個條件謂詞
- 條件謂詞是由類中各個狀態變量構成的表達式(while)
- 在測試條件謂詞之前必須先持有這個鎖
- 鎖對象與條件隊列對象(即調用wait和notify等方法所在的對象)必須是同一個對象
- wait被喚醒后需要重新獲得鎖,并重新檢查條件謂詞
2、過早喚醒——一個條件隊列與多個條件謂詞相關時,wait方法返回不一定線程所等待的條件謂詞就變為真了
1 void stateDependentMethod() throws InterruptedException 2 { 3 synchronized(lock) // 必須通過一個鎖來保護條件謂詞 4 { 5 while(!condietionPredicate()) 6 lock.wait(); 7 } 8 }
當使用條件等待時(如Object.wait(), 或Condition.await()):
- 通常都有一個條件謂詞--包括一些對象狀態的測試,線程在執行前必須首先通過這些測試
- 在調用wait之前測試條件謂詞,并且從wait中返回時再次進行測試
- 在一個循環中調用wait
- 確保使用與條件隊列相關的鎖來保護構成條件謂詞的各個狀態變量
- 當調用wait,?notify或notifyAll等方法時,一定要持有與條件隊列相關的鎖
- 在檢查條件謂詞之后以及開始執行相應的操作之前,不要釋放鎖。
3、丟失信號量——線程必須等待一個已經為真的條件,但在開始等待之前沒有檢查條件謂詞
如果線程A通知了一個條件隊列,而線程B隨后在這個條件隊列上等待,那么線程B將不會立即醒來,而是需要另一個通知來喚醒它(導致活躍性下降)
4、通知——確保在條件謂詞變為真時通過某種方式發出通知掛起的線程
- 發出通知的線程持有鎖調用notify和notifyAll,發出通知后應盡快釋放鎖
- 多個線程可以基于不同的條件謂詞在同一個條件隊列上等待,使用notify單一的通知很容易導致類似于信號丟失的問題
- 可以使用notify:同一條件謂詞并且單進單出
使用notifyAll有時是低效的:喚醒的所有線程都需要競爭鎖,并重新檢驗,而有時最終只有一個線程能執行
優化:條件通知
1 public synchronized void put(V v) throws InterruptedException 2 { 3 while(isFull()) 4 wait(); 5 boolean wasEmpty = isEmpty(); 6 doPut(v); 7 if(wasEmpty) 8 notifyAll(); 9 }
?5、示例:閥門類
1 public class ThreadGate { 2 private boolean isOpen; 3 private int generation; 4 5 public synchronized void close() { 6 isOpen = false; 7 } 8 9 public synchronized void open() { 10 ++generation; 11 isOpen = true; 12 notifyAll(); 13 } 14 15 public synchronized void await() throws InterruptedException { 16 int arrivalGeneration = generation; 17 while (!isOpen && arrivalGeneration == generation) 18 wait(); 19 } 20 } 21 22 可重新關閉的閥門
arrivalGeneration == generation為了保證在閥門打開時又立即關閉時,在打開時通知的線程都可以通過閥門
6、子類的安全問題
- 如果在實施子類化時違背了條件通知或單詞通知的某個需求,那么在子類中可以增加合適的通知機制來代表基類
- 對于狀態依賴的類,要么將其等待和通知等協議完全向子類公開(并且寫入正式文檔),要么完全阻止子類參與到等待和通知等過程中
- 完全禁止子類化
7、封裝條件隊列
8、入口協議和出口協議
- 入口協議:該操作的條件謂詞
- 出口協議:檢查被該操作修改的所有狀態變量,并確認它們是否使某個其他的條件謂詞變為真,如果是,則通知相關的條件隊列
?
三、顯示的Condition對象
內置條件隊列的缺點:每個內置鎖都只能有一個相關聯的條件隊列,而多個線程可能在同一條件隊列上等待不同的條件謂詞,調用notifyAll通知的線程非等待同意謂詞
Condition <-> Lock,內置條件隊列 <-> 內置鎖
- Lock.newCondition()
- 在每個鎖上可存在多個等待、條件等待可以是可中斷的或不可中斷的、基于時限的等待,以及公平的或非公平的隊列操作
- Condition對象繼承了相關的Lock對象的公平性
- 與wait、notify和notifyAll方法對應的分別是await、signal和signalAll
- 將多個條件謂詞分開并放到多個等待線程集,Condition使其更容易滿足單次通知的需求(signal比signalAll更高效)
- 鎖、條件謂詞和條件變量:件謂詞中包含的變量必須由Lock來保護,并且在檢查條件謂詞以及調用await和signal時,必須持有Lock對象
?
1 public class ConditionBoundedBuffer<T> {2 protected final Lock lock = new ReentrantLock();3 private final Condition notFull = lock.newCondition();//條件:count < items.length4 private final Condition notEmpty = lock.newCondition();//條件:count > 05 private final T[] items = (T[]) new Object[100];6 private int tail, head, count;7 8 public void put(T x) throws InterruptedException {9 lock.lock(); 10 try { 11 while (count == items.length) 12 notFull.await();//等到條件count < items.length滿足 13 items[tail] = x; 14 if (++tail == items.length) 15 tail = 0; 16 ++count; 17 notEmpty.signal();//通知讀取等待線程 18 } finally { 19 lock.unlock(); 20 } 21 } 22 23 public T take() throws InterruptedException { 24 lock.lock(); 25 try { 26 while (count == 0) 27 notEmpty.await();//等到條件count > 0滿足 28 T x = items[head]; 29 items[head] = null; 30 if (++head == items.length) 31 head = 0; 32 --count; 33 notFull.signal();//通知寫入等待線程 34 return x; 35 } finally { 36 lock.unlock(); 37 } 38 } 39 }
??
四、Synchronizer解析
在ReentrantLock和Semaphore這兩個接口之間存在許多共同點。兩個類都可以用作一個”閥門“,即每次只允許一定數量的線程通過,并當線程到達閥門時,可以通過(在調用lock或acquire時成功返回),也可以等待(在調用lock或acquire時阻塞),還可以取消(在調用tryLock或tryAcquire時返回”假“,表示在指定的時間內鎖是不可用的或者無法獲取許可)。而且,這兩個接口都支持中斷、不可中斷的以及限時的獲取操作,并且也都支持等待線程執行公平或非公平的隊列操作。
原因:都實現了同一個基類AbstractQueuedSynchronizer(AQS)
?
1 public class SemaphoreOnLock {//基于Lock的Semaphore實現 2 private final Lock lock = new ReentrantLock(); 3 //條件:permits > 0 4 private final Condition permitsAvailable = lock.newCondition(); 5 private int permits;//許可數 6 7 SemaphoreOnLock(int initialPermits) { 8 lock.lock(); 9 try { 10 permits = initialPermits; 11 } finally { 12 lock.unlock(); 13 } 14 } 15 16 //頒發許可,條件是:permits > 0 17 public void acquire() throws InterruptedException { 18 lock.lock(); 19 try { 20 while (permits <= 0)//如果沒有許可,則等待 21 permitsAvailable.await(); 22 --permits;//用一個少一個 23 } finally { 24 lock.unlock(); 25 } 26 } 27 28 //歸還許可 29 public void release() { 30 lock.lock(); 31 try { 32 ++permits; 33 permitsAvailable.signal(); 34 } finally { 35 lock.unlock(); 36 } 37 } 38 } 39 40 使用Lock實現信號量
?
1 public class LockOnSemaphore {//基于Semaphore的Lock實現 2 //具有一個信號量的Semaphore就相當于Lock 3 private final Semaphore s = new Semaphore(1); 4 5 //獲取鎖 6 public void lock() throws InterruptedException { 7 s.acquire(); 8 } 9 10 //釋放鎖 11 public void unLock() { 12 s.release(); 13 } 14 } 15 16 使用信號量實現Lock
?
五、AbstractQueuedSynchronizer
最基本的操作:
- 獲取操作是一種依賴狀態的操作,并且通常會阻塞(同步器判斷當前狀態是否允許獲得操作,更新同步器的狀態)
- 釋放并不是一個可阻塞的操作時,當執行“釋放”操作時,所有在請求時被阻塞的線程都會開始執行
狀態管理(一個整數狀態):
- 通過getState,setState以及compareAndSetState等protected類型方法來進行操作
- 這個整數在不同子類表示任意狀態。例:剩余的許可數量,任務狀態
- 子類可以添加額外狀態
?
六、java.util.concurrent 同步器類中的AQS
1、ReentrantLock
ReentrantLock只支持獨占方式的獲取操作,因此它實現了tryAcquire、tryRelease和isHeldExclusively
ReentrantLock將同步狀態用于保存鎖獲取操作的次數,或者正要釋放鎖的時候,才會修改這個變量
2、Semaphore與CountDownLatch
Semaphore將AQS的同步狀態用于保存當前可用許可的數量;CountDownLatch使用AQS的方式與Semaphore很相似,在同步狀態中保存的是當前的計數值
3、FutureTask
在FutureTask中,AQS同步狀態被用來保存任務的狀態
FutureTask還維護一些額外的狀態變量,用來保存計算結果或者拋出的異常
4、ReentrantReadWriteLock
- 單個AQS子類將同時管理讀取加鎖和寫入加鎖
- ReentrantReadWriteLock使用了一個16位的狀態來表示寫入鎖的計數,并且使用了另一個16位的狀態來表示讀取鎖的計數
- 在讀取鎖上的操作將使用共享的獲取方法與釋放方法,在寫入鎖上的操作將使用獨占的獲取方法與釋放方法
- AQS在內部維護了一個等待線程隊列,其中記錄了某個線程請求的是獨占訪問還是共享訪問:寫操作獨占獲取;讀操作可使第一個寫之前的讀都獲取