本章我們介紹兩個Java 并發包中用于線程協作的工具--LockSupport和Condition
LockSupport:
Java 并發包(java.util.concurrent.locks)
提供了基于許可(permit)的線程阻塞和喚醒機制--LockSupport
對于LockSupport是通過方法park以及unpark來對線程進行阻塞和喚醒的
public static void park() {UNSAFE.park(false, 0L);}public static void unpark(Thread var0) {if (var0 != null) {UNSAFE.unpark(var0);}}
我們可以看出park方法很簡單只是調用了Unsafe
?的方法park。Unsafe
?正如它名字而言是 Java 中一個?高度危險且未被官方正式支持?的類,位于?sun.misc
?包下(JDK 9 后移至?jdk.internal.misc
?包)。它提供了一系列?直接操作底層資源?的方法,允許開發者繞過 Java 語言的安全機制,直接訪問內存、操作線程狀態等。所以LockSupport就是基于unsafe類進行包裝后的類,將原來的不安全類封裝成了一個安全類供開發者使用。對于park方法的兩個參數一個是Boolean類型一個是long類型,分別用來表示是否為絕對時間以及阻塞的時長,對于0L則是永久阻塞。
對于unpark方法來說則是多了一個參數Thread,這個參數的作用是用來指定喚醒的線程。為什么park不需要參數而unpark需要參數呢,因為unpark喚醒的都是其他線程,當本線程進入阻塞后則無法自己喚醒自己只能通過其他線程來喚醒自己。
Condition:
Condition
?是 Java 并發包(java.util.concurrent.locks
)中的一個接口,用于替代傳統的?Object.wait()
、Object.notify()
?和?Object.notifyAll()
,提供更靈活、更強大的線程間協作機制。它通常與?Lock
?接口配合使用,實現精細化的線程等待和喚醒操作。
也就是說Condition的定位其實與Object.wait類似,都是協助鎖來實現線程的協作機制。
特性 | Condition | Object.wait()/notify() |
---|---|---|
鎖機制 | 必須與?Lock ?顯式關聯(如?ReentrantLock )。 | 必須在?synchronized ?塊中調用。 |
等待隊列 | 每個?Condition ?獨立維護一個等待隊列,可創建多個條件隊列(如?notFull 、notEmpty )。 | 每個對象只有一個等待隊列,所有線程共享。 |
喚醒方式 | signal() :喚醒一個等待線程;signalAll() :喚醒所有等待線程。 | notify() :隨機喚醒一個線程;notifyAll() :喚醒所有線程。 |
中斷支持 | await() ?可響應中斷(拋出?InterruptedException ),也支持不可中斷模式(awaitUninterruptibly() )。 | wait() ?只能響應中斷(拋出異常),無法禁用。 |
超時機制 | 支持靈活的超時等待(如?await(long, TimeUnit) )。 | 僅支持?wait(long timeout) (毫秒級)。 |
其中Condition接口的實現類則是在AQS內部中,而AQS則賦予了Condition靈魂,下面我們來看看Condition子類ConditionObject的源碼。
在ConditionObject中我們從Condition最核心的兩個方法await方法和signal方法來說起
public final void await() throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();} else {Node var1 = this.addConditionWaiter();long var2 = AbstractQueuedLongSynchronizer.this.fullyRelease(var1);int var4 = 0;while(!AbstractQueuedLongSynchronizer.this.isOnSyncQueue(var1)) {LockSupport.park(this);if ((var4 = this.checkInterruptWhileWaiting(var1)) != 0) {break;}}if (AbstractQueuedLongSynchronizer.this.acquireQueued(var1, var2) && var4 != -1) {var4 = 1;}if (var1.nextWaiter != null) {this.unlinkCancelledWaiters();}if (var4 != 0) {this.reportInterruptAfterWait(var4);}}
}
首先則是線程中斷位的判斷如果已經中斷則直接拋出異常(由此可以看出await方法是可中斷的方法)。隨后則是調用了addConditionWaiter方法,這個方法的作用就是將當前線程加入到Condition所維持的隊列中,在我們解析addConditionWaiter方法之前先看一下Condition的屬性
public class ConditionObject implements Condition, Serializable {private static final long serialVersionUID = 1173984872572414699L;private transient Node firstWaiter;private transient Node lastWaiter;private static final int REINTERRUPT = 1;private static final int THROW_IE = -1;。。。。}
可以看出含有兩個屬性firstWaiter和lastWaiter,這兩個屬性分別表示的是Condition所維持的鏈表的表頭和表尾,由此我們又可以看出ConditionObject 則是用一個鏈表來串聯起整個隊列的。
然后我們開始進入addConditionWaiter方法中來看看是如何加入隊列的
private Node addConditionWaiter() {Node var1 = this.lastWaiter;if (var1 != null && var1.waitStatus != -2) {this.unlinkCancelledWaiters();var1 = this.lastWaiter;}Node var2 = new Node(Thread.currentThread(), -2);if (var1 == null) {this.firstWaiter = var2;} else {var1.nextWaiter = var2;}this.lastWaiter = var2;return var2;}
代碼可以看出首先將隊尾的Node節點取出并且檢查狀態,如果狀態不符合要求則會清除出隊列。
之后開始初始化本線程的Node節點,并且放在隊尾后面。好了這個方法的大致功能已經捋順接下來我們看看下面的方法
long var2 = AbstractQueuedLongSynchronizer.this.fullyRelease(var1);int var4 = 0;while(!AbstractQueuedLongSynchronizer.this.isOnSyncQueue(var1)) {LockSupport.park(this);if ((var4 = this.checkInterruptWhileWaiting(var1)) != 0) {break;}}if (AbstractQueuedLongSynchronizer.this.acquireQueued(var1, var2) && var4 != -1) {var4 = 1;}if (var1.nextWaiter != null) {this.unlinkCancelledWaiters();}if (var4 != 0) {this.reportInterruptAfterWait(var4);}
隨后就會調用AQS的方法進行鎖釋放(由于線程已經進入Condition隊列中),隨后則會一個while循環調用isOnSyncQueue進行校驗保證當前Node節點不會在AQS的同步隊列,這時候就會有疑惑了為什么要保證不會在AQS的同步隊列呢,原因就是當Condition調用signal方法的時候被喚醒的線程會從Condition隊列中移除轉而放入到AQS維護的CLH同步隊列中去。所以這里的循環查看是否在同步隊列換個意思就是保證當前Node節點沒有被喚醒。
在保證當前沒有被喚醒之后則會調用?LockSupport.park(this)來講當前的線程阻塞。由此可見LockSupport很純粹也很底層,目的就是為了將當前線程進行阻塞或者喚醒。在進入阻塞之后后續的代碼則不會執行而是等到喚醒之后才會執行。
喚醒之后首先查看當前線程的中斷位,如果被中斷則直接跳出循環。
隨后則參與鎖的競爭調用了acquireQueued方法表示來占有鎖,占有鎖成功之后則會執行后續的方法如果后面還有節點那么會遍歷清除后面的節點。
最后進行判斷如果中斷位是否開啟并且進行處理中斷位
下面舉一個常用示例
// 示例:生產者-消費者模型
lock.lock();
try {while (queue.isFull()) {notFull.await(); // 等待隊列不滿}// await() 返回后,線程已持有鎖,繼續執行queue.add(item); // 生產元素notEmpty.signal(); // 通知消費者
} finally {lock.unlock();
}
整體流程:
- 開始?→?檢查線程中斷(若已中斷,拋異常)
- 創建節點加入 Condition 隊列?→?釋放鎖
- 循環檢查節點是否在同步隊列:
- 否?→ 調用
park()
阻塞 → 等待喚醒 / 中斷 - 是?→ 跳出循環
- 否?→ 調用
- 重新競爭鎖(
acquireQueued
)→?獲取鎖后處理中斷標記 - 清理 Condition 隊列無效節點?→?根據中斷狀態處理異常或恢復標志
- 方法返回,線程持有鎖繼續執行后續邏輯
下一章節我們將把最后的signal函數源碼解析給講述完畢