需求:
最近在對接一個物聯網里設備,他的通信方式是 websocket 。所以我需要在 springboot框架中集成websocket 依賴,從而實現與設備實時通信!
框架:springboot2.7
java版本:java8
好了,還是直接上代碼
第一步:引入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
第二步寫配置:
package com.agentai.base.config;import com.agentai.base.yumou.webSocket.YuMouDeviceWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;/*** WebSocket配置類* 負責配置WebSocket服務器和注冊WebSocket處理器*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {// 注冊WebSocket處理器,// 允許所有來源的跨域請求registry.addHandler(deviceWebSocketHandler(), "/linker-dev").setAllowedOrigins("*");}@Beanpublic YuMouDeviceWebSocketHandler deviceWebSocketHandler() {return new YuMouDeviceWebSocketHandler();}@Beanpublic ServletServerContainerFactoryBean createWebSocketContainer() {ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();// 設置消息緩沖區大小container.setMaxTextMessageBufferSize(8192);container.setMaxBinaryMessageBufferSize(8192);// 設置會話超時時間(毫秒)container.setMaxSessionIdleTimeout(60000L);return container;}
}
第三方:WebSocket會話管理器
package com.agentai.base.yumou.webSocket;import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** WebSocket會話管理器* 負責管理所有WebSocket會話,包括會話狀態跟蹤、心跳檢測和清理過期會話*/
@Slf4j
public class WebSocketSessionManager {// 心跳超時限制(毫秒)private static final long HEARTBEAT_TIMEOUT = 30000;// 心跳檢查間隔(毫秒)private static final long HEARTBEAT_CHECK_INTERVAL = 10000;// 心跳消息內容private static final String HEARTBEAT_MESSAGE = "{\"type\":\"ping\"}";// 會話信息,包含WebSocket會話和最后活動時間private static class SessionInfo {WebSocketSession session;long lastActiveTime;SessionInfo(WebSocketSession session) {this.session = session;this.lastActiveTime = Instant.now().toEpochMilli();}void updateLastActiveTime() {this.lastActiveTime = Instant.now().toEpochMilli();}}// 保存所有會話信息private final Map<String, SessionInfo> sessions = new ConcurrentHashMap<>();private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();public WebSocketSessionManager() {// 啟動心跳檢查任務scheduler.scheduleAtFixedRate(this::checkHeartbeats,HEARTBEAT_CHECK_INTERVAL, HEARTBEAT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);}/*** 添加新的會話* @param session 新的WebSocket會話*/public void addSession(WebSocketSession session) {sessions.put(session.getId(), new SessionInfo(session));log.info("新會話已添加: {}", session.getId());}/*** 移除會話* @param sessionId 會話ID*/public void removeSession(String sessionId) {sessions.remove(sessionId);log.info("會話已移除: {}", sessionId);}/*** 更新會話最后活動時間* @param sessionId 會話ID*/public void updateSessionActivity(String sessionId) {SessionInfo info = sessions.get(sessionId);if (info != null) {info.updateLastActiveTime();}}/*** 發送消息到指定會話* @param sessionId 會話ID* @param message 消息內容* @return 是否發送成功*/public boolean sendMessage(String sessionId, String message) {SessionInfo info = sessions.get(sessionId);if (info != null && info.session.isOpen()) {try {info.session.sendMessage(new TextMessage(message));return true;} catch (IOException e) {log.error("發送消息到會話[{}]失敗: {}", sessionId, e.getMessage());}}return false;}/*** 廣播消息到所有會話* @param message 消息內容*/public void broadcastMessage(String message) {sessions.forEach((sessionId, info) -> {if (info.session.isOpen()) {try {info.session.sendMessage(new TextMessage(message));} catch (IOException e) {log.error("廣播消息到會話[{}]失敗: {}", sessionId, e.getMessage());}}});}/*** 檢查心跳并清理過期會話*/private void checkHeartbeats() {long now = Instant.now().toEpochMilli();sessions.forEach((sessionId, info) -> {if (now - info.lastActiveTime > HEARTBEAT_TIMEOUT) {try {// 發送心跳消息info.session.sendMessage(new TextMessage(HEARTBEAT_MESSAGE));log.debug("發送心跳到會話: {}", sessionId);} catch (IOException e) {// 如果發送失敗,關閉并移除會話log.warn("會話[{}]心跳檢測失敗,關閉會話: {}", sessionId, e.getMessage());try {info.session.close();} catch (IOException ex) {log.error("關閉會話[{}]失敗: {}", sessionId, ex.getMessage());}removeSession(sessionId);}}});}/*** 關閉會話管理器*/public void shutdown() {scheduler.shutdown();sessions.forEach((sessionId, info) -> {try {info.session.close();} catch (IOException e) {log.error("關閉會話[{}]失敗: {}", sessionId, e.getMessage());}});sessions.clear();}
}
第四步:設備WebSocket處理器
package com.agentai.base.yumou.webSocket;import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
/*** 設備WebSocket處理器* 負責處理設備的WebSocket連接、消息接收和斷開連接*/
@Slf4j
public class YuMouDeviceWebSocketHandler extends TextWebSocketHandler {private final WebSocketSessionManager sessionManager;// 構造函數,初始化會話管理器public YuMouDeviceWebSocketHandler() {this.sessionManager = new WebSocketSessionManager();}/*** WebSocket連接建立后的處理* @param session WebSocket會話*/@Overridepublic void afterConnectionEstablished(WebSocketSession session) {// 將新會話添加到會話管理器String sessionId = session.getId();sessionManager.addSession(session);log.info("WebSocket連接已建立: {}", sessionId);}@AutowiredYuMouService yuMouService;/*** 處理接收到的文本消息* @param session 當前會話* @param message 接收到的文本消息*/@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) {String payload = message.getPayload();String sessionId = session.getId();try {// 更新會話的活動時間sessionManager.updateSessionActivity(sessionId);log.info("接收到設備[{}]的文本消息: {}", sessionId, payload);JSONObject jsonObject = JSONObject.parseObject(payload);log.info("數據:", jsonObject );// 處理其他業務消息// TODO: 添加具體的業務消息處理邏輯} catch (Exception e) {log.error("處理設備[{}]消息時發生錯誤: {}", sessionId, e.getMessage());}}/*** 處理接收到的二進制消息* @param session 當前會話* @param message 接收到的二進制消息*/@Overrideprotected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {byte[] payload = message.getPayload().array();String sessionId = session.getId();log.info("接收到設備[{}]的二進制消息,長度: {} 字節", sessionId, payload.length);// 目前只打印消息長度,可以根據需求處理二進制數據// TODO: 添加二進制消息處理邏輯}/*** 處理傳輸錯誤* @param session 當前會話* @param exception 錯誤異常*/@Overridepublic void handleTransportError(WebSocketSession session, Throwable exception) {String sessionId = session.getId();log.error("設備[{}]連接傳輸錯誤: {}", sessionId, exception.getMessage());}/*** WebSocket連接關閉后的處理* @param session 當前會話* @param status 關閉狀態*/@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) {String sessionId = session.getId();sessionManager.removeSession(sessionId);log.info("設備[{}]WebSocket連接已關閉,狀態碼: {}", sessionId, status.getCode());}/*** 發送消息到指定會話* @param sessionId 會話ID* @param message 消息內容* @return 是否發送成功*/public boolean sendMessage(String sessionId, String message) {return sessionManager.sendMessage(sessionId, message);}/*** 廣播消息到所有連接的會話* @param message 消息內容*/public void broadcastMessage(String message) {sessionManager.broadcastMessage(message);}/*** 關閉WebSocket處理器,清理資源*/public void shutdown() {sessionManager.shutdown();}
}