Spring Boot WebFlux 中 WebSocket 生命周期解析

Spring Boot WebFlux 中的 WebSocket 提供了一種高效、異步的方式來處理客戶端與服務器之間的雙向通信。WebSocket 連接的生命周期包括連接建立、消息傳輸、連接關閉以及資源清理等過程。此外,為了確保 WebSocket 連接的穩定性和可靠性,我們可以加入重試機制,以處理斷開或網絡問題時自動重新連接。

1. WebSocket 連接建立

WebSocket 的連接是通過 HTTP 的 Upgrade 機制從普通的 HTTP/HTTPS 請求升級而來的。具體流程如下:

1.1 客戶端請求 WebSocket 連接

客戶端通過 ws://wss:// 協議來訪問 WebSocket 服務器,并發送 HTTP Upgrade 請求頭,要求服務器將連接升級為 WebSocket 協議:

GET /ws HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: random-generated-key
Sec-WebSocket-Version: 13

1.2 服務器端處理 WebSocket 連接

Spring WebFlux 通過 WebSocketHandler 來處理 WebSocket 請求。以下是一個簡單的 WebSocketHandler 實現:

@Component
public class MyWebSocketHandler implements WebSocketHandler {@Overridepublic Mono<Void> handle(WebSocketSession session) {return session.receive().doOnNext(message -> System.out.println("Received: " + message.getPayloadAsText())).then();}
}

當服務器收到 HTTP Upgrade 請求后,它會檢查 Sec-WebSocket-Key 并返回 Sec-WebSocket-Accept 進行握手,建立連接。

1.3 握手成功,連接建立

如果握手成功,服務器會返回 101 Switching Protocols 響應,表示 WebSocket 連接已建立:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: (calculated key)

2. WebSocket 消息處理

連接建立后,WebSocket 進入消息傳輸階段,包括消息的接收和發送。

2.1 消息接收

服務器端可以通過 WebSocketSession.receive() 方法來接收客戶端發送的消息:

session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(msg -> System.out.println("Received: " + msg)).then();

session.receive() 返回一個 Flux<WebSocketMessage>,可以處理流式消息,每次接收到新消息時執行 doOnNext() 中的處理邏輯。

2.2 消息發送

服務器端可以通過 WebSocketSession.send() 方法發送消息給客戶端:

Flux<String> messages = Flux.interval(Duration.ofSeconds(1)).map(i -> "Message " + i);
return session.send(messages.map(session::textMessage));

send() 方法接收一個 Publisher<WebSocketMessage>,可以使用 Flux 來生成消息流。textMessage() 方法用于創建文本消息。

3. WebSocket 連接關閉

WebSocket 連接可以由客戶端、服務器或網絡異常等原因主動關閉。連接關閉的主要方式如下:

3.1 正常關閉

  • 客戶端主動關閉:客戶端可以通過調用 WebSocket.close() 發送 Close Frame,服務器接收到后會關閉連接。
  • 服務器主動關閉:服務器通過 WebSocketSession.close() 關閉連接:
    session.close(CloseStatus.NORMAL);
    

3.2 異常關閉

  • 網絡異常:如網絡斷開或客戶端崩潰等,連接會被強制關閉。
  • 心跳超時:如果使用 ping/pong 機制檢測 WebSocket 是否存活,超時未收到 pong 響應時,連接會關閉。
    session.send(Flux.just(session.pingMessage(ByteBuffer.wrap(new byte[0]))));
    

3.3 連接關閉后的處理

服務器可以使用 session.receive().doOnTerminate() 監聽連接關閉事件,執行清理操作:

session.receive().doOnTerminate(() -> System.out.println("WebSocket connection closed")).then();

4. WebSocket 生命周期總結

階段說明
連接建立客戶端發起 WebSocket 連接請求,服務器接受并返回 101 Switching Protocols 響應,連接建立。
消息傳輸服務器和客戶端可以雙向傳輸文本或二進制消息。
連接關閉連接可由客戶端、服務器、網絡異常等原因關閉。
資源清理連接關閉后需要進行資源清理操作,如取消訂閱、清理狀態等。

5. 完整示例:WebFlux WebSocket 服務器

以下是一個完整的 WebSocket 服務器配置示例,展示了如何在 Spring Boot WebFlux 中配置 WebSocket:

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerMapping;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Map;@Configuration
public class WebSocketConfig {@Beanpublic WebSocketHandler webSocketHandler() {return session -> {Flux<String> output = Flux.interval(Duration.ofSeconds(1)).map(time -> "Server time: " + time);return session.send(output.map(session::textMessage));};}@Beanpublic WebSocketHandlerMapping handlerMapping(WebSocketHandler handler) {return new WebSocketHandlerMapping(Map.of("/ws", handler));}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}

說明:

  • WebSocketHandler 處理 WebSocket 連接,發送定時消息。
  • WebSocketHandlerMapping/ws 端點映射到 WebSocketHandler。
  • WebSocketHandlerAdapter 用于適配 WebSocket 處理器。

6. 服務器端發起 WebSocket 連接

如果你希望服務器主動連接到其他 WebSocket 服務器,可以使用 WebSocketClient。Spring WebFlux 提供了 ReactorNettyWebSocketClient 來發起 WebSocket 連接。

6.1 示例:服務器端發起 WebSocket 連接

import org.springframework.stereotype.Service;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import reactor.core.publisher.Mono;
import java.net.URI;@Service
public class WebSocketClientService {private final ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();public Mono<Void> connectToWebSocketServer() {return client.execute(URI.create("ws://example.com/socket"), session -> {Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(System.out::println).subscribe();return sendMessage;});}
}

6.2 在 Spring Boot 啟動時自動連接

通過在 @PostConstruct 中調用連接方法,可以確保 WebSocket 客戶端在 Spring Boot 啟動時自動連接:

import jakarta.annotation.PostConstruct;
import org.springframework.stereotype.Component;@Component
public class WebSocketClientInitializer {private final WebSocketClientService webSocketClientService;public WebSocketClientInitializer(WebSocketClientService webSocketClientService) {this.webSocketClientService = webSocketClientService;}@PostConstructpublic void init() {webSocketClientService.connectToWebSocketServer().subscribe();}
}

7. WebSocket 連接重試機制

在 WebSocket 的生命周期中,由于網絡問題或服務器錯誤,WebSocket 連接可能會中斷。為了提高 WebSocket 連接的可靠性,我們可以為 WebSocket 客戶端添加重試機制,以確保斷開后能夠重新連接。

7.1 使用 retry() 方法重試連接

WebFlux 提供了 retry() 方法來自動重試操作。以下是一個簡單的重試機制示例:

import reactor.core.publisher.Mono;public class WebSocketClientService {private final ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();public Mono<Void> connectToWebSocketServer() {return client.execute(URI.create("ws://example.com/socket"), session -> {Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(System.out::println).subscribe();return sendMessage;}).retry(5);  // 最大重試5次}
}

在這個例子中,retry(5) 表示如果 WebSocket 連接失敗,最多會重試 5 次。

7.2 使用 retryWhen() 實現自定義重試邏輯

我們還可以通過 retryWhen() 來實現更復雜的重試策略,例如設置重試間隔時間或根據錯誤類型決定是否重試:

import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import java.time.Duration;public class WebSocketClientService {private final ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();public Mono<Void> connectToWebSocketServer() {return client.execute(URI.create("ws://example.com/socket"), session -> {Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(System.out::println).subscribe();return sendMessage;}).retryWhen(errors ->errors.zipWith(Flux.range(1, 5), (error, count) -> count)  // 重試次數.flatMap(retryCount -> Mono.delay(Duration.ofSeconds(retryCount)))  // 增加重試間隔);}
}

在這個例子中,retryWhen() 會根據錯誤進行自定義重試邏輯,設置每次重試間隔遞增。

8. 連接關閉后的重試機制

為了確保連接在關閉后重新建立,我們可以監聽連接關閉事件并嘗試重試:

session.receive().doOnTerminate(() -> {System.out.println("WebSocket connection closed");reconnect();  // 重新連接}).then();private void reconnect() {connectToWebSocketServer().retry(3)  // 重試3次.subscribe();
}

8.1 完整的客戶端重試代碼

public Mono<Void> connectWithRetry() {return client.execute(URI.create("ws://example.com/socket"), session -> {Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(System.out::println).doOnTerminate(() -> reconnect())  // 連接關閉后重試.subscribe();return sendMessage;}).retryWhen(errors ->errors.zipWith(Flux.range(1, 5), (error, count) -> count).flatMap(retryCount -> Mono.delay(Duration.ofSeconds(retryCount))));
}

9. 結論

Spring Boot WebFlux 中 WebSocket 的生命周期包括:

  1. 連接建立:通過 HTTP Upgrade 握手建立 WebSocket 連接。
  2. 消息收發:服務器和客戶端之間通過 receive()send() 方法進行消息交換。
  3. 連接關閉:連接可以通過正常關閉、異常關閉或主動關閉的方式結束。
  4. 資源清理:連接關閉后需要進行資源清理操作,確保系統穩定。
  5. 重試機制:通過 retry()retryWhen() 方法為 WebSocket 連接添加自動重試機制,提高連接的可靠性。

通過 WebSocket,Spring Boot WebFlux 提供了高效的異步通信方式,特別適合用于實時數據流應用。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/72632.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/72632.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/72632.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【數據挖掘】異構圖與同構圖

在圖論&#xff08;Graph Theory&#xff09;中&#xff0c;異構圖&#xff08;Heterogeneous Graph&#xff09;和同構圖&#xff08;Homogeneous Graph&#xff09;是兩種不同的圖結構概念&#xff0c;它們的主要區別在于節點和邊的類型是否單一。 1. 異構圖&#xff08;Hete…

Golang實踐錄:go發布版本信息收集

go發布版本信息收集。 背景 本文從官方、網絡資料收羅有關go的發布歷史概況。主要目的是能快速了解golang不同版本的變更。鑒于官方資料為英文&#xff0c;為方便閱讀&#xff0c;使用工具翻譯成中文&#xff0c;重要特性參考其它資料補充/修改。由于發布版本內容較多&#xf…

【C++】: STL詳解 —— set和map類

目錄 關聯式容器 鍵值對 set set的概念 set的構造函數 set的使用 map map的概念 map的構造函數 map的使用 multiset multimap 關聯式容器 C標準庫提供了多種容器&#xff0c;用于高效管理和操作數據集合。這些容器可分為以下幾類&#xff1a; 順序容器&#xff08;…

DeepSeek:構筑大數據平臺底座的最優解

一、大數據平臺底座的重要性 在數字化浪潮席卷全球的當下,數據已成為企業乃至整個社會最具價值的資產之一 。大數據平臺底座作為數據處理和業務支撐的核心樞紐,其重要性不言而喻,猶如大廈的基石,關乎整個數據生態系統的穩定與發展。 從數據處理角度來看,隨著互聯網、物聯…

Minix OS的配置 SSH C程序編譯

Minix3的下載 官網&#xff1a;https://www.minix3.org/ 安裝 平臺&#xff1a;VMware 開機后進入系統使用setup命令來配置和安裝盡量配置一個DNS服務器&#xff0c;比如8.8.8.8 SSH 安裝&#xff1a;pkgin install openssh 修改配置文件&#xff0c;需要&#xff1a; 修…

ubuntu20 安裝python2

1. 確保啟用了 Universe 倉庫 在某些情況下&#xff0c;python2-minimal 包可能位于 Universe 倉庫中。你可以通過以下命令啟用 Universe 倉庫并更新軟件包列表&#xff1a; bash復制 sudo add-apt-repository universe sudo apt update 然后嘗試安裝&#xff1a; bash復制…

STM32---FreeRTOS中斷管理試驗

一、實驗 實驗目的&#xff1a;學會使用FreeRTOS的中斷管理 創建兩個定時器&#xff0c;一個優先級為4&#xff0c;另一個優先級為6&#xff1b;注意&#xff1a;系統所管理的優先級范圍 &#xff1a;5~15 現象&#xff1a;兩個定時器每1s&#xff0c;打印一段字符串&#x…

docker利用docker-compose-gpu.yml啟動RAGFLOW,文檔解析出錯【親測已解決】

0.問題說明 想要讓RAGFLOW利用GPU資源跑起來&#xff0c;可以選擇docker-compose-gpu.yml啟動。&#xff08;但是官網啟動案例是86平臺的不是NVIDIA GPU的&#xff0c;docker-compose-gpu.yml又是第三方維護&#xff0c;所以稍有問題&#xff09; 1.問題 docker利用docker-c…

【AI深度學習網絡】卷積神經網絡(CNN)入門指南:從生物啟發的原理到現代架構演進

深度神經網絡系列文章 【AI深度學習網絡】卷積神經網絡&#xff08;CNN&#xff09;入門指南&#xff1a;從生物啟發的原理到現代架構演進【AI實踐】基于TensorFlow/Keras的CNN&#xff08;卷積神經網絡&#xff09;簡單實現&#xff1a;手寫數字識別的工程實踐 引言 在當今…

【ThreeJS Basics 06】Camera

文章目錄 Camera 相機PerspectiveCamera 透視相機正交相機用鼠標控制相機大幅度轉動&#xff08;可以看到后面&#xff09; 控制組件FlyControls 飛行組件控制FirstPersonControls 第一人稱控制PointerLockControls 指針鎖定控制OrbitControls 軌道控制TrackballControls 軌跡球…

Linux | Ubuntu 與 Windows 雙系統安裝 / 高頻故障 / UEFI 安全引導禁用

注&#xff1a;本文為 “buntu 與 Windows 雙系統及高頻故障解決” 相關文章合輯。 英文引文&#xff0c;機翻未校。 How to install Ubuntu 20.04 and dual boot alongside Windows 10 如何將 Ubuntu 20.04 和雙啟動與 Windows 10 一起安裝 Dave’s RoboShack Published in…

在 C++ 中,通常會使用 `#define` 來定義宏,并通過這種方式發出警告或提示。

在 C++ 中,通常會使用 #define 來定義宏,并通過這種方式發出警告或提示。 如何實現 GBB_DEPRECATED_MSG 宏: 你可以通過以下方式定義一個宏,顯示棄用警告: #include <iostream>// 定義一個宏,用來打印棄用警告 #define GBB_DEPRECATED_MSG(msg

el-tree右鍵節點動態位置展示菜單;el-tree的節點圖片動態根據節點屬性color改變背景色;加遮罩層(opacity)

一、el-tree右鍵節點動態位置展示菜單 關鍵:@node-contextmenu="handleRightClick"與@node-click=“handleNodeClick” <div class="content"><el-tabs class="tabs" @tab-click="handleClick" v-model="Modal"…

Leetcode 378-有序矩陣中第 K 小的元素

給你一個 n x n 矩陣 matrix &#xff0c;其中每行和每列元素均按升序排序&#xff0c;找到矩陣中第 k 小的元素。 請注意&#xff0c;它是 排序后 的第 k 小元素&#xff0c;而不是第 k 個 不同 的元素。 你必須找到一個內存復雜度優于 O(n2) 的解決方案。 示例 1&#xff1…

【二.提示詞工程與實戰應用篇】【3.Prompt調優:讓AI更懂你的需求】

最近老張在朋友圈秀出用AI生成的國風水墨畫,隔壁王姐用AI寫了份驚艷全場的年終總結,就連樓下小賣部老板都在用AI生成營銷文案。你看著自己跟AI對話時滿屏的"我不太明白您的意思",是不是懷疑自己買了臺假電腦?別慌,這可能是你的打開方式不對。今天咱們就聊聊這個…

UNIAPP前端配合thinkphp5后端通過高德API獲取當前城市天氣預報

如何通過 UniApp 前端項目與 ThinkPHP5 后端結合高德天氣 API 獲取天氣預報信息。我們將分為前端和后端兩部分進行實現。以下是一個完整的代碼. 一、項目結構 project/ ├── frontend/ (UniApp 項目) │ ├── pages/ │ │ └── weather/ │ │ ├── in…

藍橋杯C組真題——巧克力

題目如下 思路 代碼及解析如下 謝謝觀看

CSDN博客寫作教學(五):從寫作到個人IP的體系化構建(完結篇)

導語 (第一篇)Markdown編輯器基礎 (第二篇)Markdown核心語法 (第三篇)文章結構化思維 (第四篇)標題優化與SEO實戰 通過前四篇教程,你已掌握技術寫作的“術”——排版、標題、流量與數據。但真正的價值在于將技能升維為“道”:用技術博客為支點,撬動個人品牌與職業發…

Elasticsearch簡單學習

1、依賴的導入 <!--ES依賴--> <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId> </dependency>2、客戶端鏈接 RestHighLevelClient client new RestHigh…

macOS Sequoia 15.3 M3 Pro芯片 iOS 開發環境配置記錄(最新)

進行如下工作之前首先確保終端已翻墻&#xff0c;在ClashX選擇“復制終端代理命令”&#xff0c;在終端進行粘附并執行。 安裝 homebrew Homebrew 是 Mac 平臺的一個包管理工具&#xff0c;提供了許多Mac下沒有的Linux工具等。 /bin/bash -c "$(curl -fsSL https://raw…