一、這個管理系統是基于若依框架,配置webSocKet的maven依賴
<!--websocket--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
二、配置類配置webSocket的端點和相關的參數
1、WebSocketConfig - webSocket配置類
注意:ws://yourdomain:port/ws/order?token=yourTokenValue。
可以使用cpolar 工具把IP地址解析成可訪問的域名。
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Autowiredprivate WebSocketHandler webSocketHandler;/*** 注冊websocket的端點* 客戶端連接格式: ws://yourdomain:port/ws/order?token=yourTokenValue* token參數必須提供,系統會通過token從Redis獲取對應的openId用于用戶識別* @param registry WebSocketHandlerRegistry*/@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(webSocketHandler, "/ws/order").setAllowedOrigins("*"); // 允許跨域訪問}/*** 配置WebSocket服務器的參數* 包括:連接超時時間、心跳超時時間、最大消息大小等* @return ServletServerContainerFactoryBean*/@Beanpublic ServletServerContainerFactoryBean createWebSocketContainer() {ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();// 設置異步發送超時時間為25秒container.setAsyncSendTimeout(25000L);// 設置最大會話空閑時間為60秒container.setMaxSessionIdleTimeout(60000L);// 設置最大文本消息緩沖區大小為8KBcontainer.setMaxTextMessageBufferSize(8192);// 設置最大二進制消息緩沖區大小為8KBcontainer.setMaxBinaryMessageBufferSize(8192);return container;}
}
2、WebSocketHandler - webSocket處理器
?
@Component
@Slf4j
public class WebSocketHandler extends TextWebSocketHandler {@Autowiredprivate StringRedisTemplate stringRedisTemplate;// 用線程安全的集合來管理所有連接的 WebSocket 會話private static final Set<WebSocketSession> sessions = new CopyOnWriteArraySet<>();// 使用ConcurrentHashMap來存儲openId到session的映射關系private static final Map<String, WebSocketSession> userSessions = new ConcurrentHashMap<>();// 使用ConcurrentHashMap來存儲session到openId的映射關系(反向映射)private static final Map<WebSocketSession, String> sessionUsers = new ConcurrentHashMap<>();// 記錄每個session最后一次活躍時間private static final Map<String, Long> sessionLastActiveTime = new ConcurrentHashMap<>();// 心跳檢查的定時任務執行器private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();// 心跳超時時間,單位毫秒private static final long HEARTBEAT_TIMEOUT = 50000L; // 50秒// 用于解析JSON的對象映射器private static final ObjectMapper objectMapper = new ObjectMapper();/*** 構造方法,啟動心跳檢測任務*/public WebSocketHandler() {// 每15秒檢查一次心跳heartbeatScheduler.scheduleAtFixedRate(this::checkHeartbeats, 15, 15, TimeUnit.SECONDS);}/*** 心跳檢查方法,清理那些超時的連接*/private void checkHeartbeats() {long currentTime = System.currentTimeMillis();for (Map.Entry<String, Long> entry : sessionLastActiveTime.entrySet()) {String openId = entry.getKey();long lastActive = entry.getValue();// 如果超過超時時間沒有活動,則關閉會話if (currentTime - lastActive > HEARTBEAT_TIMEOUT) {WebSocketSession session = userSessions.get(openId);if (session != null && session.isOpen()) {try {log.warn("會話心跳超時,主動斷開連接 - openId: {}, 上次活躍: {}ms前", openId, currentTime - lastActive);session.close(CloseStatus.NORMAL);} catch (IOException e) {log.error("關閉超時WebSocket會話異常 - openId: {}, 錯誤: {}", openId, e.getMessage());} finally {// 確保從會話映射中移除sessions.remove(session);sessionUsers.remove(session);userSessions.remove(openId);sessionLastActiveTime.remove(openId);}} else {// 會話已關閉或不存在,直接清理userSessions.remove(openId);sessionLastActiveTime.remove(openId);}}}}/*** 新客戶端連接時,加入到 sessions 集合中* @param session 會話*/@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {sessions.add(session);// 從URL中獲取token參數,格式應為 /ws/order?token=xxxString token = extractToken(session);if (token != null) {// 從Redis中獲取對應的openIdString openId = getOpenIdFromToken(token);if (openId != null) {userSessions.put(openId, session);sessionUsers.put(session, openId);sessionLastActiveTime.put(openId, System.currentTimeMillis()); // 記錄初始活躍時間log.info("WebSocket連接已建立 - token: {}, openId: {}, 當前連接數: {}", token, openId, sessions.size());} else {log.warn("找不到token對應的openId,token可能已過期 - token: {}", token);// 可以選擇關閉這個無效的連接session.close(CloseStatus.NOT_ACCEPTABLE);}} else {log.warn("WebSocket連接未提供token參數,無法識別用戶");// 可以選擇關閉這個無效的連接session.close(CloseStatus.NOT_ACCEPTABLE);}}/*** 客戶端斷開連接時,從 sessions 集合中移除* @param session 會話* @param status*/@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {sessions.remove(session);// 從用戶會話映射中也移除String openId = sessionUsers.remove(session);if (openId != null) {userSessions.remove(openId);sessionLastActiveTime.remove(openId);log.info("WebSocket連接已關閉 - openId: {}, 狀態: {}", openId, status);}}/*** 處理收到的文本消息* 對于心跳消息進行特殊處理*/@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {String openId = sessionUsers.get(session);String payload = message.getPayload();try {// 嘗試解析為JSONJsonNode jsonNode = objectMapper.readTree(payload);// 檢查是否是心跳消息if (jsonNode.has("type") && "ping".equals(jsonNode.get("type").asText())) {// 更新最后活躍時間if (openId != null) {sessionLastActiveTime.put(openId, System.currentTimeMillis());}// 發送pong響應session.sendMessage(new TextMessage("{\"type\":\"pong\",\"time\":" + System.currentTimeMillis() + "}"));return;}} catch (Exception e) {// 不是JSON格式的消息,忽略錯誤繼續處理}// 更新最后活躍時間if (openId != null) {sessionLastActiveTime.put(openId, System.currentTimeMillis());}log.debug("收到消息 - openId: {}, 內容: {}", openId, payload);// 在這里可以添加其他消息處理邏輯}/*** 從WebSocketSession中提取token* @param session WebSocket會話* @return token,如果不存在則返回null*/private String extractToken(WebSocketSession session) {String query = session.getUri().getQuery();if (query != null && query.contains("token=")) {String[] params = query.split("&");for (String param : params) {String[] keyValue = param.split("=");if (keyValue.length == 2 && "token".equals(keyValue[0])) {log.info("WebSocket連接已獲取token - token: {}", keyValue[1]);return keyValue[1];}}}return null;}/*** 從token獲取對應的openId* @param token 用戶token* @return openId,如果token無效則返回null*/private String getOpenIdFromToken(String token) {if (token == null || token.isEmpty()) {return null;}try {// 從Redis中獲取token對應的openIdreturn stringRedisTemplate.opsForValue().get(WECHAT_KEY + token);} catch (Exception e) {log.error("從Redis獲取token信息異常 - token: {}, 錯誤: {}", token, e.getMessage());return null;}}/*** 發送支付成功的通知給所有連接的客戶端* @param message 消息體*/public void sendPaymentSuccessNotification(String message) {for (WebSocketSession session : sessions) {try {// 通過 WebSocket 向每個客戶端發送消息session.sendMessage(new TextMessage(message));} catch (IOException e) {log.error("發送支付成功通知失敗", e);}}}/*** 向指定用戶發送消息* @param openId 用戶的openId* @param message 消息內容* @return 是否發送成功*/public boolean sendMessageToUser(String openId, String message) {WebSocketSession session = userSessions.get(openId);if (session != null && session.isOpen()) {try {session.sendMessage(new TextMessage(message));log.info("消息已發送給用戶 - openId: {}", openId);return true;} catch (IOException e) {log.error("發送消息給用戶失敗 - openId: {}", openId, e);return false;}} else {log.info("用戶未通過WebSocket連接 - openId: {}", openId);return false;}}/*** 向所有用戶發送心跳檢測消息*/public void sendHeartbeat() {String heartbeatMsg = "{\"type\":\"heartbeat\",\"time\":" + System.currentTimeMillis() + "}";for (WebSocketSession session : sessions) {if (session.isOpen()) {try {session.sendMessage(new TextMessage(heartbeatMsg));} catch (IOException e) {log.error("發送心跳消息失敗", e);}}}}
}
注意:這里發送消息給指定用戶需要前端傳遞token,獲取存儲在redis中的openId(微信小程序用戶標識)
3、發送消息我定義了一個定時器發送消息和心跳測試
3.1、根據自己業務封裝的消息體
?
@ApiModel(value = "MessageVo",discriminator = "websocket的消息體")
public class MessageVo {@ApiModelProperty(value = "消息標題",dataType = "string")private String title;@ApiModelProperty(value = "消息內容",dataType = "string")private String content;@ApiModelProperty(value = "車牌號碼",dataType = "string")private String plateNumber;@ApiModelProperty(value = "訂單編號",dataType = "string")private String orderNumber;@ApiModelProperty(value = "創建時間",dataType = "date")private Date createTime;}
/*** 定時發送提醒消息給待過磅狀態的用戶* 每1分鐘執行一次,提醒用戶進行過磅操作*/public void sendWeighingReminder() {log.info("開始執行待過磅用戶提醒任務");try {// 查詢所有待過磅的訂單WeighingRecords pendingQuery = new WeighingRecords();pendingQuery.setStatus(0L); // 待過磅List<WeighingRecords> pendingWeighingOrders = weighingRecordsMapper.selectWeighingRecordsList(pendingQuery);// 如果沒有待過磅訂單,直接返回if (pendingWeighingOrders == null || pendingWeighingOrders.isEmpty()) {log.info("沒有查詢到待過磅訂單,跳過發送提醒");return;}log.info("查詢到 {} 條待過磅訂單,開始發送提醒", pendingWeighingOrders.size());int successCount = 0;// 遍歷所有待過磅訂單,發送提醒消息for (WeighingRecords order : pendingWeighingOrders) {// 檢查是否有有效的openIdString openId = order.getOpenId();if (openId == null || openId.trim().isEmpty()) {log.warn("訂單 {} 缺少有效的openId,無法發送提醒", order.getOrderNumber());continue;}// 創建消息體MessageVo messageVo = new MessageVo();messageVo.setTitle("過磅提醒");messageVo.setContent("您有一條待過磅的訂單,請及時前往過磅點進行過磅操作。");messageVo.setOrderNumber(order.getOrderNumber());messageVo.setPlateNumber(order.getPlateNumber()); // 設置車牌號messageVo.setCreateTime(DateUtils.getNowDate());try {// 轉換為JSON字符串String messageJson = objectMapper.writeValueAsString(messageVo);// 直接使用openId發送消息(WebSocketHandler內部會通過openId查找對應的會話)boolean sent = webSocketHandler.sendMessageToUser(openId, messageJson);if (sent) {successCount++;log.info("成功向用戶 {} 發送過磅提醒消息,訂單號: {}", openId, order.getOrderNumber());} else {log.info("用戶 {} 未連接WebSocket,無法發送過磅提醒消息,訂單號: {}", openId, order.getOrderNumber());}} catch (JsonProcessingException e) {log.error("消息序列化異常,訂單號: {}, 錯誤: {}", order.getOrderNumber(), e.getMessage());} catch (Exception e) {log.error("發送消息異常,訂單號: {}, 錯誤: {}", order.getOrderNumber(), e.getMessage());}}log.info("過磅提醒任務完成,共嘗試: {} 條,成功: {} 條", pendingWeighingOrders.size(), successCount);} catch (Exception e) {log.error("過磅提醒任務異常: {}", e.getMessage(), e);}}/*** 定期發送心跳消息,保持WebSocket連接活躍* 每25秒執行一次,低于WebSocketConfig中設置的60秒超時時間*/public void sendHeartbeat() {log.debug("開始執行WebSocket心跳任務");try {webSocketHandler.sendHeartbeat();log.debug("WebSocket心跳消息發送完成");} catch (Exception e) {log.error("WebSocket心跳任務異常: {}", e.getMessage(), e);}}
4、由于這個管理系統是基于若依所以需要配置鑒權,否則會被攔截
這個是部分配置代碼
@Beanprotected SecurityFilterChain filterChain(HttpSecurity httpSecurity) throws Exception{return httpSecurity// CSRF禁用,因為不使用session.csrf(csrf -> csrf.disable())// 禁用HTTP響應標頭.headers((headersCustomizer) -> {headersCustomizer.cacheControl(cache -> cache.disable()).frameOptions(options -> options.sameOrigin());})// 認證失敗處理類.exceptionHandling(exception -> exception.authenticationEntryPoint(unauthorizedHandler))// 基于token,所以不需要session.sessionManagement(session -> session.sessionCreationPolicy(SessionCreationPolicy.STATELESS))// 注解標記允許匿名訪問的url.authorizeHttpRequests((requests) -> {permitAllUrl.getUrls().forEach(url -> requests.antMatchers(url).permitAll());// 對于登錄login 注冊register 驗證碼captchaImage 允許匿名訪問requests.antMatchers("/login", "/register", "/captchaImage","/weiXin/login","/weiXin/returnNotify","/ws/**").permitAll()
..........}
注意:端點配置的是“/ws/order",所以在這了配置為”/ws/**“
三、小程序端的部分代碼配置
注意:需要在路徑上面傳遞token,為了后端獲取openId向指定用戶發送消息
這個是小程序的webSocket的地址示例:“wss://5aa7e45c.r11.cpolar.top/ws/order?token=${this.token}”