1.純Websocket實現消息發送
1.1一對一發送
前端
-
用戶在輸入框輸入消息內容(
sendText
) -
選擇特定接收用戶(
sendUserId
) -
點擊發送按鈕觸發
handlerSend
方法 -
構造消息內容JSON:
{text: "Hello", // 消息內容toUserId: 123 // 目標用戶ID }
-
包裝為WebSocket標準格式:
{type: "demo-message-send", // 消息類型content: '{"text":"Hello","toUserId":123}' // 字符串化的內容 }
-
通過
send()
方法發送
- 前端在
setup
函數中,使用useWebSocket
方法,根據server
變量(WebSocket 服務地址)建立連接。server
地址由VITE_BASE_URL
(環境變量)、/infra/ws
路徑和token
(通過getRefreshToken
獲取)組成。 - 設置
autoReconnect
為true
,表示自動重連;heartbeat
為true
,表示開啟心跳機制。
- 當用戶在前端輸入消息并點擊發送按鈕時,
handlerSend
函數被調用。 - 首先將發送內容
sendText
和接收用戶sendUserId
進行 JSON 化處理,構建消息內容messageContent
。 - 然后將消息類型
type
(demo-message-send
)和消息內容messageContent
再次 JSON 化,形成最終的消息jsonMessage
。 - 最后使用
send
函數將jsonMessage
發送到后端。
const server = ref((import.meta.env.VITE_BASE_URL + '/infra/ws').replace('http', 'ws') +'?token=' +getRefreshToken() // 使用 getRefreshToken() 方法,而不使用 getAccessToken() 方法的原因:WebSocket 無法方便的刷新訪問令牌
) // WebSocket 服務地址
const getIsOpen = computed(() => status.value === 'OPEN') // WebSocket 連接是否打開
const getTagColor = computed(() => (getIsOpen.value ? 'success' : 'red')) // WebSocket 連接的展示顏色/** 發起 WebSocket 連接 */
const { status, data, send, close, open } = useWebSocket(server.value, {autoReconnect: true,heartbeat: true
})
/** 發送消息 */
const sendText = ref('') // 發送內容
const sendUserId = ref('') // 發送人
const handlerSend = () => {// 1.1 先 JSON 化 message 消息內容const messageContent = JSON.stringify({text: sendText.value,toUserId: sendUserId.value})// 1.2 再 JSON 化整個消息const jsonMessage = JSON.stringify({type: 'demo-message-send',content: messageContent})// 2. 最后發送消息send(jsonMessage)sendText.value = ''
}
后端
- 注冊監聽器:
DemoWebSocketMessageListener
?類通過實現?WebSocketMessageListener<DemoSendMessage>
?接口,并使用?@Component
?注解將自己注冊為 Spring Bean。框架啟動時會掃描所有實現了該接口的 Bean,并將它們注冊到消息處理器中。
/*** WebSocket 示例:單發消息*/
@Component
public class DemoWebSocketMessageListener implements WebSocketMessageListener<DemoSendMessage> {@Resourceprivate WebSocketMessageSender webSocketMessageSender;@Overridepublic void onMessage(WebSocketSession session, DemoSendMessage message) {Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session);// 情況一:單發if (message.getToUserId() != null) {DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(true);webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(), // 給指定用戶"demo-message-receive", toMessage);return;}// 情況二:群發DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(false);webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 給所有用戶"demo-message-receive", toMessage);}@Overridepublic String getType() {return "demo-message-send";}}
- 消息類型綁定:
getType()
?方法返回?"demo-message-send"
,這表明該監聽器專門處理類型為?"demo-message-send"
?的消息。當后端接收到消息時,會根據消息類型路由到對應的監聽器。
當 WebSocket 服務器接收到消息后:
- 消息解析:框架首先解析消息的 JSON 格式,提取?
type
?字段(如?"demo-message-send"
)。 - 類型匹配:后端框架會自動將?
type
?為?"demo-message-send"
?的消息路由到?DemoWebSocketMessageListener
?的?onMessage
?方法。 - 調用回調:將消息反序列化為?
DemoSendMessage
?對象,并調用監聽器的?onMessage
?方法。
/*** JSON 格式 {@link WebSocketHandler} 實現類* 基于 {@link JsonWebSocketMessage#getType()} 消息類型,調度到對應的 {@link WebSocketMessageListener} 監聽器。*/
@Slf4j
public class JsonWebSocketMessageHandler extends TextWebSocketHandler {/*** type 與 WebSocketMessageListener 的映射* 用于存儲不同消息類型對應的監聽器,鍵為消息類型,值為對應的監聽器實例*/private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();@SuppressWarnings({"rawtypes", "unchecked"})public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {// 遍歷傳入的監聽器列表listenersList.forEach((Consumer<WebSocketMessageListener>)listener -> {// 將監聽器的類型(通過 getType() 方法獲取)作為鍵,監聽器實例作為值,存入 listeners 映射中listeners.put(listener.getType(), listener);});}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {// 1.1 空消息,跳過// 如果消息的負載長度為 0,說明是一個空消息,直接返回,不進行后續處理if (message.getPayloadLength() == 0) {return;}// 1.2 ping 心跳消息,直接返回 pong 消息。// 如果消息的負載長度為 4 且負載內容為 "ping",則向客戶端發送 "pong" 消息,表示響應心跳if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) {session.sendMessage(new TextMessage("pong"));return;}// 2.1 解析消息try {// 將文本消息的負載解析為 JsonWebSocketMessage 對象JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);// 如果解析后的消息為空,記錄錯誤日志并返回,不進行后續處理if (jsonMessage == null) {log.error("[handleTextMessage][session({}) message({}) 解析為空]", session.getId(), message.getPayload());return;}// 如果解析后的消息類型為空,記錄錯誤日志并返回,不進行后續處理if (StrUtil.isEmpty(jsonMessage.getType())) {log.error("[handleTextMessage][session({}) message({}) 類型為空]", session.getId(), message.getPayload());return;}// 2.2 獲得對應的 WebSocketMessageListener// 根據消息類型從 listeners 映射中獲取對應的監聽器WebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());// 如果沒有找到對應的監聽器,記錄錯誤日志并返回,不進行后續處理if (messageListener == null) {log.error("[handleTextMessage][session({}) message({}) 監聽器為空]", session.getId(), message.getPayload());return;}// 2.3 處理消息// 獲取監聽器泛型參數類型Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);// 將消息內容解析為對應類型的對象Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);// 獲取當前會話的租戶 IDLong tenantId = WebSocketFrameworkUtils.getTenantId(session);// 執行租戶相關的操作,調用監聽器的 onMessage 方法處理消息TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj));} catch (Throwable ex) {// 如果在處理消息過程中發生異常,記錄錯誤日志log.error("[handleTextMessage][session({}) message({}) 處理異常]", session.getId(), message.getPayload());}}}
WebSocketMessageListener
?之所以能監聽消息,是因為:
- 接口契約:實現?
WebSocketMessageListener
?接口并指定消息類型(getType()
)。 - 框架支持:Spring 框架自動掃描并注冊監聽器,實現消息的解析和分發。
- 類型匹配:前端發送的消息?
type
?與后端監聽器的?getType()
?一致,觸發回調。
這個過程類似于 HTTP 請求的路由機制,只不過 WebSocket 是長連接,需要持續監聽消息。
通常,WebSocket 框架(如 Spring WebSocket)會提供以下核心組件:
- 消息解碼器:將二進制數據轉換為 Java 對象(如?
DemoSendMessage
)。 - 消息路由器:根據消息類型將消息路由到對應的監聽器。
- 會話管理器:維護所有 WebSocket 會話(
WebSocketSession
),并提供獲取用戶信息的工具(如?WebSocketFrameworkUtils.getLoginUserId
)。
- 后端的
DemoWebSocketMessageListener
類實現了WebSocketMessageListener
接口的onMessage
方法。 - 當有消息到達時,
onMessage
方法被調用,從WebSocketSession
中獲取登錄用戶 ID(fromUserId
)。 - 根據消息中的
toUserId
判斷是單發還是群發:- 如果
toUserId
不為空,則創建DemoReceiveMessage
對象,設置fromUserId
、text
和single
為true
,通過webSocketMessageSender
的sendObject
方法將消息發送給指定用戶。 - 如果
toUserId
為空,則創建DemoReceiveMessage
對象,設置fromUserId
、text
和single
為false
,通過webSocketMessageSender
的sendObject
方法將消息發送給所有用戶。
- 如果
-
JsonWebSocketMessageHandler
接收并解析消息 -
根據
type="demo-message-send"
找到DemoWebSocketMessageListener
-
調用
onMessage
方法:-
從Session中獲取發送者ID(
fromUserId
) -
檢查
message.getToUserId()
不為null,進入單發邏輯
-
-
構造響應消息:
new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(true)
-
通過
webSocketMessageSender
發送給指定用戶:webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 用戶類型message.getToUserId(), // 目標用戶ID"demo-message-receive", // 消息類型toMessage // 消息內容 )
實際示例:
-
用戶A(ID:100)發送"下午開會"給用戶B(ID:101)
-
前端發送:
{"type":"demo-message-send","content":"{\"text\":\"下午開會\",\"toUserId\":101}"}
-
后端處理后發送給用戶B:
{"type":"demo-message-receive","content":"{\"fromUserId\":100,\"text\":\"下午開會\",\"single\":true}"}
1.2一對多發送
前端
-
用戶在輸入框輸入消息內容(
sendText
) -
不選擇特定用戶(或選擇"所有人")
-
點擊發送按鈕觸發
handlerSend
方法 -
構造消息內容JSON:
{text: "系統維護通知", // 消息內容toUserId: "" // 空表示群發 }
-
包裝為WebSocket標準格式并發送
后端
-
同上接收解析流程
-
onMessage
方法中檢查message.getToUserId()
為null,進入群發邏輯 -
構造響應消息:
new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(false)
-
通過
webSocketMessageSender
發送給所有用戶:webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 用戶類型"demo-message-receive", // 消息類型toMessage // 消息內容 )
實際示例:
-
管理員發送"系統即將升級"給所有用戶
-
前端發送:
{"type":"demo-message-send","content":"{\"text\":\"系統即將升級\",\"toUserId\":\"\"}"}
2.總結及類補充
后端代碼
-
配置類:
- YudaoWebSocketAutoConfiguration: 配置 WebSocket 端點、攔截器、會話管理和消息發送器,支持多種發送類型(local, redis, rocketmq, rabbitmq, kafka)。
- 條件注解 @ConditionalOnProperty 允許通過配置啟用/禁用 WebSocket 或切換發送類型。
- 注冊 WebSocketConfigurer、HandshakeInterceptor、WebSocketHandler 和 WebSocketSessionManager。
-
/*** WebSocket 自動配置類* 負責 WebSocket 服務的初始化和相關組件的注冊*/ @AutoConfiguration(before = YudaoRedisMQConsumerAutoConfiguration.class) // 在 YudaoRedisMQConsumerAutoConfiguration 之前加載,確保 RedisWebSocketMessageConsumer 先創建 @EnableWebSocket // 啟用 Spring WebSocket 支持 @ConditionalOnProperty(prefix = "moyun.websocket", value = "enable", matchIfMissing = true) // 通過配置項 moyun.websocket.enable 控制是否啟用 WebSocket,默認啟用 @EnableConfigurationProperties(WebSocketProperties.class) // 啟用 WebSocket 配置屬性類 public class YudaoWebSocketAutoConfiguration {/*** 配置 WebSocket 處理器和握手攔截器*/@Beanpublic WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor[] handshakeInterceptors,WebSocketHandler webSocketHandler,WebSocketProperties webSocketProperties) {return registry -> registry// 注冊 WebSocket 處理器并指定連接路徑.addHandler(webSocketHandler, webSocketProperties.getPath())// 添加握手攔截器,用于驗證和預處理.addInterceptors(handshakeInterceptors)// 允許所有域名跨域訪問,否則前端連接會被阻止.setAllowedOriginPatterns("*");}/*** 創建握手攔截器,用于在 WebSocket 握手階段進行用戶認證和權限檢查*/@Beanpublic HandshakeInterceptor handshakeInterceptor() {return new LoginUserHandshakeInterceptor();}/*** 創建 WebSocket 消息處理器* 包裝 JsonWebSocketMessageHandler 并添加會話管理功能*/@Beanpublic WebSocketHandler webSocketHandler(WebSocketSessionManager sessionManager,List<? extends WebSocketMessageListener<?>> messageListeners) {// 1. 創建消息處理器,負責消息類型路由和分發JsonWebSocketMessageHandler messageHandler = new JsonWebSocketMessageHandler(messageListeners);// 2. 包裝消息處理器,添加會話管理功能(如連接建立和關閉時的回調)return new WebSocketSessionHandlerDecorator(messageHandler, sessionManager);}/*** 創建 WebSocket 會話管理器,用于管理所有活動的 WebSocket 會話*/@Beanpublic WebSocketSessionManager webSocketSessionManager() {return new WebSocketSessionManagerImpl();}/*** 創建 WebSocket 請求授權自定義器,用于配置安全規則*/@Beanpublic WebSocketAuthorizeRequestsCustomizer webSocketAuthorizeRequestsCustomizer(WebSocketProperties webSocketProperties) {return new WebSocketAuthorizeRequestsCustomizer(webSocketProperties);}// ==================== 消息發送器配置 ====================/*** 本地模式消息發送器配置(單節點部署)*/@Configuration@ConditionalOnProperty(prefix = "moyun.websocket", name = "sender-type", havingValue = "local")public class LocalWebSocketMessageSenderConfiguration {@Beanpublic LocalWebSocketMessageSender localWebSocketMessageSender(WebSocketSessionManager sessionManager) {return new LocalWebSocketMessageSender(sessionManager);}}/*** Redis 模式消息發送器配置(分布式部署)*/@Configuration@ConditionalOnProperty(prefix = "moyun.websocket", name = "sender-type", havingValue = "redis")public class RedisWebSocketMessageSenderConfiguration {@Beanpublic RedisWebSocketMessageSender redisWebSocketMessageSender(WebSocketSessionManager sessionManager,RedisMQTemplate redisMQTemplate) {return new RedisWebSocketMessageSender(sessionManager, redisMQTemplate);}@Beanpublic RedisWebSocketMessageConsumer redisWebSocketMessageConsumer(RedisWebSocketMessageSender redisWebSocketMessageSender) {return new RedisWebSocketMessageConsumer(redisWebSocketMessageSender);}}/*** RocketMQ 模式消息發送器配置*/@Configuration@ConditionalOnProperty(prefix = "moyun.websocket", name = "sender-type", havingValue = "rocketmq")public class RocketMQWebSocketMessageSenderConfiguration {@Beanpublic RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender(WebSocketSessionManager sessionManager, RocketMQTemplate rocketMQTemplate,@Value("${moyun.websocket.sender-rocketmq.topic}") String topic) {return new RocketMQWebSocketMessageSender(sessionManager, rocketMQTemplate, topic);}@Beanpublic RocketMQWebSocketMessageConsumer rocketMQWebSocketMessageConsumer(RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender) {return new RocketMQWebSocketMessageConsumer(rocketMQWebSocketMessageSender);}}/*** RabbitMQ 模式消息發送器配置*/@Configuration@ConditionalOnProperty(prefix = "moyun.websocket", name = "sender-type", havingValue = "rabbitmq")public class RabbitMQWebSocketMessageSenderConfiguration {@Beanpublic RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender(WebSocketSessionManager sessionManager, RabbitTemplate rabbitTemplate,TopicExchange websocketTopicExchange) {return new RabbitMQWebSocketMessageSender(sessionManager, rabbitTemplate, websocketTopicExchange);}@Beanpublic RabbitMQWebSocketMessageConsumer rabbitMQWebSocketMessageConsumer(RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender) {return new RabbitMQWebSocketMessageConsumer(rabbitMQWebSocketMessageSender);}/*** 創建 RabbitMQ 主題交換機,用于消息廣播*/@Beanpublic TopicExchange websocketTopicExchange(@Value("${moyun.websocket.sender-rabbitmq.exchange}") String exchange) {return new TopicExchange(exchange,true, // durable: 持久化交換機,重啟后不丟失false); // exclusive: 非排他性,允許多個連接使用}}/*** Kafka 模式消息發送器配置*/@Configuration@ConditionalOnProperty(prefix = "moyun.websocket", name = "sender-type", havingValue = "kafka")public class KafkaWebSocketMessageSenderConfiguration {@Beanpublic KafkaWebSocketMessageSender kafkaWebSocketMessageSender(WebSocketSessionManager sessionManager, KafkaTemplate<Object, Object> kafkaTemplate,@Value("${moyun.websocket.sender-kafka.topic}") String topic) {return new KafkaWebSocketMessageSender(sessionManager, kafkaTemplate, topic);}@Beanpublic KafkaWebSocketMessageConsumer kafkaWebSocketMessageConsumer(KafkaWebSocketMessageSender kafkaWebSocketMessageSender) {return new KafkaWebSocketMessageConsumer(kafkaWebSocketMessageSender);}}}
-
認證與攔截:
- TokenAuthenticationFilter: 解析 WebSocket URL 中的 token 參數,驗證用戶身份,構建 LoginUser 并存儲到 Spring Security 上下文中。
-
/*** Token 過濾器,驗證 token 的有效性* 驗證通過后,獲得 {@link LoginUser} 信息,并加入到 Spring Security 上下文*/ @RequiredArgsConstructor public class TokenAuthenticationFilter extends OncePerRequestFilter {private final SecurityProperties securityProperties;private final GlobalExceptionHandler globalExceptionHandler;private final OAuth2TokenApi oauth2TokenApi;@Override@SuppressWarnings("NullableProblems")protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain)throws ServletException, IOException {String token = SecurityFrameworkUtils.obtainAuthorization(request,securityProperties.getTokenHeader(), securityProperties.getTokenParameter());if (StrUtil.isNotEmpty(token)) {Integer userType = WebFrameworkUtils.getLoginUserType(request);try {// 1.1 基于 token 構建登錄用戶LoginUser loginUser = buildLoginUserByToken(token, userType);// 1.2 模擬 Login 功能,方便日常開發調試if (loginUser == null) {loginUser = mockLoginUser(request, token, userType);}// 2. 設置當前用戶if (loginUser != null) {SecurityFrameworkUtils.setLoginUser(loginUser, request);}} catch (Throwable ex) {CommonResult<?> result = globalExceptionHandler.allExceptionHandler(request, ex);ServletUtils.writeJSON(response, result);return;}}// 繼續過濾鏈chain.doFilter(request, response);}private LoginUser buildLoginUserByToken(String token, Integer userType) {try {OAuth2AccessTokenCheckRespDTO accessToken = oauth2TokenApi.checkAccessToken(token);if (accessToken == null) {return null;}// 用戶類型不匹配,無權限// 注意:只有 /admin-api/* 和 /app-api/* 有 userType,才需要比對用戶類型// 類似 WebSocket 的 /ws/* 連接地址,是不需要比對用戶類型的if (userType != null&& ObjectUtil.notEqual(accessToken.getUserType(), userType)) {throw new AccessDeniedException("錯誤的用戶類型");}// 構建登錄用戶return new LoginUser().setId(accessToken.getUserId()).setUserType(accessToken.getUserType()).setInfo(accessToken.getUserInfo()) // 額外的用戶信息.setTenantId(accessToken.getTenantId()).setScopes(accessToken.getScopes()).setExpiresTime(accessToken.getExpiresTime());} catch (ServiceException serviceException) {// 校驗 Token 不通過時,考慮到一些接口是無需登錄的,所以直接返回 null 即可return null;}}/*** 模擬登錄用戶,方便日常開發調試** 注意,在線上環境下,一定要關閉該功能!!!** @param request 請求* @param token 模擬的 token,格式為 {@link SecurityProperties#getMockSecret()} + 用戶編號* @param userType 用戶類型* @return 模擬的 LoginUser*/private LoginUser mockLoginUser(HttpServletRequest request, String token, Integer userType) {if (!securityProperties.getMockEnable()) {return null;}// 必須以 mockSecret 開頭if (!token.startsWith(securityProperties.getMockSecret())) {return null;}// 構建模擬用戶Long userId = Long.valueOf(token.substring(securityProperties.getMockSecret().length()));return new LoginUser().setId(userId).setUserType(userType).setTenantId(WebFrameworkUtils.getTenantId(request));}}
- LoginUserHandshakeInterceptor: 在 WebSocket 握手階段將 LoginUser 存入 WebSocketSession 的 attributes。
-
/*** 登錄用戶的 {@link HandshakeInterceptor} 實現類** 流程如下:* 1. 前端連接 websocket 時,會通過拼接 ?token={token} 到 ws:// 連接后,這樣它可以被 {@link TokenAuthenticationFilter} 所認證通過* 2. {@link LoginUserHandshakeInterceptor} 負責把 {@link LoginUser} 添加到 {@link WebSocketSession} 中*/ public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Map<String, Object> attributes) {LoginUser loginUser = SecurityFrameworkUtils.getLoginUser();if (loginUser != null) {WebSocketFrameworkUtils.setLoginUser(loginUser, attributes);}return true;}@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Exception exception) {// do nothing}}
-
會話管理:
- WebSocketSessionHandlerDecorator: 裝飾 WebSocketHandler,在連接建立/關閉時管理 WebSocketSession。
-
/*** {@link WebSocketHandler} 的裝飾類,實現了以下功能:** 1. {@link WebSocketSession} 連接或關閉時,使用 {@link #sessionManager} 進行管理* 2. 封裝 {@link WebSocketSession} 支持并發操作*/ public class WebSocketSessionHandlerDecorator extends WebSocketHandlerDecorator {/*** 發送時間的限制,單位:毫秒* 這里定義了發送消息的時間限制為 5 秒(1000 毫秒 * 5),用于控制消息發送的時間范圍,* 可能是為了防止長時間的消息發送操作,避免資源占用或其他潛在問題。*/private static final Integer SEND_TIME_LIMIT = 1000 * 5;/*** 發送消息緩沖上限,單位:bytes* 定義了發送消息的緩沖大小上限為 1024 * 100 字節,用于限制消息緩沖的大小,* 防止緩沖過大導致內存占用過多等問題。*/private static final Integer BUFFER_SIZE_LIMIT = 1024 * 100;// WebSocket 會話管理器,用于管理 WebSocket 會話private final WebSocketSessionManager sessionManager;/*** 構造函數,接收被裝飾的 WebSocketHandler 和 WebSocketSessionManager** @param delegate 被裝飾的 WebSocketHandler* @param sessionManager WebSocket 會話管理器*/public WebSocketSessionHandlerDecorator(WebSocketHandler delegate,WebSocketSessionManager sessionManager) {// 調用父類構造函數,傳入被裝飾的 WebSocketHandlersuper(delegate);// 初始化會話管理器this.sessionManager = sessionManager;}/*** 當 WebSocket 連接建立時調用此方法** @param session WebSocket 會話*/@Overridepublic void afterConnectionEstablished(WebSocketSession session) {// 實現 session 支持并發,可參考 https://blog.csdn.net/abu935009066/article/details/131218149// 使用定義的時間限制和緩沖大小限制,創建一個支持并發的 WebSocketSession 裝飾類實例session = new ConcurrentWebSocketSessionDecorator(session, SEND_TIME_LIMIT, BUFFER_SIZE_LIMIT);// 將新的會話添加到 WebSocketSessionManager 中進行管理sessionManager.addSession(session);}/*** 當 WebSocket 連接關閉時調用此方法** @param session WebSocket 會話* @param closeStatus 關閉狀態*/@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {// 從 WebSocketSessionManager 中移除對應的會話,完成連接關閉時的會話管理操作sessionManager.removeSession(session);} }
- WebSocketSessionManager 和 WebSocketSessionManagerImpl:管理會話,支持按用戶類型和 ID 查詢。
-
/*** {@link WebSocketSession} 管理器的接口*/ public interface WebSocketSessionManager {/*** 添加 Session** @param session Session*/void addSession(WebSocketSession session);/*** 移除 Session** @param session Session*/void removeSession(WebSocketSession session);/*** 獲得指定編號的 Session** @param id Session 編號* @return Session*/WebSocketSession getSession(String id);/*** 獲得指定用戶類型的 Session 列表** @param userType 用戶類型* @return Session 列表*/Collection<WebSocketSession> getSessionList(Integer userType);/*** 獲得指定用戶編號的 Session 列表** @param userType 用戶類型* @param userId 用戶編號* @return Session 列表*/Collection<WebSocketSession> getSessionList(Integer userType, Long userId);}
/*** 默認的 {@link WebSocketSessionManager} 實現類*/ public class WebSocketSessionManagerImpl implements WebSocketSessionManager {/*** id 與 WebSocketSession 映射** key:Session 編號*/private final ConcurrentMap<String, WebSocketSession> idSessions = new ConcurrentHashMap<>();/*** user 與 WebSocketSession 映射** key1:用戶類型* key2:用戶編號*/private final ConcurrentMap<Integer, ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>>> userSessions= new ConcurrentHashMap<>();@Overridepublic void addSession(WebSocketSession session) {// 添加到 idSessions 中idSessions.put(session.getId(), session);// 添加到 userSessions 中LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);if (user == null) {return;}ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());if (userSessionsMap == null) {userSessionsMap = new ConcurrentHashMap<>();if (userSessions.putIfAbsent(user.getUserType(), userSessionsMap) != null) {userSessionsMap = userSessions.get(user.getUserType());}}CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());if (sessions == null) {sessions = new CopyOnWriteArrayList<>();if (userSessionsMap.putIfAbsent(user.getId(), sessions) != null) {sessions = userSessionsMap.get(user.getId());}}sessions.add(session);}@Overridepublic void removeSession(WebSocketSession session) {// 移除從 idSessions 中idSessions.remove(session.getId());// 移除從 idSessions 中LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);if (user == null) {return;}ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());if (userSessionsMap == null) {return;}CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());sessions.removeIf(session0 -> session0.getId().equals(session.getId()));if (CollUtil.isEmpty(sessions)) {userSessionsMap.remove(user.getId(), sessions);}}@Overridepublic WebSocketSession getSession(String id) {return idSessions.get(id);}@Overridepublic Collection<WebSocketSession> getSessionList(Integer userType) {ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);if (CollUtil.isEmpty(userSessionsMap)) {return new ArrayList<>();}LinkedList<WebSocketSession> result = new LinkedList<>(); // 避免擴容Long contextTenantId = TenantContextHolder.getTenantId();for (List<WebSocketSession> sessions : userSessionsMap.values()) {if (CollUtil.isEmpty(sessions)) {continue;}// 特殊:如果租戶不匹配,則直接排除if (contextTenantId != null) {Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0));if (!contextTenantId.equals(userTenantId)) {continue;}}result.addAll(sessions);}return result;}@Overridepublic Collection<WebSocketSession> getSessionList(Integer userType, Long userId) {ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);if (CollUtil.isEmpty(userSessionsMap)) {return new ArrayList<>();}CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(userId);return CollUtil.isNotEmpty(sessions) ? new ArrayList<>(sessions) : new ArrayList<>();}}
-
消息處理:
- JsonWebSocketMessageHandler:解析 JsonWebSocketMessage,根據 type 分發給 WebSocketMessageListener。
- DemoWebSocketMessageListener: 處理 demo-message-send 類型的消息,支持單發和群發。
- JsonWebSocketMessage:包含 type 和 content 字段。
-
消息發送:
- WebSocketMessageSender: 定義消息發送接口。
- AbstractWebSocketMessageSender: 實現消息發送邏輯,查詢會話并發送 JsonWebSocketMessage。
- LocalWebSocketMessageSender: 本地發送實現,適合單機場景。
- 配置websocket的Ss權限
/*** WebSocket 的權限自定義* 負責為 WebSocket 端點的 HTTP 握手請求配置 Spring Security 權限,通過 permitAll() 確保握手請求不被阻止* 同時保留自定義的 token 認證邏輯(由 TokenAuthenticationFilter 和 LoginUserHandshakeInterceptor 處理)* 它解決了 Spring Security 對 WebSocket 握手請求的限制問題,是集成 Spring Security 的 WebSocket 功能的關鍵組件*/
@RequiredArgsConstructor
public class WebSocketAuthorizeRequestsCustomizer extends AuthorizeRequestsCustomizer {private final WebSocketProperties webSocketProperties;@Overridepublic void customize(AuthorizeHttpRequestsConfigurer<HttpSecurity>.AuthorizationManagerRequestMatcherRegistry registry) {registry.requestMatchers(webSocketProperties.getPath()).permitAll();}}
前端代碼
- InfraWebSocket.vue: 使用 @vueuse/core 的 useWebSocket 建立連接,發送/接收 JsonWebSocketMessage,支持單發和群發消息。
- 依賴用戶列表 API(UserApi.getSimpleUserList)和 token 獲取邏輯(getRefreshToken)。