spring響應式編程系列:異步消費數據

目錄

示例

大致流程

parallel

cache

PARALLEL_SUPPLIER

newParallel

init

publishOn

new MonoSubscribeOnValue

???????subscribe

???????new LambdaMonoSubscriber

???????MonoSubscribeOnValue.subscribe

???????onSubscribe

???????request

???????schedule

???????directSchedule

???????run

???????onNext

時序圖

類圖

數據發布者

MonoSubscribeOnValue

調度器

ParallelScheduler

數據訂閱者

LambdaMonoSubscriber

訂閱的消息體

ScheduledScalar


? ? ? ?本篇文章我們來研究如何控制數據流在特定線程池上的執行。即將操作符的執行切換到指定調度器(Scheduler)的線程。在Project Reactor框架中,Mono.publishOn()就可以實現該功能,示例如下所示:

示例

CountDownLatch countDownLatch = new CountDownLatch(1);

// 創建一個包含數據的 Mono

Mono<String> mono = Mono.just("Hello, Reactive World!");

mono.publishOn(Schedulers.parallel())

????.subscribe(x -> {

????????log.info("Sub thread, subscribe: {}", x);

????????countDownLatch.countDown();

????});

log.info("Main thread, blocking");

countDownLatch.await();

? ? ? ?首先,通過mono.publishOn方法指定線程池;

? ? ? ?其次,通過subscribe指定的消費者處理邏輯會在前一步指定的線程池里執行。

大致流程

? ? ? ?點擊Schedulers.parallel(),如下所示:

parallel

public static Scheduler parallel() {

return cache(CACHED_PARALLEL, PARALLEL, PARALLEL_SUPPLIER);

}

? ? ? ?在這里,調用cache方法并且PARALLEL_SUPPLIER參數,返回Scheduler調度器。cache方法如下所示:

cache

static CachedScheduler cache(AtomicReference<CachedScheduler> reference, String key, Supplier<Scheduler> supplier) {

CachedScheduler s = reference.get();

if (s != null) {

return s;

}

s = new CachedScheduler(key, supplier.get());

if (reference.compareAndSet(null, s)) {

return s;

}

... ...

}

? ? ? ?在這里,調用supplier.get()方法來生成CachedScheduler調度器。supplier.get()方法的定義如下所示:

PARALLEL_SUPPLIER

static final Supplier<Scheduler> PARALLEL_SUPPLIER =
??????() -> newParallel(PARALLEL, DEFAULT_POOL_SIZE, true);

newParallel

public static Scheduler newParallel(int parallelism, ThreadFactory threadFactory) {

final Scheduler fromFactory = factory.newParallel(parallelism, threadFactory);

fromFactory.init();

return fromFactory;

}

? ? ? ?在這里,通過工廠方法模式創建Scheduler對象,并調用init()初始化方法,如下所示:

init

@Override

public void init() {

... ...

SchedulerState<ScheduledExecutorService[]> b =

SchedulerState.init(new ScheduledExecutorService[n]);

for (int i = 0; i < n; i++) {

b.currentResource[i] = Schedulers.decorateExecutorService(this, this.get());

}

if (!STATE.compareAndSet(this, null, b)) {

for (ScheduledExecutorService exec : b.currentResource) {

exec.shutdownNow();

}

if (isDisposed()) {

throw new IllegalStateException(

"Initializing a disposed scheduler is not permitted"

);

}

}}

? ? ? ?在這里,new了一個ScheduledExecutorService線程池對象作為調度器的底導線程池實現。

? ? ? ?點擊示例里的mono.publishOn()方法,如下所示:

???????publishOn

public final Mono<T> publishOn(Scheduler scheduler) {
???if(this instanceof Callable) {
??????if (this instanceof Fuseable.ScalarCallable) {
?????????try {
????????????T value = block();
????????????return onAssembly(new MonoSubscribeOnValue<>(value, scheduler));
?????????}
?????????catch (Throwable t) {
????????????//leave MonoSubscribeOnCallable defer error
?????????}
??????}
??????@SuppressWarnings("unchecked")
??????Callable<T> c = (Callable<T>)this;
??????return onAssembly(new MonoSubscribeOnCallable<>(c, scheduler));
???}
???return onAssembly(new MonoPublishOn<>(this, scheduler));
}

? ? ? ?在這里,new 了一個MonoSubscribeOnValue對象,并且傳遞了scheduler對象作為構造參數。

? ? ? ?如下所示:

???????new MonoSubscribeOnValue

final class MonoSubscribeOnValue<T> extends Mono<T> implements Scannable {
???final T value;
???final Scheduler scheduler;
???MonoSubscribeOnValue(@Nullable T value, Scheduler scheduler) {
??????this.value = value;
??????this.scheduler = Objects.requireNonNull(scheduler, "scheduler");
???}

? ? ? ?在這里,將入參scheduler作為MonoSubscribeOnValue對象的屬性scheduler保留下來。

? ? ? ?點擊示例里的mono.subscribe()方法,如下所示:

???????subscribe

public final Disposable subscribe(
??????@Nullable Consumer<? super T> consumer,
??????@Nullable Consumer<? super Throwable> errorConsumer,
??????@Nullable Runnable completeConsumer,
??????@Nullable Context initialContext) {
???return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
?????????completeConsumer, null, initialContext));
}

? ? ? 在這里,new了一個LambdaMonoSubscriber對象,這里與《spring響應式編程系列:總體流程》類似。LambdaMonoSubscriber對象的構造函數如下所示:

???????new LambdaMonoSubscriber

LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer,
??????@Nullable Consumer<? super Throwable> errorConsumer,
??????@Nullable Runnable completeConsumer,
??????@Nullable Consumer<? super Subscription> subscriptionConsumer,
??????@Nullable Context initialContext) {
???this.consumer = consumer;
???this.errorConsumer = errorConsumer;
???this.completeConsumer = completeConsumer;
???this.subscriptionConsumer = subscriptionConsumer;
???this.initialContext = initialContext == null ? Context.empty() : initialContext;
}

???????MonoSubscribeOnValue.subscribe

@Override

public void subscribe(CoreSubscriber<? super T> actual) {

T v = value;

if (v == null) {

ScheduledEmpty parent = new ScheduledEmpty(actual);

actual.onSubscribe(parent);

try {

parent.setFuture(scheduler.schedule(parent));

}

catch (RejectedExecutionException ree) {

if (parent.future != OperatorDisposables.DISPOSED) {

actual.onError(Operators.onRejectedExecution(ree,

actual.currentContext()));

}

}

}

else {

actual.onSubscribe(new ScheduledScalar<>(actual, v, scheduler));

}

}

? ? ? ?在這里,new一個ScheduledScalar對象,傳入消費者和scheduler對象,然后同樣是調用消費者的onSubscribe()方法。

? ? ? ?如下所示:

???????onSubscribe

public final void onSubscribe(Subscription s) {
???if (Operators.validate(subscription, s)) {
??????this.subscription = s;
??????if (subscriptionConsumer != null) {
?????????try {
????????????subscriptionConsumer.accept(s);
?????????}
?????????catch (Throwable t) {
????????????Exceptions.throwIfFatal(t);
????????????s.cancel();
????????????onError(t);
?????????}
??????}
??????else {
?????????s.request(Long.MAX_VALUE);
??????}
???}
}

? ? ? ?在這里,與《spring響應式編程系列:總體流程》類似,調用訂閱消費體的request()方法,如下所示:

???????request

@Override
public void request(long n) {
???if (Operators.validate(n)) {
??????if (ONCE.compareAndSet(this, 0, 1)) {
?????????try {
????????????Disposable f = scheduler.schedule(this);
????????????if (!FUTURE.compareAndSet(this,
??????????????????null,
??????????????????f) && future != FINISHED && future != OperatorDisposables.DISPOSED) {
???????????????f.dispose();
????????????}
?????????}
?????????catch (RejectedExecutionException ree) {
????????????if (future != FINISHED && future != OperatorDisposables.DISPOSED) {
???????????????actual.onError(Operators.onRejectedExecution(ree,
?????????????????????this,
?????????????????????null,
?????????????????????value, actual.currentContext()));
????????????}
?????????}
??????}
???}
}

? ? ? ?在這里,調用scheduler.schedule(this)方法,并將訂閱的消息體作為參數傳入。如下所示:

???????schedule

public Disposable schedule(Runnable task) {
?return Schedulers.directSchedule(pick(), task, null, 0L, TimeUnit.MILLISECONDS);
}

? ? ? ?在這里,首先,調用pick()方法獲取當前調度器的線程池對象,與訂閱的消息體一起作為參數傳入directSchedule()方法,如下所示:

???????directSchedule

static Disposable directSchedule(ScheduledExecutorService exec,
??????Runnable task,
??????@Nullable Disposable parent,
??????long delay,
??????TimeUnit unit) {
???task = onSchedule(task);
???SchedulerTask sr = new SchedulerTask(task, parent);
???Future<?> f;
???if (delay <= 0L) {
??????f = exec.submit((Callable<?>) sr);
???}
???else {
??????f = exec.schedule((Callable<?>) sr, delay, unit);
???}
???sr.setFuture(f);
???return sr;
}

? ? ? ?在這里,將訂閱的消息體任務提交到線程池去執行。接下來,我們看看為什么訂閱的消息體是一個線程池可以執行的任務呢?如下所示:

???????run

static final class ScheduledScalar<T>

implements QueueSubscription<T>, InnerProducer<T>, Runnable {

????????... ...

@Override

public void run() {

try {

if (fusionState == NO_VALUE) {

fusionState = HAS_VALUE;

}

actual.onNext(value);

actual.onComplete();

}

finally {

FUTURE.lazySet(this, FINISHED);

}

}

? ? ? ?在這里,ScheduledScalar實現了Runnable 接口,并且實現了run()方法,所以,訂閱的消息體就是一個線程池可以執行的任務了。該線程池任務的執行邏輯如下所示:

???????onNext

public final void onNext(T x) {
???Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
???if (s == Operators.cancelledSubscription()) {
??????Operators.onNextDropped(x, this.initialContext);
??????return;
???}
???if (consumer != null) {
??????try {
?????????consumer.accept(x);
??????}
??????catch (Throwable t) {
?????????Exceptions.throwIfFatal(t);
?????????s.cancel();
?????????doError(t);
??????}
???}
???if (completeConsumer != null) {
??????try {
?????????completeConsumer.run();
??????}
??????catch (Throwable t) {
?????????Operators.onErrorDropped(t, this.initialContext);
??????}
???}
}

? ? ? ?在這里,調用數據消費者的onNext()方法執行相關的消費邏輯。

? ? ? ?至此,大致流程就結束了。

時序圖

  1. 類關系的設計,與《spring響應式編程系列:總體流程》類似,主要包括數據發布者對象、數據訂閱者對象及訂閱的消息體對象;
  2. Mono和MonoSubscribeOnValue是數據發布者,LambdaMonoSubscriber是數據訂閱者,ScheduledScalar是訂閱的消息體;
  3. 不同點在于,多了Schedulers(暫且叫著調度器工廠)和ParallelScheduler調度器;以及ScheduledScalar在執行request方法時,需要將任務交由調度器來處理。
  4. Schedulers類里有一個Factory接口,該接口可以默認創建各種Scheduler調度器對象,如(ElasticScheduler、BoundedElasticScheduler、ParallelScheduler、SingleScheduler),這就是典型的工廠方法設計模式。

類圖

數據發布者

MonoSubscribeOnValue

MonoSubscribeOnValue與《spring響應式編程系列:總體流程》介紹的類似,都是繼承于Mono類,并且實現了CorePublisher和Publisher接口。

不同點在于,該數據發布者多了一個屬性,如下所示:

final Scheduler scheduler;

該屬性帶有線程池ScheduledExecutorService信息,可以為數據訂閱者提供異步執行的功能。

調度器

ParallelScheduler

  1. Scheduler

????提供了接口:Disposable schedule(Runnable task);

  1. ParallelScheduler

????該類封裝了線程池信息;實現了接口schedule(Runnable task),用于提供對所封裝的線程池的調度。

數據訂閱者

LambdaMonoSubscriber

LambdaMonoSubscriber與《spring響應式編程系列:總體流程》介紹的一樣。

訂閱的消息體

ScheduledScalar

? ? ? ?ScheduledScalar與《spring響應式編程系列:總體流程》介紹的類似,都實現了Subscription接口。

? ? ? ?不同點在于,ScheduledScalar實現了Runnable接口,從而可以提供給線程池執行。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/79857.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/79857.shtml
英文地址,請注明出處:http://en.pswp.cn/web/79857.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

視頻編解碼學習十二之Android疑點

一、android.view.SurfaceControl.setDisplaySurface的作用 android.view.SurfaceControl.setDisplaySurface 是 Android 系統中一個 native 層級別的 API&#xff0c;主要用于 設置某個物理顯示屏&#xff08;Display&#xff09;的輸出 Surface&#xff0c;屬于 SurfaceFlin…

家用或辦公 Windows 電腦玩人工智能開源項目配備核顯的必要性(含 NPU 及顯卡類型補充)

一、GPU 與顯卡的概念澄清 首先需要明確一個容易誤解的概念&#xff1a;GPU 不等同于顯卡。 顯卡和GPU是兩個不同的概念。 【概念區分】 在討論圖形計算領域時&#xff0c;需首先澄清一個常見誤區&#xff1a;GPU&#xff08;圖形處理單元&#xff09;與顯卡&#xff08;視…

Python----神經網絡(《Deep Residual Learning for Image Recognition》論文和ResNet網絡結構)

一、論文 1.1、論文基本信息 標題&#xff1a;Deep Residual Learning for Image Recognition 作者&#xff1a;Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun 單位&#xff1a;Microsoft Research 會議&#xff1a;CVPR 2016 主要貢獻&#xff1a;提出了一種深度殘…

Qt/C++開發監控GB28181系統/錄像文件查詢/錄像回放/倍速播放/錄像文件下載

一、前言 搞定了實時預覽后&#xff0c;另一個功能就是錄像回放&#xff0c;錄像回放和視頻點播功能完全一致&#xff0c;唯一的區別就是發送點播的sdp信息中攜帶了開始時間和結束時間&#xff0c;因為是錄像文件&#xff0c;所以有這個時間&#xff0c;而實時視頻預覽這個對應…

在Spark搭建YARN

&#xff08;一&#xff09;什么是SparkONYarn模式 Spark on YARN&#xff08;Yet Another Resource Negotiator&#xff09;是 Spark 框架在 Hadoop 集群中運行的一種部署模式&#xff0c;它借助 Hadoop YARN 來管理資源和調度任務。 架構組成 ResourceManager&#xff1a;作…

SpringAI

機器學習&#xff1a; 定義&#xff1a;人工智能的子領域&#xff0c;通過數據驅動的方法讓計算機學習規律&#xff0c;進行預測或決策。 核心方法&#xff1a; 監督學習&#xff08;如線性回歸、SVM&#xff09;。 無監督學習&#xff08;如聚類、降維&#xff09;。 強化學…

如何用Redis實現分布式鎖?RedLock算法的核心思想?Redisson的看門狗機制原理?

一、Redis分布式鎖基礎實現 public class RedisDistributedLock {private JedisPool jedisPool;private String lockKey;private String clientId;private int expireTime 30; // 默認30秒public boolean tryLock() {try (Jedis jedis jedisPool.getResource()) {// NX表示不…

前端面試寶典---js垃圾回收機制

什么是垃圾回收 垃圾回收是指一種自動內存管理機制&#xff0c;當聲明一個變量時&#xff0c;會在內存中開辟一塊內存空間用于存放這個變量。當這個變量被使用過后&#xff0c;可能再也不需要它了&#xff0c;此時垃圾回收器會自動檢測并回收這些不再使用的內存空間。垃圾回收…

阿里媽媽LMA2新進展:集成大語言模型與電商知識的通用召回大模型URM

近日&#xff0c;阿里媽媽在國際頂級學術會議 —— 國際萬維網大會&#xff08;International World Wide Web Conference, 簡稱WWW&#xff09;上共同主持了計算廣告算法技術相關的Tutorial&#xff08;講座&#xff09;&#xff0c;介紹了計算廣告領域的技術發展脈絡&#xf…

數字孿生實時監控汽車零部件工廠智能化巡檢新范式

在汽車制造業面臨數字化轉型時&#xff0c;汽車零部件工廠也面臨著提升生產效率、降低運營成本和增強市場競爭力的多重挑戰。傳統的巡檢方式已經難以滿足現代工廠對高效、精準管理和實時決策的需求。數字孿生系統的出現&#xff0c;為汽車零部件工廠提供了一種創新的智能化巡檢…

【計算機網絡】3數據鏈路層②

1. 數據鏈路層所處的地位 數據鏈路層使用的信道主要有兩種: ①點對點信道:PPP協議 ②廣播信道:有線局域網,CSMA/CD協議;無線局域網,CSMA/CA協議 對比項點對點信道 vs 單播廣播信道 vs 廣播核心是否一致? 一致(一對一傳輸)? 一致(一對所有傳輸)差異點前者是物理層…

c++中的函數(默認參數,占位參數,重載)

1&#xff0c;函數默認參數 在c中&#xff0c;函數的形參列表中的形參是可以有默認值得 語法&#xff1a;返回值類型 函數名 &#xff08;參數 默認值&#xff09;{} 示例&#xff1a; #include<iostream> using namespace std;//函數默認參數 // 就是如果傳了就…

【原創】使用阿里云存放一個臨時共享的文件

在某些場合&#xff0c;需要臨時將一個文件存儲到一個可被公網訪問的地方&#xff0c;某個服務需要訪問一下這個文件。這個文件基本上就是一次壽命&#xff0c;也就是你上傳一下&#xff0c;然后被訪問一下&#xff0c;這個文件的壽命就結束了。 對于這種需求&#xff0c;自建…

Python中列表(list)知識詳解(2)和注意事項以及應用示例

在 Python 中列表&#xff08;list&#xff09; 的包括其結構、常見操作&#xff08;更新、添加、刪除、查找、隊列棧行為等&#xff09;&#xff0c;下面將逐一的進行講解并附相關的示例。 一、列表的基礎知識 1. 定義與特點 定義方式&#xff1a;用 [] 包裹的有序可變集合 …

vscode extention踩坑記

# npx vsce package --allow-missing-repository --no-dependencies #耗時且不穩定 npx vsce package --allow-missing-repository #用這行 code --install-extension $vsixFileName --force我問ai&#xff1a;為什么我的.vsix文件大了那么多 ai答&#xff1a;因為你沒有用 --n…

移動端巡檢點檢,讓設備管理更便捷高效

在企業設備管理的日常工作中&#xff0c;巡檢點檢是保障設備正常運行的重要環節。傳統的巡檢方式依賴紙質記錄、人工操作&#xff0c;效率低、易出錯&#xff0c;已難以滿足現代企業的管理需求。隨著技術發展&#xff0c;越來越多設備管理系統引入移動端功能&#xff0c;為設備…

laravel 中使用的pdf 擴展包 laravel-snappy(已解決中文亂碼)

Centos7 安裝 wkhtmltopdf 1、先查看系統是 32 位的還是 64 位的 uname -a2、通過 composer 安裝 wkhtmltopdf 32位: $ composer require h4cc / wkhtmltopdf-i386 0.12.x $ composer require h4cc / wkhtmltoimage-i386 0.12.x 64位: $ composer require h4cc/wkhtmltopdf-…

Rust:重新定義系統編程的安全與效率邊界

在軟件工程領域&#xff0c;內存安全漏洞每年造成數千億美元損失&#xff0c;而C/C生態中60%的漏洞源于指針誤用。正是在這樣的背景下&#xff0c;Rust憑借其革命性的內存安全機制異軍突起。作為一門現代系統級編程語言&#xff0c;Rust不僅解決了困擾開發者數十年的內存管理難…

C++學習細節回顧(匯總二)

一.初始化列表相關 1.初始化順序受申明順序影響 2.在必要時可以部分不采用初始化列表&#xff0c;避免受特性1影響 二.非類型模板參數 template< class T , size_t N 10 > 三.特化–特殊化處理 template< class T > bool less(T left , T right) { return left&…

勾選某一行的勾選框,更改當前行的顏色,ALV數據發生變化的事件

文章目錄 屏幕ALV的創建定義變量注冊事件方法定義方法實現frm_data_change 效果 屏幕 ALV的創建 DATA: g_gui_custom_container TYPE REF TO cl_gui_custom_container. DATA: g_gui_alv_grid TYPE REF TO cl_gui_alv_grid.DATA: gt_listheader TYPE slis_t_listheader, &quo…