目錄
示例
大致流程
parallel
cache
PARALLEL_SUPPLIER
newParallel
init
publishOn
new MonoSubscribeOnValue
???????subscribe
???????new LambdaMonoSubscriber
???????MonoSubscribeOnValue.subscribe
???????onSubscribe
???????request
???????schedule
???????directSchedule
???????run
???????onNext
時序圖
類圖
數據發布者
MonoSubscribeOnValue
調度器
ParallelScheduler
數據訂閱者
LambdaMonoSubscriber
訂閱的消息體
ScheduledScalar
? ? ? ?本篇文章我們來研究如何控制數據流在特定線程池上的執行。即將操作符的執行切換到指定調度器(Scheduler)的線程。在Project Reactor框架中,Mono.publishOn()就可以實現該功能,示例如下所示:
示例
CountDownLatch countDownLatch = new CountDownLatch(1); // 創建一個包含數據的 Mono Mono<String> mono = Mono.just("Hello, Reactive World!"); mono.publishOn(Schedulers.parallel()) ????.subscribe(x -> { ????????log.info("Sub thread, subscribe: {}", x); ????????countDownLatch.countDown(); ????}); log.info("Main thread, blocking"); countDownLatch.await(); |
? ? ? ?首先,通過mono.publishOn方法指定線程池;
? ? ? ?其次,通過subscribe指定的消費者處理邏輯會在前一步指定的線程池里執行。
大致流程
? ? ? ?點擊Schedulers.parallel(),如下所示:
parallel
public static Scheduler parallel() { return cache(CACHED_PARALLEL, PARALLEL, PARALLEL_SUPPLIER); } |
? ? ? ?在這里,調用cache方法并且PARALLEL_SUPPLIER參數,返回Scheduler調度器。cache方法如下所示:
cache
static CachedScheduler cache(AtomicReference<CachedScheduler> reference, String key, Supplier<Scheduler> supplier) { CachedScheduler s = reference.get(); if (s != null) { return s; } s = new CachedScheduler(key, supplier.get()); if (reference.compareAndSet(null, s)) { return s; } ... ... } |
? ? ? ?在這里,調用supplier.get()方法來生成CachedScheduler調度器。supplier.get()方法的定義如下所示:
PARALLEL_SUPPLIER
static final Supplier<Scheduler> PARALLEL_SUPPLIER = ??????() -> newParallel(PARALLEL, DEFAULT_POOL_SIZE, true); |
newParallel
public static Scheduler newParallel(int parallelism, ThreadFactory threadFactory) { final Scheduler fromFactory = factory.newParallel(parallelism, threadFactory); fromFactory.init(); return fromFactory; } |
? ? ? ?在這里,通過工廠方法模式創建Scheduler對象,并調用init()初始化方法,如下所示:
init
@Override public void init() { ... ... SchedulerState<ScheduledExecutorService[]> b = SchedulerState.init(new ScheduledExecutorService[n]); for (int i = 0; i < n; i++) { b.currentResource[i] = Schedulers.decorateExecutorService(this, this.get()); } if (!STATE.compareAndSet(this, null, b)) { for (ScheduledExecutorService exec : b.currentResource) { exec.shutdownNow(); } if (isDisposed()) { throw new IllegalStateException( "Initializing a disposed scheduler is not permitted" ); } }} |
? ? ? ?在這里,new了一個ScheduledExecutorService線程池對象作為調度器的底導線程池實現。
? ? ? ?點擊示例里的mono.publishOn()方法,如下所示:
???????publishOn
public final Mono<T> publishOn(Scheduler scheduler) { ???if(this instanceof Callable) { ??????if (this instanceof Fuseable.ScalarCallable) { ?????????try { ????????????T value = block(); ????????????return onAssembly(new MonoSubscribeOnValue<>(value, scheduler)); ?????????} ?????????catch (Throwable t) { ????????????//leave MonoSubscribeOnCallable defer error ?????????} ??????} ??????@SuppressWarnings("unchecked") ??????Callable<T> c = (Callable<T>)this; ??????return onAssembly(new MonoSubscribeOnCallable<>(c, scheduler)); ???} ???return onAssembly(new MonoPublishOn<>(this, scheduler)); } |
? ? ? ?在這里,new 了一個MonoSubscribeOnValue對象,并且傳遞了scheduler對象作為構造參數。
? ? ? ?如下所示:
???????new MonoSubscribeOnValue
final class MonoSubscribeOnValue<T> extends Mono<T> implements Scannable { ???final T value; ???final Scheduler scheduler; ???MonoSubscribeOnValue(@Nullable T value, Scheduler scheduler) { ??????this.value = value; ??????this.scheduler = Objects.requireNonNull(scheduler, "scheduler"); ???} |
? ? ? ?在這里,將入參scheduler作為MonoSubscribeOnValue對象的屬性scheduler保留下來。
? ? ? ?點擊示例里的mono.subscribe()方法,如下所示:
???????subscribe
public final Disposable subscribe( ??????@Nullable Consumer<? super T> consumer, ??????@Nullable Consumer<? super Throwable> errorConsumer, ??????@Nullable Runnable completeConsumer, ??????@Nullable Context initialContext) { ???return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer, ?????????completeConsumer, null, initialContext)); } |
? ? ? 在這里,new了一個LambdaMonoSubscriber對象,這里與《spring響應式編程系列:總體流程》類似。LambdaMonoSubscriber對象的構造函數如下所示:
???????new LambdaMonoSubscriber
LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer, ??????@Nullable Consumer<? super Throwable> errorConsumer, ??????@Nullable Runnable completeConsumer, ??????@Nullable Consumer<? super Subscription> subscriptionConsumer, ??????@Nullable Context initialContext) { ???this.consumer = consumer; ???this.errorConsumer = errorConsumer; ???this.completeConsumer = completeConsumer; ???this.subscriptionConsumer = subscriptionConsumer; ???this.initialContext = initialContext == null ? Context.empty() : initialContext; } |
???????MonoSubscribeOnValue.subscribe
@Override public void subscribe(CoreSubscriber<? super T> actual) { T v = value; if (v == null) { ScheduledEmpty parent = new ScheduledEmpty(actual); actual.onSubscribe(parent); try { parent.setFuture(scheduler.schedule(parent)); } catch (RejectedExecutionException ree) { if (parent.future != OperatorDisposables.DISPOSED) { actual.onError(Operators.onRejectedExecution(ree, actual.currentContext())); } } } else { actual.onSubscribe(new ScheduledScalar<>(actual, v, scheduler)); } } |
? ? ? ?在這里,new一個ScheduledScalar對象,傳入消費者和scheduler對象,然后同樣是調用消費者的onSubscribe()方法。
? ? ? ?如下所示:
???????onSubscribe
public final void onSubscribe(Subscription s) { ???if (Operators.validate(subscription, s)) { ??????this.subscription = s; ??????if (subscriptionConsumer != null) { ?????????try { ????????????subscriptionConsumer.accept(s); ?????????} ?????????catch (Throwable t) { ????????????Exceptions.throwIfFatal(t); ????????????s.cancel(); ????????????onError(t); ?????????} ??????} ??????else { ?????????s.request(Long.MAX_VALUE); ??????} ???} } |
? ? ? ?在這里,與《spring響應式編程系列:總體流程》類似,調用訂閱消費體的request()方法,如下所示:
???????request
@Override public void request(long n) { ???if (Operators.validate(n)) { ??????if (ONCE.compareAndSet(this, 0, 1)) { ?????????try { ????????????Disposable f = scheduler.schedule(this); ????????????if (!FUTURE.compareAndSet(this, ??????????????????null, ??????????????????f) && future != FINISHED && future != OperatorDisposables.DISPOSED) { ???????????????f.dispose(); ????????????} ?????????} ?????????catch (RejectedExecutionException ree) { ????????????if (future != FINISHED && future != OperatorDisposables.DISPOSED) { ???????????????actual.onError(Operators.onRejectedExecution(ree, ?????????????????????this, ?????????????????????null, ?????????????????????value, actual.currentContext())); ????????????} ?????????} ??????} ???} } |
? ? ? ?在這里,調用scheduler.schedule(this)方法,并將訂閱的消息體作為參數傳入。如下所示:
???????schedule
public Disposable schedule(Runnable task) { ?return Schedulers.directSchedule(pick(), task, null, 0L, TimeUnit.MILLISECONDS); } |
? ? ? ?在這里,首先,調用pick()方法獲取當前調度器的線程池對象,與訂閱的消息體一起作為參數傳入directSchedule()方法,如下所示:
???????directSchedule
static Disposable directSchedule(ScheduledExecutorService exec, ??????Runnable task, ??????@Nullable Disposable parent, ??????long delay, ??????TimeUnit unit) { ???task = onSchedule(task); ???SchedulerTask sr = new SchedulerTask(task, parent); ???Future<?> f; ???if (delay <= 0L) { ??????f = exec.submit((Callable<?>) sr); ???} ???else { ??????f = exec.schedule((Callable<?>) sr, delay, unit); ???} ???sr.setFuture(f); ???return sr; } |
? ? ? ?在這里,將訂閱的消息體任務提交到線程池去執行。接下來,我們看看為什么訂閱的消息體是一個線程池可以執行的任務呢?如下所示:
???????run
static final class ScheduledScalar<T> implements QueueSubscription<T>, InnerProducer<T>, Runnable { ????????... ... @Override public void run() { try { if (fusionState == NO_VALUE) { fusionState = HAS_VALUE; } actual.onNext(value); actual.onComplete(); } finally { FUTURE.lazySet(this, FINISHED); } } |
? ? ? ?在這里,ScheduledScalar實現了Runnable 接口,并且實現了run()方法,所以,訂閱的消息體就是一個線程池可以執行的任務了。該線程池任務的執行邏輯如下所示:
???????onNext
public final void onNext(T x) { ???Subscription s = S.getAndSet(this, Operators.cancelledSubscription()); ???if (s == Operators.cancelledSubscription()) { ??????Operators.onNextDropped(x, this.initialContext); ??????return; ???} ???if (consumer != null) { ??????try { ?????????consumer.accept(x); ??????} ??????catch (Throwable t) { ?????????Exceptions.throwIfFatal(t); ?????????s.cancel(); ?????????doError(t); ??????} ???} ???if (completeConsumer != null) { ??????try { ?????????completeConsumer.run(); ??????} ??????catch (Throwable t) { ?????????Operators.onErrorDropped(t, this.initialContext); ??????} ???} } |
? ? ? ?在這里,調用數據消費者的onNext()方法執行相關的消費邏輯。
? ? ? ?至此,大致流程就結束了。
時序圖

- 類關系的設計,與《spring響應式編程系列:總體流程》類似,主要包括數據發布者對象、數據訂閱者對象及訂閱的消息體對象;
- Mono和MonoSubscribeOnValue是數據發布者,LambdaMonoSubscriber是數據訂閱者,ScheduledScalar是訂閱的消息體;
- 不同點在于,多了Schedulers(暫且叫著調度器工廠)和ParallelScheduler調度器;以及ScheduledScalar在執行request方法時,需要將任務交由調度器來處理。
- Schedulers類里有一個Factory接口,該接口可以默認創建各種Scheduler調度器對象,如(ElasticScheduler、BoundedElasticScheduler、ParallelScheduler、SingleScheduler),這就是典型的工廠方法設計模式。
類圖
數據發布者
MonoSubscribeOnValue

MonoSubscribeOnValue與《spring響應式編程系列:總體流程》介紹的類似,都是繼承于Mono類,并且實現了CorePublisher和Publisher接口。
不同點在于,該數據發布者多了一個屬性,如下所示:
final Scheduler scheduler;
該屬性帶有線程池ScheduledExecutorService信息,可以為數據訂閱者提供異步執行的功能。
調度器
ParallelScheduler

- Scheduler
????提供了接口:Disposable schedule(Runnable task);
- ParallelScheduler
????該類封裝了線程池信息;實現了接口schedule(Runnable task),用于提供對所封裝的線程池的調度。
數據訂閱者
LambdaMonoSubscriber
LambdaMonoSubscriber與《spring響應式編程系列:總體流程》介紹的一樣。
訂閱的消息體
ScheduledScalar

? ? ? ?ScheduledScalar與《spring響應式編程系列:總體流程》介紹的類似,都實現了Subscription接口。
? ? ? ?不同點在于,ScheduledScalar實現了Runnable接口,從而可以提供給線程池執行。