在 Reactor 中,ConnectableFlux
是一種用于處理響應式流的機制,它允許你控制何時開始訂閱和數據生成。通常情況下,訂閱者(subscriber)在訂閱時會立即開始接收數據,但有時你可能希望多個訂閱者“會面”(rendezvous)之后再觸發訂閱和數據生成。這就是 ConnectableFlux
的用途。
1. ConnectableFlux 的主要模式
Flux
API 提供了兩種主要的模式來返回 ConnectableFlux
:publish
和 replay
。
-
publish:
publish
會動態地嘗試滿足各個訂閱者的需求(即背壓),并通過將這些請求轉發到源來實現。如果任何訂閱者的掛起需求為 0,publish
會暫停對源的請求。
例如,你可以使用publish()
方法將一個冷發布者(cold publisher)轉換為熱發布者(hot publisher),從而允許多個訂閱者共享同一個數據源。 -
replay:
replay
會緩存第一次訂閱看到的數據,并在達到可配置的限制(如時間和緩沖區大小)后,將數據重放給后續的訂閱者。
例如,你可以使用replay(2)
來緩存最近的 2 個數據點,并在新訂閱者到來時重放這些數據。
2. ConnectableFlux 的管理方法
ConnectableFlux
提供了多種方法來管理訂閱和源的連接:
-
connect():
你可以手動調用connect()
方法,當達到足夠的訂閱數時,觸發對上游源的訂閱。例如:ConnectableFlux<String> connectableFlux = Flux.just("A", "B", "C").publish(); connectableFlux.connect();
在調用
connect()
之前,connectableFlux
不會開始發送數據。 -
autoConnect(n):
autoConnect(n)
可以自動執行與connect()
類似的操作,當有n
個訂閱者訂閱時,自動觸發對源的訂閱。例如:Flux<String> flux = Flux.just("A", "B", "C"); ConnectableFlux<String> autoConnectFlux = flux.publish().autoConnect(2);
這意味著當有 2 個或更多訂閱者訂閱時,
autoConnectFlux
會自動開始發送數據。 -
refCount(n):
refCount(n)
不僅可以自動跟蹤傳入的訂閱,還可以檢測訂閱是否被取消。如果訂閱者數量不足,refCount
會斷開與源的連接,直到有新的訂閱者出現。例如:Flux<String> flux = Flux.just("A", "B", "C"); ConnectableFlux<String> refCountFlux = flux.publish().refCount(2);
這意味著當有 2 個訂閱者訂閱時,
refCountFlux
會自動開始發送數據;當所有訂閱者取消訂閱后,refCountFlux
會斷開連接。 -
refCount(int, Duration):
refCount(int, Duration)
增加了一個“寬限期”(grace period),即在訂閱者數量低于閾值時,等待指定的時間后再斷開連接。例如:Flux<String> flux = Flux.just("A", "B", "C"); ConnectableFlux<String> refCountWithGrace = flux.publish().refCount(2, Duration.ofSeconds(10));
這意味著在訂閱者數量低于 2 時,
refCountWithGrace
會等待 10 秒,看看是否有新的訂閱者出現。
3. 應用場景
ConnectableFlux
適用于需要多個訂閱者“會面”后再觸發訂閱和數據生成的場景。例如:
- 實時數據推送:在實時數據推送中,你可能希望多個客戶端在連接到服務器后才開始接收數據。使用
ConnectableFlux
可以確保所有客戶端都準備好后再開始發送數據。 - 分布式系統:在分布式系統中,你可能希望多個節點在協調一致后再觸發數據生成。使用
ConnectableFlux
可以確保所有節點都準備好后再開始處理數據。 - IoT 數據可視化:在 IoT 數據可視化中,你可能希望多個設備在連接到服務器后才開始發送數據。使用
ConnectableFlux
可以確保所有設備都準備好后再開始處理數據。
4. 相關案例
展示如何使用 ConnectableFlux 實現流的多訂閱和延遲連接
package org.example;import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;/*** Main0011 類演示了如何使用 Reactor 創建和操作 Flux 流* 該類展示了如何使用 ConnectableFlux 實現流的多訂閱和延遲連接*/
public class Main0011 {/*** 主函數展示了如何創建一個 Flux 流并使用 ConnectableFlux 進行訂閱和連接操作** @param args 命令行參數* @throws InterruptedException 線程睡眠時可能拋出的異常*/public static void main(String[] args) throws InterruptedException {// 創建一個 Flux 源,產生 1 到 3 的整數序列,并在訂閱時打印消息Flux<Integer> source = Flux.range(1, 3).doOnSubscribe(s -> System.out.println("subscribed to source"));// 將 Flux 源轉換為 ConnectableFlux,以便進行延遲連接和多訂閱ConnectableFlux<Integer> co = source.publish();// 訂閱 ConnectableFlux,此時不會開始產生數據co.subscribe(System.out::println, e -> {}, () -> {});// 再次訂閱,演示多個訂閱者co.subscribe(System.out::println, e -> {}, () -> {});// 打印消息,表明訂閱已完成,但數據流尚未開始System.out.println("done subscribing");// 線程睡眠,模擬在連接前的準備或其他操作Thread.sleep(500);// 打印消息,表明即將連接數據流System.out.println("will now connect");// 連接數據流,使數據開始流動到所有已訂閱的消費者co.connect();}
}
演示Reactor庫中Flux的自動連接(autoConnect)功能
package org.example;import reactor.core.publisher.Flux;/*** 該類用于演示Reactor庫中Flux的自動連接(autoConnect)功能* 它展示了如何使用autoConnect方法在多個訂閱者之間共享一個數據流,* 并在達到指定的訂閱者數量后自動開始數據流的發布*/
public class Main0012 {/*** 主函數,用于演示autoConnect的使用** @param args 命令行參數* @throws InterruptedException 當線程因中斷策略被中斷時拋出此異常*/public static void main(String[] args) throws InterruptedException {// 創建一個Flux數據源,范圍從1到3,同時在訂閱時打印消息Flux<Integer> source = Flux.range(1, 3).doOnSubscribe(s -> System.out.println("subscribed to source"));// 使用autoConnect方法使數據源在有兩個訂閱者時自動連接Flux<Integer> autoCo = source.publish().autoConnect(2);// 第一個訂閱者訂閱數據流,并在接收到數據時打印出來autoCo.subscribe(System.out::println, e -> {}, () -> {});System.out.println("subscribed first");// 暫停500毫秒以模擬時間流逝Thread.sleep(500);System.out.println("subscribing second");// 第二個訂閱者訂閱數據流,此時達到autoConnect設定的條件,數據流開始發布autoCo.subscribe(System.out::println, e -> {}, () -> {});}
}
5. 總結
ConnectableFlux
是 Reactor 中用于處理響應式流的機制,它允許你控制何時開始訂閱和數據生成。通過 publish
和 replay
模式,你可以實現多個訂閱者“會面”后再觸發訂閱和數據生成。通過 connect()
、autoConnect(n)
、refCount(n)
和 refCount(int, Duration)
方法,你可以靈活地管理訂閱和源的連接。