文章目錄
- 一、創建操作符
- 1. `just` —— 創建包含指定元素的流
- 2. `fromIterable` —— 從集合創建 Flux
- 3. `empty` —— 創建空的 Flux 或 Mono
- 4. `fromArray` —— 從數組創建 Flux
- 5. `fromStream` —— 從 Java 8 Stream 創建 Flux
- 6. `create` —— 使用 FluxSink 手動發射元素
- 7. `generate` —— 使用狀態生成元素,適用于同步場景
- 8. `fromFuture` —— 從 CompletableFuture 創建 Mono
- 9. `interval` —— 創建周期性發射元素的 Flux
- 10. `timer` —— 創建延遲發射的 Mono
- 二、轉換操作符
- 1. `map` —— 映射每個元素為新值
- 2. `flatMap` —— 扁平化異步流,將每個元素映射為異步 Publisher
- 3. `concatMap` —— 順序執行映射為 Publisher 的異步流
- 三、過濾操作符
- 1. `filter` —— 按條件過濾元素
- 2. `take` —— 獲取前 N 個元素
- 3. `skip` —— 跳過前 N 個元素
- 四、組合操作符
- 1. `concat` —— 按順序合并多個 Flux
- 2. `merge` —— 并發合并多個 Flux(無序)
- 3. `zip` —— 按索引組合多個 Flux 的元素
- 五、錯誤處理操作符
- 1. `onErrorReturn` —— 出錯時返回默認值
- 2. `onErrorResume` —— 出錯時切換備用流
- 3. `retry` —— 出錯時重試指定次數
- 六、延遲執行與懶加載:`Mono.defer` 和 `Flux.defer`:被訂閱時才執行
- `Mono.defer` —— 懶加載 Mono,直到subscribe時才創建執行
- `Flux.defer` —— 懶加載 Flux,每次訂閱時重新執行邏輯
Reactor 是一個用于構建反應式應用程序的 Java 庫,提供了豐富的操作符(算子)來處理反應式流(Flux
和 Mono
)。本文詳細介紹了 Reactor 中常用的創建、轉換、過濾、組合和錯誤處理操作符,以及一些高級用法示例。
一、創建操作符
1. just
—— 創建包含指定元素的流
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
Mono<String> mono = Mono.just("Hello");
2. fromIterable
—— 從集合創建 Flux
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
Flux<Integer> flux = Flux.fromIterable(list);
3. empty
—— 創建空的 Flux 或 Mono
Flux<Integer> emptyFlux = Flux.empty();
Mono<String> emptyMono = Mono.empty();
4. fromArray
—— 從數組創建 Flux
Integer[] numbers = {1, 2, 3, 4, 5};
Flux<Integer> flux = Flux.fromArray(numbers);
5. fromStream
—— 從 Java 8 Stream 創建 Flux
Stream<Integer> stream = Arrays.asList(1, 2, 3, 4, 5).stream();
Flux<Integer> flux = Flux.fromStream(stream);
6. create
—— 使用 FluxSink 手動發射元素
Flux<Integer> flux = Flux.create(sink -> {for (int i = 0; i < 5; i++) {sink.next(i);}sink.complete();
});
7. generate
—— 使用狀態生成元素,適用于同步場景
Flux<Integer> flux = Flux.generate(() -> 0, (state, sink) -> {sink.next(state);if (state == 4) sink.complete();return state + 1;
});
8. fromFuture
—— 從 CompletableFuture 創建 Mono
CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");
Mono<String> mono = Mono.fromFuture(future);
9. interval
—— 創建周期性發射元素的 Flux
Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1));
10. timer
—— 創建延遲發射的 Mono
Mono<Long> timerMono = Mono.timer(Duration.ofSeconds(2));
?
二、轉換操作符
1. map
—— 映射每個元素為新值
Flux<Integer> squared = Flux.just(1, 2, 3).map(n -> n * n);
2. flatMap
—— 扁平化異步流,將每個元素映射為異步 Publisher
Flux<Integer> result = Flux.just(1, 2, 3).flatMap(n -> Mono.just(n * 2));
3. concatMap
—— 順序執行映射為 Publisher 的異步流
Flux<Integer> result = Flux.just(1, 2, 3).concatMap(n -> Mono.just(n * 2));
?
三、過濾操作符
1. filter
—— 按條件過濾元素
Flux<Integer> evens = Flux.just(1, 2, 3, 4).filter(n -> n % 2 == 0);
2. take
—— 獲取前 N 個元素
Flux<Integer> firstThree = Flux.just(1, 2, 3, 4, 5).take(3);
3. skip
—— 跳過前 N 個元素
Flux<Integer> skipped = Flux.just(1, 2, 3, 4, 5).skip(2);
?
四、組合操作符
1. concat
—— 按順序合并多個 Flux
Flux<Integer> combined = Flux.concat(Flux.just(1, 2), Flux.just(3, 4));
2. merge
—— 并發合并多個 Flux(無序)
Flux<Integer> merged = Flux.merge(Flux.just(1, 2), Flux.just(3, 4));
3. zip
—— 按索引組合多個 Flux 的元素
Flux<String> zipped = Flux.zip(Flux.just(1, 2), Flux.just(3, 4), (a, b) -> a + ":" + b);
?
五、錯誤處理操作符
1. onErrorReturn
—— 出錯時返回默認值
Flux<Integer> result = Flux.just(1, 2, 3).map(n -> {if (n == 2) throw new RuntimeException("error");return n;}).onErrorReturn(-1);
2. onErrorResume
—— 出錯時切換備用流
Flux<Integer> result = Flux.just(1, 2, 3).map(n -> {if (n == 2) throw new RuntimeException("error");return n;}).onErrorResume(e -> Mono.just(-1));
3. retry
—— 出錯時重試指定次數
Flux<Integer> result = Flux.just(1, 2, 3).map(n -> {if (n == 2) throw new RuntimeException("error");return n;}).retry(2);
?
六、延遲執行與懶加載:Mono.defer
和 Flux.defer
:被訂閱時才執行
Mono.defer
—— 懶加載 Mono,直到subscribe時才創建執行
Mono<String> deferredMono = Mono.defer(() -> {System.out.println("Generating value...");return Mono.just("Deferred Result");
});
只有當 subscribe()
被調用時,Mono.defer
中的邏輯才會真正執行。這對于需要確保執行時機晚于前一步完成場景特別重要,比如:
Mono.defer(() -> readQaResultType()).subscribe(result -> System.out.println("QA Result: " + result));
在這段代碼中,讀取 qaResultType
的操作只會在前面的步驟(例如數據預處理)完全完成后才被觸發。
Flux.defer
—— 懶加載 Flux,每次訂閱時重新執行邏輯
Flux<Integer> deferredFlux = Flux.defer(() -> {System.out.println("Evaluating source...");return Flux.just(1, 2, 3);
});
每次訂閱都會重新生成數據,適用于帶有狀態的源或依賴最新上下文的處理邏輯。