在響應式編程中,retryWhen
操作符通過 RetrySignal
接口提供了對重試行為的精細控制,特別是在處理 瞬態錯誤(transient errors) 時。瞬態錯誤是指那些在一段時間內發生,但隨后會自行恢復的錯誤,例如網絡請求失敗后服務器短暫不可用,但隨后恢復正常。在這種情況下,我們希望每個錯誤“爆發”(burst)都能獨立處理,而不是將前一次的重試狀態帶入下一次。
1. 瞬態錯誤的定義與場景
瞬態錯誤通常表現為短暫的失敗,隨后系統會恢復。例如,一個 HTTP 請求源可能會在某些條件下連續失敗兩次,然后恢復正常。這種模式在長期運行的流(如 Kafka 消費者、HTTP 請求等)中非常常見。
2. RetrySignal
的作用
RetrySignal
是 retryWhen
操作符中用于表示重試狀態的接口。它提供了兩個關鍵方法:
totalRetries()
:返回到目前為止的總重試次數(單調遞增)。totalRetriesInARow()
:返回當前連續失敗的次數。如果在重試中成功恢復(即接收到onNext
而不是onError
),這個值會被重置為 0。
這個 totalRetriesInARow()
的值是處理瞬態錯誤的關鍵。它允許我們區分“連續失敗”和“獨立失敗”,從而實現更合理的重試策略。
3. transientErrors(boolean)
配置的作用
當在 RetrySpec
或 RetryBackoffSpec
中設置 transientErrors(true)
時,重試策略將使用 totalRetriesInARow()
來計算重試次數。這意味著:
- 每次重試失敗后,如果成功恢復(即接收到
onNext
),則totalRetriesInARow()
會被重置為 0。 - 每次“爆發”(即連續失敗)都會被獨立處理,重試次數不會累積。
這種配置特別適用于處理瞬態錯誤,例如網絡請求失敗后服務器短暫不可用的情況。
4. 示例代碼解析
// 用于生成數據和控制流的輔助變量
final AtomicInteger transientHelper = new AtomicInteger();
// 模擬HTTP請求的Flux數據流
Supplier<Flux<Integer>> httpRequest = () ->Flux.generate(sink -> {int i = transientHelper.getAndIncrement();if (i == 10) {sink.next(i);sink.complete();}else if (i % 3 == 0) {sink.next(i);}else {sink.error(new IllegalStateException("Transient error at " + i));}});
// 用于統計錯誤次數的變量
AtomicInteger errorCount = new AtomicInteger();
// 添加錯誤處理邏輯的Flux
Flux<Integer> transientFlux = httpRequest.get().doOnError(e -> errorCount.incrementAndGet());// 使用retryWhen進行重試,最多重試2次,并且認為所有錯誤都是暫時性的
transientFlux.retryWhen(Retry.max(2).transientErrors(true)).blockLast();
assertThat(errorCount).hasValue(6);
doOnError
:用于統計錯誤次數。retryWhen(Retry.max(2).transientErrors(true))
:Retry.max(2)
表示最多重試 2 次。transientErrors(true)
表示啟用瞬態錯誤處理模式。
blockLast()
:等待整個流完成。assertThat(errorCount).hasValue(6)
:驗證總共發生了 6 次錯誤,說明重試機制成功處理了 6 次錯誤。
5. 關鍵區別:啟用 transientErrors(true)
與不啟用
-
啟用
transientErrors(true)
:- 每次重試失敗后,如果成功恢復,
totalRetriesInARow()
會被重置為 0。 - 每次“爆發”(連續失敗)都會被獨立處理,重試次數不會累積。
- 最終成功完成,錯誤次數為 6 次。
- 每次重試失敗后,如果成功恢復,
-
不啟用
transientErrors(true)
:- 重試次數是單調遞增的,不會重置。
- 如果第二次“爆發”導致重試次數超過 2 次,整個序列將失敗。
6. 總結
retryWhen
通過RetrySignal
提供了對重試行為的精細控制。totalRetriesInARow()
是處理瞬態錯誤的關鍵,它允許我們區分“連續失敗”和“獨立失敗”。transientErrors(true)
配置使得每次“爆發”都能獨立處理,重試次數不會累積,從而避免了因前一次失敗而放棄后續重試的問題。- 這種機制特別適用于處理網絡請求、數據庫連接等可能遇到瞬態錯誤的場景。