響應式編程框架Reactor【1】

文章目錄

  • 一、Reactor 框架概述與理論基礎
    • 1.1 響應式編程(Reactive Programming)是什么?
    • 1.2 Reactive Streams 規范
    • 1.3 響應式編程與 Reactor 的誕生
    • 1.4 Reactor核心特性
    • 1.5 Reactor與其它響應式框架比較
  • 二、Reactor核心類型
    • 2.1 Reactor 核心概念
    • 2.2 核心類型
    • 2.3 Mono【0個或者1個元素的流】
    • 2.4 Flux【0到N個元素的流】
    • 2.5 數據流生命周期
    • 2.6 Reactor數據流模型
    • 2.7 操作符鏈式調用
    • 2.8 線程切換時序圖
  • 三、基礎應用
    • 3.1 基礎Mono使用
    • 3.2 基礎Flux使用
    • 3.3 異步與線程切換
    • 3.4 背壓(Backpressure)演示
    • 3.5 錯誤處理

一、Reactor 框架概述與理論基礎

官方文檔:

Project Reactor官網

Getting Started :: Reactor Core Reference Guide

https://www.reactive-streams.org/

https://www.reactive-streams.org/
https://projectreactor.io/
https://projectreactor.io/docs/core/release/reference/gettingStarted.html

1.1 響應式編程(Reactive Programming)是什么?

響應式編程是一種面向數據流和變化傳播的編程范式。它允許你聲明式地定義數據流的轉換、組合和處理邏輯,系統自動處理異步、背壓、錯誤傳播等復雜問題。

[!tip]

? 核心思想:數據流是第一公民,一切皆流(Everything is a Stream)。

1.2 Reactive Streams 規范

Reactor 實現了 Reactive Streams 規范,該規范定義了四個核心接口:

  • Publisher<T>:發布者
  • Subscriber<T>:訂閱者
  • Subscription:訂閱關系(支持背壓)
  • Processor<T,R>:處理器

有興趣參照網址查看: reactive-streams.org

[!note]

🔗 Reactor 是 Project Reactor 的簡稱,由 Pivotal(現 VMware)開發,是 Spring WebFlux 的底層引擎。

1.3 響應式編程與 Reactor 的誕生

響應式編程(Reactive Programming) 是一種面向數據流和變化傳播的編程范式,其核心思想是:將程序視為數據流的處理管道,通過異步非阻塞的方式傳遞和處理數據,并通過背壓(Backpressure) 機制平衡生產者和消費者的速度差異。

在 Java 生態中,Reactor 框架是 Reactive Streams 規范的優秀實現,由 Pivotal 公司開發(與 Spring 同屬一個團隊),于 2013 年首次發布。它的誕生解決了以下核心問題:

  • 傳統同步阻塞 IO 在高并發場景下的性能瓶頸
  • 異步編程中的 “回調地獄” 問題
  • 缺乏標準化的背壓機制導致的資源失控
  • 與 Spring 生態(如 Spring WebFlux、Spring Cloud)的深度集成需求

Reactor 的核心理念是:“以聲明式的方式處理異步數據流,同時保持代碼的可讀性和可維護性”

1.4 Reactor核心特性

特性說明
異步非阻塞基于事件驅動模型,避免線程阻塞,提高系統吞吐量
背壓支持消費者可主動告知生產者自己的處理能力,防止數據積壓
聲明式編程通過操作符組合描述 “做什么”,而非 “怎么做”
數據流組合支持復雜的流組合(合并、連接、嵌套等)
完善的錯誤處理提供豐富的錯誤捕獲、恢復和傳遞機制
與 Java 生態融合兼容 Java 8 + 的 Stream API,支持 CompletableFuture 轉換
輕量級核心庫體積小,無強依賴

1.5 Reactor與其它響應式框架比較

flowchart LRA[響應式框架] --> B[Reactor]A --> C[RxJava]A --> D[Akka Streams]B --> B1[與Spring生態深度集成]B --> B2[嚴格遵循Reactive Streams]B --> B3[專為Java 8+優化]B --> B4[更簡潔的API設計]C --> C1[更早出現,生態成熟]C --> C2[支持多語言]C --> C3[操作符更豐富但復雜]D --> D1[基于Actor模型]D --> D2[分布式場景優勢]D --> D3[學習曲線陡峭]

Reactor 的獨特優勢在于:

  • 與 Spring WebFlux、Spring Cloud Gateway 等現代 Spring 組件無縫集成
  • 對 Java 新特性(如虛擬線程、密封類)的原生支持
  • 更簡潔的 API 設計,降低響應式編程的學習門檻

二、Reactor核心類型

2.1 Reactor 核心概念

Reactive Streams
Reactor
Publisher
Flux: 0..N elements
Mono: 0..1 elements
Operators
Transformation
Filtering
Combination
Error Handling

Reactor執行流程

SubscriberPublisher (Flux/Mono)OperatorsSchedulersubscribe()創建操作鏈安排執行(如果需要)在指定線程執行onSubscribe(Subscription)request(n)請求數據onNext(data)應用轉換/過濾onNext(processedData)request(m) (更多數據)onComplete() (數據完成)onComplete()錯誤處理路徑onError(throwable)onError(throwable)SubscriberPublisher (Flux/Mono)OperatorsScheduler

2.2 核心類型

Reactor 提供了兩個核心發布者類型:

類型特點適用場景
Mono<T>0 或 1 個元素的異步序列單個結果(如 HTTP 請求、數據庫查詢)
Flux<T>0 到 N 個元素的異步序列多個結果(如列表、事件流)

2.3 Mono【0個或者1個元素的流】

Mono用于表示包含 0 或 1 個元素的異步結果,適合處理單次操作(如數據庫查詢、HTTP 請求)的結果。

// 創建Mono【相當于事件的發布者】
Mono<String> mono = Mono.just("Hello Reactor"); // 直接值
Mono<String> emptyMono = Mono.empty(); // 空流
Mono<String> fromCallable = Mono.fromCallable(() -> "動態計算值"); // 延遲計算// 訂閱Mono(觸發執行)
mono.subscribe(value -> System.out.println("接收值:" + value), // 成功回調error -> System.err.println("錯誤:" + error), // 錯誤回調() -> System.out.println("完成") // 完成回調
);

2.4 Flux【0到N個元素的流】

Flux用于表示包含 0 到多個元素的異步數據流,支持完整的生命周期(正常結束、錯誤終止)。常見場景:集合數據處理、事件流、批量操作等。

// 創建Flux
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5); // 固定元素
Flux<Integer> rangeFlux = Flux.range(1, 5); // 范圍1-5
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)); // 每秒生成遞增數(需手動取消訂閱)// 訂閱Flux
flux.map(x -> x * 2) // 轉換操作符.filter(x -> x % 3 != 0) // 過濾操作符.subscribe(System.out::println, // 簡化寫法:僅處理成功事件Throwable::printStackTrace,() -> System.out.println("Flux完成"));

2.5 數據流生命周期

無論是Flux還是Mono,都遵循相同的生命周期:

  • 正常事件:通過onNext()發送元素(Flux可多次調用,Mono最多調用一次)
  • 終止事件:
    • 成功終止:onComplete()(無元素發送)
    • 錯誤終止:onError(Throwable)(攜帶異常信息)
訂閱(subscribe)
onNext(元素)
onComplete()
onError(異常)
初始化
運行中
完成
錯誤

2.6 Reactor數據流模型

subscribe
request(n)
onNext
onError
onComplete
Publisher
Subscriber

2.7 操作符鏈式調用

Flux.just(1,2,3)
.map(x*2)
.filter(>5)
.log()
.subscribe()

2.8 線程切換時序圖

MainboundedElasticparallelsubscribeOn()map() 執行publishOn()subscribe() 回調MainboundedElasticparallel

三、基礎應用

引入Maven依賴:

<dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2024.0.6</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency>
</dependencies>

3.1 基礎Mono使用

@Test
public void monoBasicTest() {// 1. 創建一個Mono對象(發射一個字符串)Mono<String> mono = Mono.just("Hello, Reactor!");// 2. 訂閱并消費mono.subscribe(value -> System.out.println("? 接收到: " + value),error -> System.err.println("? 錯誤: " + error),() -> System.out.println("🎉 完成"),subscription -> {System.out.println("🔗 訂閱建立");subscription.request(1); // 背壓:請求 1 個});
}

在這里插入圖片描述

3.2 基礎Flux使用

@Test
public void fluxBasicTest() {// 創建一個Flux對象(發射多個字符串)Flux<String> flux = Flux.just("Hello", "Reactor", "Face", "Smail").map(String::toUpperCase).filter(s -> s.length() > 5).log();flux.subscribe(System.out::println,System.err::println,() -> System.out.println("流結束"));
}

在這里插入圖片描述

🔍 log() 是調試利器,可查看所有信號(onNext, onError, onComplete)。

3.3 異步與線程切換

@Test
public void asyncTest(){Flux.just("張小三", "A", "B", "C").map(data -> {System.out.println("🔄 處理線程: " + Thread.currentThread().getName());return data + "-processed";}).subscribeOn(Schedulers.boundedElastic()) // 訂閱在彈性線程池.publishOn(Schedulers.parallel()) // 發布在并行線程池.subscribe(result -> {System.out.println("📩 接收線程: " + Thread.currentThread().getName() + ", 數據: " + result);});System.out.println("MAIN THREAD: " + Thread.currentThread().getName());try {TimeUnit.MILLISECONDS.sleep(3000);}catch (InterruptedException e){e.printStackTrace();}
}

在這里插入圖片描述

?? subscribeOn() 影響上游執行線程,publishOn() 影響下游執行線程。

3.4 背壓(Backpressure)演示

/*** 背壓演示*/@Testpublic void backPressureTest() {Flux.range(1, 1000).onBackpressureDrop(item -> System.out.println("🗑? 丟棄: " + item)) // 緩沖區滿時丟棄.subscribe(new CoreSubscriber<Integer>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription s) {this.subscription = s;subscription.request(10);  // 初始請求 10 個}@Overridepublic void onNext(Integer item) {System.out.println("? 接收: " + item);try {Thread.sleep(100);} catch (InterruptedException e) {}subscription.request(1); // 每處理一個再要一個}@Overridepublic void onError(Throwable t) {t.printStackTrace();}@Overridepublic void onComplete() {System.out.println("? 完成");}});try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}}

在這里插入圖片描述

3.5 錯誤處理

/*** 錯誤處理*/
@Test
public void errorHandlerTest() {Flux.range(1, 5).map(i -> {if (i == 3) throw new RuntimeException("模擬錯誤");return "Item " + i;}).onErrorResume(e -> {System.err.println("?? 捕獲錯誤: " + e.getMessage());return Flux.just("Fallback 1", "Fallback 2"); // 錯誤后返回備用數據}).retryWhen(Retry.fixedDelay(2, Duration.ofSeconds(1))) // 重試 2 次.subscribe(System.out::println);
}

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/920421.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/920421.shtml
英文地址,請注明出處:http://en.pswp.cn/news/920421.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【LeetCode】29. 兩數相除(Divide Two Integers)

文章目錄29. 兩數相除&#xff08;Divide Two Integers&#xff09;1. 題目重述與約束解析2. 算法選擇與總體設計3. 核心難點與關鍵技巧4. 解法一&#xff1a;快倍增&#xff08;重復加倍減法&#xff09;4.1 思路4.2 流程圖4.3 正確性要點5. 解法二&#xff1a;位移長除法&…

智能物聯網(AIoT)核心技術落地路徑與企業數字化轉型適配方案

一、行業現狀&#xff1a;AIoT 落地潛力與企業轉型痛點并存根據中國信通院《2023 年中國物聯網發展白皮書》數據&#xff0c;截至 2023 年&#xff0c;我國物聯網設備連接數已突破 300 億&#xff0c;龐大的設備基數為企業數字化轉型奠定了技術基礎。但與之形成鮮明對比的是&am…

前端文件下載的三種方式:URL、二進制與 Base64 的深度解析

前言在 Web 應用開發中&#xff0c;文件下載是一個常見的功能需求。從簡單的圖片保存到復雜的報表導出&#xff0c;前端開發者需要根據后端返回的數據格式選擇合適的處理方式。本文探討三種主流的文件下載方式 —— 基于 URL、二進制數據和 Base64 編碼的實現原理、區別對比及通…

B站 XMCVE Pwn入門課程學習筆記(8)

這個視頻講的比較難&#xff0c;我花了比較長時間來分析&#xff0c;甚至一個點反復很多次&#xff0c;這也是在學PWN的過程中不可避免的&#xff0c;需要堅持和毅力pwn3:沒有system&#xff0c;通過ROP調用write的plt入口&#xff0c;執行write函數&#xff0c;并且將gots里的…

AMGCL介紹和使用

文章目錄一、AMGCL 簡介1.1 什么是 AMG&#xff1f;1.2 AMGCL 特點二、安裝與配置2.1 獲取源碼2.2 編譯依賴&#xff08;可選&#xff09;三、基本使用示例3.1 構造稀疏矩陣&#xff08;以 1D Poisson 為例&#xff09;四、核心組件介紹4.1 后端&#xff08;Backend&#xff09…

AI解決生活小事系列——用AI給我的電腦做一次“深度體檢”

哈嘍&#xff0c;大家好&#xff0c;這里是Ai極客團長&#xff0c;我打算做一個用AI解決生活實際問題的系列專欄。 決定做這個系列的初衷很簡單&#xff1a;現在打開手機、電腦&#xff0c;到處都是 "AI 改變世界" 的宏大敘事&#xff0c;但對普通人來說&#xff0c…

JavaWeb 30 天入門:第二十一天 ——AJAX 異步交互技術

在前二十天的學習中&#xff0c;我們掌握了 JavaWeb 開發的核心技術&#xff0c;包括 Servlet、JSP、會話管理、過濾器、監聽器、文件操作、數據庫交互、連接池、分頁與排序等。今天我們將學習一項徹底改變 Web 應用交互方式的技術 ——AJAX&#xff08;Asynchronous JavaScrip…

從枯燥C++到趣味音樂:我的Windows系統底層探索之旅

一段穿越計算機抽象層次的旅程&#xff0c;從高級語言到底層硬件&#xff0c;探索代碼如何創造美妙旋律第一章&#xff1a;初學C的枯燥與靈感閃現 當我第一次打開《C Primer Plus》這本厚重的教程時&#xff0c;面對那些晦澀的語法規則和抽象概念&#xff0c;確實感到有些枯燥乏…

taro+vue3+vite項目 tailwind 踩坑記,附修復后的模板源碼地址

tailwind 踩坑記 這&#xff0c;是taro官網地址&#xff1a;taro引入tailwind的教程 我完全按照上面的步驟來&#xff0c;結果根本無效&#xff08;文檔太過時了&#xff09; 我后來又按照 weapp-tailwindcss 的官方文檔做了一番修正&#xff1a; weapp-tailwindcss Taro (所…

LCEDA電氣規則

MARK點普通問題 鋪銅太靠近MARK點放置一個禁止區域&#xff0c;圓形編輯封裝

無人機Remote ID:天空中的數字車牌與未來空域管理

一架沒有牌照的汽車上路會被交管部門處罰,那么一架沒有“數字車牌”的無人機升空呢?隨著無人機Remote ID技術的推廣,未來天空中的每架無人機都將擁有自己的身份標識。 近年來,無人機呈爆炸式增長,從航拍攝影到物流配送,從農業植保到應急救援,應用場景不斷拓展。但隨著無…

自下而上的樹形dp

最大獨立集 1.藍橋舞會 link:1.藍橋舞會 - 藍橋云課 分析&#xff1a; code #include <bits/stdc.h> using namespace std; using ll long long; const ll MAXN 1e5 7; ll hpy[MAXN], fa[MAXN], dp[MAXN][2]; vector<ll> sons[MAXN];void dfs(ll u, ll fa) {…

Docker 詳解+示例

介 紹Docker 是一個開源的容器化平臺&#xff0c;它的核心目標是解決 “軟件在不同環境下運行不一致” 的問題&#xff0c;實現 “一次構建&#xff0c;到處運行” 。它基于 Linux 內核的底層技術&#xff0c;將應用程序及其依賴&#xff08;如庫文件、配置、運行環境等&#x…

洛谷 P2568 GCD-提高+/省選?

題目描述 給定正整數 nnn&#xff0c;求 1≤x,y≤n1\le x,y\le n1≤x,y≤n 且 gcd?(x,y)\gcd(x,y)gcd(x,y) 為素數的數對 (x,y)(x,y)(x,y) 有多少對。 輸入格式 只有一行一個整數&#xff0c;代表 nnn。 輸出格式 一行一個整數表示答案。 輸入輸出樣例 #1 輸入 #1 4輸…

軟件測試覆蓋率與質量保障專業經驗分享報告

測試覆蓋率的核心維度與評估標準 多維度定義與核心內涵 測試覆蓋率是衡量軟件測試完整性的關鍵指標體系,分為測試覆蓋率(黑盒視角:需求驗證程度)和代碼覆蓋率(白盒視角:代碼執行占比)兩大基礎類型。現代測試覆蓋體系已擴展至產品覆蓋、風險覆蓋、平臺/設備覆蓋、數據覆…

使用CCProxy搭建http/https代理服務器

下載 https://user.youngzsoft.com/ccproxy/update/ccproxysetup.exe 我們使用免費的即可&#xff0c;3個人。 啟動軟件 設置 更改局域網IP 我的電腦有多個IP&#xff0c;所以要手工指定。

ICCV 2025|TRACE:無需標注,用3D高斯直接學習物理參數,從視頻“預知”未來!

論文鏈接&#xff1a;https://arxiv.org/pdf/2507.01484導讀 準確預測道路智能體的運動對于自動駕駛的安全性至關重要。當前&#xff0c;現有的數據驅動方法直接預測未來軌跡&#xff0c;缺乏對駕駛行為的充分考慮&#xff0c;限制了可解釋性和可靠性。為此&#xff0c;本文引入…

TypeScript:symbol類型

symbol是TypeScript和JavaScript中的一種基本數據類型&#xff0c;表示唯一的、不可變的標識符。作為專業的前端工程師&#xff0c;理解symbol的特性對于構建安全可靠的代碼至關重要。1. symbol的核心特性唯一性&#xff1a;每個symbol值都是唯一的&#xff0c;即使創建時使用相…

【深度學習新浪潮】顯著性檢測最新研究進展(2022-2025)

1. 弱監督與主動學習 ASTE-AL框架(TPAMI 2024):提出對抗性時空集成主動學習方法,通過點標記數據集(每張圖像僅需10個標注點)達到全監督模型98%-99%的性能。其核心模塊包括: FPGD-PA對抗攻擊:通過無額外計算成本的自由梯度下降攻擊定位不確定像素。 時空集成策略:減少模…

Intern-S1-mini模型結構

模型介紹 Intern-S1-mini基于一個8B密集語言模型&#xff08;Qwen3&#xff09;和一個0.3B視覺編碼器&#xff08;InternViT&#xff09;&#xff0c;Intern-S1-mini 在5萬億個標記的多模態數據上進行了進一步預訓練&#xff0c;其中包括超過2.5萬億個科學領域的標記。這使得該…