文章目錄
- 一、Reactor 框架概述與理論基礎
- 1.1 響應式編程(Reactive Programming)是什么?
- 1.2 Reactive Streams 規范
- 1.3 響應式編程與 Reactor 的誕生
- 1.4 Reactor核心特性
- 1.5 Reactor與其它響應式框架比較
- 二、Reactor核心類型
- 2.1 Reactor 核心概念
- 2.2 核心類型
- 2.3 Mono【0個或者1個元素的流】
- 2.4 Flux【0到N個元素的流】
- 2.5 數據流生命周期
- 2.6 Reactor數據流模型
- 2.7 操作符鏈式調用
- 2.8 線程切換時序圖
- 三、基礎應用
- 3.1 基礎Mono使用
- 3.2 基礎Flux使用
- 3.3 異步與線程切換
- 3.4 背壓(Backpressure)演示
- 3.5 錯誤處理
一、Reactor 框架概述與理論基礎
官方文檔
:
Project Reactor官網
Getting Started :: Reactor Core Reference Guide
https://www.reactive-streams.org/
https://www.reactive-streams.org/
https://projectreactor.io/
https://projectreactor.io/docs/core/release/reference/gettingStarted.html
1.1 響應式編程(Reactive Programming)是什么?
響應式編程
是一種面向數據流和變化傳播的編程范式。它允許你聲明式地定義數據流的轉換、組合和處理邏輯,系統自動處理異步、背壓、錯誤傳播等復雜問題。
[!tip]
? 核心思想:數據流是第一公民,一切皆流(Everything is a Stream)。
1.2 Reactive Streams 規范
Reactor 實現了 Reactive Streams 規范,該規范定義了四個核心接口:
Publisher<T>
:發布者Subscriber<T>
:訂閱者Subscription
:訂閱關系(支持背壓)Processor<T,R>
:處理器
有興趣參照網址查看: reactive-streams.org
[!note]
🔗 Reactor 是 Project Reactor 的簡稱,由 Pivotal(現 VMware)開發,是 Spring WebFlux 的底層引擎。
1.3 響應式編程與 Reactor 的誕生
響應式編程(Reactive Programming) 是一種面向數據流和變化傳播的編程范式,其核心思想是:將程序視為數據流的處理管道,通過異步非阻塞的方式傳遞和處理數據,并通過背壓(Backpressure) 機制平衡生產者和消費者的速度差異。
在 Java 生態中,Reactor 框架是 Reactive Streams 規范的優秀實現,由 Pivotal 公司開發(與 Spring 同屬一個團隊),于 2013 年首次發布。它的誕生解決了以下核心問題:
- 傳統同步阻塞 IO 在高并發場景下的性能瓶頸
- 異步編程中的 “回調地獄” 問題
- 缺乏標準化的背壓機制導致的資源失控
- 與 Spring 生態(如 Spring WebFlux、Spring Cloud)的深度集成需求
Reactor 的核心理念是:“以聲明式的方式處理異步數據流,同時保持代碼的可讀性和可維護性”。
1.4 Reactor核心特性
特性 | 說明 |
---|---|
異步非阻塞 | 基于事件驅動模型,避免線程阻塞,提高系統吞吐量 |
背壓支持 | 消費者可主動告知生產者自己的處理能力,防止數據積壓 |
聲明式編程 | 通過操作符組合描述 “做什么”,而非 “怎么做” |
數據流組合 | 支持復雜的流組合(合并、連接、嵌套等) |
完善的錯誤處理 | 提供豐富的錯誤捕獲、恢復和傳遞機制 |
與 Java 生態融合 | 兼容 Java 8 + 的 Stream API,支持 CompletableFuture 轉換 |
輕量級 | 核心庫體積小,無強依賴 |
1.5 Reactor與其它響應式框架比較
flowchart LRA[響應式框架] --> B[Reactor]A --> C[RxJava]A --> D[Akka Streams]B --> B1[與Spring生態深度集成]B --> B2[嚴格遵循Reactive Streams]B --> B3[專為Java 8+優化]B --> B4[更簡潔的API設計]C --> C1[更早出現,生態成熟]C --> C2[支持多語言]C --> C3[操作符更豐富但復雜]D --> D1[基于Actor模型]D --> D2[分布式場景優勢]D --> D3[學習曲線陡峭]
Reactor 的獨特優勢在于:
- 與 Spring WebFlux、Spring Cloud Gateway 等現代 Spring 組件無縫集成
- 對 Java 新特性(如虛擬線程、密封類)的原生支持
- 更簡潔的 API 設計,降低響應式編程的學習門檻
二、Reactor核心類型
2.1 Reactor 核心概念
Reactor執行流程
2.2 核心類型
Reactor 提供了兩個核心發布者類型:
類型 | 特點 | 適用場景 |
---|---|---|
Mono<T> | 0 或 1 個元素的異步序列 | 單個結果(如 HTTP 請求、數據庫查詢) |
Flux<T> | 0 到 N 個元素的異步序列 | 多個結果(如列表、事件流) |
2.3 Mono【0個或者1個元素的流】
Mono
用于表示包含 0 或 1 個元素的異步結果,適合處理單次操作(如數據庫查詢、HTTP 請求)的結果。
// 創建Mono【相當于事件的發布者】
Mono<String> mono = Mono.just("Hello Reactor"); // 直接值
Mono<String> emptyMono = Mono.empty(); // 空流
Mono<String> fromCallable = Mono.fromCallable(() -> "動態計算值"); // 延遲計算// 訂閱Mono(觸發執行)
mono.subscribe(value -> System.out.println("接收值:" + value), // 成功回調error -> System.err.println("錯誤:" + error), // 錯誤回調() -> System.out.println("完成") // 完成回調
);
2.4 Flux【0到N個元素的流】
Flux
用于表示包含 0 到多個元素的異步數據流,支持完整的生命周期(正常結束、錯誤終止)。常見場景:集合數據處理、事件流、批量操作等。
// 創建Flux
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5); // 固定元素
Flux<Integer> rangeFlux = Flux.range(1, 5); // 范圍1-5
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)); // 每秒生成遞增數(需手動取消訂閱)// 訂閱Flux
flux.map(x -> x * 2) // 轉換操作符.filter(x -> x % 3 != 0) // 過濾操作符.subscribe(System.out::println, // 簡化寫法:僅處理成功事件Throwable::printStackTrace,() -> System.out.println("Flux完成"));
2.5 數據流生命周期
無論是Flux
還是Mono
,都遵循相同的生命周期:
- 正常事件:通過
onNext()
發送元素(Flux
可多次調用,Mono
最多調用一次) - 終止事件:
- 成功終止:
onComplete()
(無元素發送) - 錯誤終止:
onError(Throwable)
(攜帶異常信息)
- 成功終止:
2.6 Reactor數據流模型
2.7 操作符鏈式調用
2.8 線程切換時序圖
三、基礎應用
引入Maven依賴:
<dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2024.0.6</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency>
</dependencies>
3.1 基礎Mono使用
@Test
public void monoBasicTest() {// 1. 創建一個Mono對象(發射一個字符串)Mono<String> mono = Mono.just("Hello, Reactor!");// 2. 訂閱并消費mono.subscribe(value -> System.out.println("? 接收到: " + value),error -> System.err.println("? 錯誤: " + error),() -> System.out.println("🎉 完成"),subscription -> {System.out.println("🔗 訂閱建立");subscription.request(1); // 背壓:請求 1 個});
}
3.2 基礎Flux使用
@Test
public void fluxBasicTest() {// 創建一個Flux對象(發射多個字符串)Flux<String> flux = Flux.just("Hello", "Reactor", "Face", "Smail").map(String::toUpperCase).filter(s -> s.length() > 5).log();flux.subscribe(System.out::println,System.err::println,() -> System.out.println("流結束"));
}
🔍 log()
是調試利器,可查看所有信號(onNext, onError, onComplete)。
3.3 異步與線程切換
@Test
public void asyncTest(){Flux.just("張小三", "A", "B", "C").map(data -> {System.out.println("🔄 處理線程: " + Thread.currentThread().getName());return data + "-processed";}).subscribeOn(Schedulers.boundedElastic()) // 訂閱在彈性線程池.publishOn(Schedulers.parallel()) // 發布在并行線程池.subscribe(result -> {System.out.println("📩 接收線程: " + Thread.currentThread().getName() + ", 數據: " + result);});System.out.println("MAIN THREAD: " + Thread.currentThread().getName());try {TimeUnit.MILLISECONDS.sleep(3000);}catch (InterruptedException e){e.printStackTrace();}
}
?? subscribeOn()
影響上游執行線程,publishOn()
影響下游執行線程。
3.4 背壓(Backpressure)演示
/*** 背壓演示*/@Testpublic void backPressureTest() {Flux.range(1, 1000).onBackpressureDrop(item -> System.out.println("🗑? 丟棄: " + item)) // 緩沖區滿時丟棄.subscribe(new CoreSubscriber<Integer>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription s) {this.subscription = s;subscription.request(10); // 初始請求 10 個}@Overridepublic void onNext(Integer item) {System.out.println("? 接收: " + item);try {Thread.sleep(100);} catch (InterruptedException e) {}subscription.request(1); // 每處理一個再要一個}@Overridepublic void onError(Throwable t) {t.printStackTrace();}@Overridepublic void onComplete() {System.out.println("? 完成");}});try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}}
3.5 錯誤處理
/*** 錯誤處理*/
@Test
public void errorHandlerTest() {Flux.range(1, 5).map(i -> {if (i == 3) throw new RuntimeException("模擬錯誤");return "Item " + i;}).onErrorResume(e -> {System.err.println("?? 捕獲錯誤: " + e.getMessage());return Flux.just("Fallback 1", "Fallback 2"); // 錯誤后返回備用數據}).retryWhen(Retry.fixedDelay(2, Duration.ofSeconds(1))) // 重試 2 次.subscribe(System.out::println);
}