Reactor 事件流 vs. Spring 事件 ApplicationEvent
- Reactor 事件流 vs. Spring 事件 (`ApplicationEvent`)
- 1?? 核心區別
- 2?? Spring 事件 (`ApplicationEvent`)
- ? 示例:Spring 事件發布 & 監聽
- 1?? 定義事件
- 2?? 發布事件
- 3?? 監聽事件
- 🔹 進階:異步監聽
- 3?? Reactor 事件流(`Flux` / `Mono` / `Sinks.Many`)
- ? 示例:事件流處理
- 1?? 冷流(每個訂閱者獨立獲取數據)
- ? 方法 1:使用 `Flux.create()`,手動推送數據
- **? 方法 2:使用 `Flux.generate()`,按需推送數據**
- **? 方法 3:使用 `Sinks.many().multicast()`,支持多個訂閱者**
- **? 結論**
- 2?? 熱流(共享事件流)
- **📌 ReplayProcessor:可重放歷史事件的 Reactor 處理器**
- **📌 1?? 關鍵特點**
- **📌 2?? 基本使用**
- **📌 運行結果**
- **📌 3?? `ReplayProcessor` vs `Sinks.Many`**
- **📌 4?? 適用場景**
- 4?? 什么時候用哪個?
- 5?? 總結
Reactor 事件流 vs. Spring 事件 (ApplicationEvent
)
1?? 核心區別
特性 | Spring ApplicationEvent | Reactor Flux /Sinks.Many |
---|---|---|
數據處理方式 | 一次性事件(同步或異步) | 流式處理(持續事件流) |
是否支持多個訂閱者 | ? 支持(多個 @EventListener ) | ? 支持(Sinks.Many 廣播) |
是否支持流式操作 | ? 不支持 | ? 支持(map() , filter() , zip() ) |
是否支持回放歷史事件 | ? 不支持 | ?(默認不支持,但可用 ReplayProcessor ) |
適用場景 | 業務事件通知(用戶注冊、訂單支付) | 高吞吐數據流處理(日志、消息隊列、WebFlux) |
2?? Spring 事件 (ApplicationEvent
)
🔹 特點
- 適用于應用內部組件通信,解耦業務邏輯。
- 默認同步,可使用
@Async
進行異步處理。 - 一次性事件,無法流式處理。
? 示例:Spring 事件發布 & 監聽
1?? 定義事件
public class UserRegisteredEvent extends ApplicationEvent {private final String username;public UserRegisteredEvent(Object source, String username) {super(source);this.username = username;}public String getUsername() { return username; }
}
2?? 發布事件
@Component
public class UserService {@Autowiredprivate ApplicationEventPublisher eventPublisher;public void registerUser(String username) {eventPublisher.publishEvent(new UserRegisteredEvent(this, username));}
}
3?? 監聽事件
@Component
public class WelcomeEmailListener {@EventListenerpublic void handleUserRegisteredEvent(UserRegisteredEvent event) {System.out.println("📧 發送歡迎郵件給: " + event.getUsername());}
}
可多個 @EventListener
監聽同一個事件,同時觸發
@Component
public class LoggingListener {@EventListenerpublic void logUserRegisteredEvent(UserRegisteredEvent event) {System.out.println("📜 記錄日志: 用戶 " + event.getUsername() + " 已注冊");}
}
@Component
public class PointsRewardListener {@EventListenerpublic void giveWelcomePoints(UserRegisteredEvent event) {System.out.println("🎁 贈送 100 積分給: " + event.getUsername());}
}
🔹 進階:異步監聽
🔹 1?? 監聽器可以指定異步 需要啟用 Spring 異步,@EnableAsync
@Async
@EventListener
public void sendWelcomeEmail(UserRegisteredEvent event) {System.out.println("📧 發送歡迎郵件給: " + event.getUsername() + " [異步]");
}
🔹 2?? 監聽多個事件 如果多個事件需要相同的處理邏輯,你可以用 classes
監聽多個事件:
@EventListener(classes = {UserRegisteredEvent.class, OrderPlacedEvent.class})
public void handleMultipleEvents(Object event) {System.out.println("📢 事件觸發: " + event.getClass().getSimpleName());
}
🔹 3?? 條件監聽 可以使用 condition
屬性,讓監聽器只處理 符合條件 的事件:
@EventListener(condition = "#event.username.startsWith('A')")
public void handleUserStartingWithA(UserRegisteredEvent event) {System.out.println("🎯 處理用戶名以 A 開頭的用戶: " + event.getUsername());
}
🔹 適用場景 ? 適用于業務事件通知(如用戶注冊、訂單支付)。 ? 不適用于流式數據處理。
3?? Reactor 事件流(Flux
/ Mono
/ Sinks.Many
)
🔹 特點
- 異步 & 流式 處理,可以并行、合并、過濾、轉換數據。
- 冷流(Flux、Mono) 每個訂閱者獨立處理數據。
- 熱流(Sinks.Many) 可用于事件廣播。
? 示例:事件流處理
1?? 冷流(每個訂閱者獨立獲取數據)
Flux<String> flux = Flux.just("事件1", "事件2", "事件3");
flux.subscribe(event -> System.out.println("訂閱者 1 收到: " + event));
flux.subscribe(event -> System.out.println("訂閱者 2 收到: " + event));
? 方法 1:使用 Flux.create()
,手動推送數據
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class CustomFluxExample {public static void main(String[] args) throws InterruptedException {Flux<String> customFlux = Flux.create(emitter -> {Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {String event = "自定義事件:" + System.currentTimeMillis();System.out.println("發布:" + event);emitter.next(event);}, 0, 1, TimeUnit.SECONDS);}, FluxSink.OverflowStrategy.BUFFER);// 訂閱者 1customFlux.subscribe(event -> System.out.println("訂閱者 1 收到:" + event));Thread.sleep(5000); // 5 秒后再添加訂閱者// 訂閱者 2customFlux.subscribe(event -> System.out.println("訂閱者 2 收到:" + event));Thread.sleep(10000); // 讓主線程等待一會兒,看效果}
}
🔹 運行結果
python-repl復制編輯發布:自定義事件:1712101234567
訂閱者 1 收到:自定義事件:1712101234567
發布:自定義事件:1712101235567
訂閱者 1 收到:自定義事件:1712101235567
發布:自定義事件:1712101236567
訂閱者 1 收到:自定義事件:1712101236567
發布:自定義事件:1712101237567
訂閱者 1 收到:自定義事件:1712101237567
發布:自定義事件:1712101238567
訂閱者 1 收到:自定義事件:1712101238567
(5 秒后,訂閱者 2 加入)
訂閱者 2 收到:自定義事件:1712101239567
訂閱者 1 收到:自定義事件:1712101239567
發布:自定義事件:1712101240567
訂閱者 2 收到:自定義事件:1712101240567
訂閱者 1 收到:自定義事件:1712101240567
...
📌 特點
-
你可以隨時手動推送數據(每 1 秒發布一次)。
-
新訂閱者不會收到歷史數據,只會接收到之后的事件(如果你想讓新訂閱者也能收到歷史數據,可以用
.replay()
)。示例:
Flux<String> flux = Flux.just("事件A", "事件B", "事件C").replay(2) // 緩存最后 2 個事件.autoConnect(); // 至少一個訂閱者連接后開始發布flux.subscribe(event -> System.out.println("訂閱者 1 收到: " + event));// 新的訂閱者會從緩存中接收事件 flux.subscribe(event -> System.out.println("訂閱者 2 收到: " + event));
輸出:
訂閱者 1 收到: 事件A 訂閱者 1 收到: 事件B 訂閱者 1 收到: 事件C 訂閱者 2 收到: 事件B 訂閱者 2 收到: 事件C
-
不會自動結束,Flux 會一直運行。
? 方法 2:使用 Flux.generate()
,按需推送數據
如果你的數據是基于前一個數據計算出來的,可以使用 Flux.generate()
:
import reactor.core.publisher.Flux;import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;public class GenerateFluxExample {public static void main(String[] args) throws InterruptedException {Flux<String> generatedFlux = Flux.generate(() -> new AtomicInteger(1), // 初始狀態(state, sink) -> {String event = "事件 " + state.getAndIncrement();System.out.println("發布:" + event);sink.next(event);try { Thread.sleep(1000); } catch (InterruptedException e) {}return state;});// 訂閱者 1generatedFlux.subscribe(event -> System.out.println("訂閱者 1 收到:" + event));Thread.sleep(5000); // 5 秒后再添加訂閱者// 訂閱者 2generatedFlux.subscribe(event -> System.out.println("訂閱者 2 收到:" + event));Thread.sleep(10000);}
}
🔹 運行結果
python-repl復制編輯發布:事件 1
訂閱者 1 收到:事件 1
發布:事件 2
訂閱者 1 收到:事件 2
發布:事件 3
訂閱者 1 收到:事件 3
發布:事件 4
訂閱者 1 收到:事件 4
發布:事件 5
訂閱者 1 收到:事件 5
(5 秒后,訂閱者 2 加入)
發布:事件 6
訂閱者 1 收到:事件 6
訂閱者 2 收到:事件 6
發布:事件 7
訂閱者 1 收到:事件 7
訂閱者 2 收到:事件 7
...
📌 區別
Flux.generate()
一次只能推送一個數據,適合基于狀態逐步生成數據。Flux.create()
可以異步推送多個數據,適合事件流、網絡請求等異步數據。
? 方法 3:使用 Sinks.many().multicast()
,支持多個訂閱者
如果你希望多個訂閱者可以同時消費,并且可以隨時加入:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;import java.time.Duration;public class SinkExample {public static void main(String[] args) throws InterruptedException {Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();Flux<String> flux = sink.asFlux();// 訂閱者 1flux.subscribe(event -> System.out.println("訂閱者 1 收到:" + event));// 模擬定時推送數據new Thread(() -> {int i = 1;while (true) {String event = "事件 " + i++;System.out.println("發布:" + event);sink.tryEmitNext(event);try { Thread.sleep(1000); } catch (InterruptedException e) { }}}).start();Thread.sleep(5000); // 5 秒后再添加訂閱者// 訂閱者 2flux.subscribe(event -> System.out.println("訂閱者 2 收到:" + event));Thread.sleep(10000);}
}
🔹 運行結果
python-repl復制編輯發布:事件 1
訂閱者 1 收到:事件 1
發布:事件 2
訂閱者 1 收到:事件 2
發布:事件 3
訂閱者 1 收到:事件 3
發布:事件 4
訂閱者 1 收到:事件 4
發布:事件 5
訂閱者 1 收到:事件 5
(5 秒后,訂閱者 2 加入)
發布:事件 6
訂閱者 1 收到:事件 6
訂閱者 2 收到:事件 6
發布:事件 7
訂閱者 1 收到:事件 7
訂閱者 2 收到:事件 7
...
📌 特點
Sinks.many().multicast()
允許多個訂閱者同時消費。- 適用于 WebSocket、事件總線、消息隊列等場景。
? 結論
方式 | 特點 | 適用場景 |
---|---|---|
Flux.create() | 手動推送數據,支持異步 | 適合事件流、消息隊列、WebSocket |
Flux.generate() | 按需推送,每次一個 | 適合基于前一個狀態生成新數據 |
Sinks.many().multicast() | 支持多個訂閱者,實時推送 | 適合多訂閱者共享數據 |
2?? 熱流(共享事件流)
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> hotFlux = sink.asFlux();hotFlux.subscribe(event -> System.out.println("訂閱者 1 收到: " + event));
hotFlux.subscribe(event -> System.out.println("訂閱者 2 收到: " + event));sink.tryEmitNext("全局事件 1");
sink.tryEmitNext("全局事件 2");
🔹 適用場景 ? 適用于高吞吐、異步、多訂閱者的事件流。 ? 適用于數據流式處理(如 WebFlux、消息隊列、日志流)。 ? 不適用于簡單的業務事件通知。
📌 ReplayProcessor:可重放歷史事件的 Reactor 處理器
ReplayProcessor<T>
是 Reactor 提供的一種 熱流(Hot Publisher),它允許新的訂閱者 回放之前發送的事件。適用于 日志系統、消息隊列、數據緩存 等場景。
📌 1?? 關鍵特點
? 存儲歷史事件,新的訂閱者可以看到之前的事件。
? 類似 RxJava 的 ReplaySubject
,可以 指定緩存大小。
? 適用于需要回放數據的場景,如 日志系統、WebSocket 消息隊列。
📌 2?? 基本使用
import reactor.core.publisher.ReplayProcessor;public class ReplayProcessorExample {public static void main(String[] args) {// 創建 ReplayProcessor,緩存 2 條數據ReplayProcessor<String> processor = ReplayProcessor.create(2);// 訂閱者 1 訂閱processor.subscribe(data -> System.out.println("訂閱者 1 收到:" + data));// 發送數據processor.onNext("事件 A");processor.onNext("事件 B");processor.onNext("事件 C");// 訂閱者 2 訂閱processor.subscribe(data -> System.out.println("訂閱者 2 收到:" + data));// 發送更多數據processor.onNext("事件 D");}
}
📌 運行結果
mathematica復制編輯訂閱者 1 收到:事件 A
訂閱者 1 收到:事件 B
訂閱者 1 收到:事件 C
訂閱者 2 收到:事件 B // 只回放最近 2 條(事件 B 和 C)
訂閱者 2 收到:事件 C
訂閱者 1 收到:事件 D
訂閱者 2 收到:事件 D
📌 說明:
ReplayProcessor.create(2)
最多緩存 2 條數據,新訂閱者只能收到最近的 2 條事件。- 訂閱者 1 先訂閱,它會收到所有事件。
- 訂閱者 2 后訂閱,但它可以收到最近的 2 條緩存(事件 B 和 C)。
📌 3?? ReplayProcessor
vs Sinks.Many
特性 | ReplayProcessor | Sinks.Many (Multicast) |
---|---|---|
是否緩存歷史數據 | ? 是,可指定緩存大小 | ? 否,不會緩存歷史數據 |
新訂閱者是否能收到舊數據 | ? 可以 | ? 不能 |
適用場景 | 回放數據,如日志、歷史消息 | 實時消息推送,不存儲歷史 |
📌 4?? 適用場景
? 日志回放(新訂閱者也能看到之前的日志)。
? 聊天系統(新加入的用戶可以看到最近的聊天記錄)。
? 數據緩存(保存最近 N 條數據,避免重復請求)。
🚀 如果你需要 緩存歷史數據,并讓新的訂閱者能收到過去的事件,ReplayProcessor
是一個很好的選擇!
4?? 什么時候用哪個?
? 使用 ApplicationEvent
(Spring 事件)
- 簡單的應用事件通知(如用戶注冊、郵件通知、訂單完成)。
- 解耦業務邏輯,但不需要流式處理。
? 使用 Reactor 事件流
- 高吞吐、并發的數據流(日志流、消息流、WebFlux)。
- 多個訂閱者同時消費事件,如廣播消息、實時數據推送。
? 結合使用(最佳實踐)
@EventListener
public void handleUserRegisteredEvent(UserRegisteredEvent event) {Flux.just(event).map(e -> "處理事件: " + e.getUsername()).doOnNext(System.out::println).subscribe();
}
5?? 總結
方案 | 訂閱者是否獨立 | 事件是否廣播 | 適用場景 |
---|---|---|---|
Spring ApplicationEvent | ? 每個訂閱者獨立 | ? 不是廣播 | 業務事件通知 |
Reactor Flux (冷流) | ? 每個訂閱者獨立 | ? 不是廣播 | 流式數據處理 |
Reactor Sinks.Many (熱流) | ? 共享數據流 | ? 所有訂閱者都收到 | 事件驅動架構 |
🚀 如果你是做 WebFlux、日志流、消息隊列,選 Reactor!如果是應用內部事件解耦,選 Spring ApplicationEvent
! 🎯