目錄
示例
大致流程
create
new MonoCreate
subscribe
new LambdaMonoSubscriber
monoCreate.subscribe
accept
success
onNext
時序圖
類圖
數據發布者
MonoCreate
數據訂閱者
LambdaMonoSubscriber
訂閱的消息體
DefaultMonoSink
? ? ? ? 本篇文章我們來研究如何將現有異步 API(如回調式接口)適配到 Reactor 的響應式流中。
? ? ? ? 默認情況下,Mono.create的代碼塊執行在訂閱時的線程上,但如果在該代碼塊中啟動其他線程或使用異步API,那么數據生產就會變成異步的。示例如下所示:
示例
Mono<String> mono = Mono.create(sink -> { ????// 模擬一個異步API操作 ????new Thread(() -> { ????????try { ????????????Thread.sleep(1000); // 模擬耗時操作 ????????????log.info("success"); ????????????sink.success("Hello, World!"); // 成功時發射數據 ????????} catch (InterruptedException e) { ????????????sink.error(e); // 發生錯誤時發射錯誤信號 ????????} ????}).start(); }); log.info("main start"); mono.subscribe(x -> log.info("main finish")); Thread.sleep(5000); |
? ? ? ? 在這里,通過Mono.create模擬一個異步API操作,API操作成功后,調用sink.success("Hello, World!")進行數據發布者發送數據,從而觸發數據的訂閱。
? ? ? ? 接下來,讓我們一起看看程序的流程是怎么處理的。
? ? ? ? 點擊create()方法,如下所示:
大致流程
create
public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) { ????return onAssembly(new MonoCreate<>(callback)); } |
? ? ? ? 在這里,new一個MonoCreate對象并返回。
? ? ? ? 點擊MonoCreate,如下所示:
new MonoCreate
final class MonoCreate<T> extends Mono<T> implements SourceProducer<T> { ???static final Disposable TERMINATED = OperatorDisposables.DISPOSED; ???static final Disposable CANCELLED = Disposables.disposed(); ???final Consumer<MonoSink<T>> callback; ???MonoCreate(Consumer<MonoSink<T>> callback) { ??????this.callback = callback; ???} |
? ? ? ? 在這里,將create()方法的回調接口參數賦值給callback屬性。因此,Mono.create的參數就作為數據發布者的一個屬性信息了。
? ? ? ? 點擊示例里的mono.subscribe(),如下所示:
subscribe
public final Disposable subscribe( ??????@Nullable Consumer<? super T> consumer, ??????@Nullable Consumer<? super Throwable> errorConsumer, ??????@Nullable Runnable completeConsumer, ??????@Nullable Context initialContext) { ???return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer, ?????????completeConsumer, null, initialContext)); } |
? ? ? ? 在這里,new一個LambdaMonoSubscriber對象,如下所示:
new LambdaMonoSubscriber
LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer, ??????@Nullable Consumer<? super Throwable> errorConsumer, ??????@Nullable Runnable completeConsumer, ??????@Nullable Consumer<? super Subscription> subscriptionConsumer, ??????@Nullable Context initialContext) { ???this.consumer = consumer; ???this.errorConsumer = errorConsumer; ???this.completeConsumer = completeConsumer; ???this.subscriptionConsumer = subscriptionConsumer; ???this.initialContext = initialContext == null ? Context.empty() : initialContext; } |
? ? ? ? 在這里,將subscribe的回調接口參數賦值給consumer 屬性,因此,mono.subscribe的參數就作為數據消費者的屬性了。
? ? ? ? 點擊上一步的subscribeWith()方法,如下所示:
monoCreate.subscribe
@Override public void subscribe(CoreSubscriber<? super T> actual) { ???DefaultMonoSink<T> emitter = new DefaultMonoSink<>(actual); ???actual.onSubscribe(emitter); ???try { ??????callback.accept(emitter); ???} ???catch (Throwable ex) { ??????emitter.error(Operators.onOperatorError(ex, actual.currentContext())); ???} } |
? ? ? ? 在這里,首先調用了數據消費者的onSubscribe()方法,這個與《spring響應式編程系列:總體流程》一樣。
? ? ? ? 另外,調用了callback.accept()方法,也就是Mono.create()的回調接口參數。
accept
Mono<String> mono = Mono.create(sink -> { ????// 模擬一個異步操作 ????new Thread(() -> { ????????try { ????????????Thread.sleep(1000); // 模擬耗時操作 ????????????log.info("success"); ????????????sink.success("Hello, World!"); // 成功時發射數據 ????????} catch (InterruptedException e) { ????????????sink.error(e); // 發生錯誤時發射錯誤信號 ????????} ????}).start(); }); |
? ? ? ? 在這里,模擬了耗時操作,然后調用sink.success()方法。
? ? ? 通常,可以將sink對象保存在線程共享環境里,等其它的業務操作執行完成后,再調用sink.success()方法,即可發射數據發布者數據,從而觸發消費者訂閱。
? ? ? ? 點擊sink.success(),如下所示:
???????success
public void success(@Nullable T value) { ... ... ?????for (; ; ) { ??????int s = state; ??????if (s == HAS_REQUEST_HAS_VALUE || s == NO_REQUEST_HAS_VALUE) { ?????????Operators.onNextDropped(value, actual.currentContext()); ?????????return; ??????} ??????if (s == HAS_REQUEST_NO_VALUE) { ?????????if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) { ????????????try { ???????????????actual.onNext(value); ???????????????actual.onComplete(); ????????????} ????????????catch (Throwable t) { ???????????????actual.onError(t); ????????????} ????????????finally { ???????????????disposeResource(false); ????????????} ?????????} else { ????????????Operators.onNextDropped(value, actual.currentContext()); ?????????} ?????????return; ??????} ??????... ... ???} } |
? ? ? ? 在這里,調用了數據訂閱者的onNext()方法,如下所示:
???????onNext
public final void onNext(T x) { ???Subscription s = S.getAndSet(this, Operators.cancelledSubscription()); ???if (s == Operators.cancelledSubscription()) { ??????Operators.onNextDropped(x, this.initialContext); ??????return; ???} ???if (consumer != null) { ??????try { ?????????consumer.accept(x); ??????} ??????catch (Throwable t) { ?????????Exceptions.throwIfFatal(t); ?????????s.cancel(); ?????????doError(t); ??????} ???} ???if (completeConsumer != null) { ??????try { ?????????completeConsumer.run(); ??????} ??????catch (Throwable t) { ?????????Operators.onErrorDropped(t, this.initialContext); ??????} ???} } |
時序圖

- 類關系的設計,與《spring響應式編程系列:總體流程》類似,主要包括數據發布者對象、數據訂閱者對象及訂閱的消息體對象;
- Mono和MonoCreate是數據發布者,LambdaMonoSubscriber是數據訂閱者,DefaultMonoSink是訂閱的消息體;
- 不同點在于,DefaultMonoSink可以通過示例里的Mono.create暴露給業務側,業務側的相關業務執行完成之后,可以通過調用該對象success方法,來觸發訂閱者的回調函數。
???????類圖
數據發布者
MonoCreate

? ? ? ? MonoCreate與《spring響應式編程系列:總體流程》介紹的類似,都是繼承于Mono類,并且實現了CorePublisher和Publisher接口。
? ? ? ? 不同點在于,該數據發布者多了一個屬性,如下所示:
? ? ? ? final Consumer<MonoSink<T>> callback;
? ? ? ? 該屬性是一個可以接收所訂閱消息體(類型為MonoSink<T>)參數的回調函數,在這里可以將該消息體與對應的業務建立綁定關系,為后續業務執行結束后的回調做準備。
數據訂閱者
LambdaMonoSubscriber
? ? ? ? LambdaMonoSubscriber與《spring響應式編程系列:總體流程》介紹的一樣。
訂閱的消息體
DefaultMonoSink

? ? ? ? DefaultMonoSink與《spring響應式編程系列:總體流程???????》介紹的類似,都實現了Subscription接口。
? ? ? ? 不同點在于,DefaultMonoSink實現了MonoSink接口,該接口提供了供業務側調用 的接口方法,如下所示:
void success(@Nullable T value);
? ? ? ? 業務側的相關業務執行完成之后,可以通過調用該接口方法,來觸發訂閱者的回調函數。