通用請求體類
@Data
@ApiModel("websocket請求消息")
public class WebSocketRequest<T> implements Serializable {private static final long serialVersionUID = 1L;/*** 參考:com.mcmcnet.gacne.basic.service.common.pojo.enumeration.screen.AiBroadcastEventEnum;*/private @NotNull(message = "業務操作類型不能為空") Integer aiBroadcastEventEnum;private T data;public T getRealData(Class<T> clazz) {if (this.data == null) {return null;} else {String jsonStr = JsonUtil.toJsonStr(this.data);return (T) JsonUtil.parseObject(jsonStr, clazz);}}}
通用響應類
@ApiModel("websocket響應消息")
@Data
public class WebSocketResponse<T> implements Serializable {private static final long serialVersionUID = 1L;/*** 參考枚舉:com.mcmcnet.gacne.basic.service.common.pojo.enumeration.screen.AiBroadcastEventEnum*/private Integer aiBroadcastEventEnum;private String code;private String msg;private T data;public static <T> Mono<WebSocketResponse<T>> ok(Integer eventEnum, T data) {WebSocketResponse<T> response = new WebSocketResponse<T>();response.setAiBroadcastEventEnum(eventEnum);response.setCode(RespStatusEnum.OK.getCode());response.setMsg(RespStatusEnum.OK.getMsg());response.setData(data);return Mono.just(response);}public static Mono<WebSocketResponse<Void>> ok(Integer eventEnum) {WebSocketResponse<Void> response = new WebSocketResponse<Void>();response.setAiBroadcastEventEnum(eventEnum);response.setCode(RespStatusEnum.OK.getCode());response.setMsg(RespStatusEnum.OK.getMsg());return Mono.just(response);}public static <T> Mono<WebSocketResponse<T>> fail(Integer eventEnum, RespStatusEnum status, String err) {WebSocketResponse<T> response = new WebSocketResponse<T>();response.setAiBroadcastEventEnum(eventEnum);response.setCode(status.getCode());response.setMsg(err);return Mono.just(response);}}
連接類型類
@Data
@Accessors(chain = true)
public class ConnectDTO {/*** 連接類型 參考枚舉:com.mcmcnet.gacne.basic.service.common.pojo.enumeration.screen.AiBroadcastEventEnum*/private Integer type;}
- 配置類
@Configuration
public class WebFluxWebSocketConfig {/** 讓 Spring 注入已經帶依賴的 Handler */@Beanpublic HandlerMapping webSocketMapping(WebSocketReceivedHandler handler) {return new SimpleUrlHandlerMapping(Map.of("/api/xxx/ws", handler), // 用注入的 handler-1);}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}
- 實現類
@Component
@RequiredArgsConstructor
@Slf4j
public class WebSocketReceivedHandler implements WebSocketHandler {@Autowiredprivate AiBroadcastEventHandlerDispatcher<?, ?> dispatcher;@Autowiredprivate WsSessionPool wsSessionPool;@Overridepublic @NotNull Mono<Void> handle(@NotNull WebSocketSession session) {wsSessionPool.add(session);String sid = session.getId();// 處理客戶端請求消息,生成響應消息流Flux<WebSocketMessage> inputFlux = session.receive().map(WebSocketMessage::getPayloadAsText).flatMap(payload -> dispatcher.doDispatch(session, payload).map(session::textMessage));// 服務端廣播消息流Flux<WebSocketMessage> broadcastFlux = wsSessionPool.getPersonalFlux(sid).map(session::textMessage);// 合并兩個流,確保 session.send 只調用一次Flux<WebSocketMessage> mergedFlux = Flux.merge(inputFlux, broadcastFlux);return session.send(mergedFlux).doFinally(sig -> {wsSessionPool.remove(session);log.info("websocket 關閉,sessionId:{},signal:{}", session.getId(), sig);});}
}
3.處理類 aiBroadcastEventEnum 是枚舉類型,根據不同的枚舉類型進入不同的處理類進行處理不同的消息返回
@Component
@Slf4j
public class AiBroadcastEventHandlerDispatcher<T, R> {private final Map<Integer, AiBroadcastEventHandler<T, R>> eventMap = new HashMap<>();/** 由 Spring 注入所有事件處理器 */public AiBroadcastEventHandlerDispatcher(List<AiBroadcastEventHandler<T, R>> handlers) {handlers.forEach(h -> eventMap.put(h.aiBroadcastEvent(), h));}/*** 處理前端發來的 Payload 并把響應寫回當前 session** @param session 當前 WebFlux Session* @param payload 文本 JSON* @return Mono<Void> 供調用方鏈式訂閱*/public Mono<String> doDispatch(WebSocketSession session, String payload) {WebSocketRequest<R> webSocketRequest = JsonUtil.parseObject(payload, new TypeReference<WebSocketRequest<R>>() {});log.info("webSocketRequest:{}, sessionID:{}", webSocketRequest, session.getId());// 發送響應并記錄日志return handlerRequest(webSocketRequest, session).map(JsonUtil::toJson).doOnNext(json -> log.info("準備發送: {}", json)).onErrorResume(e -> {log.error("發送異常", e);return Mono.just(JsonUtil.toJson(WebSocketResponse.fail(webSocketRequest != null ? webSocketRequest.getAiBroadcastEventEnum() : null,RespStatusEnum.INTERNAL_SERVICE_ERROR,"系統異常:" + e.getMessage())));});}/** 實際路由到具體 Handler */private Mono<WebSocketResponse<T>> handlerRequest(WebSocketRequest<R> req, WebSocketSession session) {if (ObjectUtil.isNull(req) || !eventMap.containsKey(req.getAiBroadcastEventEnum())) {return WebSocketResponse.fail(req.getAiBroadcastEventEnum(),RespStatusEnum.INTERNAL_SERVICE_ERROR,"aiBroadcastEventEnum not find");}try {return eventMap.get(req.getAiBroadcastEventEnum()).handler(req, session);} catch (Exception e) {log.error("websocket 處理消息異常,webSocketRequest:{}, sessionID:{}",req, session.getId(), e);return WebSocketResponse.fail(req.getAiBroadcastEventEnum(),RespStatusEnum.INTERNAL_SERVICE_ERROR, e.getMessage());}}}
- 接口
public interface AiBroadcastEventHandler<T, R> {/*** 對應事件枚舉值(例如 MAP_ALARM_CHARGING 的 code)*/Integer aiBroadcastEvent();/*** 執行處理邏輯,并返回響應,最終由 dispatcher 推送給前端** @param webSocketRequest 請求體* @param session 當前連接* @return Mono<WebSocketResponse<T>> 最終會轉成 JSON 發給前端*/Mono<WebSocketResponse<T>> handler(WebSocketRequest<R> webSocketRequest, WebSocketSession session);/*** 校驗參數*/void validateParam(WebSocketRequest<R> webSocketRequest) throws ParameterException;
- 通用處理邏輯
public abstract class AbstractAiBroadcastEventHandler<T, R>implements AiBroadcastEventHandler<T, R> {/*** websocket 請求事件處理統一流程:參數校驗 → 業務處理*/@Overridepublic Mono<WebSocketResponse<T>> handler(WebSocketRequest<R> webSocketRequest,WebSocketSession session) {try {validateParam(webSocketRequest);return doHandler(webSocketRequest, session);} catch (ParameterException e) {return WebSocketResponse.fail(webSocketRequest.getAiBroadcastEventEnum(),RespStatusEnum.INTERNAL_SERVICE_ERROR,e.getMessage());}}/*** 子類實現真正的業務邏輯:* 1. 更新本地 sessionId / Redis 映射* 2. 查詢并返回最新業務數據*/public abstract Mono<WebSocketResponse<T>> doHandler(WebSocketRequest<R> webSocketRequest, WebSocketSession session);
- 實現類
@Component
@Slf4j
public class SubscribeFireContentHandler extends AbstractAiBroadcastEventHandler<VO, ConnectDTO> implements AiBroadcastEventHandler<VO, ConnectDTO>{@Autowiredprivate BizServiceSafeScreenClient bizServiceSafeScreenClient;@Overridepublic Integer aiBroadcastEvent() {return AiBroadcastEventEnum.AI_FIRE_ALARM_CONTENT.getCode();}@Overridepublic Mono<WebSocketResponse<VO>> doHandler(WebSocketRequest<ConnectDTO> webSocketRequest, WebSocketSession session) {log.info("SubscribeFireContentHandler doHandler start");//session訂閱該訂單數據ConnectDTO dto = webSocketRequest.getRealData(ConnectDTO.class);if (!AiBroadcastEventEnum.AI_FIRE_ALARM_CONTENT.getCode().equals(dto.getType())) {return Mono.error(new RespException("參數異常", RespStatusEnum.CLIENT_ERROR));}// 查詢火災站 前端拼接內容FinalResultVO<VO> xxx = 調用接口獲取數據;if (xxx != null) {Mono<WebSocketResponse<FireStationVO>> ok = WebSocketResponse.ok(AiBroadcastEventEnum.AI_FIRE_ALARM_CONTENT.getCode(), xxx);return ok;}return Mono.empty();}@Overridepublic void validateParam(WebSocketRequest<ConnectDTO> webSocketRequest) {ConnectDTO dto = webSocketRequest.getRealData(ConnectDTO.class);if (ObjUtil.isNull(dto.getType())) {throw new RespException("參數異常", RespStatusEnum.CLIENT_ERROR);}}
7.心跳
@Component
@Log4j2
public class WsHeartbeatTask {private final WsSessionPool wsSessionPool;public WsHeartbeatTask(WsSessionPool wsSessionPool) {this.wsSessionPool = wsSessionPool;}@PostConstructpublic void init() {log.info("WebSocket心跳任務已啟動");}// 每30秒廣播一個心跳消息@Scheduled(fixedRate = 30_000)public void sendHeartbeat() {String timeStr = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));String json = String.format("{\"type\":\"ping\",\"timestamp\":\"%s\"}", timeStr);wsSessionPool.broadcast(json);}
}