WebApplicationType.REACTIVE 的webSocket

通用請求體類

@Data
@ApiModel("websocket請求消息")
public class WebSocketRequest<T> implements Serializable {private static final long serialVersionUID = 1L;/*** 參考:com.mcmcnet.gacne.basic.service.common.pojo.enumeration.screen.AiBroadcastEventEnum;*/private @NotNull(message = "業務操作類型不能為空") Integer aiBroadcastEventEnum;private T data;public T getRealData(Class<T> clazz) {if (this.data == null) {return null;} else {String jsonStr = JsonUtil.toJsonStr(this.data);return (T) JsonUtil.parseObject(jsonStr, clazz);}}}

通用響應類

@ApiModel("websocket響應消息")
@Data
public class WebSocketResponse<T> implements Serializable {private static final long serialVersionUID = 1L;/*** 參考枚舉:com.mcmcnet.gacne.basic.service.common.pojo.enumeration.screen.AiBroadcastEventEnum*/private Integer aiBroadcastEventEnum;private String code;private String msg;private T data;public static <T> Mono<WebSocketResponse<T>> ok(Integer eventEnum, T data) {WebSocketResponse<T> response = new WebSocketResponse<T>();response.setAiBroadcastEventEnum(eventEnum);response.setCode(RespStatusEnum.OK.getCode());response.setMsg(RespStatusEnum.OK.getMsg());response.setData(data);return Mono.just(response);}public static Mono<WebSocketResponse<Void>> ok(Integer eventEnum) {WebSocketResponse<Void> response = new WebSocketResponse<Void>();response.setAiBroadcastEventEnum(eventEnum);response.setCode(RespStatusEnum.OK.getCode());response.setMsg(RespStatusEnum.OK.getMsg());return Mono.just(response);}public static <T> Mono<WebSocketResponse<T>> fail(Integer eventEnum, RespStatusEnum status, String err) {WebSocketResponse<T> response = new WebSocketResponse<T>();response.setAiBroadcastEventEnum(eventEnum);response.setCode(status.getCode());response.setMsg(err);return Mono.just(response);}}

連接類型類

@Data
@Accessors(chain = true)
public class ConnectDTO {/*** 連接類型 參考枚舉:com.mcmcnet.gacne.basic.service.common.pojo.enumeration.screen.AiBroadcastEventEnum*/private Integer type;}
  1. 配置類
@Configuration
public class WebFluxWebSocketConfig {/** 讓 Spring 注入已經帶依賴的 Handler */@Beanpublic HandlerMapping webSocketMapping(WebSocketReceivedHandler handler) {return new SimpleUrlHandlerMapping(Map.of("/api/xxx/ws", handler),   // 用注入的 handler-1);}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}
  1. 實現類
@Component
@RequiredArgsConstructor
@Slf4j
public class WebSocketReceivedHandler implements WebSocketHandler {@Autowiredprivate AiBroadcastEventHandlerDispatcher<?, ?> dispatcher;@Autowiredprivate WsSessionPool wsSessionPool;@Overridepublic @NotNull Mono<Void> handle(@NotNull WebSocketSession session) {wsSessionPool.add(session);String sid = session.getId();// 處理客戶端請求消息,生成響應消息流Flux<WebSocketMessage> inputFlux = session.receive().map(WebSocketMessage::getPayloadAsText).flatMap(payload -> dispatcher.doDispatch(session, payload).map(session::textMessage));// 服務端廣播消息流Flux<WebSocketMessage> broadcastFlux = wsSessionPool.getPersonalFlux(sid).map(session::textMessage);// 合并兩個流,確保 session.send 只調用一次Flux<WebSocketMessage> mergedFlux = Flux.merge(inputFlux, broadcastFlux);return session.send(mergedFlux).doFinally(sig -> {wsSessionPool.remove(session);log.info("websocket 關閉,sessionId:{},signal:{}", session.getId(), sig);});}
}

3.處理類 aiBroadcastEventEnum 是枚舉類型,根據不同的枚舉類型進入不同的處理類進行處理不同的消息返回


@Component
@Slf4j
public class AiBroadcastEventHandlerDispatcher<T, R> {private final Map<Integer, AiBroadcastEventHandler<T, R>> eventMap = new HashMap<>();/** 由 Spring 注入所有事件處理器 */public AiBroadcastEventHandlerDispatcher(List<AiBroadcastEventHandler<T, R>> handlers) {handlers.forEach(h -> eventMap.put(h.aiBroadcastEvent(), h));}/*** 處理前端發來的 Payload 并把響應寫回當前 session** @param session 當前 WebFlux Session* @param payload 文本 JSON* @return Mono<Void> 供調用方鏈式訂閱*/public Mono<String> doDispatch(WebSocketSession session, String payload) {WebSocketRequest<R> webSocketRequest = JsonUtil.parseObject(payload, new TypeReference<WebSocketRequest<R>>() {});log.info("webSocketRequest:{}, sessionID:{}", webSocketRequest, session.getId());// 發送響應并記錄日志return handlerRequest(webSocketRequest, session).map(JsonUtil::toJson).doOnNext(json -> log.info("準備發送: {}", json)).onErrorResume(e -> {log.error("發送異常", e);return Mono.just(JsonUtil.toJson(WebSocketResponse.fail(webSocketRequest != null ? webSocketRequest.getAiBroadcastEventEnum() : null,RespStatusEnum.INTERNAL_SERVICE_ERROR,"系統異常:" + e.getMessage())));});}/** 實際路由到具體 Handler */private Mono<WebSocketResponse<T>> handlerRequest(WebSocketRequest<R> req, WebSocketSession session) {if (ObjectUtil.isNull(req) || !eventMap.containsKey(req.getAiBroadcastEventEnum())) {return WebSocketResponse.fail(req.getAiBroadcastEventEnum(),RespStatusEnum.INTERNAL_SERVICE_ERROR,"aiBroadcastEventEnum not find");}try {return eventMap.get(req.getAiBroadcastEventEnum()).handler(req, session);} catch (Exception e) {log.error("websocket 處理消息異常,webSocketRequest:{}, sessionID:{}",req, session.getId(), e);return WebSocketResponse.fail(req.getAiBroadcastEventEnum(),RespStatusEnum.INTERNAL_SERVICE_ERROR, e.getMessage());}}}
  1. 接口
public interface AiBroadcastEventHandler<T, R> {/*** 對應事件枚舉值(例如 MAP_ALARM_CHARGING 的 code)*/Integer aiBroadcastEvent();/*** 執行處理邏輯,并返回響應,最終由 dispatcher 推送給前端** @param webSocketRequest 請求體* @param session          當前連接* @return Mono<WebSocketResponse<T>> 最終會轉成 JSON 發給前端*/Mono<WebSocketResponse<T>> handler(WebSocketRequest<R> webSocketRequest, WebSocketSession session);/*** 校驗參數*/void validateParam(WebSocketRequest<R> webSocketRequest) throws ParameterException;
  1. 通用處理邏輯
public abstract class AbstractAiBroadcastEventHandler<T, R>implements AiBroadcastEventHandler<T, R> {/*** websocket 請求事件處理統一流程:參數校驗 → 業務處理*/@Overridepublic Mono<WebSocketResponse<T>> handler(WebSocketRequest<R> webSocketRequest,WebSocketSession session) {try {validateParam(webSocketRequest);return doHandler(webSocketRequest, session);} catch (ParameterException e) {return WebSocketResponse.fail(webSocketRequest.getAiBroadcastEventEnum(),RespStatusEnum.INTERNAL_SERVICE_ERROR,e.getMessage());}}/*** 子類實現真正的業務邏輯:*   1. 更新本地 sessionId / Redis 映射*   2. 查詢并返回最新業務數據*/public abstract Mono<WebSocketResponse<T>> doHandler(WebSocketRequest<R> webSocketRequest, WebSocketSession session);
  1. 實現類
@Component
@Slf4j
public class SubscribeFireContentHandler extends AbstractAiBroadcastEventHandler<VO, ConnectDTO> implements AiBroadcastEventHandler<VO, ConnectDTO>{@Autowiredprivate BizServiceSafeScreenClient bizServiceSafeScreenClient;@Overridepublic Integer aiBroadcastEvent() {return AiBroadcastEventEnum.AI_FIRE_ALARM_CONTENT.getCode();}@Overridepublic Mono<WebSocketResponse<VO>> doHandler(WebSocketRequest<ConnectDTO> webSocketRequest, WebSocketSession session) {log.info("SubscribeFireContentHandler doHandler start");//session訂閱該訂單數據ConnectDTO dto = webSocketRequest.getRealData(ConnectDTO.class);if (!AiBroadcastEventEnum.AI_FIRE_ALARM_CONTENT.getCode().equals(dto.getType())) {return Mono.error(new RespException("參數異常", RespStatusEnum.CLIENT_ERROR));}// 查詢火災站 前端拼接內容FinalResultVO<VO> xxx = 調用接口獲取數據;if (xxx  != null) {Mono<WebSocketResponse<FireStationVO>> ok = WebSocketResponse.ok(AiBroadcastEventEnum.AI_FIRE_ALARM_CONTENT.getCode(), xxx);return ok;}return Mono.empty();}@Overridepublic void validateParam(WebSocketRequest<ConnectDTO> webSocketRequest) {ConnectDTO dto = webSocketRequest.getRealData(ConnectDTO.class);if (ObjUtil.isNull(dto.getType())) {throw new RespException("參數異常", RespStatusEnum.CLIENT_ERROR);}}

7.心跳

@Component
@Log4j2
public class WsHeartbeatTask {private final WsSessionPool wsSessionPool;public WsHeartbeatTask(WsSessionPool wsSessionPool) {this.wsSessionPool = wsSessionPool;}@PostConstructpublic void init() {log.info("WebSocket心跳任務已啟動");}// 每30秒廣播一個心跳消息@Scheduled(fixedRate = 30_000)public void sendHeartbeat() {String timeStr = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));String json = String.format("{\"type\":\"ping\",\"timestamp\":\"%s\"}", timeStr);wsSessionPool.broadcast(json);}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/89125.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/89125.shtml
英文地址,請注明出處:http://en.pswp.cn/web/89125.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

降本增效!自動化UI測試平臺TestComplete并行測試亮點

在跨平臺自動化測試中&#xff0c;企業常面臨設備投入高、串行測試耗時長、測試覆蓋率難以兼顧的困境。自動化UI測試平臺TestComplete的并行測試引擎提供了有效的解決方案&#xff1a;通過云端海量設備池與CI/CD深度集成&#xff0c;實現多平臺、多瀏覽器并行測試&#xff0c;顯…

云、實時、時序數據庫混合應用:醫療數據管理的革新與展望(上)

云、實時、時序數據庫混合應用:醫療數據管理的革新與展望 1、引言 1.1 研究背景與意義 在信息技術飛速發展的當下,醫療行業正經歷著深刻的數字化轉型。這一轉型不僅是技術層面的革新,更是關乎醫療體系未來發展方向的深刻變革。從醫療服務的提供方式,到醫療管理的模式,再…

代碼隨想錄算法訓練營十六天|二叉樹part06

LeetCode 530 二叉搜索樹的最小絕對差 題目鏈接&#xff1a;530. 二叉搜索樹的最小絕對差 - 力扣&#xff08;LeetCode&#xff09; 給你一個二叉搜索樹的根節點 root &#xff0c;返回 樹中任意兩不同節點值之間的最小差值 。 差值是一個正數&#xff0c;其數值等于兩值之差…

自增主鍵為什么不是連續的?

前言 如果一個線程回滾&#xff0c;例如唯一鍵沖突的情況回滾時&#xff0c;回滾了sql語句&#xff0c;但是并沒有把自增的值也-1。那么就會導致下一條插入的數據自增id出現了跳躍。 自增主鍵為什么不是連續的&#xff1f;前言執行時機為什么自增主鍵不是連續的為什么不回滾自…

OpenCV圖像基本操作:讀取、顯示與保存

在圖像處理項目中&#xff0c;圖像的 讀取&#xff08;imread&#xff09;、顯示&#xff08;imshow&#xff09; 和 保存&#xff08;imwrite&#xff09; 是最基礎也是最常用的三個操作。本文將詳細介紹這三個函數的功能、用法和注意事項&#xff0c;并提供一個完整示例供讀者…

.NET控制臺應用程序中防止程序立即退出

在VB.NET控制臺應用程序中防止程序立即退出&#xff0c;主要有以下幾種常用方法&#xff0c;根據需求選擇適合的方案&#xff1a; 方法1&#xff1a;等待用戶輸入&#xff08;推薦&#xff09; Module Module1Sub Main()Console.WriteLine("程序開始運行...") 這里是…

Vue3 + Three.js 極速入門:打造你的第一個3D可視化項目

文章目錄前言一、環境準備1.1 創建Vue3項目1.2 安裝Three.js二、Three.js核心概念速覽三、實戰&#xff1a;創建旋轉立方體3.1 組件化初始化四、核心代碼解析4.1 Vue3響應式整合技巧4.2 性能優化要點五、進階功能擴展5.1 數據驅動控制5.2 加載3D模型六、常見問題解決七、資源推…

【設計模式】享元模式(輕量級模式) 單純享元模式和復合享元模式

享元模式&#xff08;Flyweight Pattern&#xff09;詳解一、享元模式簡介 享元模式&#xff08;Flyweight Pattern&#xff09; 是一種 結構型設計模式&#xff08;對象結構型模式&#xff09;&#xff0c;它通過共享技術實現相同或相似對象的重用&#xff0c;以減少內存占用和…

驅動開發_2.字符設備驅動

目錄1. 什么是字符設備2. 設備號2.1 設備號概念2.2 通過設備號dev分別獲取主、次設備號的宏函數2.3 主設備號的申請靜態申請動態分配2.4 注銷設備號3. 字符設備3.1 注冊字符設備3.2 注銷字符設備3.3 應用程序和驅動程序的關系3.4 file_opertaions結構體3.5 class_create3.6 創建…

直播推流技術底層邏輯詳解與私有化實現方案-以rmtp rtc hls為例-優雅草卓伊凡

直播推流技術底層邏輯詳解與私有化實現方案-以rmtp rtc hls為例-優雅草卓伊凡由于我們的甲方客戶要開始為我們項目產品上加入私有化的直播&#xff0c;這塊不得不又撿起來曾經我們做直播推流的事情了&#xff0c;其實私有化直播一直并不是一件容易的事情&#xff0c;現在大部分…

一文讀懂現代卷積神經網絡—深度卷積神經網絡(AlexNet)

目錄 深度卷積神經網絡&#xff08;AlexNet&#xff09;是什么&#xff1f; 一、AlexNet 的核心創新 1. 深度架構 2. ReLU 激活函數 3. 數據增強 4. Dropout 正則化 5. GPU 并行計算 6. 局部響應歸一化&#xff08;LRN&#xff09; 二、AlexNet 的網絡結構 三、AlexN…

JVM 垃圾收集算法全面解析

1. 引言1.1 為什么需要垃圾收集&#xff1f;在Java應用中&#xff0c;垃圾收集&#xff08;Garbage Collection&#xff0c;GC&#xff09;是一個至關重要的機制&#xff0c;它使得開發者不需要手動管理內存。與傳統的語言&#xff08;如C或C&#xff09;不同&#xff0c;Java通…

Vmware中安裝的CentOS7如何擴展硬盤大小

起初創建虛擬機時&#xff0c;大小設置不合理&#xff0c;導致我在嘗試開源項目時空間不足重新擴展硬盤&#xff0c;不僅需要在虛擬機設置中配置&#xff0c;還需要在系統內重新進行分區一、虛擬機設置打開虛擬機設置→硬盤→擴展&#xff0c;將大小設置為自己期望的大小&#…

Python+MongoDB高效開發組合

如大家所知&#xff0c;Python與MongoDB的結合是一種高效的開發組合&#xff0c;主要用于通過Python進行數據存儲、查詢及管理&#xff0c;利用MongoDB的文檔型數據庫特性實現靈活的數據處理。下面讓 Python 連接上 MongoDB&#xff1a;安裝 PyMongo&#xff1a;pip3 install p…

【論文閱讀】Masked Autoencoders Are Effective Tokenizers for Diffusion Models

introduce什么樣的 latent 空間更適合用于擴散模型&#xff1f;作者發現&#xff1a;相比傳統的 VAE&#xff0c;結構良好、判別性強的 latent 空間才是 diffusion 成功的關鍵。研究動機&#xff1a;什么才是“好的 latent 表征”&#xff1f;背景&#xff1a;Diffusion Models…

每日一SQL 【游戲玩法分析 IV】

文章目錄問題案例執行順序使用分組解決問題 案例 執行順序 SQL 語句的執行順序&#xff08;核心步驟&#xff09; 同一層級的select查詢內部, 別名在整個 SELECT 計算完成前不生效 使用分組解決 select distinct s.product_id, Product.product_name from Sales sleft join …

內部文件審計:企業文件服務器審計對網絡安全提升有哪些幫助?

企業文件服務器審計工作不僅對提升企業網絡信息安全起到重要作用&#xff0c;還能對企業內部網絡文件信息是否合規進行判斷。因此企業文件服務器審計一直被高度重視。 一、文件服務器為何成為攻擊焦點&#xff1f; 企業文件服務器通常集中存儲財務報表、人事檔案、研發資料、客…

FusionOne HCI 23 超融合實施手冊(超聚變超融合)

產品介紹 FusionOne HCI作為實現企業信息一體化的IT基礎設施平臺&#xff0c;以“軟硬件垂直深度集成和調優”、“快速部署”、“統一管理”的理念&#xff0c;提供應用融合部署&#xff0c;提升核心業務運作效率&#xff0c;降低整體采購成本。 FusionOne HCI代表了IT產品的…

AI算姻緣測算小工具流量主微信小程序開源

功能特點 響應式設計&#xff1a;完美適配各種移動設備屏幕尺寸 精美UI界面&#xff1a; 柔和的粉紅色漸變背景 圓角卡片設計 精心設計的字體和間距 愛心圖標點綴 動態效果&#xff1a; 點擊按鈕時的動畫反饋 測算結果的平滑過渡動畫 愛心漂浮動畫 進度條動態填充 AI測算功能&a…

Vue獲取上傳Excel文件內容并展示在表格中

一、安裝依賴 npm install xlsx 二、引用依賴 import XLSX from xlsx 三、代碼實現 1、注意&#xff1a;函數 analysis 中reader.readAsBinaryString(file)&#xff0c;file的數據格式如圖所示 2、示例代碼 <!-- 項目使用的前端框架為非流行框架&#xff0c;主要關注…