spring響應式編程系列:異步生產數據

目錄

示例

大致流程

create

new MonoCreate

subscribe

new LambdaMonoSubscriber

monoCreate.subscribe

accept

success

onNext

時序圖

類圖

數據發布者

MonoCreate

數據訂閱者

LambdaMonoSubscriber

訂閱的消息體

DefaultMonoSink

? ? ? ? 本篇文章我們來研究如何將現有異步 API(如回調式接口)適配到 Reactor 的響應式流中。

? ? ? ? 默認情況下,Mono.create的代碼塊執行在訂閱時的線程上,但如果在該代碼塊中啟動其他線程或使用異步API,那么數據生產就會變成異步的。示例如下所示:

示例

Mono<String> mono = Mono.create(sink -> {
????// 模擬一個異步API操作
????new Thread(() -> {
????????try {
????????????Thread.sleep(1000); // 模擬耗時操作
????????????log.info("success");
????????????sink.success("Hello, World!"); // 成功時發射數據
????????} catch (InterruptedException e) {
????????????sink.error(e); // 發生錯誤時發射錯誤信號
????????}
????}).start();
});
log.info("main start");
mono.subscribe(x -> log.info("main finish"));
Thread.sleep(5000);

? ? ? ? 在這里,通過Mono.create模擬一個異步API操作,API操作成功后,調用sink.success("Hello, World!")進行數據發布者發送數據,從而觸發數據的訂閱。

? ? ? ? 接下來,讓我們一起看看程序的流程是怎么處理的。

? ? ? ? 點擊create()方法,如下所示:

大致流程

create

public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
????return onAssembly(new MonoCreate<>(callback));
}

? ? ? ? 在這里,new一個MonoCreate對象并返回。

? ? ? ? 點擊MonoCreate,如下所示:

new MonoCreate

final class MonoCreate<T> extends Mono<T> implements SourceProducer<T> {
???static final Disposable TERMINATED = OperatorDisposables.DISPOSED;
???static final Disposable CANCELLED = Disposables.disposed();
???final Consumer<MonoSink<T>> callback;
???MonoCreate(Consumer<MonoSink<T>> callback) {
??????this.callback = callback;
???}

? ? ? ? 在這里,將create()方法的回調接口參數賦值給callback屬性。因此,Mono.create的參數就作為數據發布者的一個屬性信息了。

? ? ? ? 點擊示例里的mono.subscribe(),如下所示:

subscribe

public final Disposable subscribe(
??????@Nullable Consumer<? super T> consumer,
??????@Nullable Consumer<? super Throwable> errorConsumer,
??????@Nullable Runnable completeConsumer,
??????@Nullable Context initialContext) {
???return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
?????????completeConsumer, null, initialContext));
}

? ? ? ? 在這里,new一個LambdaMonoSubscriber對象,如下所示:

new LambdaMonoSubscriber

LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer,
??????@Nullable Consumer<? super Throwable> errorConsumer,
??????@Nullable Runnable completeConsumer,
??????@Nullable Consumer<? super Subscription> subscriptionConsumer,
??????@Nullable Context initialContext) {
???this.consumer = consumer;
???this.errorConsumer = errorConsumer;
???this.completeConsumer = completeConsumer;
???this.subscriptionConsumer = subscriptionConsumer;
???this.initialContext = initialContext == null ? Context.empty() : initialContext;
}

? ? ? ? 在這里,將subscribe的回調接口參數賦值給consumer 屬性,因此,mono.subscribe的參數就作為數據消費者的屬性了。

? ? ? ? 點擊上一步的subscribeWith()方法,如下所示:

monoCreate.subscribe

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
???DefaultMonoSink<T> emitter = new DefaultMonoSink<>(actual);
???actual.onSubscribe(emitter);
???try {
??????callback.accept(emitter);
???}
???catch (Throwable ex) {
??????emitter.error(Operators.onOperatorError(ex, actual.currentContext()));
???}
}

? ? ? ? 在這里,首先調用了數據消費者的onSubscribe()方法,這個與《spring響應式編程系列:總體流程》一樣。

? ? ? ? 另外,調用了callback.accept()方法,也就是Mono.create()的回調接口參數。

accept

Mono<String> mono = Mono.create(sink -> {
????// 模擬一個異步操作
????new Thread(() -> {
????????try {
????????????Thread.sleep(1000); // 模擬耗時操作
????????????log.info("success");
????????????sink.success("Hello, World!"); // 成功時發射數據
????????} catch (InterruptedException e) {
????????????sink.error(e); // 發生錯誤時發射錯誤信號
????????}
????}).start();
});

? ? ? ? 在這里,模擬了耗時操作,然后調用sink.success()方法。

? ? ? 通常,可以將sink對象保存在線程共享環境里,等其它的業務操作執行完成后,再調用sink.success()方法,即可發射數據發布者數據,從而觸發消費者訂閱。

? ? ? ? 點擊sink.success(),如下所示:

???????success

public void success(@Nullable T value) {

... ...
?????for (; ; ) {
??????int s = state;
??????if (s == HAS_REQUEST_HAS_VALUE || s == NO_REQUEST_HAS_VALUE) {
?????????Operators.onNextDropped(value, actual.currentContext());
?????????return;
??????}
??????if (s == HAS_REQUEST_NO_VALUE) {
?????????if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) {
????????????try {
???????????????actual.onNext(value);
???????????????actual.onComplete();
????????????}
????????????catch (Throwable t) {
???????????????actual.onError(t);
????????????}
????????????finally {
???????????????disposeResource(false);
????????????}
?????????} else {
????????????Operators.onNextDropped(value, actual.currentContext());
?????????}
?????????return;
??????}
??????... ...
???}
}

? ? ? ? 在這里,調用了數據訂閱者的onNext()方法,如下所示:

???????onNext

public final void onNext(T x) {
???Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
???if (s == Operators.cancelledSubscription()) {
??????Operators.onNextDropped(x, this.initialContext);
??????return;
???}
???if (consumer != null) {
??????try {
?????????consumer.accept(x);
??????}
??????catch (Throwable t) {
?????????Exceptions.throwIfFatal(t);
?????????s.cancel();
?????????doError(t);
??????}
???}
???if (completeConsumer != null) {
??????try {
?????????completeConsumer.run();
??????}
??????catch (Throwable t) {
?????????Operators.onErrorDropped(t, this.initialContext);
??????}
???}
}

時序圖

  1. 類關系的設計,與《spring響應式編程系列:總體流程》類似,主要包括數據發布者對象、數據訂閱者對象及訂閱的消息體對象;
  2. Mono和MonoCreate是數據發布者,LambdaMonoSubscriber是數據訂閱者,DefaultMonoSink是訂閱的消息體;
  3. 不同點在于,DefaultMonoSink可以通過示例里的Mono.create暴露給業務側,業務側的相關業務執行完成之后,可以通過調用該對象success方法,來觸發訂閱者的回調函數。

???????類圖

數據發布者

MonoCreate

? ? ? ? MonoCreate與《spring響應式編程系列:總體流程》介紹的類似,都是繼承于Mono類,并且實現了CorePublisher和Publisher接口。

? ? ? ? 不同點在于,該數據發布者多了一個屬性,如下所示:

? ? ? ? final Consumer<MonoSink<T>> callback;

? ? ? ? 該屬性是一個可以接收所訂閱消息體(類型為MonoSink<T>)參數的回調函數,在這里可以將該消息體與對應的業務建立綁定關系,為后續業務執行結束后的回調做準備。

數據訂閱者

LambdaMonoSubscriber

? ? ? ? LambdaMonoSubscriber與《spring響應式編程系列:總體流程》介紹的一樣。

訂閱的消息體

DefaultMonoSink

? ? ? ? DefaultMonoSink與《spring響應式編程系列:總體流程???????》介紹的類似,都實現了Subscription接口。

? ? ? ? 不同點在于,DefaultMonoSink實現了MonoSink接口,該接口提供了供業務側調用 的接口方法,如下所示:

void success(@Nullable T value);

? ? ? ? 業務側的相關業務執行完成之后,可以通過調用該接口方法,來觸發訂閱者的回調函數。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/80610.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/80610.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/80610.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MCP Python SDK構建的**SQLite瀏覽器**的完整操作指南

以下是使用MCP Python SDK構建的SQLite瀏覽器的完整操作指南&#xff1a; 一、環境準備 安裝依賴 # 安裝MCP SDK及SQLite支持 pip install mcp sqlite3創建測試數據庫 sqlite3 test.db <<EOF CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT); IN…

【Python爬蟲基礎篇】--3.cookie和session

目錄 1.cookie 1.1.定義 1.2.參數 1.3.分類 2.session 3.使用cookie登錄微博 4.使用session登錄 1.cookie 由于http是一個無狀態的協議&#xff0c;請求與請求之間無法相互傳遞或者記錄一些信息&#xff0c;cookie和session正是為了解決這個問題而產生。 例子&#xff1…

風車郵箱系統詳細使用指南:Windows與Ubuntu雙平臺解析

風車郵箱系統V1.2使用手冊 風車郵箱系統詳細使用指南&#xff1a;Windows與Ubuntu雙平臺解析 前言 在日常網絡活動中&#xff0c;我們經常需要一個臨時郵箱來注冊各類網站或接收驗證碼&#xff0c;但不想使用自己的真實郵箱。「風車無線郵箱系統」作為一款優秀的臨時郵箱工具…

同樣的接口用postman/apifox能跑通,用jmeter跑就報錯500

之前沒用過jmeter,第一次用調試壓測腳本遇到了問題 一樣的接口用postman能跑通&#xff0c;用jmeter跑就報錯500&#xff0c;百度很多文章都說是該接口需要加一個‘內容編碼’改成utf-8,我加了還是不行 后來我就想到apifox好像有隱藏的header&#xff0c;然后開始比較apifox的…

1656打印路徑-Floyd回溯/圖論-鏈表/數據結構

藍橋賬戶中心 1.稅收&#xff1a; “城市的稅收”&#xff1a;所以是中介點的稅收&#xff0c;經過該點后加上 2.路徑&#xff1a; 用數組存儲前驅節點從而串成鏈表 pre[ i ][ j ]代表的是從 i 到 j 的最短路徑上 j 的前驅節點是什么 那么便可以pre[ i ][ j ]k 把k加入pa…

Eigen矩陣操作類 (Map, Block, 視圖類)

1. Map 類&#xff1a;內存映射&#xff08;零拷貝操作&#xff09; 核心功能 將現有的 C/C 數組或緩沖區映射為 Eigen 矩陣/向量&#xff0c;不復制數據&#xff0c;直接操作原內存。 模板參數 cpp Map<Matrix<Scalar, Rows, Cols, Options, MaxRows, MaxCols>&…

多系統安裝經驗,移動硬盤,ubuntu grub修改/etc/fstab 移動硬盤需要改成nfts格式才能放steam游戲

總結&#xff1a;我硬盤會自動掛載&#xff0c;直接格式化nfts&#xff0c;steam就能裝里面了 機械硬盤裝系統真的不行&#xff0c;超級慢游戲還跑不了 --------------------------------------------------------------------底下都不用看 筆記本一個系統&#xff0c;移動硬盤…

JFLAP SOFTWARE 編譯原理用(自動機繪圖)

csdn全是蛆蟲&#xff0c;2mb的軟件&#xff0c;都在那里搞收費&#xff0c;我就看不慣&#xff0c;我就放出來&#xff0c;那咋了&#xff01;&#xff01;&#xff01; https://pan.baidu.com/s/1IuEfHScynjCCUF5ScF26KA 通過網盤分享的文件&#xff1a;JFLAP7.1.jar 鏈接: h…

[Windows] Disk Sorter文件分類管理軟件 v16.7.18

[Windows] Disk Sorter文件分類管理 鏈接&#xff1a;https://pan.xunlei.com/s/VOOl0sDntAdHvlMkc7N0ZOD-A1?pwd966n# Disk Sorter是一個功能強大的文件分類管理軟件&#xff0c;允許對本地磁盤、網絡共享、NAS設備和企業存儲系統中的文件進行分類&#xff0c;并且支持生成…

STM32提高篇: 藍牙通訊

STM32提高篇: 藍牙通訊 一.藍牙通訊介紹1.藍牙技術類型 二.藍牙協議棧1.藍牙芯片架構2.BLE低功耗藍牙協議棧框架 三.ESP32-C3中的藍牙功能1.廣播2.掃描3.通訊 四.發送和接收 一.藍牙通訊介紹 藍牙&#xff0c;是一種利用低功率無線電&#xff0c;支持設備短距離通信的無線電技…

6.1.多級緩存架構

目錄 一、多級緩存基礎與核心概念 緩存的定義與價值 ? 緩存的應用場景&#xff08;高并發、低延遲、減輕數據庫壓力&#xff09; ? 多級緩存 vs 單級緩存的優劣對比 多級緩存核心組件 ? 本地緩存&#xff08;Caffeine、Guava Cache&#xff09; ? 分布式緩存&#xff08;…

MySQL的MVCC【學習筆記】

MVCC 事務的隔離級別分為四種&#xff0c;其中Read Committed和Repeatable Read隔離級別&#xff0c;部分實現就是通過MVCC&#xff08;Multi-Version Concurrency Control&#xff0c;多版本并發控制&#xff09; 版本鏈 版本鏈是通過undo日志實現的&#xff0c; 事務每次修改…

基于OpenMV+STM32+OLED與YOLOv11+PaddleOCR的嵌入式車牌識別系統開發筆記

基于OpenMV、STM32與OLED的嵌入式車牌識別系統開發筆記 基于OpenMV、STM32與OLED的嵌入式車牌識別系統開發筆記系統架構全景 一、實物演示二、OpenMV端設計要點1. 硬件配置優化2. 智能幀率控制算法3. 數據傳輸協議設計 三、PyTorch后端核心實現&#xff1a;YOLOv11與PaddleOCR的…

C#中常見的設計模式

文章目錄 引言設計模式的分類創建型模式 (Creational Patterns)1. 單例模式 (Singleton)2. 工廠方法模式 (Factory Method)3. 抽象工廠模式 (Abstract Factory)4. 建造者模式 (Builder) 結構型模式 (Structural Patterns)5. 適配器模式 (Adapter)6. 裝飾器模式 (Decorator)7. 外…

Nacos簡介—3.Nacos的配置簡介

大綱 1.Nacos生產集群Web端口與數據庫配置 2.Nacos生產集群的Distro協議核心參數 3.Nacos打通CMDB實現跨機房的就近訪問 4.Nacos基于SPI動態擴展機制來獲取CMDB的數據 5.基于Nacos SPI機制開發CMDB動態擴展 6.Nacos基于CMDB來實現多機房就近訪問 7.Nacos生產集群Prometh…

Jest 快照測試

以下是關于 Jest 快照測試的系統化知識總結,從基礎使用到底層原理全面覆蓋: 一、快照測試核心原理 1. 工作機制三階段 #mermaid-svg-GC46t2NBvGv7RF0M {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GC46t2NBvGv…

第十六屆藍橋杯大賽軟件賽省賽 C/C++ 大學B組 [京津冀]

由于官方沒有公布題目的數據, 所以代碼僅供參考 1. 密密擺放 題目鏈接&#xff1a;P12337 [藍橋杯 2025 省 AB/Python B 第二場] 密密擺放 - 洛谷 題目描述 小藍有一個大箱子&#xff0c;內部的長寬高分別是 200、250、240&#xff08;單位&#xff1a;毫米&#xff09;&…

Spring 學習筆記之 @Transactional 異常不回滾匯總

使用springboot時&#xff0c;只要引入spring-jdbc/jpa相關的依賴后&#xff0c;在想要啟用事務的方法上加上Transactional注解就能開啟事務&#xff0c;碰到異常就能自動回滾。大大的提高了編碼的便捷性性&#xff0c;同時也不侵入代碼&#xff0c;保持了代碼的簡潔性。 默認情…

React 與 Vue 虛擬 DOM 實現原理深度對比:從理論到實踐

在現代前端開發中&#xff0c;React 和 Vue 作為最流行的兩大框架&#xff0c;都采用了虛擬 DOM&#xff08;Virtual DOM&#xff09; 技術來優化渲染性能。虛擬 DOM 的核心思想是通過 JavaScript 對象模擬真實 DOM&#xff0c;減少直接操作 DOM 的開銷&#xff0c;從而提高頁面…

WordPress AI 原創文章自動生成插件 24小時全自動生成SEO原創文章 | 多語言支持 | 智能配圖與排版

為什么選擇Linkreate AI內容生成插件&#xff1f; ? 全自動化工作流程 - 從關鍵詞挖掘到文章發布一站式完成 ? 多語言支持 - 輕松覆蓋全球市場&#xff08;中/英等多語種&#xff09; ? 智能SEO優化 - 自動生成搜索引擎友好的內容結構 ? AI智能配圖 - 每篇文章自動匹配高質…