WebSocket是一種在單個TCP連接上進行全雙工通信的協議。WebSocket通信協議于2011年被IETF定為標準RFC 6455,并由RFC7936補充規范。WebSocketAPI也被W3C定為標準。
WebSocket使得客戶端和服務器之間的數據交換變得更加簡單,允許服務端主動向客戶端推送數據。在WebSocket API中,瀏覽器和服務器只需要完成一次握手,兩者之間就直接可以創建持久性的連接,并進行雙向數據傳輸。
創建定時任務,實現定時向前端推送相關消息。
創建存放ws推送的參數緩存Map,定時任務獲取參數,獲取數據后推送。
引入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
開啟WebSocket支持的配置類
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** 功能描述:* 開啟websocket支持*/
@Configuration
public class WebSocketConfig {// 使用boot內置tomcat時需要注入此bean@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
WebSocketServer服務端
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;/*** 功能描述:* WebSocketServer服務端*/
// @ServerEndpoint 注解是一個類層次的注解,它的功能主要是將目前的類定義成一個websocket服務器端。注解的值將被用于監聽用戶連接的終端訪問URL地址
// encoders = WebSocketCustomEncoding.class 是為了使用ws自己的推送Object消息對象(sendObject())時進行解碼,通過Encoder 自定義規則(轉換為JSON字符串)
@ServerEndpoint(value = "/websocket/{userId}",encoders = WebSocketCustomEncoding.class)
@Component
public class WebSocket {private final static Logger logger = LogManager.getLogger(WebSocket.class);/*** 靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的*/private static int onlineCount = 0;/*** concurrent包的線程安全Map,用來存放每個客戶端對應的MyWebSocket對象*/public static ConcurrentHashMap<String, WebSocket> webSocketMap = new ConcurrentHashMap<>();/**** 功能描述:* concurrent包的線程安全Map,用來存放每個客戶端對應的MyWebSocket對象的參數體*/public static ConcurrentHashMap<String, PushParams> webSocketParamsMap = new ConcurrentHashMap<>();/*** 與某個客戶端的連接會話,需要通過它來給客戶端發送數據*/private Session session;private String userId;/*** 連接建立成功調用的方法* onOpen 和 onClose 方法分別被@OnOpen和@OnClose 所注解。他們定義了當一個新用戶連接和斷開的時候所調用的方法。*/@OnOpenpublic void onOpen(Session session, @PathParam("userId") String userId) {this.session = session;this.userId = userId;//加入mapwebSocketMap.put(userId, this);addOnlineCount(); //在線數加1logger.info("用戶{}連接成功,當前在線人數為{}", userId, getOnlineCount());try {sendMessage(String.valueOf(this.session.getQueryString()));} catch (IOException e) {logger.error("IO異常");}}/*** 連接關閉調用的方法*/@OnClosepublic void onClose() {//從map中刪除webSocketMap.remove(userId);subOnlineCount(); //在線數減1logger.info("用戶{}關閉連接!當前在線人數為{}", userId, getOnlineCount());}/*** 收到客戶端消息后調用的方法* onMessage 方法被@OnMessage所注解。這個注解定義了當服務器接收到客戶端發送的消息時所調用的方法。* @param message 客戶端發送過來的消息*/@OnMessagepublic void onMessage(String message, Session session) {logger.info("來自客戶端用戶:{} 消息:{}",userId, message);//群發消息/*for (String item : webSocketMap.keySet()) {try {webSocketMap.get(item).sendMessage(message);} catch (IOException e) {e.printStackTrace();}}*/}/*** 發生錯誤時調用** @OnError*/@OnErrorpublic void onError(Session session, Throwable error) {logger.error("用戶錯誤:" + this.userId + ",原因:" + error.getMessage());error.printStackTrace();}/*** 向客戶端發送消息*/public void sendMessage(String message) throws IOException {this.session.getBasicRemote().sendText(message);//this.session.getAsyncRemote().sendText(message);}/*** 向客戶端發送消息*/public void sendMessage(Object message) throws IOException, EncodeException {this.session.getBasicRemote().sendObject(message);//this.session.getAsyncRemote().sendText(message);}/*** 通過userId向客戶端發送消息*/public void sendMessageByUserId(String userId, String message) throws IOException {logger.info("服務端發送消息到{},消息:{}",userId,message);if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){webSocketMap.get(userId).sendMessage(message);}else{logger.error("用戶{}不在線",userId);}}/*** 通過userId向客戶端發送消息*/public void sendMessageByUserId(String userId, Object message) throws IOException, EncodeException {logger.info("服務端發送消息到{},消息:{}",userId,message);if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){webSocketMap.get(userId).sendMessage(message);}else{logger.error("用戶{}不在線",userId);}}/*** 通過userId更新緩存的參數*/public void changeParamsByUserId(String userId, PushParams pushParams) throws IOException, EncodeException {logger.info("ws用戶{}請求參數更新,參數:{}",userId,pushParams.toString());webSocketParamsMap.put(userId,pushParams);}/*** 群發自定義消息*/public static void sendInfo(String message) throws IOException {for (String item : webSocketMap.keySet()) {try {webSocketMap.get(item).sendMessage(message);} catch (IOException e) {continue;}}}public static synchronized int getOnlineCount() {return onlineCount;}public static synchronized void addOnlineCount() {WebSocket.onlineCount++;}public static synchronized void subOnlineCount() {WebSocket.onlineCount--;}}
Encoder 自定義規則(轉換為JSON字符串)
import com.alibaba.fastjson.JSON;
import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;/*** 在 websocket 中直接發送 obj 會有問題 - No encoder specified for object of class* 需要對 obj 創建解碼類,實現 websocket 中的 Encoder.Text<>* */
public class WebSocketCustomEncoding implements Encoder.Text<Object> {/*** The Encoder interface defines how developers can provide a way to convert their* custom objects into web socket messages. The Encoder interface contains* subinterfaces that allow encoding algorithms to encode custom objects to:* text, binary data, character stream and write to an output stream.** Encoder 接口定義了如何提供一種方法將定制對象轉換為 websocket 消息* 可自定義對象編碼為文本、二進制數據、字符流、寫入輸出流* Text、TextStream、Binary、BinaryStream* */@Overridepublic void init(EndpointConfig endpointConfig) {}@Overridepublic void destroy() {}@Overridepublic String encode(Object o) throws EncodeException {return JSON.toJSONString(o);}
}
自定義消息推送的參數體
/*** 功能描述:** @description: ws推送的參數結構*/
@Data
public class PushParams {/*** 功能描述:* 類型*/private String type;/*** 功能描述:* 開始時間*/private String startTime;/*** 功能描述:* 結束時間*/private String stopTime;
}
根據用戶ID更新ws推送的參數,或者使用onMessage修改緩存的結構體
import com.company.project.common.websocket.PushParams;
import com.company.project.common.websocket.WebSocket;
import com.company.project.service.TestMongodbService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.websocket.EncodeException;
import java.io.IOException;/*** 功能描述:* 建立WebSocket連接* @Author: LXD* @Date: 2022-12-01 09:55:00* @since: 1.0.0*/
@RestController
@RequestMapping("/webSocketPush")
public class WebSocketController {@Autowiredprivate WebSocket webSocket;@Autowiredprivate TestMongodbService testMongodbService;@RequestMapping("/sentMessage")public void sentMessage(String userId,String message){try {webSocket.sendMessageByUserId(userId,message);} catch (IOException e) {e.printStackTrace();}}@RequestMapping("/sentObjectMessage")public void sentObjectMessage(String userId){try {webSocket.sendMessageByUserId(userId,testMongodbService.query());} catch (IOException e) {e.printStackTrace();} catch (EncodeException e) {e.printStackTrace();}}/**** 功能描述:* 根據用戶ID更新ws推送的參數* @Param userId: WS中的用戶ID* @Param pushParams: 推送參數* @return: void* @since: 1.0.0*/@RequestMapping("/changeWsParams")public void changeWsParams(String userId, PushParams pushParams){try {webSocket.changeParamsByUserId(userId,pushParams);} catch (IOException e) {e.printStackTrace();} catch (EncodeException e) {e.printStackTrace();}}}
創建定時推送的任務
import com.company.project.service.TestMongodbService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import javax.websocket.EncodeException;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import static com.company.project.common.websocket.WebSocket.webSocketMap;
import static com.company.project.common.websocket.WebSocket.webSocketParamsMap;/*** 功能描述:** @description: ws定時推送*/
@Configuration
@EnableScheduling
public class WebsocketSchedule {@Autowiredprivate WebSocket webSocket;@Autowiredprivate TestMongodbService testMongodbService;// 第一次延遲1秒后執行,之后按fixedRate的規則每5秒執行一次 fixedRateString 與 fixedRate 意思相同,只是使用字符串的形式。唯一不同的是支持占位符@Scheduled(initialDelay=1000, fixedRateString = "${ws.pushInterval}")public void pushData() throws EncodeException, IOException {ConcurrentHashMap<String, WebSocket> webSocketPushMap = webSocketMap;ConcurrentHashMap<String, PushParams> webSocketPushParamsMap = webSocketParamsMap;if(!webSocketPushMap.isEmpty()){for(String key : webSocketPushMap.keySet()){// 根據ws連接用戶ID獲取推送參數PushParams pushParams = webSocketPushParamsMap.get(key);webSocket.sendMessageByUserId(key,testMongodbService.query());}}}
}