java每日精進 5.11【WebSocket】

1.純Websocket實現消息發送

1.1一對一發送

前端

  1. 用戶在輸入框輸入消息內容(sendText)

  2. 選擇特定接收用戶(sendUserId)

  3. 點擊發送按鈕觸發handlerSend方法

  4. 構造消息內容JSON:

    {text: "Hello", // 消息內容toUserId: 123   // 目標用戶ID
    }
  5. 包裝為WebSocket標準格式:

    {type: "demo-message-send", // 消息類型content: '{"text":"Hello","toUserId":123}' // 字符串化的內容
    }
  6. 通過send()方法發送

  • 前端在setup函數中,使用useWebSocket方法,根據server變量(WebSocket 服務地址)建立連接。server地址由VITE_BASE_URL(環境變量)、/infra/ws路徑和token(通過getRefreshToken獲取)組成。
  • 設置autoReconnecttrue,表示自動重連;heartbeattrue,表示開啟心跳機制。
  • 當用戶在前端輸入消息并點擊發送按鈕時,handlerSend函數被調用。
  • 首先將發送內容sendText和接收用戶sendUserId進行 JSON 化處理,構建消息內容messageContent
  • 然后將消息類型typedemo-message-send)和消息內容messageContent再次 JSON 化,形成最終的消息jsonMessage
  • 最后使用send函數將jsonMessage發送到后端。
const server = ref((import.meta.env.VITE_BASE_URL + '/infra/ws').replace('http', 'ws') +'?token=' +getRefreshToken() // 使用 getRefreshToken() 方法,而不使用 getAccessToken() 方法的原因:WebSocket 無法方便的刷新訪問令牌
) // WebSocket 服務地址
const getIsOpen = computed(() => status.value === 'OPEN') // WebSocket 連接是否打開
const getTagColor = computed(() => (getIsOpen.value ? 'success' : 'red')) // WebSocket 連接的展示顏色/** 發起 WebSocket 連接 */
const { status, data, send, close, open } = useWebSocket(server.value, {autoReconnect: true,heartbeat: true
})
/** 發送消息 */
const sendText = ref('') // 發送內容
const sendUserId = ref('') // 發送人
const handlerSend = () => {// 1.1 先 JSON 化 message 消息內容const messageContent = JSON.stringify({text: sendText.value,toUserId: sendUserId.value})// 1.2 再 JSON 化整個消息const jsonMessage = JSON.stringify({type: 'demo-message-send',content: messageContent})// 2. 最后發送消息send(jsonMessage)sendText.value = ''
}

后端

  • 注冊監聽器DemoWebSocketMessageListener?類通過實現?WebSocketMessageListener<DemoSendMessage>?接口,并使用?@Component?注解將自己注冊為 Spring Bean。框架啟動時會掃描所有實現了該接口的 Bean,并將它們注冊到消息處理器中。
/*** WebSocket 示例:單發消息*/
@Component
public class DemoWebSocketMessageListener implements WebSocketMessageListener<DemoSendMessage> {@Resourceprivate WebSocketMessageSender webSocketMessageSender;@Overridepublic void onMessage(WebSocketSession session, DemoSendMessage message) {Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session);// 情況一:單發if (message.getToUserId() != null) {DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(true);webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(), // 給指定用戶"demo-message-receive", toMessage);return;}// 情況二:群發DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(false);webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 給所有用戶"demo-message-receive", toMessage);}@Overridepublic String getType() {return "demo-message-send";}}
  • 消息類型綁定getType()?方法返回?"demo-message-send",這表明該監聽器專門處理類型為?"demo-message-send"?的消息。當后端接收到消息時,會根據消息類型路由到對應的監聽器。

當 WebSocket 服務器接收到消息后:

  1. 消息解析:框架首先解析消息的 JSON 格式,提取?type?字段(如?"demo-message-send")。
  2. 類型匹配:后端框架會自動將?type?為?"demo-message-send"?的消息路由到?DemoWebSocketMessageListener?的?onMessage?方法。
  3. 調用回調:將消息反序列化為?DemoSendMessage?對象,并調用監聽器的?onMessage?方法。
/*** JSON 格式 {@link WebSocketHandler} 實現類* 基于 {@link JsonWebSocketMessage#getType()} 消息類型,調度到對應的 {@link WebSocketMessageListener} 監聽器。*/
@Slf4j
public class JsonWebSocketMessageHandler extends TextWebSocketHandler {/*** type 與 WebSocketMessageListener 的映射* 用于存儲不同消息類型對應的監聽器,鍵為消息類型,值為對應的監聽器實例*/private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();@SuppressWarnings({"rawtypes", "unchecked"})public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {// 遍歷傳入的監聽器列表listenersList.forEach((Consumer<WebSocketMessageListener>)listener -> {// 將監聽器的類型(通過 getType() 方法獲取)作為鍵,監聽器實例作為值,存入 listeners 映射中listeners.put(listener.getType(), listener);});}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {// 1.1 空消息,跳過// 如果消息的負載長度為 0,說明是一個空消息,直接返回,不進行后續處理if (message.getPayloadLength() == 0) {return;}// 1.2 ping 心跳消息,直接返回 pong 消息。// 如果消息的負載長度為 4 且負載內容為 "ping",則向客戶端發送 "pong" 消息,表示響應心跳if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) {session.sendMessage(new TextMessage("pong"));return;}// 2.1 解析消息try {// 將文本消息的負載解析為 JsonWebSocketMessage 對象JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);// 如果解析后的消息為空,記錄錯誤日志并返回,不進行后續處理if (jsonMessage == null) {log.error("[handleTextMessage][session({}) message({}) 解析為空]", session.getId(), message.getPayload());return;}// 如果解析后的消息類型為空,記錄錯誤日志并返回,不進行后續處理if (StrUtil.isEmpty(jsonMessage.getType())) {log.error("[handleTextMessage][session({}) message({}) 類型為空]", session.getId(), message.getPayload());return;}// 2.2 獲得對應的 WebSocketMessageListener// 根據消息類型從 listeners 映射中獲取對應的監聽器WebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());// 如果沒有找到對應的監聽器,記錄錯誤日志并返回,不進行后續處理if (messageListener == null) {log.error("[handleTextMessage][session({}) message({}) 監聽器為空]", session.getId(), message.getPayload());return;}// 2.3 處理消息// 獲取監聽器泛型參數類型Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);// 將消息內容解析為對應類型的對象Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);// 獲取當前會話的租戶 IDLong tenantId = WebSocketFrameworkUtils.getTenantId(session);// 執行租戶相關的操作,調用監聽器的 onMessage 方法處理消息TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj));} catch (Throwable ex) {// 如果在處理消息過程中發生異常,記錄錯誤日志log.error("[handleTextMessage][session({}) message({}) 處理異常]", session.getId(), message.getPayload());}}}

WebSocketMessageListener?之所以能監聽消息,是因為:

  1. 接口契約:實現?WebSocketMessageListener?接口并指定消息類型(getType())。
  2. 框架支持:Spring 框架自動掃描并注冊監聽器,實現消息的解析和分發。
  3. 類型匹配:前端發送的消息?type?與后端監聽器的?getType()?一致,觸發回調。

這個過程類似于 HTTP 請求的路由機制,只不過 WebSocket 是長連接,需要持續監聽消息。

通常,WebSocket 框架(如 Spring WebSocket)會提供以下核心組件:

  • 消息解碼器:將二進制數據轉換為 Java 對象(如?DemoSendMessage)。
  • 消息路由器:根據消息類型將消息路由到對應的監聽器。
  • 會話管理器:維護所有 WebSocket 會話(WebSocketSession),并提供獲取用戶信息的工具(如?WebSocketFrameworkUtils.getLoginUserId)。
  • 后端的DemoWebSocketMessageListener類實現了WebSocketMessageListener接口的onMessage方法。
  • 當有消息到達時,onMessage方法被調用,從WebSocketSession中獲取登錄用戶 ID(fromUserId)。
  • 根據消息中的toUserId判斷是單發還是群發:
    • 如果toUserId不為空,則創建DemoReceiveMessage對象,設置fromUserIdtextsingletrue,通過webSocketMessageSendersendObject方法將消息發送給指定用戶。
    • 如果toUserId為空,則創建DemoReceiveMessage對象,設置fromUserIdtextsinglefalse,通過webSocketMessageSendersendObject方法將消息發送給所有用戶。
  1. JsonWebSocketMessageHandler接收并解析消息

  2. 根據type="demo-message-send"找到DemoWebSocketMessageListener

  3. 調用onMessage方法:

    • 從Session中獲取發送者ID(fromUserId)

    • 檢查message.getToUserId()不為null,進入單發邏輯

  4. 構造響應消息:

    new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(true)
  5. 通過webSocketMessageSender發送給指定用戶:

    webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 用戶類型message.getToUserId(),         // 目標用戶ID"demo-message-receive",       // 消息類型toMessage                    // 消息內容
    )

實際示例:

  • 用戶A(ID:100)發送"下午開會"給用戶B(ID:101)

  • 前端發送:

    {"type":"demo-message-send","content":"{\"text\":\"下午開會\",\"toUserId\":101}"}
  • 后端處理后發送給用戶B:

    {"type":"demo-message-receive","content":"{\"fromUserId\":100,\"text\":\"下午開會\",\"single\":true}"}

1.2一對多發送

前端

  1. 用戶在輸入框輸入消息內容(sendText)

  2. 不選擇特定用戶(或選擇"所有人")

  3. 點擊發送按鈕觸發handlerSend方法

  4. 構造消息內容JSON:

    {text: "系統維護通知", // 消息內容toUserId: ""      // 空表示群發
    }
  5. 包裝為WebSocket標準格式并發送

后端

  1. 同上接收解析流程

  2. onMessage方法中檢查message.getToUserId()為null,進入群發邏輯

  3. 構造響應消息:

    new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(false)
  4. 通過webSocketMessageSender發送給所有用戶:

    webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 用戶類型"demo-message-receive",       // 消息類型toMessage                    // 消息內容
    )

實際示例:

  • 管理員發送"系統即將升級"給所有用戶

  • 前端發送:

    {"type":"demo-message-send","content":"{\"text\":\"系統即將升級\",\"toUserId\":\"\"}"}

2.總結及類補充

后端代碼

  1. 配置類:

    • YudaoWebSocketAutoConfiguration: 配置 WebSocket 端點、攔截器、會話管理和消息發送器,支持多種發送類型(local, redis, rocketmq, rabbitmq, kafka)。
    • 條件注解 @ConditionalOnProperty 允許通過配置啟用/禁用 WebSocket 或切換發送類型。
    • 注冊 WebSocketConfigurer、HandshakeInterceptor、WebSocketHandler 和 WebSocketSessionManager。
    • /*** WebSocket 自動配置類* 負責 WebSocket 服務的初始化和相關組件的注冊*/
      @AutoConfiguration(before = YudaoRedisMQConsumerAutoConfiguration.class) 
      // 在 YudaoRedisMQConsumerAutoConfiguration 之前加載,確保 RedisWebSocketMessageConsumer 先創建
      @EnableWebSocket // 啟用 Spring WebSocket 支持
      @ConditionalOnProperty(prefix = "moyun.websocket", value = "enable", matchIfMissing = true) 
      // 通過配置項 moyun.websocket.enable 控制是否啟用 WebSocket,默認啟用
      @EnableConfigurationProperties(WebSocketProperties.class) // 啟用 WebSocket 配置屬性類
      public class YudaoWebSocketAutoConfiguration {/*** 配置 WebSocket 處理器和握手攔截器*/@Beanpublic WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor[] handshakeInterceptors,WebSocketHandler webSocketHandler,WebSocketProperties webSocketProperties) {return registry -> registry// 注冊 WebSocket 處理器并指定連接路徑.addHandler(webSocketHandler, webSocketProperties.getPath())// 添加握手攔截器,用于驗證和預處理.addInterceptors(handshakeInterceptors)// 允許所有域名跨域訪問,否則前端連接會被阻止.setAllowedOriginPatterns("*");}/*** 創建握手攔截器,用于在 WebSocket 握手階段進行用戶認證和權限檢查*/@Beanpublic HandshakeInterceptor handshakeInterceptor() {return new LoginUserHandshakeInterceptor();}/*** 創建 WebSocket 消息處理器* 包裝 JsonWebSocketMessageHandler 并添加會話管理功能*/@Beanpublic WebSocketHandler webSocketHandler(WebSocketSessionManager sessionManager,List<? extends WebSocketMessageListener<?>> messageListeners) {// 1. 創建消息處理器,負責消息類型路由和分發JsonWebSocketMessageHandler messageHandler = new JsonWebSocketMessageHandler(messageListeners);// 2. 包裝消息處理器,添加會話管理功能(如連接建立和關閉時的回調)return new WebSocketSessionHandlerDecorator(messageHandler, sessionManager);}/*** 創建 WebSocket 會話管理器,用于管理所有活動的 WebSocket 會話*/@Beanpublic WebSocketSessionManager webSocketSessionManager() {return new WebSocketSessionManagerImpl();}/*** 創建 WebSocket 請求授權自定義器,用于配置安全規則*/@Beanpublic WebSocketAuthorizeRequestsCustomizer webSocketAuthorizeRequestsCustomizer(WebSocketProperties webSocketProperties) {return new WebSocketAuthorizeRequestsCustomizer(webSocketProperties);}// ==================== 消息發送器配置 ====================/*** 本地模式消息發送器配置(單節點部署)*/@Configuration@ConditionalOnProperty(prefix = "moyun.websocket", name = "sender-type", havingValue = "local")public class LocalWebSocketMessageSenderConfiguration {@Beanpublic LocalWebSocketMessageSender localWebSocketMessageSender(WebSocketSessionManager sessionManager) {return new LocalWebSocketMessageSender(sessionManager);}}/*** Redis 模式消息發送器配置(分布式部署)*/@Configuration@ConditionalOnProperty(prefix = "moyun.websocket", name = "sender-type", havingValue = "redis")public class RedisWebSocketMessageSenderConfiguration {@Beanpublic RedisWebSocketMessageSender redisWebSocketMessageSender(WebSocketSessionManager sessionManager,RedisMQTemplate redisMQTemplate) {return new RedisWebSocketMessageSender(sessionManager, redisMQTemplate);}@Beanpublic RedisWebSocketMessageConsumer redisWebSocketMessageConsumer(RedisWebSocketMessageSender redisWebSocketMessageSender) {return new RedisWebSocketMessageConsumer(redisWebSocketMessageSender);}}/*** RocketMQ 模式消息發送器配置*/@Configuration@ConditionalOnProperty(prefix = "moyun.websocket", name = "sender-type", havingValue = "rocketmq")public class RocketMQWebSocketMessageSenderConfiguration {@Beanpublic RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender(WebSocketSessionManager sessionManager, RocketMQTemplate rocketMQTemplate,@Value("${moyun.websocket.sender-rocketmq.topic}") String topic) {return new RocketMQWebSocketMessageSender(sessionManager, rocketMQTemplate, topic);}@Beanpublic RocketMQWebSocketMessageConsumer rocketMQWebSocketMessageConsumer(RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender) {return new RocketMQWebSocketMessageConsumer(rocketMQWebSocketMessageSender);}}/*** RabbitMQ 模式消息發送器配置*/@Configuration@ConditionalOnProperty(prefix = "moyun.websocket", name = "sender-type", havingValue = "rabbitmq")public class RabbitMQWebSocketMessageSenderConfiguration {@Beanpublic RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender(WebSocketSessionManager sessionManager, RabbitTemplate rabbitTemplate,TopicExchange websocketTopicExchange) {return new RabbitMQWebSocketMessageSender(sessionManager, rabbitTemplate, websocketTopicExchange);}@Beanpublic RabbitMQWebSocketMessageConsumer rabbitMQWebSocketMessageConsumer(RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender) {return new RabbitMQWebSocketMessageConsumer(rabbitMQWebSocketMessageSender);}/*** 創建 RabbitMQ 主題交換機,用于消息廣播*/@Beanpublic TopicExchange websocketTopicExchange(@Value("${moyun.websocket.sender-rabbitmq.exchange}") String exchange) {return new TopicExchange(exchange,true,  // durable: 持久化交換機,重啟后不丟失false); // exclusive: 非排他性,允許多個連接使用}}/*** Kafka 模式消息發送器配置*/@Configuration@ConditionalOnProperty(prefix = "moyun.websocket", name = "sender-type", havingValue = "kafka")public class KafkaWebSocketMessageSenderConfiguration {@Beanpublic KafkaWebSocketMessageSender kafkaWebSocketMessageSender(WebSocketSessionManager sessionManager, KafkaTemplate<Object, Object> kafkaTemplate,@Value("${moyun.websocket.sender-kafka.topic}") String topic) {return new KafkaWebSocketMessageSender(sessionManager, kafkaTemplate, topic);}@Beanpublic KafkaWebSocketMessageConsumer kafkaWebSocketMessageConsumer(KafkaWebSocketMessageSender kafkaWebSocketMessageSender) {return new KafkaWebSocketMessageConsumer(kafkaWebSocketMessageSender);}}}

  2. 認證與攔截:

    • TokenAuthenticationFilter: 解析 WebSocket URL 中的 token 參數,驗證用戶身份,構建 LoginUser 并存儲到 Spring Security 上下文中。
    • /*** Token 過濾器,驗證 token 的有效性* 驗證通過后,獲得 {@link LoginUser} 信息,并加入到 Spring Security 上下文*/
      @RequiredArgsConstructor
      public class TokenAuthenticationFilter extends OncePerRequestFilter {private final SecurityProperties securityProperties;private final GlobalExceptionHandler globalExceptionHandler;private final OAuth2TokenApi oauth2TokenApi;@Override@SuppressWarnings("NullableProblems")protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain)throws ServletException, IOException {String token = SecurityFrameworkUtils.obtainAuthorization(request,securityProperties.getTokenHeader(), securityProperties.getTokenParameter());if (StrUtil.isNotEmpty(token)) {Integer userType = WebFrameworkUtils.getLoginUserType(request);try {// 1.1 基于 token 構建登錄用戶LoginUser loginUser = buildLoginUserByToken(token, userType);// 1.2 模擬 Login 功能,方便日常開發調試if (loginUser == null) {loginUser = mockLoginUser(request, token, userType);}// 2. 設置當前用戶if (loginUser != null) {SecurityFrameworkUtils.setLoginUser(loginUser, request);}} catch (Throwable ex) {CommonResult<?> result = globalExceptionHandler.allExceptionHandler(request, ex);ServletUtils.writeJSON(response, result);return;}}// 繼續過濾鏈chain.doFilter(request, response);}private LoginUser buildLoginUserByToken(String token, Integer userType) {try {OAuth2AccessTokenCheckRespDTO accessToken = oauth2TokenApi.checkAccessToken(token);if (accessToken == null) {return null;}// 用戶類型不匹配,無權限// 注意:只有 /admin-api/* 和 /app-api/* 有 userType,才需要比對用戶類型// 類似 WebSocket 的 /ws/* 連接地址,是不需要比對用戶類型的if (userType != null&& ObjectUtil.notEqual(accessToken.getUserType(), userType)) {throw new AccessDeniedException("錯誤的用戶類型");}// 構建登錄用戶return new LoginUser().setId(accessToken.getUserId()).setUserType(accessToken.getUserType()).setInfo(accessToken.getUserInfo()) // 額外的用戶信息.setTenantId(accessToken.getTenantId()).setScopes(accessToken.getScopes()).setExpiresTime(accessToken.getExpiresTime());} catch (ServiceException serviceException) {// 校驗 Token 不通過時,考慮到一些接口是無需登錄的,所以直接返回 null 即可return null;}}/*** 模擬登錄用戶,方便日常開發調試** 注意,在線上環境下,一定要關閉該功能!!!** @param request 請求* @param token 模擬的 token,格式為 {@link SecurityProperties#getMockSecret()} + 用戶編號* @param userType 用戶類型* @return 模擬的 LoginUser*/private LoginUser mockLoginUser(HttpServletRequest request, String token, Integer userType) {if (!securityProperties.getMockEnable()) {return null;}// 必須以 mockSecret 開頭if (!token.startsWith(securityProperties.getMockSecret())) {return null;}// 構建模擬用戶Long userId = Long.valueOf(token.substring(securityProperties.getMockSecret().length()));return new LoginUser().setId(userId).setUserType(userType).setTenantId(WebFrameworkUtils.getTenantId(request));}}
    • LoginUserHandshakeInterceptor: 在 WebSocket 握手階段將 LoginUser 存入 WebSocketSession 的 attributes。
    • /*** 登錄用戶的 {@link HandshakeInterceptor} 實現類** 流程如下:* 1. 前端連接 websocket 時,會通過拼接 ?token={token} 到 ws:// 連接后,這樣它可以被 {@link TokenAuthenticationFilter} 所認證通過* 2. {@link LoginUserHandshakeInterceptor} 負責把 {@link LoginUser} 添加到 {@link WebSocketSession} 中*/
      public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Map<String, Object> attributes) {LoginUser loginUser = SecurityFrameworkUtils.getLoginUser();if (loginUser != null) {WebSocketFrameworkUtils.setLoginUser(loginUser, attributes);}return true;}@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Exception exception) {// do nothing}}
  3. 會話管理:

    • WebSocketSessionHandlerDecorator: 裝飾 WebSocketHandler,在連接建立/關閉時管理 WebSocketSession。
    • /*** {@link WebSocketHandler} 的裝飾類,實現了以下功能:** 1. {@link WebSocketSession} 連接或關閉時,使用 {@link #sessionManager} 進行管理* 2. 封裝 {@link WebSocketSession} 支持并發操作*/
      public class WebSocketSessionHandlerDecorator extends WebSocketHandlerDecorator {/*** 發送時間的限制,單位:毫秒* 這里定義了發送消息的時間限制為 5 秒(1000 毫秒 * 5),用于控制消息發送的時間范圍,* 可能是為了防止長時間的消息發送操作,避免資源占用或其他潛在問題。*/private static final Integer SEND_TIME_LIMIT = 1000 * 5;/*** 發送消息緩沖上限,單位:bytes* 定義了發送消息的緩沖大小上限為 1024 * 100 字節,用于限制消息緩沖的大小,* 防止緩沖過大導致內存占用過多等問題。*/private static final Integer BUFFER_SIZE_LIMIT = 1024 * 100;// WebSocket 會話管理器,用于管理 WebSocket 會話private final WebSocketSessionManager sessionManager;/*** 構造函數,接收被裝飾的 WebSocketHandler 和 WebSocketSessionManager** @param delegate      被裝飾的 WebSocketHandler* @param sessionManager WebSocket 會話管理器*/public WebSocketSessionHandlerDecorator(WebSocketHandler delegate,WebSocketSessionManager sessionManager) {// 調用父類構造函數,傳入被裝飾的 WebSocketHandlersuper(delegate);// 初始化會話管理器this.sessionManager = sessionManager;}/*** 當 WebSocket 連接建立時調用此方法** @param session WebSocket 會話*/@Overridepublic void afterConnectionEstablished(WebSocketSession session) {// 實現 session 支持并發,可參考 https://blog.csdn.net/abu935009066/article/details/131218149// 使用定義的時間限制和緩沖大小限制,創建一個支持并發的 WebSocketSession 裝飾類實例session = new ConcurrentWebSocketSessionDecorator(session, SEND_TIME_LIMIT, BUFFER_SIZE_LIMIT);// 將新的會話添加到 WebSocketSessionManager 中進行管理sessionManager.addSession(session);}/*** 當 WebSocket 連接關閉時調用此方法** @param session    WebSocket 會話* @param closeStatus 關閉狀態*/@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {// 從 WebSocketSessionManager 中移除對應的會話,完成連接關閉時的會話管理操作sessionManager.removeSession(session);}
      }
    • WebSocketSessionManager 和 WebSocketSessionManagerImpl:管理會話,支持按用戶類型和 ID 查詢。
    • /*** {@link WebSocketSession} 管理器的接口*/
      public interface WebSocketSessionManager {/*** 添加 Session** @param session Session*/void addSession(WebSocketSession session);/*** 移除 Session** @param session Session*/void removeSession(WebSocketSession session);/*** 獲得指定編號的 Session** @param id Session 編號* @return Session*/WebSocketSession getSession(String id);/*** 獲得指定用戶類型的 Session 列表** @param userType 用戶類型* @return Session 列表*/Collection<WebSocketSession> getSessionList(Integer userType);/*** 獲得指定用戶編號的 Session 列表** @param userType 用戶類型* @param userId 用戶編號* @return Session 列表*/Collection<WebSocketSession> getSessionList(Integer userType, Long userId);}
      /*** 默認的 {@link WebSocketSessionManager} 實現類*/
      public class WebSocketSessionManagerImpl implements WebSocketSessionManager {/*** id 與 WebSocketSession 映射** key:Session 編號*/private final ConcurrentMap<String, WebSocketSession> idSessions = new ConcurrentHashMap<>();/*** user 與 WebSocketSession 映射** key1:用戶類型* key2:用戶編號*/private final ConcurrentMap<Integer, ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>>> userSessions= new ConcurrentHashMap<>();@Overridepublic void addSession(WebSocketSession session) {// 添加到 idSessions 中idSessions.put(session.getId(), session);// 添加到 userSessions 中LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);if (user == null) {return;}ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());if (userSessionsMap == null) {userSessionsMap = new ConcurrentHashMap<>();if (userSessions.putIfAbsent(user.getUserType(), userSessionsMap) != null) {userSessionsMap = userSessions.get(user.getUserType());}}CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());if (sessions == null) {sessions = new CopyOnWriteArrayList<>();if (userSessionsMap.putIfAbsent(user.getId(), sessions) != null) {sessions = userSessionsMap.get(user.getId());}}sessions.add(session);}@Overridepublic void removeSession(WebSocketSession session) {// 移除從 idSessions 中idSessions.remove(session.getId());// 移除從 idSessions 中LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);if (user == null) {return;}ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());if (userSessionsMap == null) {return;}CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());sessions.removeIf(session0 -> session0.getId().equals(session.getId()));if (CollUtil.isEmpty(sessions)) {userSessionsMap.remove(user.getId(), sessions);}}@Overridepublic WebSocketSession getSession(String id) {return idSessions.get(id);}@Overridepublic Collection<WebSocketSession> getSessionList(Integer userType) {ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);if (CollUtil.isEmpty(userSessionsMap)) {return new ArrayList<>();}LinkedList<WebSocketSession> result = new LinkedList<>(); // 避免擴容Long contextTenantId = TenantContextHolder.getTenantId();for (List<WebSocketSession> sessions : userSessionsMap.values()) {if (CollUtil.isEmpty(sessions)) {continue;}// 特殊:如果租戶不匹配,則直接排除if (contextTenantId != null) {Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0));if (!contextTenantId.equals(userTenantId)) {continue;}}result.addAll(sessions);}return result;}@Overridepublic Collection<WebSocketSession> getSessionList(Integer userType, Long userId) {ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);if (CollUtil.isEmpty(userSessionsMap)) {return new ArrayList<>();}CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(userId);return CollUtil.isNotEmpty(sessions) ? new ArrayList<>(sessions) : new ArrayList<>();}}
  4. 消息處理:

    • JsonWebSocketMessageHandler:解析 JsonWebSocketMessage,根據 type 分發給 WebSocketMessageListener。
    • DemoWebSocketMessageListener: 處理 demo-message-send 類型的消息,支持單發和群發。
    • JsonWebSocketMessage:包含 type 和 content 字段。
  5. 消息發送:

    • WebSocketMessageSender: 定義消息發送接口。
    • AbstractWebSocketMessageSender: 實現消息發送邏輯,查詢會話并發送 JsonWebSocketMessage。
    • LocalWebSocketMessageSender: 本地發送實現,適合單機場景。
  6. 配置websocket的Ss權限
/*** WebSocket 的權限自定義* 負責為 WebSocket 端點的 HTTP 握手請求配置 Spring Security 權限,通過 permitAll() 確保握手請求不被阻止* 同時保留自定義的 token 認證邏輯(由 TokenAuthenticationFilter 和 LoginUserHandshakeInterceptor 處理)* 它解決了 Spring Security 對 WebSocket 握手請求的限制問題,是集成 Spring Security 的 WebSocket 功能的關鍵組件*/
@RequiredArgsConstructor
public class WebSocketAuthorizeRequestsCustomizer extends AuthorizeRequestsCustomizer {private final WebSocketProperties webSocketProperties;@Overridepublic void customize(AuthorizeHttpRequestsConfigurer<HttpSecurity>.AuthorizationManagerRequestMatcherRegistry registry) {registry.requestMatchers(webSocketProperties.getPath()).permitAll();}}

前端代碼

  • InfraWebSocket.vue: 使用 @vueuse/core 的 useWebSocket 建立連接,發送/接收 JsonWebSocketMessage,支持單發和群發消息。
  • 依賴用戶列表 API(UserApi.getSimpleUserList)和 token 獲取邏輯(getRefreshToken)。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/82887.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/82887.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/82887.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【NextPilot日志移植】params.c解析

params.c 參數說明 params.c 文件的主要作用是定義與 SD卡日志記錄 相關的參數。這些參數用于配置日志記錄的行為&#xff0c;包括日志記錄的時間、內容、存儲管理以及加密設置等。 1. UTC 偏移量 (SDLOG_UTC_OFFSET) PARAM_DEFINE_INT32(SDLOG_UTC_OFFSET, 0);用途&#xf…

jFinal 使用 SolonMCP 開發 MCP(擁抱新潮流)

MCP 官方的 java-sdk 目前只支持 java17。直接基于 mcp-java-sdk 也比較復雜。使用 SolonMCP&#xff0c;可以基于 java8 開發&#xff08;像 MVC 的開發風格&#xff09;&#xff0c;且比較簡單。 1、SolonMCP 簡介 SolonMCP&#xff08;全稱&#xff1a;solon-ai-mcp&#…

“端 - 邊 - 云”三級智能協同平臺的理論建構與技術實現

摘要 隨著低空經濟與智能制造的深度融合&#xff0c;傳統集中式云計算架構在實時性、隱私保護和資源效率上的瓶頸日益凸顯。本文提出“端 - 邊 - 云”三級智能協同平臺架構&#xff0c;以“時空 - 資源 - 服務”三維協同理論為核心&#xff0c;構建覆蓋終端感知、邊緣計算、云端…

【如何搭建開發環境】

了解java程序 JAVA體系結構 跨平臺原理與編譯和反編譯 如何學習java語言&#xff0c;如何搭建環境 設置JAVA_HOME&#xff0c;指向jdk的安裝目錄這一級即可。比如我的JDK安裝在C:\java\jdk1.8.0_25&#xff0c;那JAVA_HOME的值就是C:\java\jdk1.8.0_25設置Path變量 在Path值后…

LegoGPT,卡內基梅隆大學推出的樂高積木設計模型

LegoGPT 是由卡內基梅隆大學開發的一款創新性樂高積木設計模型&#xff0c;能夠根據用戶的文本提示生成結構穩固、可組裝的樂高模型。該模型基于自回歸語言模型和大規模樂高設計數據集進行訓練&#xff0c;用戶只需輸入簡單的文字描述&#xff0c;LegoGPT 就能逐步構建出物理穩…

深入理解 NumPy:Python 科學計算的基石

在數據科學、人工智能和科學計算的世界里&#xff0c;NumPy 是一塊繞不過去的基石。它是 Python 語言中用于高性能科學計算的基礎包&#xff0c;幾乎所有的數據分析與機器學習框架&#xff08;如 Pandas、TensorFlow、Scikit-learn&#xff09;都離不開它的支持。 一、什么是 …

Java基礎(IO)

所有操作都在內存&#xff0c;不能長時間保存&#xff0c;IO主要在硬盤&#xff0c;可以長時間保存。 一、File類 File類被定義為文件和目錄路徑名的抽象表示形式&#xff0c;這是因為 File 類既可以表示文件也可以表示目錄&#xff0c;他們都通過對應的路徑來描述。 提供構…

仿正點原子驅動BMP280氣壓傳感器實例

文章目錄 前言 一、寄存器頭文件定義 二、設備樹文件中添加節點 三、驅動文件編寫 四、編寫驅動測試文件并編譯測試 總結 前言 本文驅動開發仿照正點原子的iic驅動實現&#xff0c;同時附上bmp280的數據手冊&#xff0c;可訪問下面的鏈接&#xff1a; BMP280_Bosch(博世…

論壇系統(中-1)

軟件開發 編寫公共代碼 定義狀態碼 對執?業務處理邏輯過程中可能出現的成功與失敗狀態做針對性描述(根據需求分析階段可以遇見的問題提前做出定義)&#xff0c;?枚舉定義狀態碼&#xff0c;先定義?部分&#xff0c;業務中遇到新的問題再添加 定義狀態碼如下 狀態碼類型描…

E+H流量計通過Profibus DP主站轉Modbus TCP網關與上位機輕松通訊

EH流量計通過Profibus DP主站轉Modbus TCP網關與上位機輕松通訊 在現代工業自動化的廣闊舞臺上&#xff0c;Profibus DP與Modbus TCP這兩種通信協議各領風騷&#xff0c;它們在不同的應用場景中發揮著舉足輕重的作用。但工業生產的復雜性往往要求不同設備、系統之間能夠順暢溝…

服務器中存儲空間不足該怎么辦?

服務器作為存儲數據信息的重要網絡設備&#xff0c;隨著企業業務的不斷拓展&#xff0c;所需要存儲的數據信息也在不斷增加&#xff0c;最終會導致服務器中存儲空間不足&#xff0c;這不僅會影響到服務器系統性能&#xff0c;還會造成業務無法正常執行&#xff0c;那么&#xf…

C++23 views::chunk_by (P2443R1) 詳解

文章目錄 引言C23 范圍庫概述范圍視圖&#xff08;Range Views&#xff09;范圍算法&#xff08;Range Algorithms&#xff09;范圍適配器&#xff08;Range Adapters&#xff09; std::views::chunk_by 介紹基本概念特性使用場景 示例代碼簡單示例自定義謂詞示例 總結 引言 在…

零碳園區能源系統-多能互補體系

構建以可再生能源為核心的零碳園區能源系統&#xff0c;需整合光儲直柔、光伏發電、微電網、氫能與儲能技術&#xff0c;通過多能協同與智能調控實現能源生產、存儲、消費全鏈條優化。以下是系統性實施方案&#xff1a; 一、系統架構設計 1. 多能互補體系 &#xff08;圖示&a…

elastic search學習

首先在自己電腦上安裝elastic search。安裝成功后&#xff0c;查看ES是否啟動成功。 安裝過程參考&#xff1a;ElasticSearch入門1: mac 安裝 - 霜井 - 博客園 安裝完成后&#xff0c;直接執行bin目錄中的elastic search命令后&#xff0c;就可以啟動成功&#xff01; 在網頁…

mysql8常用sql語句

查詢結果帶行號 -- 表名為 mi_user&#xff0c; 假設包含列 id &#xff0c;address SELECT ROW_NUMBER() OVER (ORDER BY id) AS row_num, t.id, t.address FROM mi_user t ; SELECT ROW_NUMBER() OVER ( ) AS row_num, t.id, t.address FROM mi_user t ; 更新某列數…

Memcached 服務搭建和集成使用的詳細步驟示例

以下是 Memcached 服務搭建和集成使用的詳細步驟示例&#xff1a; 一、搭建 Memcached 服務 安裝 Memcached Linux 系統 yum 安裝&#xff1a;執行命令 yum install -y memcached memcached-devel。源碼安裝 下載源碼&#xff1a;wget http://www.memcached.org/files/memcach…

2. 盒模型/布局模塊 - 響應式產品展示頁_案例:電商產品網格布局

2. 盒模型/布局模塊 - 響應式產品展示頁 案例&#xff1a;電商產品網格布局 <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title></head><style type"text/css">:root {--primary-color…

Go基于plugin的熱更新初體驗

背景 對于一個部署在生產環境的項目來說&#xff0c;我們希望當代碼出現bug的時候&#xff0c;可以不用重啟進程而達到動態修改代碼的目的—— 這就是代碼熱部署&#xff01; 使用java做游戲服務器&#xff0c;最大的好處是&#xff0c;當代碼出現bug&#xff0c;可以直接熱…

【RabbitMQ】工作隊列和發布/訂閱模式的具體實現

文章目錄 建立連接工作隊列模式實現創建隊列和交換機生產者代碼消費者代碼運行程序啟動消費者啟動生產者 發布/訂閱模式實現創建隊列和交換機生產者代碼創建交換機聲明兩個隊列綁定隊列和交換機發送消息完整代碼 消費者代碼完整代碼 運行程序啟動生產者啟動消費者 建立連接 我…

Codeforces Round 998 (Div. 3)

A. Fibonacciness 題目大意 給你四個數字abde&#xff0c;讓你找到一個中間值c&#xff0c;問 a b c a b c abc &#xff0c; b c d b c d bcd &#xff0c; c d e c d e cde 最多能有幾個式子成立 解題思路 顯然最多就六種情況&#xff0c;暴力枚舉即可 代…