JAVA實現websocket
- 背景
- 依賴
- 問題
- 代碼實現
- 測試
背景
近期項目中需要用到websocket,實現即時通信。
依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
問題
使用websocket網頁測試時,發現websocket連接不上,因為spring.security的問題,需要賬號密碼,可以在啟動類中移除SecurityAutoConfiguration.class。
代碼實現
onMessage方法用來接收消息,可以根據自己業務需要定義正常的心跳處理以及業務處理邏輯。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.travelsky.config.WebSocketConfigurator;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;@Slf4j
@Component
@ServerEndpoint(value = "/websocket", configurator = WebSocketConfigurator.class)
public class WebSocketServer {//與某個客戶端的連接會話,需要通過它來給客戶端發送數據private Session session;private static final AtomicInteger OnlineCount = new AtomicInteger(0);// concurrent包的線程安全Set,用來存放每個客戶端對應的Session對象。public static ConcurrentHashMap<String, WebSocketServer> serverMap = new ConcurrentHashMap<>();private static final String LOCALHOST = "127.0.0.1";private final String ipAddress = LOCALHOST;/*** 連接建立成功調用的方法*/@OnOpenpublic void onOpen(Session session) {this.session = session;log.info("開始建立websocket方法連接,獲取Session為:{}", session);log.info("連接建立 - Session ID: {}, Query: {}",session.getId(), session.getRequestURI().getQuery());// 如果已有連接,先關閉舊連接if (serverMap.containsKey(this.ipAddress)) {WebSocketServer oldServer = serverMap.get(this.ipAddress);try {oldServer.session.close();} catch (IOException e) {log.error("關閉舊連接失敗", e);}}serverMap.put(this.ipAddress, this);int cnt = OnlineCount.incrementAndGet();log.info("有連接加入,當前連接數為:{},當前連接id為:{}", cnt, this.ipAddress);sendMessage("連接成功");log.info("{},已上線!", this.ipAddress);}@OnMessagepublic void onMessage(String message, Session session) {try {log.info("服務器收到了用戶:{},發來的消息:{},當前ip集合為:{}", ipAddress, message, JSON.toJSONString(serverMap));//方便前端測試JSONObject jsonObject = JSON.parseObject(message);Object modeCode = jsonObject.get("ModeCode");log.info("獲取到前端發送消息為:{}", modeCode);String string = modeCode.toString();sendMessage("pong");if (!string.contains("ping")) {//處理其它業務邏輯log.info("ModeCode為空則表示正常數據");//ModeCode為空則表示正常數據 處理其他消息Map map = JSON.parseObject(message, Map.class);log.info("打印JSON字符串:{}", JSON.toJSONString(map));}} catch (Exception e) {log.error("處理數據異常", e);onError(session, e);}}/*** 給ip地址為ip的客戶端發送消息** @param ip ip地址* @param message 消息*/public static Boolean sendMessage(String ip, String message) {log.info("開始發送消息,serverMap數據為:{},ip為:{},消息為:{}", serverMap, ip, message);if (serverMap.containsKey(ip)) {WebSocketServer webSocketServer = serverMap.get(ip);webSocketServer.sendMessage(message);log.info("發送成功:{},當前連接數為:{}", ip, OnlineCount);return false;} else {log.error("發送失敗,客戶端未連接: {}", ip);return true;}}/*** 服務器主動發送消息** @param message 消息*/public void sendMessage(String message) {try {this.session.getBasicRemote().sendText(message);} catch (IOException e) {log.error("發送消息異常:{}", e.getMessage());}}@OnClosepublic void onClose(Session session, CloseReason closeReason) {log.info("連接關閉 - Session ID: {}, Reason: {} (Code: {})",session.getId(),closeReason.getReasonPhrase(),closeReason.getCloseCode());if (serverMap.containsKey(ipAddress)) {serverMap.remove(ipAddress);int cnt = OnlineCount.decrementAndGet();log.info("有連接退出,當前連接數為:{},當前;退出id為:{}", cnt, this.ipAddress);log.info("{},已下線!", ipAddress);}}@OnErrorpublic void onError(Session session, Throwable throwable) {log.info("連接異常 - Session ID: {}", session.getId());if (serverMap.containsKey(ipAddress)) {serverMap.remove(ipAddress);int cnt = OnlineCount.decrementAndGet();log.error("用戶{}發生了錯誤,具體如下:{},已下線!,當前連接數為:{}", ipAddress, throwable.getMessage(), cnt);}}private static synchronized void subOnlineCount() {OnlineCount.decrementAndGet();}public static synchronized void addOnlineCount() {OnlineCount.incrementAndGet();}public static WebSocketServer get(String ipAddress) {return serverMap.get(ipAddress);}public static ConcurrentHashMap<String, WebSocketServer> getMap() {return serverMap;}public static boolean isOnline(String ipAddress) {return serverMap.containsKey(ipAddress);}/*** 通用處理空格** @param s String*/private static String processDataSpace(String s, int length) {if (s == null || s.isEmpty()) {// 如果為空,直接返回空格return String.format("%-" + length + "s", s);}//如果長度超過則截取 不足則用空格補齊if (s.length() > length) {// 超過位則截斷return s.substring(0, length);} else {// 不足位右補空格return String.format("%-" + length + "s", s);}}
}
測試
websocket在線測試
該鏈接可以在線測試websocket是否正常,數據格式需要滿足自定義的要求。