這是一個 Apache Ignite 中非常核心的組件 —— GridClosureProcessor
,它是 分布式閉包(Closure)執行的調度中樞,負責在集群節點上異步執行用戶提交的任務(如 Runnable
、Closure
)。
我們來逐層深入理解它的設計思想、關鍵機制和代碼邏輯。
🧱 一、類概覽:GridClosureProcessor
public class GridClosureProcessor extends GridProcessorAdapter
- 職責:處理所有基于閉包(函數式)的遠程執行請求
- 常見用途:
compute().run(Runnable)
compute().call(Closure)
compute().broadcast(Closure)
cache().affinity().run(...)
- 它是
ComputeTask
的底層支撐模塊
🔩 二、關鍵字段解析
字段 | 類型 | 作用 |
---|---|---|
pools | PoolProcessor | 線程池管理器,用于獲取執行任務的線程池 |
busyLock | GridSpinReadWriteLock | 控制組件在 停止期間不接受新任務 |
stopping | boolean | 標記當前處理器是否正在停止 |
?? 這三個字段共同實現了 “優雅關閉” 的核心邏輯。
🔒 三、busyLock
:優雅關閉的關鍵機制
1. 什么是 GridSpinReadWriteLock
?
- Ignite 自定義的 自旋讀寫鎖
- 特點:
- 讀鎖可重入、允許多個線程同時持有
- 寫鎖獨占,用于“停止”階段
- 使用 自旋 + sleep 避免線程頻繁阻塞喚醒
2. 讀鎖(readLock()
):
- 所有任務提交方法(
runAsync
,callAsync
,broadcast
)都先獲取讀鎖 - 表示:“我正在使用這個處理器”
- 允許多個線程并發提交任務
3. 寫鎖(tryWriteLock(...)
):
- 在
onKernalStop(...)
中使用 - 目的:阻止任何新任務提交,并標記為“停止中”
🛑 四、onKernalStop(...)
:優雅關閉流程
@Override
public void onKernalStop(boolean cancel) {boolean interrupted = false;while (true) {try {if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS))break;elseThread.sleep(200);}catch (InterruptedException ignore) {interrupted = true;}}try {if (interrupted)Thread.currentThread().interrupt();stopping = true; // 標記為停止狀態}finally {busyLock.writeUnlock();}
}
🔍 流程詳解:
-
嘗試獲取寫鎖:
tryWriteLock(200ms)
:嘗試在 200ms 內獲取寫鎖- 如果有線程持有讀鎖(即正在提交任務),則失敗
- 失敗后
Thread.sleep(200)
,然后重試
-
為什么是“Busy Wait”?
- 注解
@SuppressWarnings("BusyWait")
表示這是有意為之的忙等待 - 目的:盡快完成關閉,避免長時間阻塞
- 每 200ms 嘗試一次,不會過度消耗 CPU
- 注解
-
處理中斷:
- 如果等待期間被中斷,記錄
interrupted = true
- 最后恢復中斷狀態(線程安全最佳實踐)
- 如果等待期間被中斷,記錄
-
設置
stopping = true
- 獲取寫鎖后,設置標志位
- 之后所有
runAsync
等調用都會被拒絕
-
釋放寫鎖
- 即使發生異常,也確保釋放鎖
? 這是一個典型的 “關閉守衛”模式:先阻止新請求,再清理資源。
🚀 五、任務提交方法分析
所有任務提交方法都遵循統一模式:
busyLock.readLock();
try {if (stopping) reject();// 提交任務
} finally {busyLock.readUnlock();
}
我們以 runAsync(...)
為例:
? runAsync(...)
:運行一批 Runnable
public ComputeTaskInternalFuture<?> runAsync(...) {assert mode != null;assert !F.isEmpty(jobs);busyLock.readLock(); // 獲取讀鎖try {if (stopping) {return finishedFuture(new IgniteCheckedException("Closure processor cannot be used on stopped grid"));}if (F.isEmpty(nodes))return finishedFuture(U.emptyTopologyException());ctx.task().setThreadContext(TC_SUBGRID, nodes);return ctx.task().execute(new T1(mode, jobs), null, sys, execName);}finally {busyLock.readUnlock(); // 釋放讀鎖}
}
關鍵點:
stopping
檢查:如果正在停止,直接返回失敗 futurenodes
檢查:拓撲為空則返回空拓撲異常ctx.task().execute(...)
:交給TaskProcessor
執行(T1 是一個內部任務類型)- 使用
sys
參數決定使用 系統線程池 還是 公共線程池
? callAsync(...)
:遠程調用 Closure
public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> job, T arg, ...)
- 執行一個帶返回值的函數(
Closure<T,R>
) - 返回
ComputeTaskInternalFuture<R>
,可獲取結果
? broadcast(...)
:廣播到所有節點
public <T, R> IgniteInternalFuture<Collection<R>> broadcast(...)
- 在
nodes
列表中的每個節點上執行job
- 返回一個
Future<Collection<R>>
,包含所有節點的返回值
? affinityRun(...)
:基于數據親和性執行
public ComputeTaskInternalFuture<?> affinityRun(...)
- 關鍵用途:將任務發送到 特定緩存分區(partition)的主節點
- 流程:
- 獲取當前拓撲版本
readyAffinityVersion()
- 使用
ctx.affinity().mapPartitionToNode(...)
找到負責該分區的節點 - 只在那個節點上執行任務
- 獲取當前拓撲版本
- 優勢:本地化執行,避免數據移動,性能極高
💡 這是 Ignite 實現“移動計算而非數據”的核心機制之一。
🧩 六、T1
, T8
, T11
, T4
是什么?
這些是 內部任務類(定義在 GridTaskInternalFuture
或內部類中),用于包裝用戶任務:
任務類 | 包裝的任務類型 |
---|---|
T1 | GridClosureCallMode + Collection<Runnable> |
T8 | IgniteClosure<T,R> |
T11 | Broadcast 任務 |
T4 | Affinity 任務 |
它們都繼承自 ComputeTaskAdapter
,由 TaskProcessor
調度執行。
🎯 七、整體架構圖(簡化)
+---------------------+
| User Code |
| compute().run(...) |
+----------+----------+|v
+---------------------+
| GridClosureProcessor|
| - busyLock |
| - stopping |
+----------+----------+|v
+---------------------+
| TaskProcessor |
| execute(Task) |
+----------+----------+|v
+---------------------+
| PoolProcessor |
| 系統/公共線程池 |
+---------------------+
? 八、設計亮點總結
特性 | 說明 |
---|---|
讀寫鎖控制關閉 | 讀鎖允許多任務并發提交,寫鎖確保關閉時原子性 |
優雅拒絕新任務 | stopping 標志 + finishedFuture 快速失敗 |
支持多種執行模式 | 單節點、廣播、親和性執行 |
與 Task 子系統集成 | 復用 TaskProcessor 的調度能力 |
線程安全 | 所有提交路徑都受鎖保護 |
可觀測性 | 調試日志、異常信息清晰 |
📌 九、一句話總結
GridClosureProcessor
是 Ignite 的 分布式任務調度入口,它通過 讀寫鎖機制 實現了 高并發提交 + 優雅關閉,并支持 普通執行、廣播、數據親和性執行 等多種模式,是Compute
子系統的核心引擎。
💡 十、你可以借鑒的設計模式
1. 關閉守衛模式(Shutdown Guard)
private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock();
private volatile boolean shuttingDown = false;public void submit(Runnable task) {shutdownLock.readLock().lock();try {if (shuttingDown) throw new RejectedExecutionException();// 執行任務} finally {shutdownLock.readLock().unlock();}
}public void shutdown() {shutdownLock.writeLock().lock();try {shuttingDown = true;} finally {shutdownLock.writeLock().unlock();}
}
2. 快速失敗(Fail-Fast)
- 不讓任務進入隊列,而是在入口就拒絕
- 返回一個“已完成的失敗 Future”,避免資源浪費
🏁 結語
GridClosureProcessor
雖然代碼量不大,但它體現了分布式系統中 資源管理、并發控制、生命周期管理 的最佳實踐。理解它,有助于你設計自己的 高可用、可擴展的任務調度系統。