spring響應式編程系列:總體流程

目錄

示例

程序流程

just

subscribe

new LambdaMonoSubscriber

???????MonoJust.subscribe

???????new Operators.ScalarSubscription ?

???????onSubscribe

???????request

???????onNext

時序圖

類圖

數據發布者

MonoJust

數據訂閱者

LambdaSubscriber

訂閱的消息體

ScalarSubscription


? ? ? ?

? ? ? ? 想要了解響應式編程的總體流程,只要做到真正吃透一個簡單的示例即可。

? ? ? ? 如下所示:

示例

? ? ? ? 首先,通過調用Mono.just創建一個單元素的數據發布者(Publisher);

? ? ? ? 然后,通過調用mono.subscribe訂閱數據發布者(Publisher)發布的數據。

? ? ? ? 如下所示:

// 創建一個包含數據的?Mono
Mono<String> mono = Mono.just("Hello, Reactive World!");
// 訂閱并消費?Mono
mono.subscribe(System.out::println);

程序流程

? ? ? ? 點擊Mono.just,如下所示:

???????just

public static <T> Mono<T> just(T data) {

????????return onAssembly(new MonoJust(data));

????}

? ? ? ? 在這里,直接new一個MonoJust對象并返回。

? ? ? ? 點擊示例里的mono.subscribe,如下所示:

subscribe

public abstract class Mono<T> implements CorePublisher<T> {

????... ...

????public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext) {

????????return (Disposable)this.subscribeWith(new LambdaMonoSubscriber(consumer, errorConsumer, completeConsumer, (Consumer)null, initialContext));

????}

? ? ? 在這里,將示例里subscribe的參數作為LambdaMonoSubscriber的構造參數,然后new一個LambdaMonoSubscriber對象。

? ? ? ? LambdaMonoSubscriber對象的初始化參數,如下所示:

???????new LambdaMonoSubscriber

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {

????final Consumer<? super T> consumer;

????final Consumer<? super Throwable> errorConsumer;

????final Runnable completeConsumer;

????final Consumer<? super Subscription> subscriptionConsumer;

????final Context initialContext;

????volatile Subscription subscription;

????... ...

????LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer, @Nullable Context initialContext) {

????????this.consumer = consumer;

????????this.errorConsumer = errorConsumer;

????????this.completeConsumer = completeConsumer;

????????this.subscriptionConsumer = subscriptionConsumer;

????????this.initialContext = initialContext == null ? Context.empty() : initialContext;

????}

???????MonoJust.subscribe

final class MonoJust<T> extends Mono<T> implements ScalarCallable<T>, Fuseable, SourceProducer<T> {

????... ...

public void subscribe(CoreSubscriber<? super T> actual) {

????????actual.onSubscribe(Operators.scalarSubscription(actual, this.value));

????}

? ? ? ?在這里,來到了MonoJust對象的subscribe方法,該方法調用了LambdaMonoSubscriber對象的onSubscribe方法;

? ? ? ? 同時,new一個Operators.ScalarSubscription對象,該對象封裝了LambdaMonoSubscriber對象和數據發布者MonoJust發布的數據。

? ? ? ? 如下所示:

???????new Operators.ScalarSubscription ?

public static <T> Subscription scalarSubscription(CoreSubscriber<? super T> subscriber, T value, String stepName) {

????????return new Operators.ScalarSubscription(subscriber, value, stepName);

????}

? ? ? ? 點擊actual.onSubscribe,如下所示:

???????onSubscribe

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {

????... ...

????public final void onSubscribe(Subscription s) {

????????if (Operators.validate(this.subscription, s)) {

????????????this.subscription = s;

????????????if (this.subscriptionConsumer != null) {

????????????????try {

????????????????????this.subscriptionConsumer.accept(s);

????????????????} catch (Throwable var3) {

????????????????????Exceptions.throwIfFatal(var3);

????????????????????s.cancel();

????????????????????this.onError(var3);

????????????????}

????????????} else {

????????????????s.request(9223372036854775807L);

????????????}

????????}

????}

? ? ? 在這里,LambdaMonoSubscriber對象調用了Operators.ScalarSubscription對象的request方法。

? ? ? ? 如下所示:

???????request

static final class ScalarSubscription<T> implements SynchronousSubscription<T>, InnerProducer<T> {

public void request(long n) {

????????????if (Operators.validate(n) && ONCE.compareAndSet(this, 0, 1)) {

????????????????Subscriber<? super T> a = this.actual;

????????????????a.onNext(this.value);

????????????????if (this.once != 2) {

????????????????????a.onComplete();

????????????????}

????????????}

????????}

? ? ? ? 在這里,Operators.ScalarSubscription對象又調用了LambdaMonoSubscriber對象的onNext方法。

? ? ? ? LambdaMonoSubscriber對象的onNext方法如下所示:

???????onNext

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {

public final void onNext(T x) {

????????Subscription s = (Subscription)S.getAndSet(this, Operators.cancelledSubscription());

????????if (s == Operators.cancelledSubscription()) {

????????????Operators.onNextDropped(x, this.initialContext);

????????} else {

????????????if (this.consumer != null) {

????????????????try {

????????????????????this.consumer.accept(x);

????????????????} catch (Throwable var5) {

????????????????????Exceptions.throwIfFatal(var5);

????????????????????s.cancel();

????????????????????this.doError(var5);

????????????????}

????????????}

????????????if (this.completeConsumer != null) {

????????????????try {

????????????????????this.completeConsumer.run();

????????????????} catch (Throwable var4) {

????????????????????Operators.onErrorDropped(var4, this.initialContext);

????????????????}

????????????}

????????}

}

? ? ? ? 終于,在這里,調用了示例里subscribe()方法的回調函數了。

時序圖

【說明】

  1. Mono和MonoJust是數據發布者,LambdaMonoSubscriber是數據消費者,ScalarSubscription是訂閱的消息;
  2. 類的設計還是比較清晰的,就是方法的調用顯示有點繞。
  3. 數據發布者,提供了just方法來生成數據發布者(Publisher);
  4. 數據訂閱者,提供了onSubscribe和onNext方法來響應訂閱事件和讀取數據;
  5. 訂閱的消息體,封裝了數據訂閱者和數據發布發布的數據,并且提供了request方法用來處理數據。
  6. 使用了觀察者設計模式:LambdaMonoSubscriber是觀察者模式中的觀察者(Observer),它訂閱(subscribe)一個發布者(MonoJust),MonoJust是觀察者模式中的主題(Subject),它負責通知所有的 Subscriber。

類圖

數據發布者

MonoJust

【說明】

  • Publisher

????定義了接口:void subscribe(Subscriber<? super T> var1)。

  • CorePublisher

????定義了接口:void subscribe(CoreSubscriber<? super T> subscriber)。

  • Mono

? ? 是一個抽象類,實現了數據發布者通用的各種功能。

比如:使用了工廠方法設計模式來創建諸如MonoJust、MonoCreate、MonoDefer、MonoError等各種具體的數據發布者。

  • MonoJust

????是一個特定的數據發布者(Publisher),實現了接口void subscribe(CoreSubscriber<? super T> actual)。

數據訂閱者

LambdaSubscriber

【說明】

  • Subscriber

????定義了如下接口:onSubscribe、onNext、onError、onComplete。

  • CoreSubscriber

????定義了如下接口:onSubscribe

  • LambdaMonoSubscriber

? ? 關聯了consumer、errorConsumer、completeConsumer、subscriptionConsumer這些對象,以完成訂閱相關的各種操作。

訂閱的消息體

ScalarSubscription

【說明】

  • Subscription

????提供了如下接口:void request(long var1)、void cancel();

  • ScalarSubscription

????封裝了數據訂閱者和數據發布者發布的數據。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/79447.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/79447.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/79447.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

基于slimBOXtv 9.16 V2-晶晨S905L3A/ S905L3AB-Mod ATV-Android9.0-線刷通刷固件包

基于slimBOXtv 9.16 V2-晶晨S905L3A&#xff0f; S905L3AB-Mod ATV-Android9.0-線刷通刷固件包&#xff0c;基于SlimBOXtv 9 修改而來&#xff0c;貼近于原生ATV&#xff0c;僅支持晶晨S905L3A&#xff0f; S905L3AB芯片刷機。 適用型號&#xff1a;M401A、CM311-1a、CM311-1s…

使用droidrun庫實現AI控制安卓手機

使用droidrun庫實現AI控制安卓手機 介紹 DroidRun 是一個框架&#xff0c;通過LLM代理控制 Android 設備。它允許您使用自然語言命令自動化 Android 設備交互。 安裝環境 安裝源碼依賴 git clone https://github.com/droidrun/droidrun.git cd droidrun conda create --nam…

知識庫建設全流程指南(AI時代優化版)

知識庫建設全流程指南&#xff08;AI時代優化版&#xff09; ??一、知識庫建設的戰略定位?? ??核心價值錨點?? ??AI時代基建??&#xff1a;知識庫是GEO優化的核心載體&#xff0c;決定內容被AI引用的概率權重??動態護城河??&#xff1a;結構化知識體系可抵御算…

2025年03月中國電子學會青少年軟件編程(Python)等級考試試卷(五級)真題

青少年軟件編程&#xff08;Python&#xff09;等級考試試卷&#xff08;五級&#xff09; 分數&#xff1a;100 題數&#xff1a;38 答案解析&#xff1a;https://blog.csdn.net/qq_33897084/article/details/147341437 一、單選題(共25題&#xff0c;共50分) 1. 以下哪個選…

基于RRT的優化器:一種基于快速探索隨機樹算法的新型元啟發式算法

受機器人路徑規劃中常用的快速探索隨機樹&#xff08;RRT&#xff09;算法的搜索機制的啟發&#xff0c;我們提出了一種新穎的元啟發式算法&#xff0c;稱為基于RRT的優化器&#xff08;RRTO&#xff09;。這是首次將RRT算法的概念與元啟發式算法相結合。RRTO的關鍵創新是其三種…

進階篇|CAN FD 與性能優化

引言 1. CAN vs. CAN FD 對比 2. CAN FD 幀結構詳解

【隨身WiFi】隨身WiFi Debian系統優化教程

0.操作前必看 本教程基于Debian系統進行優化&#xff0c;有些操作對隨身WiFi來說可能會帶來負優化&#xff0c;根據需要選擇。 所有操作需要在root用戶環境下運行&#xff0c;否則都要加sudo 隨身wifi Debian系統&#xff0c;可以去某安的隨聲WiFi模塊自行搜索刷機 點贊&am…

【Pandas】pandas DataFrame where

Pandas2.2 DataFrame Indexing, iteration 方法描述DataFrame.head([n])用于返回 DataFrame 的前幾行DataFrame.at快速訪問和修改 DataFrame 中單個值的方法DataFrame.iat快速訪問和修改 DataFrame 中單個值的方法DataFrame.loc用于基于標簽&#xff08;行標簽和列標簽&#…

C++代碼優化

前段時間寫了一些代碼&#xff0c;但是在運算過程中發現有些代碼可以進行改進以提高運行效率&#xff0c;尤其是與PCL相關的部分&#xff0c;可以進行大幅度提高&#xff0e;特意在此進行記錄&#xff0c;分享給大家&#xff0c;也供自己查看&#xff0e; pcl::PointCloud< …

RAG-分塊策略

分塊策略在檢索增強生成&#xff08;RAG&#xff09;方法中起著至關重要的作用&#xff0c;它使文檔能夠被劃分為可管理的部分&#xff0c;同時保持上下文。每種方法都有其特定的優勢&#xff0c;適用于特定的用例。將大型數據文件拆分為更易于管理的段是提高LLM應用效率的最關…

Linux網絡編程 深入解析TFTP協議:基于UDP的文件傳輸實戰

知識點1【TFTP的概述】 學習通信的基本&#xff1a;通信協議&#xff08;具體發送上面樣的報文&#xff09;、通信流程&#xff08;按照什么步驟發送&#xff09; 1、TFTP的概述 tftp&#xff1a;簡單文件傳輸協議&#xff0c;**基于UDP&#xff0c;**不進行用戶有效性驗證 …

「數據可視化 D3系列」入門第十一章:力導向圖深度解析與實現

D3.js 力導向圖深度解析與實現 力導向圖核心概念 力導向圖是一種通過物理模擬來展示復雜關系網絡的圖表類型&#xff0c;特別適合表現社交網絡、知識圖譜、系統拓撲等關系型數據。其核心原理是通過模擬粒子間的物理作用力&#xff08;電荷斥力、彈簧引力等&#xff09;自動計…

音頻格式轉換

1. 下載ffmpeg https://www.gyan.dev/ffmpeg/builds/packages/ffmpeg-7.1.1-full_build.7z 2. 配置ffmpeg環境變量 3.安裝pydub pip install pydub 4.編寫轉化工具代碼 from pydub import AudioSegment def convertM4aToWav(m4a,wav):sound AudioSegment.from_file(m4a, f…

基于spring boot 集成 deepseek 流式輸出 的vue3使用指南

本文使用deepseek API接口流式輸出的文章。 環境要求 jdk17 spring boot 3.4 代碼如下: package com.example.controller;import jakarta.annotation.PostConstruct; import org.springframework.ai.chat.messages.AssistantMessage; import org.springframework.ai.chat.mes…

微博輻射源和干擾機

微波輻射源和干擾機是電子戰和通信領域中的兩個重要概念&#xff0c;它們在軍事、民用及科研中具有廣泛應用。以下是兩者的詳細解析及其相互關系&#xff1a; ?1. 微波輻射源? ?定義?&#xff1a; 微波輻射源是指能夠主動發射微波&#xff08;頻率范圍通常為 ?300 MHz&…

2025年4月16日華為留學生筆試第三題300分

?? 點擊直達筆試專欄 ??《大廠筆試突圍》 ?? 春秋招筆試突圍在線OJ ?? 筆試突圍OJ 03. 智慧城市網絡優化 問題描述 K小姐是一家智慧城市服務提供商的網絡架構師。她負責規劃城市邊緣計算節點的布局,以提供更快速、穩定的網絡服務。 城市內有 n n

多線程編程的簡單案例——單例模式[多線程編程篇(3)]

目錄 前言 1.wati() 和 notify() wait() 和 notify() 的產生原因 如何使用wait()和notify()? 案例一:單例模式 餓漢式寫法: 懶漢式寫法 對于它的優化 再次優化 結尾 前言 如何簡單的去使用jconsloe 查看線程 (多線程編程篇1)_eclipse查看線程-CSDN博客 淺談Thread類…

pytorch基本操作2

torch.clamp 主要用于對張量中的元素進行截斷&#xff08;clamping&#xff09;&#xff0c;將其限制在一個指定的區間范圍內。 函數定義 torch.clamp(input, minNone, maxNone) → Tensor 參數說明 input 類型&#xff1a;Tensor 需要進行截斷操作的輸入張…

一次制作參考網雜志的閱讀書源的實操經驗總結(附書源)

文章目錄 一、背景介紹二、書源文件三、詳解制作書源&#xff08;一&#xff09;打開Web服務&#xff08;二&#xff09;參考網結構解釋&#xff08;三&#xff09;閱讀書源 基礎&#xff08;四&#xff09;閱讀書源 發現&#xff08;五&#xff09;閱讀書源 詳細&#xff08;六…

并發設計模式實戰系列(2):領導者/追隨者模式

&#x1f31f; ?大家好&#xff0c;我是摘星&#xff01;? &#x1f31f; 今天為大家帶來的是并發設計模式實戰系列&#xff0c;第二章領導者/追隨者&#xff08;Leader/Followers&#xff09;模式&#xff0c;廢話不多說直接開始~ 目錄 領導者/追隨者&#xff08;Leader/…