【響應式編程】Reactor 常用操作符與使用指南

文章目錄

    • 一、創建操作符
      • 1. `just` —— 創建包含指定元素的流
      • 2. `fromIterable` —— 從集合創建 Flux
      • 3. `empty` —— 創建空的 Flux 或 Mono
      • 4. `fromArray` —— 從數組創建 Flux
      • 5. `fromStream` —— 從 Java 8 Stream 創建 Flux
      • 6. `create` —— 使用 FluxSink 手動發射元素
      • 7. `generate` —— 使用狀態生成元素,適用于同步場景
      • 8. `fromFuture` —— 從 CompletableFuture 創建 Mono
      • 9. `interval` —— 創建周期性發射元素的 Flux
      • 10. `timer` —— 創建延遲發射的 Mono
    • 二、轉換操作符
      • 1. `map` —— 映射每個元素為新值
      • 2. `flatMap` —— 扁平化異步流,將每個元素映射為異步 Publisher
      • 3. `concatMap` —— 順序執行映射為 Publisher 的異步流
    • 三、過濾操作符
      • 1. `filter` —— 按條件過濾元素
      • 2. `take` —— 獲取前 N 個元素
      • 3. `skip` —— 跳過前 N 個元素
    • 四、組合操作符
      • 1. `concat` —— 按順序合并多個 Flux
      • 2. `merge` —— 并發合并多個 Flux(無序)
      • 3. `zip` —— 按索引組合多個 Flux 的元素
    • 五、錯誤處理操作符
      • 1. `onErrorReturn` —— 出錯時返回默認值
      • 2. `onErrorResume` —— 出錯時切換備用流
      • 3. `retry` —— 出錯時重試指定次數
    • 六、延遲執行與懶加載:`Mono.defer` 和 `Flux.defer`:被訂閱時才執行
      • `Mono.defer` —— 懶加載 Mono,直到subscribe時才創建執行
      • `Flux.defer` —— 懶加載 Flux,每次訂閱時重新執行邏輯

Reactor 是一個用于構建反應式應用程序的 Java 庫,提供了豐富的操作符(算子)來處理反應式流(FluxMono)。本文詳細介紹了 Reactor 中常用的創建、轉換、過濾、組合和錯誤處理操作符,以及一些高級用法示例。


一、創建操作符

1. just —— 創建包含指定元素的流

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
Mono<String> mono = Mono.just("Hello");

2. fromIterable —— 從集合創建 Flux

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
Flux<Integer> flux = Flux.fromIterable(list);

3. empty —— 創建空的 Flux 或 Mono

Flux<Integer> emptyFlux = Flux.empty();
Mono<String> emptyMono = Mono.empty();

4. fromArray —— 從數組創建 Flux

Integer[] numbers = {1, 2, 3, 4, 5};
Flux<Integer> flux = Flux.fromArray(numbers);

5. fromStream —— 從 Java 8 Stream 創建 Flux

Stream<Integer> stream = Arrays.asList(1, 2, 3, 4, 5).stream();
Flux<Integer> flux = Flux.fromStream(stream);

6. create —— 使用 FluxSink 手動發射元素

Flux<Integer> flux = Flux.create(sink -> {for (int i = 0; i < 5; i++) {sink.next(i);}sink.complete();
});

7. generate —— 使用狀態生成元素,適用于同步場景

Flux<Integer> flux = Flux.generate(() -> 0, (state, sink) -> {sink.next(state);if (state == 4) sink.complete();return state + 1;
});

8. fromFuture —— 從 CompletableFuture 創建 Mono

CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");
Mono<String> mono = Mono.fromFuture(future);

9. interval —— 創建周期性發射元素的 Flux

Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1));

10. timer —— 創建延遲發射的 Mono

Mono<Long> timerMono = Mono.timer(Duration.ofSeconds(2));

?

二、轉換操作符

1. map —— 映射每個元素為新值

Flux<Integer> squared = Flux.just(1, 2, 3).map(n -> n * n);

2. flatMap —— 扁平化異步流,將每個元素映射為異步 Publisher

Flux<Integer> result = Flux.just(1, 2, 3).flatMap(n -> Mono.just(n * 2));

3. concatMap —— 順序執行映射為 Publisher 的異步流

Flux<Integer> result = Flux.just(1, 2, 3).concatMap(n -> Mono.just(n * 2));

?

三、過濾操作符

1. filter —— 按條件過濾元素

Flux<Integer> evens = Flux.just(1, 2, 3, 4).filter(n -> n % 2 == 0);

2. take —— 獲取前 N 個元素

Flux<Integer> firstThree = Flux.just(1, 2, 3, 4, 5).take(3);

3. skip —— 跳過前 N 個元素

Flux<Integer> skipped = Flux.just(1, 2, 3, 4, 5).skip(2);

?

四、組合操作符

1. concat —— 按順序合并多個 Flux

Flux<Integer> combined = Flux.concat(Flux.just(1, 2), Flux.just(3, 4));

2. merge —— 并發合并多個 Flux(無序)

Flux<Integer> merged = Flux.merge(Flux.just(1, 2), Flux.just(3, 4));

3. zip —— 按索引組合多個 Flux 的元素

Flux<String> zipped = Flux.zip(Flux.just(1, 2), Flux.just(3, 4), (a, b) -> a + ":" + b);

?

五、錯誤處理操作符

1. onErrorReturn —— 出錯時返回默認值

Flux<Integer> result = Flux.just(1, 2, 3).map(n -> {if (n == 2) throw new RuntimeException("error");return n;}).onErrorReturn(-1);

2. onErrorResume —— 出錯時切換備用流

Flux<Integer> result = Flux.just(1, 2, 3).map(n -> {if (n == 2) throw new RuntimeException("error");return n;}).onErrorResume(e -> Mono.just(-1));

3. retry —— 出錯時重試指定次數

Flux<Integer> result = Flux.just(1, 2, 3).map(n -> {if (n == 2) throw new RuntimeException("error");return n;}).retry(2);

?

六、延遲執行與懶加載:Mono.deferFlux.defer:被訂閱時才執行

Mono.defer —— 懶加載 Mono,直到subscribe時才創建執行

Mono<String> deferredMono = Mono.defer(() -> {System.out.println("Generating value...");return Mono.just("Deferred Result");
});

只有當 subscribe() 被調用時,Mono.defer 中的邏輯才會真正執行。這對于需要確保執行時機晚于前一步完成場景特別重要,比如:

Mono.defer(() -> readQaResultType()).subscribe(result -> System.out.println("QA Result: " + result));

在這段代碼中,讀取 qaResultType 的操作只會在前面的步驟(例如數據預處理)完全完成后才被觸發

Flux.defer —— 懶加載 Flux,每次訂閱時重新執行邏輯

Flux<Integer> deferredFlux = Flux.defer(() -> {System.out.println("Evaluating source...");return Flux.just(1, 2, 3);
});

每次訂閱都會重新生成數據,適用于帶有狀態的源或依賴最新上下文的處理邏輯。


本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/76612.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/76612.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/76612.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

從靜態綁定驅動模型到現代設備模型 —— 一次驅動架構的進化之旅

&#x1f50d; B站相應的視屏教程&#xff1a; &#x1f4cc; 內核&#xff1a;博文視頻 - 從靜態綁定驅動模型到現代設備模型 在 Linux 內核的發展歷程中&#xff0c;設備驅動結構經歷了從"硬編碼 手動注冊"的早期實現方式&#xff0c;到"設備模型統一管理&qu…

Embedding質量評估、空間塌縮、 Alignment Uniformity

Embedding質量的評估和空間塌縮的解決是自然語言處理&#xff08;NLP&#xff09;和推薦系統領域的關鍵問題。以下是綜合多篇研究的總結&#xff1a; 一、Embedding質量評估方法 基準測試與任務指標 MTEB/C-MTEB&#xff1a;使用多語言或中文的基準測試集&#xff08;如58個數據…

批量給dwg顯示略縮圖_c#插件實現(com)

如果&#xff0c;cad文件無略縮圖&#xff1a; AutoCAD2021版本以上&#xff0c;命令行輸入"netload "加載此dll插件&#xff0c;然后輸入 “lst”&#xff0c;選擇文件夾&#xff0c;即可一鍵實現給dwg增加略縮圖。 效果如下&#xff1a; 附部分代碼&#xff1a; …

嬰幼兒托育服務與管理實訓室:托育未來的基石

在社會對嬰幼兒托育服務的重視程度不斷加深的當下&#xff0c;專業托育人才的需求急劇增長。嬰幼兒托育服務與管理專業作為培育這類人才的關鍵途徑&#xff0c;要求學生熟練掌握嬰幼兒身心發展、飲食營養以及衛生保健等基礎知識&#xff0c;同時具備全面的照護與管理能力。要實…

(自用)若依生成左樹右表

第一步&#xff1a; 在數據庫創建樹表和單表&#xff1a; SQL命令&#xff1a; 商品表 CREATE TABLE products (product_id INT AUTO_INCREMENT PRIMARY KEY,product_name VARCHAR(255) , price DECIMAL(10, 2) , stock INT NOT NULL, category_id INT NOT NULL); 商品分類…

Linux:DNS服務配置(課堂實驗總結)

遇到的問題&#xff0c;都有解決方案&#xff0c;希望我的博客能為你提供一點幫助。 操作系統&#xff1a;rocky Linux 9.5 ??一、配置DNS服務器的核心步驟?? 步驟 1&#xff1a;安裝 BIND 軟件?? ??檢查是否安裝??&#xff1a; rpm -qa | grep "^bind"…

搭建一個Spring Boot聚合項目

1. 創建父項目 打開IntelliJ IDEA&#xff0c;選擇 New Project。 在創建向導中選擇 Maven&#xff0c;確保選中 Create from archetype&#xff0c;選擇 org.apache.maven.archetypes:maven-archetype-quickstart。 填寫項目信息&#xff1a; GroupId&#xff1a;com.exampl…

若依前后端分離版運行教程、打包教程、部署教程

后端打包教程 注意&#xff1a;需要先運行redis 2、前端運行教程 2.1安裝依賴 2.2運行 打開瀏覽器查看,地址&#xff1a;http://localhost:80 3、前端打包教程 3.1打包 3.2運行打包好的文件&#xff0c;先找到打包好的文件 這是nginx的文件結構 將打包好的文件放到html目錄下…

SpringAi 會話記憶功能

在使用chatGPT&#xff0c;豆包等產品后&#xff0c;就會發現他們的會話有“記憶”功能。 那么我們用API接口的話&#xff0c;這個是怎么實現的呢&#xff1f; 屬于比較粗暴的方式&#xff0c;把之前的內容與新的提示詞一起再次發給大模型。讓我們看到他們有記憶功能。 下面介紹…

基于Python的經濟循環模型構建與可視化案例

一、代碼結構概覽 該代碼構建了一個包含經濟數據生成、可視化分析和政策模擬的交互式經濟系統仿真平臺&#xff0c;主要包括三大模塊&#xff1a; 多部門經濟數據生成&#xff1a;模擬包含產業關聯的復雜經濟數據 增強型可視化&#xff1a;提供多維度的經濟數據分析視圖 Das…

第十六屆藍橋杯大賽軟件賽省賽 Python 大學 B 組 部分題解

題面鏈接Htlang/2025lqb_python_b 個人覺得今年這套題整體比往年要簡單許多&#xff0c;但是G題想簡單了出大問題&#xff0c;預估50101015120860&#xff0c;道阻且長&#xff0c;再接再厲 A: 攻擊次數 答案&#xff1a;103&#xff1f;181&#xff1f;題目沒說明白每回合是…

C++基礎精講-05

文章目錄 1.構造函數初始化列表1.1 初始化列表的使用1.2 有參構造函數的默認值 2.對象所占空間大小2.1 大小的計算2.2 內存對齊機制 3. 析構函數3.1 基本概念3.2 總結 4.valgrind工具集4.1 介紹4.2 memcheck的使用 5. 拷貝構造函數5.1 拷貝構造函數定義5.2 淺拷貝/深拷貝5.3 拷…

文章記單詞 | 第28篇(六級)

一&#xff0c;單詞釋義 shirt /???t/ n. 襯衫&#xff1b;襯衣commonly /?k?m?nli/ adv. 通常地&#xff1b;一般地&#xff1b;普遍地pick /p?k/ v. 挑選&#xff1b;采摘&#xff1b;撿起&#xff1b;選擇&#xff1b;n. 選擇&#xff1b;鶴嘴鋤&#xff1b;精華com…

安裝低版本Pytorch GPU

網上很多教程都是自動安裝&#xff0c;不指定版本&#xff0c;其實有大問題。而且torch、torchvision、torchaudio的版本必須是對應&#xff0c;所以一旦版本不對&#xff0c;就可能會出現各種問題。 其實Pytorch官網就已經給出了安裝低版本的教程 登入Pytorch官網 點擊previo…

2025認證杯挑戰賽B題【 謠言在社交網絡上的傳播 】原創論文講解(含完整python代碼)

大家好呀&#xff0c;從發布賽題一直到現在&#xff0c;總算完成了認證杯數學中國數學建模網絡挑戰賽第一階段B題目謠言在社交網絡上的傳播完整的成品論文。 本論文可以保證原創&#xff0c;保證高質量。絕不是隨便引用一大堆模型和代碼復制粘貼進來完全沒有應用糊弄人的垃圾半…

并發編程--互斥鎖與讀寫鎖

并發編程–互斥鎖與讀寫鎖 文章目錄 并發編程--互斥鎖與讀寫鎖1. 基本概念2. 互斥鎖2.1 基本邏輯2.2 函數接口2.3示例代碼12.4示例代碼2 3. 讀寫鎖3.1 基本邏輯3.2示例代碼 1. 基本概念 互斥與同步是最基本的邏輯概念&#xff1a; 互斥指的是控制兩個進度使之互相排斥&#x…

親手打造可視化故事線管理工具:開發全流程、難點突破與開發過程經驗總結

親手打造可視化故事線管理工具&#xff1a;開發全流程、難點突破與開發過程經驗總結 作為還沒入門的業余編程愛好者&#xff0c;奮戰了2天&#xff0c;借助AI開發一款FLASK小工具&#xff0c;功能還在完善中&#xff08;時間軸可以跟隨關聯圖縮放&#xff0c;加了一個用C鍵控制…

網絡攻防技術-虛擬機安裝和nmap端口掃描

文章是博主上實驗課做的實驗和心得體會&#xff0c;有些高深的地方我可能也比較一知半解&#xff0c;歡迎來交流。全文參考課程所習得&#xff0c;純粹梳理知識點和分享&#xff0c;如有不妥請聯系修改。 文章側重實驗部分&#xff0c;也會講述實驗相關的理論知識。理論后期如果…

中斷的硬件框架

今天呢&#xff0c;我們來講講中斷的硬件框架&#xff0c;這里會去舉3個開發板&#xff0c;去了解中斷的硬件框架&#xff1a; 中斷路徑上的3個部件&#xff1a; 中斷源 中斷源多種多樣&#xff0c;比如GPIO、定時器、UART、DMA等等。 它們都有自己的寄存器&#xff0c;可以…

動手學深度學習:手語視頻在VGG模型中的測試

前言 其他所有部分同上一篇AlexNet一樣&#xff0c;所以就不再贅訴&#xff0c;直接看VGG搭建部分。 模型 VGG是第一個采取塊進行模塊化搭建的模型。 def vgg_block(num_convs,in_channels,out_channels):layers[]for _ in range(num_convs):layers.append(nn.Conv2d(in_ch…