WebSocket是什么
WebSocket 是一種用于在客戶端和服務器之間建立雙向通信的協議,它能實現實時、持久的連接。與傳統的 HTTP 請求響應模式不同,WebSocket 在建立連接后允許客戶端和服務器之間相互發送消息,直到連接關閉。由于 WebSocket 具有低延遲、雙向通信和高效的特點,因此適用于多種實時應用場景。
源碼在下面
相關依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
?websocket必須配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** WebSocket 配置*/
@Configuration
public class WebSocketConfig {/*** ServerEndpointExporter 作用** 這個Bean會自動注冊使用@ServerEndpoint注解聲明的websocket endpoint** @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
一、入門例子
代碼demo
import jakarta.websocket.*;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @Description:* @Author: sh* @Date: 2024/12/15 00:06*/
@Component
@ServerEndpoint("/websocket/WEBSOCKET_MSG_TOPIC")
public class WebSocketServer {@OnOpenpublic void onOpen(Session session) {System.out.println("客戶端已連接: " + session.getId());}@OnMessagepublic void onMessage(Session session, String message) {System.out.println("收到消息: " + message);try {session.getBasicRemote().sendText("消息已收到: " + message);} catch (IOException e) {e.printStackTrace();}}@OnClosepublic void onClose(Session session) {System.out.println("客戶端已關閉: " + session.getId());}@OnErrorpublic void onError(Session session, Throwable throwable) {System.out.println("發生錯誤: " + throwable.getMessage());}
}
測試demo
import com.alipay.api.java_websocket.client.WebSocketClient;
import com.alipay.api.java_websocket.handshake.ServerHandshake;import java.net.URI;
import java.net.URISyntaxException;/*** @Description:* @Author: sh* @Date: 2024/12/15 00:04*/
public class WebSocketClientExample {public static void main(String[] args) throws URISyntaxException {WebSocketClient client = new WebSocketClient(new URI("ws://localhost:8080/websocket/WEBSOCKET_MSG_TOPIC")) {@Overridepublic void onOpen(ServerHandshake handshakedata) {System.out.println("連接已打開");send("Hello, Server!"); // 發送消息到 WebSocket 服務器}@Overridepublic void onMessage(String message) {System.out.println("收到消息: " + message);}@Overridepublic void onClose(int code, String reason, boolean remote) {System.out.println("連接關閉: " + reason);}@Overridepublic void onError(Exception ex) {System.out.println("發生錯誤: " + ex.getMessage());}};client.connect();}
}
二、進階與redis結合
進階demo
直接從redis中獲取數據通過訂閱從redis中獲取數據
import com.macro.mall.websocket.listener.WebSocketSubscribeListener;
import jakarta.annotation.Resource;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;/*** @Description:* @Author: sh* @Date: 2024/12/13 15:38*/
@Component
//@ServerEndpoint(value="/websocket/WEBSOCKET_MSG_TOPIC", decoders = MyMessageDecoder.class)
@ServerEndpoint(value="/websocket/WEBSOCKET_MSG_TOPIC")
public class JavaDemo {/*** 日志對象*/private static Logger logger = LoggerFactory.getLogger(JavaDemo.class);/*** redis消息監聽者容器,此處不好直接注入*/private static RedisMessageListenerContainer redisMessageListenerContainer;private static RedisTemplate redisTemplate;@Resourcepublic void setRedisMessageListenerContainer(RedisMessageListenerContainer redisMessageListenerContainer) {JavaDemo.redisMessageListenerContainer = redisMessageListenerContainer;}@Resourcepublic void setRedisTemplate(RedisTemplate redisTemplate) {JavaDemo.redisTemplate = redisTemplate;}/*** concurrent包的線程安全Set,用來存放每個客戶端對應的webSocket對象。若要實現服務端與單一客戶端通信的話,可以使用Map來存放,其中Key可以為用戶標識*/private static CopyOnWriteArraySet<JavaDemo> webSocketSet = new CopyOnWriteArraySet<>();/*** websocket訂閱監聽器*/private WebSocketSubscribeListener subscribeListener;@OnOpenpublic void onOpen(Session session, EndpointConfig config) {webSocketSet.add(this);subscribeListener = new WebSocketSubscribeListener();subscribeListener.setSession(session);// 設置訂閱topicredisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic("WEBSOCKET_MSG_TOPIC"));}@OnMessagepublic void onMessage(String message, Session session) {logger.debug("get msg from websocket client: {}", message);//1獲取redis數據String result = (String) redisTemplate.opsForValue().get(message);//2.訂閱獲取redis數據
// Object result = null;
// // 處理消息并準備發送給前端
// if ("WEBSOCKET_MSG_TOPIC".equals(new String(message.getChannel()))) {
// String responseMessage = "服務器收到的消息: " + new String(message.getBody());
//
// result = redisTemplate.opsForValue().get(new String(message.getBody()));
// }// 使用 Session 發送消息回客戶端try {session.getBasicRemote().sendText(result.toString());} catch (IOException e) {logger.error("發送消息失敗: {}", e.getMessage());}}@OnClosepublic void onClose(Session session) {// 移除session對象webSocketSet.remove(this);// 移除訂閱對象redisMessageListenerContainer.removeMessageListener(subscribeListener);}@OnErrorpublic void onError(Session session, Throwable error) {}
}
redis配置
@Configuration
public class RedisConfig extends BaseRedisConfig {@Beanpublic RedisConnectionFactory redisConnectionFactory() {return new LettuceConnectionFactory();}// 配置 Redis 消息監聽容器@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,MessageListener subscribeListener, // 注意這里是 inject 消息監聽器ChannelTopic channelTopic) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(redisConnectionFactory);container.addMessageListener(subscribeListener, channelTopic); // 訂閱監聽器return container;}// 配置 ChannelTopic@Beanpublic ChannelTopic channelTopic() {return new ChannelTopic("WEBSOCKET_MSG_TOPIC"); // 你可以更改為你實際需要的頻道名}// 配置消息監聽器(假設你的 subscribeListener 是一個 MessageListener)@Beanpublic MessageListener subscribeListener() {return new WebSocketSubscribeListener(); // 假設你有一個自定義的 MessageListener 類}
}
redis的sub監聽器,監聽websocket收到的消息
/*** @Description:subscribe監聽器* @Author: sh* @Date: 2024/12/13 16:00*/
public class WebSocketSubscribeListener implements MessageListener {/*** 日志對象*/private Logger logger = LoggerFactory.getLogger(WebSocketSubscribeListener.class);/*** websocket連接對象* -- GETTER --* 獲取websocket連接對象** @return websocket連接對象*/@Getterprivate Session session;/*** 設置websocket連接對象** @param session websocket連接對象*/public void setSession(Session session) {this.session = session;}@Overridepublic void onMessage(Message message, byte[] bytes) {// 獲取消息String msg = new String(message.getBody());try {session.getBasicRemote().sendText(msg);} catch (IOException e) {throw new RuntimeException(e);}}
}
Html頁面測試demo
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>WebSocket Test</title>
</head>
<body>
<h1>WebSocket 測試</h1>
<div id="status">未連接</div>
<textarea id="messages" rows="10" cols="30" readonly></textarea><br>
<input type="text" id="messageInput" placeholder="輸入消息" />
<button onclick="sendMessage()">發送</button>
<button onclick="closeConnection()">關閉連接</button><script>let websocket;// 創建 WebSocket 連接function connect() {websocket = new WebSocket('ws://127.0.0.1:8080/websocket/WEBSOCKET_MSG_TOPIC'); // 連接到后端 WebSocket 服務// WebSocket 連接打開時websocket.onopen = () => {document.getElementById("status").textContent = "連接已建立";};// 處理接收到的消息websocket.onmessage = (event) => {const message = event.data;// 假設服務器發送的是 JSON 格式的消息try {const parsedMessage = JSON.parse(message);// 假設服務器返回的數據格式是 { "user": "username", "content": "message text" }document.getElementById("messages").value += `來自 ${parsedMessage.user}: ${parsedMessage.content}\n`;} catch (e) {// 如果解析失敗,則顯示原始消息document.getElementById("messages").value += '收到: ' + message + '\n';}};// 連接關閉時websocket.onclose = () => {document.getElementById("status").textContent = "連接已關閉";};// 連接錯誤時websocket.onerror = (error) => {console.error("WebSocket 錯誤:", error);document.getElementById("status").textContent = "連接錯誤";};}// 發送消息到服務器function sendMessage() {const message = document.getElementById('messageInput').value;if (websocket && websocket.readyState === WebSocket.OPEN) {websocket.send(message);document.getElementById('messageInput').value = ''; // 清空輸入框document.getElementById('messageInput').focus(); // 聚焦到輸入框} else {alert("WebSocket 連接未打開");}}// 關閉 WebSocket 連接function closeConnection() {if (websocket) {websocket.close();}}// 頁面加載時自動連接window.onload = connect;
</script>
</body>
</html>