這是一個非常關鍵且設計精巧的 定時任務與超時管理組件 —— GridTimeoutProcessor
,它是 Apache Ignite 內核中負責 統一調度和處理所有異步超時事件的核心模塊。
🎯 一、核心職責
統一管理所有需要“在某個時間點觸發”的任務或超時邏輯。
它相當于 Ignite 內部的 “鬧鐘中心”,用于:
- 執行延遲任務(
schedule(...)
) - 監聽
Future
超時(waitAsync(...)
) - 處理各種協議級別的超時(如通信超時、鎖等待超時等)
所有實現了 GridTimeoutObject
接口的對象都可以被它管理。
🧱 二、關鍵字段解析
字段 | 類型 | 作用 |
---|---|---|
timeoutWorker | TimeoutWorker | 后臺線程,負責輪詢并觸發到期任務 |
timeoutObjs | GridConcurrentSkipListSet<...> | 按結束時間排序的超時對象集合(核心數據結構) |
mux | Object | 鎖對象,用于 timeoutWorker 線程的等待/喚醒機制 |
🔗 三、核心數據結構:GridConcurrentSkipListSet
private final GridConcurrentSkipListSet<GridTimeoutObject> timeoutObjs = ...
- 是一個 線程安全的跳表(Skip List)實現
- 按
endTime()
升序排序 - 支持高效的:
- 插入(O(log n))
- 刪除(O(log n))
- 查找最早到期任務(
firstx()
,接近 O(1))
💡 類似 Java 標準庫中的
ConcurrentSkipListSet
,但可能是 Ignite 自定義優化版本。
排序規則:
Comparator<GridTimeoutObject> {int res = Long.compare(o1.endTime(), o2.endTime());if (res != 0) return res;return o1.timeoutId().compareTo(o2.timeoutId());
}
- 先按 到期時間 排序
- 時間相同時,用
timeoutId()
保證順序唯一(避免equals/hashCode
問題)
🧩 四、核心線程:TimeoutWorker
private final TimeoutWorker timeoutWorker = new TimeoutWorker();
這是一個 無限循環的工作線程,它的邏輯大致如下:
class TimeoutWorker implements Runnable {public void run() {while (!isCancelled) {GridTimeoutObject first = timeoutObjs.firstx();if (first == null) {// 沒有任務,等待synchronized (mux) {mux.wait();}}else {long now = U.currentTimeMillis();long waitTime = first.endTime() - now;if (waitTime <= 0) {// 已到期,移除并執行if (timeoutObjs.remove(first))first.onTimeout();}else {// 未到期,等待一段時間synchronized (mux) {mux.wait(waitTime);}}}}}
}
? 它是一個典型的 “時間輪”簡化版 或 “最小堆調度器”。
📥 五、核心方法詳解
? 1. addTimeoutObject(...)
:注冊一個超時對象
public boolean addTimeoutObject(GridTimeoutObject timeoutObj)
流程:
- 檢查
endTime
是否合法(不能是0
或Long.MAX_VALUE
) - 添加到
timeoutObjs
中 - 如果它是 第一個(最早到期),則
notify()
喚醒timeoutWorker
線程
?? 為什么只
notify()
而不是notifyAll()
?
- 因為只有一個消費者線程(
timeoutWorker
),所以notify()
足夠且更高效
? 2. schedule(...)
:調度一個延遲/周期任務
public CancelableTask schedule(Runnable task, long delay, long period)
delay
:首次執行延遲(毫秒)period
:周期(毫秒),-1 表示只執行一次- 返回
CancelableTask
:可取消任務
內部邏輯:
- 創建
CancelableTask
(實現了GridTimeoutObject
) - 計算
endTime = now + delay
- 調用
addTimeoutObject(...)
當任務到期時,onTimeout()
會被調用,執行 task.run()
,如果是周期任務,還會重新調度下一次。
? 3. waitAsync(...)
:帶超時的 Future 等待
public void waitAsync(final IgniteInternalFuture<?> fut, long timeout, IgniteBiInClosure<...> clo)
這是 異步編程中非常常見的模式:等待一個 Future
完成,但最多等 timeout
毫秒。
分情況處理:
timeout 值 | 行為 |
---|---|
-1 | 立即超時 → 直接調用 clo.apply(null, true) |
0 | 無限等待 → 直接監聽 fut |
>0 | 創建 WaitFutureTimeoutObject 并注冊 |
關鍵設計:
- 雙重監聽機制:
- 如果
Future
先完成 → 移除超時對象,回調clo
- 如果超時先發生 → 回調
clo
并標記timedOut=true
- 如果
- 使用
AtomicBoolean finishGuard
防止重復執行回調
🔁 這是典型的 “競態條件保護” 設計。
? 4. removeTimeoutObject(...)
:取消一個超時任務
public boolean removeTimeoutObject(GridTimeoutObject timeoutObj)
- 用于取消尚未觸發的任務
- 例如:
Future
已提前完成,無需再等待超時
🚀 六、啟動與停止流程
start()
:
new IgniteThread(timeoutWorker).start();
- 啟動后臺線程,開始監聽超時事件
stop(boolean cancel)
:
timeoutWorker.cancel();
U.join(timeoutWorker); // 等待線程結束
- 安全關閉,避免資源泄漏
🎨 七、設計亮點總結
特性 | 說明 |
---|---|
統一調度中心 | 所有超時邏輯集中管理,避免重復創建 Timer |
高并發安全 | 使用 ConcurrentSkipListSet ,無鎖讀寫 |
低延遲喚醒 | 最早任務變化時立即喚醒 worker |
精準定時 | 基于系統時間,支持毫秒級精度 |
可取消性 | 所有任務都支持動態取消 |
異步友好 | 支持 Future + Timeout 模式,避免阻塞線程 |
🧩 八、GridTimeoutObject
是什么?
這是一個接口,表示“一個會在未來某個時間點觸發的對象”:
interface GridTimeoutObject {long endTime(); // 到期時間(絕對時間戳)UUID timeoutId(); // 唯一ID,用于排序去重void onTimeout(); // 到期時執行的邏輯
}
常見實現:
CancelableTask
:周期/延遲任務WaitFutureTimeoutObject
:Future 超時監聽MessageTimeoutObject
:通信消息超時LockTimeoutObject
:分布式鎖等待超時
📊 九、典型使用場景
場景 1:延遲執行任務
timeoutProcessor.schedule(() -> {System.out.println("3秒后執行");
}, 3000, -1);
場景 2:周期任務(心跳)
timeoutProcessor.schedule(heartbeatTask, 0, 1000); // 每秒執行
場景 3:Future 超時控制
timeoutProcessor.waitAsync(someFuture, 5000, (err, timedOut) -> {if (timedOut) {System.out.println("請求超時");} else if (err != null) {System.out.println("請求失敗: " + err);} else {System.out.println("請求成功");}
});
🏁 十、總結
GridTimeoutProcessor
是 Ignite 的 “定時中樞”,它通過一個后臺線程 + 有序集合的方式,高效、安全地管理了所有異步超時事件。
它的設計體現了:
- 資源復用:一個線程處理所有定時任務
- 線程安全:無鎖數據結構 + 最小同步塊
- 響應及時:到期立即觸發,支持喚醒機制
- 擴展性強:任何對象只要實現
GridTimeoutObject
就可接入
🔔 一句話理解:
它是一個 輕量級、高性能、集中式的超時調度器,是構建可靠分布式系統不可或缺的基礎設施組件。