文章目錄
- 七、調度與線程模型
- 7.1 概述
- 7.2 Scheduler: Reactor 的線程調度器
- 7.3 兩大核心操作符:subscribeOn vs publishOn
- 7.4 示例詳解
- 7.4.1 subscribeOn()的全局影響
- 7.4.2 publishOn() 的局部切換
- 7.4.3 多個publishOn切換
- 7.4.4 線程切換時序圖
- 7.5 核心調度器
- 7.5.1 BoundedElastic:IO 密集型任務首選
- 7.5.2 Parallel:CPU 密集型任務首選
- 7.5.3 Single:串行任務專用
- 7.5.4 Schedulers.immediate()
- 7.5.5 Schedulers.elastic()
- 7.5.6 Schedulers.fromExecutorService(ExecutorService)
- 7.5.7 Schedulers.new() 工廠方法
- 7.5.8 調度器使用最佳實踐
- 7.6 線程模型實戰: 典型場景
- 7.6.1 I/O密集型任務
- 7.6.2 場景 2:CPU 密集型任務
- 7.6.3 混合任務(I/O + CPU)
- 7.7 綜合示例
- 7.8 高級特性
- 7.8.1 調用器生命周期管理
- 7.8.2 自定義線程命名
- 7.8.3 在操作符中使用調度器
- 7.9 最佳實踐與陷阱
七、調度與線程模型
? 核心作用
- 線程抽象:將底層線程管理與響應式流解耦,提供統一的 API 控制執行上下文。
- 異步執行:支持非阻塞操作,避免阻塞主線程,提升系統吞吐量。
- 并發控制:通過不同類型的調度器,適配不同的并發場景(如 IO 密集型、CPU 密集型)。
🌺 關鍵概念
- 調度器(Scheduler):負責提供執行任務的線程,是 Reactor 中線程池的抽象。
- 調度器工作線程(Worker):
Scheduler
創建的輕量級工作單元,負責執行具體任務。 - publishOn () 與 subscribeOn ():用于切換執行上下文的操作符。
subscribeOn()
:指定訂閱操作(包括上游數據生成)的執行線程。publishOn()
:指定下游操作符鏈的執行線程
7.1 概述
Reactor 與 RxJava 類似,可以被認為是并發無關的 。也就是說,它不強制執行并發模型。相反,它把控制權交給開發者自己。然而,這并不妨礙該庫幫助你處理并發問題。
獲得 Flux
或 Mono
并不一定意味著它在專用的 Thread
,大多數操作符會在前一個操作符執行的 Thread
中繼續工作。除非另有說明,最頂層的操作符(源操作符)本身會在調用 subscribe()
Thread
中運行。以下示例在新線程中運行 Mono
:
public static void main(String[] args) throws InterruptedException {final Mono<String> mono = Mono.just("hello "); // 🥇 Mono<String> 在線程 main 中組裝。Thread t = new Thread(() -> mono.map(msg -> msg + "thread ").subscribe(v -> // 🥈 它是在線程 Thread-0 中訂閱的。System.out.println(v + Thread.currentThread().getName()) // map 和 onNext 回調實際上都在 Thread-0 中運行));t.start();t.join();}
7.2 Scheduler: Reactor 的線程調度器
Scheduler
是 Reactor 的線程抽象,類似于 Java 的 ExecutorService
,但專為響應式流設計。
? 核心作用:控制
Publisher
在哪個線程上執行。
Reactor 提供了多種內置 Scheduler
:
Scheduler | 用途 | 線程模型 |
---|---|---|
Schedulers.immediate() | 當前線程執行 | ? 不推薦用于生產 |
Schedulers.single() | 共享的單線程 | 1 個線程,復用 |
Schedulers.parallel() | CPU 密集型任務 | 固定線程數(CPU 核數) |
Schedulers.boundedElastic() | I/O 阻塞任務 | 彈性線程池(默認 10萬線程上限) |
Schedulers.newXXX() | 自定義線程池 | 如 newParallel() |
7.3 兩大核心操作符:subscribeOn vs publishOn
這是理解 Reactor 線程模型的重中之重!
🔑 核心區別
操作符 | 作用 | 影響范圍 |
---|---|---|
subscribeOn() | 指定 Publisher 的創建和上游執行線程 | 影響整個鏈的上游(從源頭到當前位置) |
publishOn() | 指定下游操作的執行線程 | 只影響其后的下游操作(當前位置到 subscribe ) |
🎯 記憶口訣:
- subscribeOn:從哪里開始(影響源頭)
- publishOn:從哪里切換(影響后續)
7.4 示例詳解
7.4.1 subscribeOn()的全局影響
package cn.tcmeta.scheduler;import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;import java.util.concurrent.TimeUnit;/*** @author: laoren* @description: subscribeOn的全局影響* @version: 1.0.0*/
public class SubscribeOnExample {public static void main(String[] args) {Flux.just("A", "B", "C").map(data -> {System.out.println("1?? Map1 線程: " + Thread.currentThread().getName());return data + "-1";}).subscribeOn(Schedulers.parallel()).map(data -> {System.out.println("2?? Map2 線程: " + Thread.currentThread().getName());return data + "-2";}).subscribe(data -> {System.out.println("📩 訂閱線程: " + Thread.currentThread().getName() + ", 數據: " + data);});try {TimeUnit.MILLISECONDS.sleep(3000);}catch (InterruptedException e){e.printStackTrace();}}
}
1?? Map1 線程: parallel-1
2?? Map2 線程: parallel-1
📩 訂閱線程: parallel-1, 數據: A-1-2
1?? Map1 線程: parallel-1
2?? Map2 線程: parallel-1
📩 訂閱線程: parallel-1, 數據: B-1-2
1?? Map1 線程: parallel-1
2?? Map2 線程: parallel-1
📩 訂閱線程: parallel-1, 數據: C-1-2
? 結論:subscribeOn(Schedulers.parallel())
即使放在中間,也使 just()
和兩個 map()
都在 parallel
線程執行。
subscribeOn
影響范圍:
7.4.2 publishOn() 的局部切換
package cn.tcmeta.scheduler;import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;/*** @author: laoren* @description: publishOn()局部切換* @version: 1.0.0*/
public class PublishOnExample {public static void main(String[] args) throws InterruptedException {Flux.just("A", "B").map(data -> {System.out.println("📍 上游 Map 線程: " + Thread.currentThread().getName());return data + "-up";})// ? publishOn 切換下游線程.publishOn(Schedulers.boundedElastic()).map(data -> {System.out.println("📍 下游 Map 線程: " + Thread.currentThread().getName());return data + "-down";}).subscribe(data ->System.out.println("📩 訂閱線程: " + Thread.currentThread().getName() + ", 數據: " + data));Thread.sleep(1000);}
}
📍 上游 Map 線程: main
📍 上游 Map 線程: main
📍 下游 Map 線程: boundedElastic-1
📩 訂閱線程: boundedElastic-1, 數據: A-up-down
📍 下游 Map 線程: boundedElastic-1
📩 訂閱線程: boundedElastic-1, 數據: B-up-down
? 結論:publishOn
之后的所有操作(包括 subscribe
)都在 boundedElastic
線程執行。
publishOn() 影響范圍:
🔴 紅色部分(下游)在 elastic
線程執行,just
和 map1
在主線程。
7.4.3 多個publishOn切換
package cn.tcmeta.scheduler;import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;public class MultiPublishOnExample {public static void main(String[] args) throws InterruptedException {Flux.just("Hello").publishOn(Schedulers.parallel()) // 切到 parallel.map(s -> {System.out.println("ParallelGroup: " + Thread.currentThread().getName());return s + "-1";}).publishOn(Schedulers.boundedElastic()) // 再 切到 boundedElastic.map(s -> {System.out.println("ElasticGroup: " + Thread.currentThread().getName());return s + "-2";}).subscribe(data ->System.out.println("Final: " + Thread.currentThread().getName() + " => " + data));Thread.sleep(1000);}
}
? 每個 publishOn
都會切換其后操作的執行線程。
7.4.4 線程切換時序圖
7.5 核心調度器
7.5.1 BoundedElastic:IO 密集型任務首選
- 設計背景:替代已過時的
ElasticScheduler
(無界線程池,可能導致 OOM),通過有界緩沖隊列和動態線程數(空閑線程會回收)避免資源耗盡。 - 適用場景:數據庫查詢、HTTP 請求、文件 IO 等阻塞且耗時的操作(允許線程阻塞,通過動態擴縮容應對并發)
7.5.2 Parallel:CPU 密集型任務首選
- 線程特性:線程數固定為 CPU 核心數(
Runtime.getRuntime().availableProcessors()
),無空閑線程回收(保持計算能力)。 - 適用場景:數據計算、序列化 / 反序列化、復雜集合處理等非阻塞但耗 CPU的操作(充分利用多核性能)。
7.5.3 Single:串行任務專用
- 線程特性:全局唯一單線程(所有
Schedulers.single()
調用共享),任務按提交順序執行。 - 注意:若需多個獨立串行線程,使用
Schedulers.newSingle()
創建私有單線程調度器。
7.5.4 Schedulers.immediate()
- 特性:在當前線程直接執行,不開啟新線程。
- 適用場景:測試或不需要異步執行的場景。
7.5.5 Schedulers.elastic()
- 特性:彈性線程池,按需創建線程,空閑線程會在 60s 后回收。
- 適用場景:IO 密集型任務(如網絡調用、文件操作)。
- 注意:已被棄用,推薦使用
boundedElastic
。
7.5.6 Schedulers.fromExecutorService(ExecutorService)
- 特性:適配自定義的
ExecutorService
,靈活集成現有線程池。
7.5.7 Schedulers.new() 工廠方法
- 特性:創建獨立的新調度器實例(如
newSingle()
、newParallel()
),避免共享資源。
7.5.8 調度器使用最佳實踐
按任務類型選擇調度器
- IO 密集型(數據庫、網絡、文件)→
boundedElastic
(允許阻塞,動態擴縮容); - CPU 密集型(計算、排序、序列化)→
parallel
(固定線程數,避免線程切換開銷); - 串行任務(狀態依賴操作)→
single
或newSingle()
(保證順序執行); - 同步操作(無阻塞)→
immediate
(無需線程切換,減少開銷)。
避免線程阻塞濫用
- 禁止在
parallel
線程中執行阻塞操作(會浪費 CPU 核心,降低計算效率); - 阻塞操作必須放在
boundedElastic
線程(其線程設計允許阻塞);
// 錯誤:在parallel線程執行阻塞操作
Flux.range(1, 10).publishOn(Schedulers.parallel()).doOnNext(num -> {Thread.sleep(1000); // 阻塞CPU線程,浪費計算資源});// 正確:阻塞操作放在boundedElastic
Flux.range(1, 10).publishOn(Schedulers.boundedElastic()).doOnNext(num -> Thread.sleep(1000)); // 安全
控制boundedElastic
的資源上限
默認配置可能不適合高并發場景,可通過系統屬性調整:
// JVM啟動參數:調整boundedElastic的線程和隊列上限
-Dreactor.schedulers.boundedElastic.maxThreads=100
-Dreactor.schedulers.boundedElastic.queuesize=1024
減少不必要的線程切換
// 優化前:多次不必要的線程切換
flux.publishOn(A).map(...).publishOn(B).filter(...).publishOn(C)// 優化后:合并操作,減少切換
flux.map(...).filter(...).publishOn(C); // 一次切換即可
7.6 線程模型實戰: 典型場景
7.6.1 I/O密集型任務
如數據庫、HTTP 調用
// 假設這是調用外部 HTTP 服務
Mono<String> callExternalApi() {return Mono.fromCallable(() -> {// 模擬阻塞調用Thread.sleep(1000);return "API Result";}).subscribeOn(Schedulers.boundedElastic()); // ? 使用彈性線程池
}// 使用
callExternalApi().map(result -> processResult(result)) // 可在主線程或其他線程處理.subscribe(System.out::println);
? 原則:I/O 操作必須用 boundedElastic()
,防止阻塞 CPU線程。
7.6.2 場景 2:CPU 密集型任務
Flux.range(1, 1000).publishOn(Schedulers.parallel()) // ? 切到并行線程池.map(i -> heavyComputation(i)) // 耗時計算.subscribe(System.out::println);
? 原則:CPU 密集型用 parallel()
,避免創建過多線程。
7.6.3 混合任務(I/O + CPU)
externalServiceCall() // I/O: boundedElastic.publishOn(Schedulers.parallel()) // 切到 CPU 線程池.map(data -> compute(data)) // CPU 密集型計算.publishOn(Schedulers.boundedElastic()) // 再切回 I/O 線程.flatMap(result -> saveToDB(result)) // 再次 I/O 操作.subscribe();
? 原則:根據操作類型動態切換線程池。
7.7 綜合示例
package cn.tcmeta.scheduler;import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;import java.time.Duration;
import java.util.concurrent.TimeUnit;public class SchedulerExamples {public static void main(String[] args) {SchedulerExamples examples = new SchedulerExamples();examples.schedulerTypes();System.out.println("-------------------------------------");examples.publishOnVsSubscribeOn();System.out.println("-------------------------------------");examples.parallelProcessing();System.out.println("-------------------------------------");examples.timeoutWithScheduler();}public void schedulerTypes() {// 1. 立即調度 (當前線程)Flux.just("A", "B", "C").subscribeOn(Schedulers.immediate()).subscribe(System.out::println);// 2. 單一線程調度Flux.range(1, 3).subscribeOn(Schedulers.single()).subscribe(i -> System.out.println(Thread.currentThread().getName() + ": " + i));// 3. 彈性線程池 (適合IO密集型任務)Flux.range(1, 3).subscribeOn(Schedulers.boundedElastic()).subscribe(i -> System.out.println(Thread.currentThread().getName() + ": " + i));// 4. 并行調度 (適合CPU密集型任務)Flux.range(1, 3).subscribeOn(Schedulers.parallel()).subscribe(i -> System.out.println(Thread.currentThread().getName() + ": " + i));}public void publishOnVsSubscribeOn() {// subscribeOn - 影響整個鏈的訂閱上下文Mono.fromCallable(() -> {System.out.println("Callable on: " + Thread.currentThread().getName());return "Result";}).subscribeOn(Schedulers.boundedElastic()).subscribe(result ->System.out.println("Subscribe on: " + Thread.currentThread().getName()));// publishOn - 影響后續操作的執行上下文Flux.range(1, 3).map(i -> {System.out.println("Map1 on: " + Thread.currentThread().getName());return i * 2;}).publishOn(Schedulers.parallel()).map(i -> {System.out.println("Map2 on: " + Thread.currentThread().getName());return i + 1;}).subscribe();}public void parallelProcessing() {// 并行處理流Flux.range(1, 10).parallel(4) // 分成4個并行流.runOn(Schedulers.parallel()).map(i -> i * i).sequential() // 合并回順序流.subscribe(System.out::println);}public void timeoutWithScheduler() {// 使用調度器實現超時Mono.delay(Duration.ofSeconds(3)).timeout(Duration.ofSeconds(1), Schedulers.parallel()).subscribe(System.out::println,error -> System.out.println("Timeout: " + error));try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}
7.8 高級特性
7.8.1 調用器生命周期管理
// 創建獨立的調度器實例
Scheduler customScheduler = Schedulers.newBoundedElastic(10, 100, "custom");// 使用自定義調度器
Flux.just(1, 2, 3).subscribeOn(customScheduler).subscribe();// 使用完畢后關閉調度器(重要!避免資源泄漏)
customScheduler.dispose();
7.8.2 自定義線程命名
Scheduler namedScheduler = Schedulers.newParallel("my-thread", 4);
Flux.just("A", "B").subscribeOn(namedScheduler).subscribe(value -> {System.out.println("Running on: " + Thread.currentThread().getName());});// 輸出:Running on: my-thread-1
7.8.3 在操作符中使用調度器
// 使用 subscribeOn 在 flatMap 中為每個內部流指定調度器
Flux.just(1, 2, 3).flatMap(num -> Mono.just(num * 2).subscribeOn(Schedulers.parallel()) // 為每個元素創建獨立的執行上下文).subscribe();
7.9 最佳實踐與陷阱
? 最佳實踐
- I/O 操作 →
Schedulers.boundedElastic()
- CPU 計算 →
Schedulers.parallel()
- 避免在
map()
中阻塞 - 合理使用
publishOn
切換線程 subscribeOn
通常放在鏈的開頭或中間,效果相同
? 常見陷阱
// ? 錯誤:在 parallel 線程中執行阻塞 I/O
Flux.range(1, 10).publishOn(Schedulers.parallel()).map(i -> blockingIoCall(i)) // 阻塞調用!會耗盡 parallel 線程池.subscribe();// ? 正確:使用 boundedElastic
Flux.range(1, 10).flatMap(i -> Mono.fromCallable(() -> blockingIoCall(i)).subscribeOn(Schedulers.boundedElastic())).subscribe();
概念 | 關鍵點 |
---|---|
Scheduler | 線程執行的“容器”,選擇合適的類型至關重要 |
subscribeOn() | 影響上游,決定 Publisher 在哪個線程啟動 |
publishOn() | 影響下游,用于在鏈中切換執行線程 |
線程選擇 | I/O → boundedElastic ,CPU → parallel |
背壓與線程 | 背壓控制數據流,線程控制執行位置,二者協同工作 |
🚀 掌握調度,就掌握了 Reactor 的“方向盤”。合理使用 subscribeOn
和 publishOn
,結合正確的 Scheduler
,你就能構建出高效、穩定、可擴展的響應式系統。