Reactor ConnectableFlux支持多訂閱者

在 Reactor 中,ConnectableFlux 是一種用于處理響應式流的機制,它允許你控制何時開始訂閱和數據生成。通常情況下,訂閱者(subscriber)在訂閱時會立即開始接收數據,但有時你可能希望多個訂閱者“會面”(rendezvous)之后再觸發訂閱和數據生成。這就是 ConnectableFlux 的用途。

1. ConnectableFlux 的主要模式

Flux API 提供了兩種主要的模式來返回 ConnectableFluxpublishreplay

  • publish
    publish 會動態地嘗試滿足各個訂閱者的需求(即背壓),并通過將這些請求轉發到源來實現。如果任何訂閱者的掛起需求為 0,publish 會暫停對源的請求。
    例如,你可以使用 publish() 方法將一個冷發布者(cold publisher)轉換為熱發布者(hot publisher),從而允許多個訂閱者共享同一個數據源。

  • replay
    replay 會緩存第一次訂閱看到的數據,并在達到可配置的限制(如時間和緩沖區大小)后,將數據重放給后續的訂閱者。
    例如,你可以使用 replay(2) 來緩存最近的 2 個數據點,并在新訂閱者到來時重放這些數據。

2. ConnectableFlux 的管理方法

ConnectableFlux 提供了多種方法來管理訂閱和源的連接:

  • connect()
    你可以手動調用 connect() 方法,當達到足夠的訂閱數時,觸發對上游源的訂閱。例如:

    ConnectableFlux<String> connectableFlux = Flux.just("A", "B", "C").publish();
    connectableFlux.connect();
    

    在調用 connect() 之前,connectableFlux 不會開始發送數據。

  • autoConnect(n)
    autoConnect(n) 可以自動執行與 connect() 類似的操作,當有 n 個訂閱者訂閱時,自動觸發對源的訂閱。例如:

    Flux<String> flux = Flux.just("A", "B", "C");
    ConnectableFlux<String> autoConnectFlux = flux.publish().autoConnect(2);
    

    這意味著當有 2 個或更多訂閱者訂閱時,autoConnectFlux 會自動開始發送數據。

  • refCount(n)
    refCount(n) 不僅可以自動跟蹤傳入的訂閱,還可以檢測訂閱是否被取消。如果訂閱者數量不足,refCount 會斷開與源的連接,直到有新的訂閱者出現。例如:

    Flux<String> flux = Flux.just("A", "B", "C");
    ConnectableFlux<String> refCountFlux = flux.publish().refCount(2);
    

    這意味著當有 2 個訂閱者訂閱時,refCountFlux 會自動開始發送數據;當所有訂閱者取消訂閱后,refCountFlux 會斷開連接。

  • refCount(int, Duration)
    refCount(int, Duration) 增加了一個“寬限期”(grace period),即在訂閱者數量低于閾值時,等待指定的時間后再斷開連接。例如:

    Flux<String> flux = Flux.just("A", "B", "C");
    ConnectableFlux<String> refCountWithGrace = flux.publish().refCount(2, Duration.ofSeconds(10));
    

    這意味著在訂閱者數量低于 2 時,refCountWithGrace 會等待 10 秒,看看是否有新的訂閱者出現。

3. 應用場景

ConnectableFlux 適用于需要多個訂閱者“會面”后再觸發訂閱和數據生成的場景。例如:

  • 實時數據推送:在實時數據推送中,你可能希望多個客戶端在連接到服務器后才開始接收數據。使用 ConnectableFlux 可以確保所有客戶端都準備好后再開始發送數據。
  • 分布式系統:在分布式系統中,你可能希望多個節點在協調一致后再觸發數據生成。使用 ConnectableFlux 可以確保所有節點都準備好后再開始處理數據。
  • IoT 數據可視化:在 IoT 數據可視化中,你可能希望多個設備在連接到服務器后才開始發送數據。使用 ConnectableFlux 可以確保所有設備都準備好后再開始處理數據。

4. 相關案例

展示如何使用 ConnectableFlux 實現流的多訂閱和延遲連接

package org.example;import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;/*** Main0011 類演示了如何使用 Reactor 創建和操作 Flux 流* 該類展示了如何使用 ConnectableFlux 實現流的多訂閱和延遲連接*/
public class Main0011 {/*** 主函數展示了如何創建一個 Flux 流并使用 ConnectableFlux 進行訂閱和連接操作** @param args 命令行參數* @throws InterruptedException 線程睡眠時可能拋出的異常*/public static void main(String[] args) throws InterruptedException {// 創建一個 Flux 源,產生 1 到 3 的整數序列,并在訂閱時打印消息Flux<Integer> source = Flux.range(1, 3).doOnSubscribe(s -> System.out.println("subscribed to source"));// 將 Flux 源轉換為 ConnectableFlux,以便進行延遲連接和多訂閱ConnectableFlux<Integer> co = source.publish();// 訂閱 ConnectableFlux,此時不會開始產生數據co.subscribe(System.out::println, e -> {}, () -> {});// 再次訂閱,演示多個訂閱者co.subscribe(System.out::println, e -> {}, () -> {});// 打印消息,表明訂閱已完成,但數據流尚未開始System.out.println("done subscribing");// 線程睡眠,模擬在連接前的準備或其他操作Thread.sleep(500);// 打印消息,表明即將連接數據流System.out.println("will now connect");// 連接數據流,使數據開始流動到所有已訂閱的消費者co.connect();}
}

演示Reactor庫中Flux的自動連接(autoConnect)功能

package org.example;import reactor.core.publisher.Flux;/*** 該類用于演示Reactor庫中Flux的自動連接(autoConnect)功能* 它展示了如何使用autoConnect方法在多個訂閱者之間共享一個數據流,* 并在達到指定的訂閱者數量后自動開始數據流的發布*/
public class Main0012 {/*** 主函數,用于演示autoConnect的使用** @param args 命令行參數* @throws InterruptedException 當線程因中斷策略被中斷時拋出此異常*/public static void main(String[] args) throws InterruptedException {// 創建一個Flux數據源,范圍從1到3,同時在訂閱時打印消息Flux<Integer> source = Flux.range(1, 3).doOnSubscribe(s -> System.out.println("subscribed to source"));// 使用autoConnect方法使數據源在有兩個訂閱者時自動連接Flux<Integer> autoCo = source.publish().autoConnect(2);// 第一個訂閱者訂閱數據流,并在接收到數據時打印出來autoCo.subscribe(System.out::println, e -> {}, () -> {});System.out.println("subscribed first");// 暫停500毫秒以模擬時間流逝Thread.sleep(500);System.out.println("subscribing second");// 第二個訂閱者訂閱數據流,此時達到autoConnect設定的條件,數據流開始發布autoCo.subscribe(System.out::println, e -> {}, () -> {});}
}

5. 總結

ConnectableFlux 是 Reactor 中用于處理響應式流的機制,它允許你控制何時開始訂閱和數據生成。通過 publishreplay 模式,你可以實現多個訂閱者“會面”后再觸發訂閱和數據生成。通過 connect()autoConnect(n)refCount(n)refCount(int, Duration) 方法,你可以靈活地管理訂閱和源的連接。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/87108.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/87108.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/87108.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

vite + vue 項目下使用 tailwindcss

版本 node: > 18.0.0 vue: 3.5.13 vite: 6.3.1 tailwindcss: 4.1.6 tailwindcss/vite: 4.1.6 tailwindcss ? 細粒度類庫 提供數千個原子級CSS類&#xff08;如 text-center、bg-blue-500、p-4&#xff09;&#x1f9e9; 組合式開發 通過類名組合構建完全自定義的UI&#x…

Hibernate中save與saveOrUpdate的差異解析

在Hibernate中&#xff0c;save()和saveOrUpdate()都是用于持久化對象的方法&#xff0c;但它們的適用場景和行為有顯著差異&#xff1a; 1. save()方法 核心行為&#xff1a; 僅適用于瞬時態&#xff08;Transient&#xff09;對象&#xff08;即新創建、未與Session關聯的對象…

香橙派3B學習筆記14:deb 打包程序_解包前后腳本運行

本文學習如何用deb打包的方式打包自己需要調用系統庫的程序。 然后實現deb解包前后的腳本運行。 目錄 承接上文&#xff1a; 刪除上文遺留的.so文件&#xff1a; 終止ledlight進程&#xff1a; 目標解釋&#xff1a; 創建項目結構&#xff1a; 創建control文件&#xff1a; 創…

nanoGPT復現——prepare拆解(自己構建詞表 VS tiktoken)

在nanoGPT的data文件夾有兩個很相似的文件夾結構&#xff1a;shakespeare和shakespeare-char&#xff0c;這兩種都是對shakespeare數據集的處理&#xff0c;但是shakespeare使用的是tiktoken對文字進行編碼&#xff0c;另一個則是使用自己構建的詞表 一、shakespeare-char&…

macos 安裝 xcode

在 macOS 上安裝 Xcode&#xff08;或者 Xcode Command Line Tools&#xff09;的方法如下&#xff1a; 1. 安裝 Xcode Command Line Tools&#xff08;輕量級&#xff0c;滿足大部分編譯需求&#xff09; 終端命令&#xff1a; xcode-select --install會彈出安裝提示&#x…

大學專業科普 | 云計算、大數據

大數據專業是近年來隨著信息技術發展而興起的熱門學科&#xff0c;專注于從海量、多樣化的數據中提取有價值信息&#xff0c;為各行業提供數據驅動的決策支持。 專業定義 大數據專業旨在培養掌握大數據采集、存儲、管理、分析和應用等核心技術的人才。該專業融合了計算機科學…

本地文件自動提交到倉庫

背景 將本地目錄做一個存儲倉庫&#xff0c;將歸檔的文件放入其中。自動同步到遠程倉庫。 倉庫配置 省略 配置密鑰 用戶可以 git pull \ git push \ git commit 自動 拉取、更新 腳本 文件名&#xff1a;autosave.sh #!/bin/zsh# 設置變量 LOCAL_DIR$1# 進入工作目錄 cd "…

Ubuntu中控制用戶存儲空間配置步驟

目的&#xff0c;限制用戶磁盤空間占用&#xff0c;例如給用戶限制100-150G容量 1.安裝磁盤配額工具 sudo apt-get install -y quota 2.備份并修改/etc/fstab文件&#xff0c;使能支持quota sudo cp /etc/fstab /etc/fstab.bak vim /etc/fstab #寫入如下,usrjquotaaquota.u…

【網絡】Linux 內核優化實戰 - net.ipv4.tcp_rmem 和 net.core.rmem_default 關系

net.ipv4.tcp_rmem 和 net.core.rmem_default 都是 Linux 內核中控制網絡接收緩沖區的參數,但它們的作用范圍、優先級和使用場景存在明顯區別。以下是詳細對比: 核心區別 參數net.ipv4.tcp_rmemnet.core.rmem_default作用協議僅針對 TCP 協議針對 所有網絡協議(TCP、UDP 等…

設計模式精講 Day 14:命令模式(Command Pattern)

【設計模式精講 Day 14】命令模式&#xff08;Command Pattern&#xff09; 文章內容 在“設計模式精講”系列的第14天&#xff0c;我們來學習命令模式&#xff08;Command Pattern&#xff09;。命令模式是一種行為型設計模式&#xff0c;它將請求封裝為對象&#xff0c;從而…

手機射頻功放測試學習(二)——手機線性功放的靜態電流和小信號(S-Parameter)測試

目錄 一、概要 二、LPA的電流測試 1、LPA的泄漏電流測試 手動測試步驟如下: 自動化測試: 2、LPA的靜態電流測試 手動測試步驟如下: 自動化測試: 三、LPA的S-Parameter測試 1、矢量網絡分析儀校準 2、LPA的S參數手動測試步驟: 3、LPA的S參數自動測試步驟: 四…

基礎算法合集-圖論

本文將介紹數據結構圖論部分中常見的算法 單源最短路徑問題(用來計算一個點到其他所有頂點的最短路徑) Dijkstra(n*n) 1. 初始化: 先找出從源點V0到各終點Vk的直達路徑(V0,Vk), 即通過一條弧到達的路徑 2. 選擇: 從這些路徑中找出一條長度最短的路徑(V0,u) 3. 更新: 然后對其余…

vue-i18n 插件打包解析失效問題記錄

vue-i18n 插件打包解析失效問題記錄 開發環境中沒有問題的&#xff0c;但打包發布之后就不行了&#xff0c;顯示的就是模板字符串 // An highlighted block const messages {en: {step: {stepDesc1: Scan,stepDesc2: Analyze,stepDesc3: Result}},zh: {step: {stepDesc1: 掃描…

數據可視化 - 單子圖

一、認識單子圖 import matplotlib.pyplot as plt import numpy as np import pandas as pdplt.figure(num單子圖, figsize(12, 8), facecolorw) # 中文字體 plt.rcParams[font.sans-serif] KaiTi # 負號顯示 plt.rcParams[axes.unicode_minus] False# 2行&#xff0c;1列&a…

服務器上設置了代理之后,服務器可以訪問外網,但是不能訪問服務器本地。如何解決

你在服務器上設置了代理后&#xff0c;發現&#xff1a; 可以訪問外網不能訪問服務器本地地址&#xff08;如 localhost、127.0.0.1、內網IP&#xff09; 這是代理設置中常見的問題&#xff0c;尤其是當你設置了全局 HTTP/HTTPS 代理時。本地訪問也會被強制走代理&#xff0c…

mysql啟動報錯:Can‘t connect to local MySQL server through socket

文章目錄 一、報錯內容二、解決方法 一、報錯內容 在linux上啟動mysql時報錯 [rootlocalhost bin]# ./mysql -u root -p Enter password: ERROR 2002 (HY000): Cant connect to local MySQL server through socket /tmp/mysql.sock (2)執行以上命令后報錯&#xff0c;并且也…

C# Avalonia 綁定模式 Mode 的區別,它們的應用場景

C# Avalonia 綁定模式 Mode 的區別&#xff0c;它們的應用場景 文章目錄 1. **Default&#xff08;默認模式&#xff09;**2. **OneTime&#xff08;一次性綁定&#xff09;**3. **OneWay&#xff08;單向綁定&#xff09;**4. **TwoWay&#xff08;雙向綁定&#xff09;**5. *…

【OpenGL學習】(七)紋理單元

【OpenGL學習】&#xff08;七&#xff09;紋理單元 OpenGL的紋理單元&#xff08;Texture Unit&#xff09;是GPU中用于管理和組織紋理資源的邏輯單元&#xff0c;它允許開發者在渲染過程中同時使用多個紋理&#xff0c;并通過采樣器&#xff08;Sampler&#xff09;在著色器…

Ubuntu 下降 Linux Kernel 的版本備忘

此處以 ubuntu 22.04 為示例系統&#xff0c;來降低其 Linux kernel 的版本。 1. 降低 Linux kernel 版本 在 Ubuntu 22.04 上降低 Linux 內核版本的步驟如下所示。 步驟 1&#xff1a;檢查當前內核版本 uname -r 確認當前運行的內核版本。 步驟 2&#xff1a;查看已安裝的…

Python 數據分析與機器學習入門 (八):用 Scikit-Learn 跑通第一個機器學習模型

引言&#xff1a;初識 Scikit-Learn Scikit-learn 是 Python 機器學習領域的黃金標準庫。它構建在 NumPy, SciPy 和 Matplotlib 之上&#xff0c;提供了大量用于分類、回歸、聚類和降維等任務的算法。Scikit-learn 廣受歡迎的原因在于其三大核心優勢&#xff1a; 一致的 API 設…