Spring Boot WebFlux 中的 WebSocket 提供了一種高效、異步的方式來處理客戶端與服務器之間的雙向通信。WebSocket 連接的生命周期包括連接建立、消息傳輸、連接關閉以及資源清理等過程。此外,為了確保 WebSocket 連接的穩定性和可靠性,我們可以加入重試機制,以處理斷開或網絡問題時自動重新連接。
1. WebSocket 連接建立
WebSocket 的連接是通過 HTTP 的 Upgrade 機制從普通的 HTTP/HTTPS 請求升級而來的。具體流程如下:
1.1 客戶端請求 WebSocket 連接
客戶端通過 ws://
或 wss://
協議來訪問 WebSocket 服務器,并發送 HTTP Upgrade 請求頭,要求服務器將連接升級為 WebSocket 協議:
GET /ws HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: random-generated-key
Sec-WebSocket-Version: 13
1.2 服務器端處理 WebSocket 連接
Spring WebFlux 通過 WebSocketHandler
來處理 WebSocket 請求。以下是一個簡單的 WebSocketHandler 實現:
@Component
public class MyWebSocketHandler implements WebSocketHandler {@Overridepublic Mono<Void> handle(WebSocketSession session) {return session.receive().doOnNext(message -> System.out.println("Received: " + message.getPayloadAsText())).then();}
}
當服務器收到 HTTP Upgrade 請求后,它會檢查 Sec-WebSocket-Key
并返回 Sec-WebSocket-Accept
進行握手,建立連接。
1.3 握手成功,連接建立
如果握手成功,服務器會返回 101 Switching Protocols
響應,表示 WebSocket 連接已建立:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: (calculated key)
2. WebSocket 消息處理
連接建立后,WebSocket 進入消息傳輸階段,包括消息的接收和發送。
2.1 消息接收
服務器端可以通過 WebSocketSession.receive()
方法來接收客戶端發送的消息:
session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(msg -> System.out.println("Received: " + msg)).then();
session.receive()
返回一個 Flux<WebSocketMessage>
,可以處理流式消息,每次接收到新消息時執行 doOnNext()
中的處理邏輯。
2.2 消息發送
服務器端可以通過 WebSocketSession.send()
方法發送消息給客戶端:
Flux<String> messages = Flux.interval(Duration.ofSeconds(1)).map(i -> "Message " + i);
return session.send(messages.map(session::textMessage));
send()
方法接收一個 Publisher<WebSocketMessage>
,可以使用 Flux
來生成消息流。textMessage()
方法用于創建文本消息。
3. WebSocket 連接關閉
WebSocket 連接可以由客戶端、服務器或網絡異常等原因主動關閉。連接關閉的主要方式如下:
3.1 正常關閉
- 客戶端主動關閉:客戶端可以通過調用
WebSocket.close()
發送 Close Frame,服務器接收到后會關閉連接。 - 服務器主動關閉:服務器通過
WebSocketSession.close()
關閉連接:session.close(CloseStatus.NORMAL);
3.2 異常關閉
- 網絡異常:如網絡斷開或客戶端崩潰等,連接會被強制關閉。
- 心跳超時:如果使用 ping/pong 機制檢測 WebSocket 是否存活,超時未收到 pong 響應時,連接會關閉。
session.send(Flux.just(session.pingMessage(ByteBuffer.wrap(new byte[0]))));
3.3 連接關閉后的處理
服務器可以使用 session.receive().doOnTerminate()
監聽連接關閉事件,執行清理操作:
session.receive().doOnTerminate(() -> System.out.println("WebSocket connection closed")).then();
4. WebSocket 生命周期總結
階段 | 說明 |
---|---|
連接建立 | 客戶端發起 WebSocket 連接請求,服務器接受并返回 101 Switching Protocols 響應,連接建立。 |
消息傳輸 | 服務器和客戶端可以雙向傳輸文本或二進制消息。 |
連接關閉 | 連接可由客戶端、服務器、網絡異常等原因關閉。 |
資源清理 | 連接關閉后需要進行資源清理操作,如取消訂閱、清理狀態等。 |
5. 完整示例:WebFlux WebSocket 服務器
以下是一個完整的 WebSocket 服務器配置示例,展示了如何在 Spring Boot WebFlux 中配置 WebSocket:
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerMapping;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Map;@Configuration
public class WebSocketConfig {@Beanpublic WebSocketHandler webSocketHandler() {return session -> {Flux<String> output = Flux.interval(Duration.ofSeconds(1)).map(time -> "Server time: " + time);return session.send(output.map(session::textMessage));};}@Beanpublic WebSocketHandlerMapping handlerMapping(WebSocketHandler handler) {return new WebSocketHandlerMapping(Map.of("/ws", handler));}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}
說明:
WebSocketHandler
處理 WebSocket 連接,發送定時消息。WebSocketHandlerMapping
將/ws
端點映射到 WebSocketHandler。WebSocketHandlerAdapter
用于適配 WebSocket 處理器。
6. 服務器端發起 WebSocket 連接
如果你希望服務器主動連接到其他 WebSocket 服務器,可以使用 WebSocketClient
。Spring WebFlux 提供了 ReactorNettyWebSocketClient
來發起 WebSocket 連接。
6.1 示例:服務器端發起 WebSocket 連接
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import reactor.core.publisher.Mono;
import java.net.URI;@Service
public class WebSocketClientService {private final ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();public Mono<Void> connectToWebSocketServer() {return client.execute(URI.create("ws://example.com/socket"), session -> {Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(System.out::println).subscribe();return sendMessage;});}
}
6.2 在 Spring Boot 啟動時自動連接
通過在 @PostConstruct
中調用連接方法,可以確保 WebSocket 客戶端在 Spring Boot 啟動時自動連接:
import jakarta.annotation.PostConstruct;
import org.springframework.stereotype.Component;@Component
public class WebSocketClientInitializer {private final WebSocketClientService webSocketClientService;public WebSocketClientInitializer(WebSocketClientService webSocketClientService) {this.webSocketClientService = webSocketClientService;}@PostConstructpublic void init() {webSocketClientService.connectToWebSocketServer().subscribe();}
}
7. WebSocket 連接重試機制
在 WebSocket 的生命周期中,由于網絡問題或服務器錯誤,WebSocket 連接可能會中斷。為了提高 WebSocket 連接的可靠性,我們可以為 WebSocket 客戶端添加重試機制,以確保斷開后能夠重新連接。
7.1 使用 retry()
方法重試連接
WebFlux 提供了 retry()
方法來自動重試操作。以下是一個簡單的重試機制示例:
import reactor.core.publisher.Mono;public class WebSocketClientService {private final ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();public Mono<Void> connectToWebSocketServer() {return client.execute(URI.create("ws://example.com/socket"), session -> {Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(System.out::println).subscribe();return sendMessage;}).retry(5); // 最大重試5次}
}
在這個例子中,retry(5)
表示如果 WebSocket 連接失敗,最多會重試 5 次。
7.2 使用 retryWhen()
實現自定義重試邏輯
我們還可以通過 retryWhen()
來實現更復雜的重試策略,例如設置重試間隔時間或根據錯誤類型決定是否重試:
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import java.time.Duration;public class WebSocketClientService {private final ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();public Mono<Void> connectToWebSocketServer() {return client.execute(URI.create("ws://example.com/socket"), session -> {Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(System.out::println).subscribe();return sendMessage;}).retryWhen(errors ->errors.zipWith(Flux.range(1, 5), (error, count) -> count) // 重試次數.flatMap(retryCount -> Mono.delay(Duration.ofSeconds(retryCount))) // 增加重試間隔);}
}
在這個例子中,retryWhen()
會根據錯誤進行自定義重試邏輯,設置每次重試間隔遞增。
8. 連接關閉后的重試機制
為了確保連接在關閉后重新建立,我們可以監聽連接關閉事件并嘗試重試:
session.receive().doOnTerminate(() -> {System.out.println("WebSocket connection closed");reconnect(); // 重新連接}).then();private void reconnect() {connectToWebSocketServer().retry(3) // 重試3次.subscribe();
}
8.1 完整的客戶端重試代碼
public Mono<Void> connectWithRetry() {return client.execute(URI.create("ws://example.com/socket"), session -> {Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(System.out::println).doOnTerminate(() -> reconnect()) // 連接關閉后重試.subscribe();return sendMessage;}).retryWhen(errors ->errors.zipWith(Flux.range(1, 5), (error, count) -> count).flatMap(retryCount -> Mono.delay(Duration.ofSeconds(retryCount))));
}
9. 結論
Spring Boot WebFlux 中 WebSocket 的生命周期包括:
- 連接建立:通過 HTTP Upgrade 握手建立 WebSocket 連接。
- 消息收發:服務器和客戶端之間通過
receive()
和send()
方法進行消息交換。 - 連接關閉:連接可以通過正常關閉、異常關閉或主動關閉的方式結束。
- 資源清理:連接關閉后需要進行資源清理操作,確保系統穩定。
- 重試機制:通過
retry()
和retryWhen()
方法為 WebSocket 連接添加自動重試機制,提高連接的可靠性。
通過 WebSocket,Spring Boot WebFlux 提供了高效的異步通信方式,特別適合用于實時數據流應用。