這段文字詳細解釋了 Reactor 中 熱發布者(Hot Publisher) 和 冷發布者(Cold Publisher) 的區別,并通過示例展示了它們的行為差異。以下是對其含義的總結和解釋:
1. 冷發布者(Cold Publisher)
-
定義:冷發布者在訂閱時才開始生成數據。如果沒有訂閱者,數據不會被生成。
-
行為:每個訂閱者都會獨立地觸發數據的生成和處理流程。
-
類比:就像 HTTP 請求,每個訂閱者都會觸發一次新的請求,即使之前已經有人訂閱過。
-
示例:
Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple")).map(String::toUpperCase); source.subscribe(d -> System.out.println("Subscriber 1: " + d)); source.subscribe(d -> System.out.println("Subscriber 2: " + d));
輸出結果:
Subscriber 1: BLUE Subscriber 1: GREEN Subscriber 1: ORANGE Subscriber 1: PURPLE Subscriber 2: BLUE Subscriber 2: GREEN Subscriber 2: ORANGE Subscriber 2: PURPLE
每個訂閱者都會接收到所有數據,因為每個訂閱都會重新執行整個操作鏈 。
2. 熱發布者(Hot Publisher)
-
定義:熱發布者在創建時就開始發布數據,不依賴于訂閱者的數量。即使沒有訂閱者,數據也會持續發布。
-
行為:訂閱者只會看到從訂閱開始之后發布的數據。如果在訂閱之前已經發布了數據,新訂閱者不會看到這些數據。
-
類比:就像一個股票價格發布者,一旦價格發生變化,所有訂閱者都會收到更新,但新訂閱者只會看到之后的價格變化。
-
示例:
Sinks.Many<String> hotSource = Sinks.unsafe().many().multicast().directBestEffort(); Flux<String> hotFlux = hotSource.asFlux().map(String::toUpperCase); hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: " + d)); hotSource.emitNext("blue", FAIL_FAST); hotSource.tryEmitNext("green").orThrow(); hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: " + d)); hotSource.emitNext("orange", FAIL_FAST); hotSource.tryEmitNext("purple").orThrow();
輸出結果:
Subscriber 1 to Hot Source: BLUE Subscriber 1 to Hot Source: GREEN Subscriber 1 to Hot Source: ORANGE Subscriber 1 to Hot Source: PURPLE Subscriber 2 to Hot Source: ORANGE Subscriber 2 to Hot Source: PURPLE
第二個訂閱者只看到 “orange” 和 “purple”,因為它們是在第一個訂閱者之后發布的 。
3. 如何將冷發布者轉換為熱發布者
share()
:將冷發布者轉換為熱發布者,多個訂閱者可以共享同一個發布者。第一個訂閱者觸發發布,后續訂閱者共享數據。replay(n)
:將冷發布者轉換為熱發布者,并保留最近的n
個元素,新訂閱者可以接收到這些元素。Sinks.Many
:通過Sinks.Many
手動控制數據的發布,可以模擬熱發布者的行為 。
4. 如何將熱發布者轉換為冷發布者
defer()
:將熱發布者(如just
)轉換為冷發布者,延遲執行,直到有訂閱者訂閱時才生成數據 。
5. 總結
- 冷發布者:每個訂閱者都會獨立地觸發數據的生成和處理。
- 熱發布者:數據在創建時就開始發布,訂閱者只會看到從訂閱開始之后的數據。
- 轉換方法:
- 冷 → 熱:
share()
、replay()
、Sinks.Many
- 熱 → 冷:
defer()
- 冷 → 熱:
這種區分對于構建高效、可擴展的響應式系統非常重要,尤其是在處理大量并發請求或實時數據流時 。