Java 中斷(Interrupt)機制詳解
Java 的中斷機制是一種協作式的線程間通信機制,用于請求另一個線程停止當前正在執行的操作。
Thread thread = Thread.currentThread();
thread.interrupt(); // 設置當前線程的中斷狀態
檢查中斷狀態
// 檢查中斷狀態
boolean isInterrupted = Thread.currentThread().isInterrupted();// 檢查并清除中斷狀態
boolean wasInterrupted = Thread.interrupted();
中斷產生的后果
對阻塞方法的影響
當線程在以下阻塞方法中被中斷時,會拋出 InterruptedException
:
try {Thread.sleep(1000); // sleepobject.wait(); // waitthread.join(); // joinLockSupport.park(); // park// 以及各種 I/O 操作和同步器
} catch (InterruptedException e) {// 中斷異常處理Thread.currentThread().interrupt(); // 恢復中斷狀態
}
對非阻塞代碼的影響
對于非阻塞代碼,中斷不會自動產生異常,需要手動檢查:
while (!Thread.currentThread().isInterrupted()) {// 執行任務System.out.println("Working...");// 模擬工作try {Thread.sleep(100);} catch (InterruptedException e) {// 睡眠時被中斷Thread.currentThread().interrupt(); // 重新設置中斷狀態break;}
}
System.out.println("線程被中斷,優雅退出");
正確的中斷處理模式
1. 傳播中斷狀態
public void task() {try {while (true) {// 執行工作if (Thread.currentThread().isInterrupted()) {throw new InterruptedException();}// 或者檢查中斷的阻塞操作Thread.sleep(100);}} catch (InterruptedException e) {// 恢復中斷狀態并退出Thread.currentThread().interrupt();}
}
2. 不可中斷任務的處理
public void nonInterruptibleTask() {while (true) {if (Thread.currentThread().isInterrupted()) {// 執行清理操作System.out.println("收到中斷請求,執行清理后退出");break;}// 執行不可中斷的工作}
}
正確使用中斷機制可以實現線程的安全、協作式停止,避免使用已廢棄的 stop()
方法。
sleep為什么拋出異常
調用本地方法:
private static native void sleepNanos0(long nanos) throws InterruptedException;
jvm.cpp中注冊
JVM_ENTRY(void, JVM_SleepNanos(JNIEnv* env, jclass threadClass, jlong nanos))if (nanos < 0) {THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "nanosecond timeout value out of range");}if (thread->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");}// Save current thread state and restore it at the end of this block.// And set new thread state to SLEEPING.JavaThreadSleepState jtss(thread);HOTSPOT_THREAD_SLEEP_BEGIN(nanos / NANOSECS_PER_MILLISEC);if (nanos == 0) {os::naked_yield();} else {ThreadState old_state = thread->osthread()->get_state();thread->osthread()->set_state(SLEEPING);if (!thread->sleep_nanos(nanos)) { // interrupted// An asynchronous exception could have been thrown on// us while we were sleeping. We do not overwrite those.if (!HAS_PENDING_EXCEPTION) {HOTSPOT_THREAD_SLEEP_END(1);// TODO-FIXME: THROW_MSG returns which means we will not call set_state()// to properly restore the thread state. That's likely wrong.THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");}}thread->osthread()->set_state(old_state);}HOTSPOT_THREAD_SLEEP_END(0);
JVM_END
JVM_SleepNanos
這個函數是sleep拋出異常的地方。以下是該函數的核心邏輯:
-
首先檢查
nanos
參數是否小于0,如果是則拋出IllegalArgumentException
異常(實際上Java層面已經檢查了,所以c++層面不會報異常): -
if (nanos < 0) {THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "nanosecond timeout value out of range"); }
-
檢查線程是否已被中斷,如果是則拋出
InterruptedException
異常:if (thread->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted"); }
-
在實際睡眠過程中,如果線程被中斷,也會拋出
InterruptedException
異常:if (!thread->sleep_nanos(nanos)) { // interrupted// An asynchronous exception could have been thrown on// us while we were sleeping. We do not overwrite those.if (!HAS_PENDING_EXCEPTION) {// TODO-FIXME: THROW_MSG returns which means we will not call set_state()// to properly restore the thread state. That's likely wrong.THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");} }
Object.wait異常
同樣調用本地方法
private final native void wait0(long timeoutMillis) throws InterruptedException;
cpp實現在ObjectMonitor.cpp,這個函數很長,簡化如下:
// ObjectMonitor的等待方法:處理線程等待、中斷和超時邏輯
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {JavaThread* current = THREAD;assert(InitDone, "未初始化");CHECK_OWNER(); // 檢查所有者,非所有者拋出異常EventJavaMonitorWait wait_event;EventVirtualThreadPinned vthread_pinned_event;// 檢查中斷狀態:若可中斷且已中斷,直接拋出異常if (interruptible && current->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {JavaThreadInObjectWaitState jtiows(current, millis != 0, interruptible);// 發送JVMTI監控等待事件if (JvmtiExport::should_post_monitor_wait()) {JvmtiExport::post_monitor_wait(current, object(), millis);}if (JvmtiExport::should_post_monitor_waited()) {JvmtiExport::post_monitor_waited(current, this, false);}if (wait_event.should_commit()) {post_monitor_wait_event(&wait_event, this, 0, millis, false);}THROW(vmSymbols::java_lang_InterruptedException());return;}// 虛擬線程處理:嘗試掛起虛擬線程freeze_result result;ContinuationEntry* ce = current->last_continuation();bool is_virtual = ce != nullptr && ce->is_virtual_thread();if (is_virtual) {if (interruptible && JvmtiExport::should_post_monitor_wait()) {JvmtiExport::post_monitor_wait(current, object(), millis);}current->set_current_waiting_monitor(this);result = Continuation::try_preempt(current, ce->cont_oop(current));if (result == freeze_ok) {vthread_wait(current, millis);current->set_current_waiting_monitor(nullptr);return;}}// 進入等待狀態JavaThreadInObjectWaitState jtiows(current, millis != 0, interruptible);if (!is_virtual) {if (interruptible && JvmtiExport::should_post_monitor_wait()) {JvmtiExport::post_monitor_wait(current, object(), millis);}current->set_current_waiting_monitor(this);}// 創建等待節點并加入等待隊列ObjectWaiter node(current);node.TState = ObjectWaiter::TS_WAIT;current->_ParkEvent->reset();OrderAccess::fence();Thread::SpinAcquire(&_wait_set_lock);add_waiter(&node);Thread::SpinRelease(&_wait_set_lock);// 退出監控器并準備掛起intx save = _recursions;_waiters++;_recursions = 0;exit(current);guarantee(!has_owner(current), "invariant");// 掛起線程:檢查中斷或超時int ret = OS_OK;int WasNotified = 0;bool interrupted = interruptible && current->is_interrupted(false);{OSThread* osthread = current->osthread();OSThreadWaitState osts(osthread, true);assert(current->thread_state() == _thread_in_vm, "invariant");{ClearSuccOnSuspend csos(this);ThreadBlockInVMPreprocess<ClearSuccOnSuspend> tbivs(current, csos, true);if (interrupted || HAS_PENDING_EXCEPTION) {// 空處理:已有中斷或異常} else if (!node._notified) {if (millis <= 0) {current->_ParkEvent->park();} else {ret = current->_ParkEvent->park(millis);}}}// 從等待隊列移除節點(如果需要)if (node.TState == ObjectWaiter::TS_WAIT) {Thread::SpinAcquire(&_wait_set_lock);if (node.TState == ObjectWaiter::TS_WAIT) {dequeue_specific_waiter(&node);assert(!node._notified, "invariant");node.TState = ObjectWaiter::TS_RUN;}Thread::SpinRelease(&_wait_set_lock);}guarantee(node.TState != ObjectWaiter::TS_WAIT, "invariant");OrderAccess::loadload();if (has_successor(current)) clear_successor();WasNotified = node._notified;// 發送JVMTI監控等待完成事件if (JvmtiExport::should_post_monitor_waited()) {JvmtiExport::post_monitor_waited(current, this, ret == OS_TIMEOUT);if (node._notified && has_successor(current)) {current->_ParkEvent->unpark();}}if (wait_event.should_commit()) {post_monitor_wait_event(&wait_event, this, node._notifier_tid, millis, ret == OS_TIMEOUT);}OrderAccess::fence();// 重新獲取監控器鎖assert(!has_owner(current), "invariant");ObjectWaiter::TStates v = node.TState;if (v == ObjectWaiter::TS_RUN) {NoPreemptMark npm(current);enter(current);} else {guarantee(v == ObjectWaiter::TS_ENTER, "invariant");reenter_internal(current, &node);node.wait_reenter_end(this);}guarantee(node.TState == ObjectWaiter::TS_RUN, "invariant");assert(has_owner(current), "invariant");assert(!has_successor(current), "invariant");}// 清理狀態:重置等待監控器引用current->set_current_waiting_monitor(nullptr);// 恢復遞歸計數guarantee(_recursions == 0, "invariant");int relock_count = JvmtiDeferredUpdates::get_and_reset_relock_count_after_wait(current);_recursions = save + relock_count;current->inc_held_monitor_count(relock_count);_waiters--;// 驗證后置條件assert(has_owner(current), "invariant");assert(!has_successor(current), "invariant");assert_mark_word_consistency();// 虛擬線程事件記錄if (ce != nullptr && ce->is_virtual_thread()) {current->post_vthread_pinned_event(&vthread_pinned_event, "Object.wait", result);}// 檢查通知結果:未通知可能是超時或中斷if (!WasNotified) {if (interruptible && current->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {THROW(vmSymbols::java_lang_InterruptedException());}}
}
ObjectMonitor::wait
?函數是 Java 對象監視器(Object Monitor)中實現?Object.wait()
?方法的核心函數。它允許線程在對象上等待,直到其他線程調用該對象的?notify()?或?notifyAll()?方法。
1. 初始化和檢查
-
獲取當前線程信息并驗證初始化狀態
-
使用?
CHECK_OWNER()
?檢查當前線程是否為監視器的所有者,如果不是則拋出?IllegalMonitorStateException
2. 事件初始化
-
初始化?
EventJavaMonitorWait
?和?EventVirtualThreadPinned
?事件,用于監控和追蹤目的
3. 中斷檢查
-
如果線程可中斷(
interruptible
?為 true)且已被中斷,則在進入等待前拋出?InterruptedException
4. 虛擬線程處理
-
如果是虛擬線程(virtual thread),則調用?vthread_wait?方法進行特殊處理,包括:
-
創建?ObjectWaiter?節點
-
將節點添加到等待隊列
-
處理虛擬線程的掛起和狀態轉換
-
5. 常規等待邏輯
對于非虛擬線程或虛擬線程的后續步驟:
-
創建?JavaThreadInObjectWaitState?對象來管理線程等待狀態
-
如果啟用了 JVMTI,則發布 monitor wait 事件
-
保存當前的遞歸計數并重置為 0
-
退出監視器(調用?exit?方法)
-
增加等待者計數
6. 等待階段
-
調用操作系統相關的 park 函數使線程進入等待狀態
-
等待可能因超時、中斷或 notify 調用而結束
7. 喚醒后處理
-
檢查喚醒原因(通知、超時或中斷)
-
如果是中斷且線程可中斷,則拋出?
InterruptedException
-
重新獲取監視器鎖
-
恢復遞歸計數
-
發布 monitor waited 事件
8. 清理工作
- 減少等待者計數
- 清理 successor(如果存在)
- 刪除 ObjectWaiter 節點
這個函數實現了 Java 中對象等待機制的核心邏輯,處理了線程狀態管理、同步控制、事件追蹤和異常處理等多個方面,確保線程安全地等待和被喚醒。
Thread.join
A線程調用ThreadB.join,實際上就只是調用對方的wait。當線程B結束會喚醒所有的等待者。
/?**?等待該線程終止,最多等待 {@code millis} 毫秒。如果超時時間設為 {@code 0} 則表示無限期等待(直到線程終止)。?如果該線程尚未被 {@link #start() 啟動},則此方法會立即返回,無需等待。??@implNote 實現說明:?對于平臺線程,該實現采用循環調用 {@code this.wait} 方法,并在 {@code this.isAlive} 條件滿足時持續等待。?當線程終止時,會調用 {@code this.notifyAll} 方法喚醒等待的線程。?建議應用程序不要在 {@code Thread} 實例上使用 {@code wait}、{@code notify} 或 {@code notifyAll} 方法,?以避免與線程同步機制發生沖突或造成意外行為。?@throws InterruptedException?如果任何線程中斷了當前線程。當拋出此異常時,當前線程的<i>中斷狀態</i>將被清除。*/public final void join(long millis) throws InterruptedException {if (millis < 0)throw new IllegalArgumentException("timeout value is negative");if (this instanceof VirtualThread vthread) {if (isAlive()) {long nanos = MILLISECONDS.toNanos(millis);vthread.joinNanos(nanos);}return;}synchronized (this) {if (millis > 0) {if (isAlive()) {final long startTime = System.nanoTime();long delay = millis;do {wait(delay);} while (isAlive() && (delay = millis -NANOSECONDS.toMillis(System.nanoTime() - startTime)) > 0);}} else {while (isAlive()) {wait(0);}}}}