目錄
示例
程序流程
just
subscribe
new LambdaMonoSubscriber
???????MonoJust.subscribe
???????new Operators.ScalarSubscription ?
???????onSubscribe
???????request
???????onNext
時序圖
類圖
數據發布者
MonoJust
數據訂閱者
LambdaSubscriber
訂閱的消息體
ScalarSubscription
? ? ? ?
? ? ? ? 想要了解響應式編程的總體流程,只要做到真正吃透一個簡單的示例即可。
? ? ? ? 如下所示:
示例
? ? ? ? 首先,通過調用Mono.just創建一個單元素的數據發布者(Publisher);
? ? ? ? 然后,通過調用mono.subscribe訂閱數據發布者(Publisher)發布的數據。
? ? ? ? 如下所示:
// 創建一個包含數據的?Mono |
程序流程
? ? ? ? 點擊Mono.just,如下所示:
???????just
public static <T> Mono<T> just(T data) { ????????return onAssembly(new MonoJust(data)); ????} |
? ? ? ? 在這里,直接new一個MonoJust對象并返回。
? ? ? ? 點擊示例里的mono.subscribe,如下所示:
subscribe
public abstract class Mono<T> implements CorePublisher<T> { ????... ... ????public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext) { ????????return (Disposable)this.subscribeWith(new LambdaMonoSubscriber(consumer, errorConsumer, completeConsumer, (Consumer)null, initialContext)); ????} |
? ? ? 在這里,將示例里subscribe的參數作為LambdaMonoSubscriber的構造參數,然后new一個LambdaMonoSubscriber對象。
? ? ? ? LambdaMonoSubscriber對象的初始化參數,如下所示:
???????new LambdaMonoSubscriber
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable { ????final Consumer<? super T> consumer; ????final Consumer<? super Throwable> errorConsumer; ????final Runnable completeConsumer; ????final Consumer<? super Subscription> subscriptionConsumer; ????final Context initialContext; ????volatile Subscription subscription; ????... ... ????LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer, @Nullable Context initialContext) { ????????this.consumer = consumer; ????????this.errorConsumer = errorConsumer; ????????this.completeConsumer = completeConsumer; ????????this.subscriptionConsumer = subscriptionConsumer; ????????this.initialContext = initialContext == null ? Context.empty() : initialContext; ????} |
???????MonoJust.subscribe
final class MonoJust<T> extends Mono<T> implements ScalarCallable<T>, Fuseable, SourceProducer<T> { ????... ... public void subscribe(CoreSubscriber<? super T> actual) { ????????actual.onSubscribe(Operators.scalarSubscription(actual, this.value)); ????} |
? ? ? ?在這里,來到了MonoJust對象的subscribe方法,該方法調用了LambdaMonoSubscriber對象的onSubscribe方法;
? ? ? ? 同時,new一個Operators.ScalarSubscription對象,該對象封裝了LambdaMonoSubscriber對象和數據發布者MonoJust發布的數據。
? ? ? ? 如下所示:
???????new Operators.ScalarSubscription ?
public static <T> Subscription scalarSubscription(CoreSubscriber<? super T> subscriber, T value, String stepName) { ????????return new Operators.ScalarSubscription(subscriber, value, stepName); ????} |
? ? ? ? 點擊actual.onSubscribe,如下所示:
???????onSubscribe
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable { ????... ... ????public final void onSubscribe(Subscription s) { ????????if (Operators.validate(this.subscription, s)) { ????????????this.subscription = s; ????????????if (this.subscriptionConsumer != null) { ????????????????try { ????????????????????this.subscriptionConsumer.accept(s); ????????????????} catch (Throwable var3) { ????????????????????Exceptions.throwIfFatal(var3); ????????????????????s.cancel(); ????????????????????this.onError(var3); ????????????????} ????????????} else { ????????????????s.request(9223372036854775807L); ????????????} ????????} ????} |
? ? ? 在這里,LambdaMonoSubscriber對象調用了Operators.ScalarSubscription對象的request方法。
? ? ? ? 如下所示:
???????request
static final class ScalarSubscription<T> implements SynchronousSubscription<T>, InnerProducer<T> { public void request(long n) { ????????????if (Operators.validate(n) && ONCE.compareAndSet(this, 0, 1)) { ????????????????Subscriber<? super T> a = this.actual; ????????????????a.onNext(this.value); ????????????????if (this.once != 2) { ????????????????????a.onComplete(); ????????????????} ????????????} ????????} |
? ? ? ? 在這里,Operators.ScalarSubscription對象又調用了LambdaMonoSubscriber對象的onNext方法。
? ? ? ? LambdaMonoSubscriber對象的onNext方法如下所示:
???????onNext
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable { public final void onNext(T x) { ????????Subscription s = (Subscription)S.getAndSet(this, Operators.cancelledSubscription()); ????????if (s == Operators.cancelledSubscription()) { ????????????Operators.onNextDropped(x, this.initialContext); ????????} else { ????????????if (this.consumer != null) { ????????????????try { ????????????????????this.consumer.accept(x); ????????????????} catch (Throwable var5) { ????????????????????Exceptions.throwIfFatal(var5); ????????????????????s.cancel(); ????????????????????this.doError(var5); ????????????????} ????????????} ????????????if (this.completeConsumer != null) { ????????????????try { ????????????????????this.completeConsumer.run(); ????????????????} catch (Throwable var4) { ????????????????????Operators.onErrorDropped(var4, this.initialContext); ????????????????} ????????????} ????????} } |
? ? ? ? 終于,在這里,調用了示例里subscribe()方法的回調函數了。
時序圖
【說明】
- Mono和MonoJust是數據發布者,LambdaMonoSubscriber是數據消費者,ScalarSubscription是訂閱的消息;
- 類的設計還是比較清晰的,就是方法的調用顯示有點繞。
- 數據發布者,提供了just方法來生成數據發布者(Publisher);
- 數據訂閱者,提供了onSubscribe和onNext方法來響應訂閱事件和讀取數據;
- 訂閱的消息體,封裝了數據訂閱者和數據發布發布的數據,并且提供了request方法用來處理數據。
- 使用了觀察者設計模式:LambdaMonoSubscriber是觀察者模式中的觀察者(Observer),它訂閱(subscribe)一個發布者(MonoJust),MonoJust是觀察者模式中的主題(Subject),它負責通知所有的 Subscriber。
類圖
數據發布者
MonoJust
【說明】
- Publisher
????定義了接口:void subscribe(Subscriber<? super T> var1)。
- CorePublisher
????定義了接口:void subscribe(CoreSubscriber<? super T> subscriber)。
- Mono
? ? 是一個抽象類,實現了數據發布者通用的各種功能。
比如:使用了工廠方法設計模式來創建諸如MonoJust、MonoCreate、MonoDefer、MonoError等各種具體的數據發布者。
- MonoJust
????是一個特定的數據發布者(Publisher),實現了接口void subscribe(CoreSubscriber<? super T> actual)。
數據訂閱者
LambdaSubscriber
【說明】
- Subscriber
????定義了如下接口:onSubscribe、onNext、onError、onComplete。
- CoreSubscriber
????定義了如下接口:onSubscribe
- LambdaMonoSubscriber
? ? 關聯了consumer、errorConsumer、completeConsumer、subscriptionConsumer這些對象,以完成訂閱相關的各種操作。
訂閱的消息體
ScalarSubscription
【說明】
- Subscription
????提供了如下接口:void request(long var1)、void cancel();
- ScalarSubscription
????封裝了數據訂閱者和數據發布者發布的數據。