問題引入
消息推送的方式
我們要實現,服務器把消息推送到客戶端,可以輪訓,長輪訓
還有sse
WebSocket理論
WebSocket 的由來與核心價值
誕生背景:解決 HTTP 協議在實時通信中的固有缺陷(單向請求-響應模式)
核心驅動力:
替代低效輪詢(Polling)和長輪詢(Comet)
滿足實時應用需求(聊天、金融行情、游戲等)
核心優勢:
全雙工通信:客戶端/服務器可同時發送數據
低延遲:從 HTTP 的數百 ms 降至 10-50ms
高效傳輸:頭部開銷僅 2-14 字節(vs HTTP 的數百字節)
標準化:2011 年 RFC 6455 成為正式標準
WebSocket 協議核心組成
組成部分 | 作用 | 必要性 |
---|---|---|
握手階段 | 通過 HTTP 協議升級協商(101 Switching Protocols )切換到 WebSocket 協議 | 兼容現有網絡基礎設施(代理、防火墻) |
數據幀 | 傳輸應用數據(文本/二進制) | 封裝數據,支持分片傳輸大文件 |
控制幀 | 管理連接狀態(Ping/Pong 保活、Close 關閉) | 維持連接健康,避免資源泄漏 |
掩碼機制 | 客戶端發送數據時進行 XOR 掩碼加密 | 防止惡意代理緩存污染(安全關鍵) |
Opcode | 標識幀類型(文本/二進制/控制幀) | 正確解析消息內容 |
Payload Length | 動態長度標識(7/16/64位) | 支持從短消息到 GB 級大文件傳輸 |
Spring Boot 深度集成方案
基礎架構
核心組件詳解
Client(客戶端)
作用:發起連接、訂閱頻道、收發消息
為什么需要:作為通信的終端用戶界面
解決問題:
提供用戶交互入口
實現跨平臺通信(Web/App/桌面)
技術實現:
const socket = new WebSocket("ws://yourdomain/ws-endpoint"); socket.onmessage = (event) => {console.log("收到消息:", event.data); };
Endpoint(連接端點)
作用:處理握手請求,建立持久連接
為什么需要:作為WebSocket連接的入口網關
解決問題:
協議升級(HTTP→WebSocket)
連接生命周期管理
跨域處理(CORS)
Spring Boot實現:
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/ws-endpoint").setAllowedOrigins("*").withSockJS(); // 瀏覽器兼容方案} }
WebSocket Connection(連接管道)
作用:維護全雙工通信通道
為什么需要:突破HTTP的無狀態限制
解決問題:
避免頻繁握手(單次握手持久連接)
支持雙向實時通信
降低延遲(從HTTP的300ms+降至30ms內)
Message Broker(消息代理)
作用:消息路由、分發、存儲
為什么需要:解耦生產者和消費者
解決問題:
海量連接下的消息分發
分布式系統擴展
消息持久化與重試
配置示例:
@Override public void configureMessageBroker(MessageBrokerRegistry registry) {// 使用外部消息中間件registry.enableStompBrokerRelay("/topic", "/queue").setRelayHost("rabbitmq-host").setRelayPort(61613); }
頻道系統(路由核心)
頻道類型 前綴 作用 解決的問題 消息流向 廣播頻道 /topic
公共消息廣播 1:N 消息分發 (如聊天室公告) 發布者 → 所有訂閱者 私有隊列 /queue
點對點通信 1:1 精準投遞 (如訂單通知) 發布者 → 特定訂閱者 用戶頻道 /user
用戶級隔離 多設備同步 (如微信網頁/App同時在線) 發布者 → 用戶所有會話 @MessageMapping Controller(業務處理器)
作用:處理業務邏輯,生成響應
為什么需要:分離通信協議與業務邏輯
解決問題:
業務邏輯集中管理
消息驗證與轉換
數據庫/服務集成
示例:
@MessageMapping("/trade") @SendTo("/topic/stock-updates") public StockUpdate handleTrade(Order order) {// 1. 驗證訂單// 2. 執行交易// 3. 生成市場數據更新return tradingService.execute(order); }
架構演進價值
協議層優化
替代方案對比:
方案 延遲 開銷 雙向通信 頻道支持 HTTP輪詢 300ms+ 高 ? ? WebSocket基礎 50ms 低 ?? ? WS+STOMP 30ms 中 ?? ??
工程化價值
業務場景適配
廣播場景:
/topic/news
(新聞推送)私有場景:
/queue/user-123/notifications
(個人通知)混合場景:
/topic/room-{id}
+/user/queue/private
(在線教育平臺)
總結:為什么需要此架構
連接管理 通過
Endpoint
統一處理握手/斷開,解決連接生命周期管理混亂問題消息路由 頻道系統實現
發布-訂閱
模式,解決海量消息精準投遞問題業務解耦 控制器隔離業務邏輯與通信協議,解決代碼維護困難問題
水平擴展 消息代理支持集群部署,解決單點性能瓶頸問題
安全管控 頻道級權限控制,解決敏感數據泄露風險
終極價值:此架構在協議層實現高效實時通信,在架構層通過頻道機制解決復雜業務場景的消息路由問題,在工程層通過Spring Boot實現企業級標準化,是構建現代實時應用的基石。
原理流程
在我的E盤的WebSocket文件夾
消息執行流程(Flow)概覽
建立連接(connect,連接)
Client(客戶端)發起到 /ws-endpoint
的 WebSocket 握手(handshake,握手),Endpoint(端點)完成升級后建立 WebSocket Connection(WebSocket 連接)。
訂閱頻道(subscribe,訂閱)
Client 通過 STOMP 向 broker 發送 SUBSCRIBE Frame(訂閱幀),表示“我要訂閱 /topic/greetings
”。
發送消息到 Controller(SEND Frame)
Client 發送 SEND Frame(發送幀),destination(目的地)為 /app/hello
。
Broker 根據 setApplicationDestinationPrefixes("/app")
,將消息路由(route,路由)給匹配 @MessageMapping("/hello")
的方法
Controller(控制器)處理
GreetingController.handleHello(...)
被調用(invoke,調用),執行業務邏輯,返回 Greeting
對象。
Broker(代理)轉發
因為方法上有 @SendTo("/topic/greetings")
,返回值被封裝成 MESSAGE Frame(消息幀)發送給 Broker(消息代理)。
Broker 將該消息分發(dispatch,分發)給所有訂閱(subscription,訂閱)了 /topic/greetings
的客戶端 session。
Client(客戶端)接收(receive,接收)
Client 在訂閱回調(callback,回調)中拿到服務器推送(push,推送)的消息并渲染到頁面。
這就是完整的一次流程。
API
客戶端
websocket對象創建
let ws = new WebSocket(URL);
URL說明
格式:協議://ip地址:端口/訪問路徑 協議:協議名稱為 ws
websocket對象相關事件
事件 | 事件處理程序 | 描述 |
---|---|---|
open | ws.onopen | 連接建立時觸發 |
message | ws.onmessage | 客戶端接收到服務器發送的數據時觸發 |
close | ws.onclose | 連接關閉時觸發 |
websocket對象提供的方法
方法名稱 | 描述 |
---|---|
send() | 通過websocket對象調用該方法發送數據給服務端 |
簡單示例
<script>
let ws = new WebSocket("ws://localhost/chat");
ws.onopen = function() {
};ws.onmessage = function(evt) {// 通過 evt.data 可以獲取服務器發送的數據
};ws.onclose = function() {
};
</script>
服務端
Tomcat的7.0.5版本開始支持WebSocket,并且實現了Java WebSocket規范。
Java WebSocket應用由一系列的Endpoint組成。Endpoint是一個java對象,代表WebSocket鏈接的一端,對于服務端,我們可以視為處理具體WebSocket消息的接口。
我們可以通過兩種方式定義Endpoint:
第一種是編程式,即繼承類javax.websocket.Endpoint并實現其方法。
第二種是注解式,即定義一個POJO,并添加@ServerEndpoint相關注解。
Endpoint實例在WebSocket握手時創建,并在客戶端與服務端鏈接過程中有效,最后在鏈接關閉時結束。在Endpoint接口中明確了與其生命周期相關的方法,規范實現者確保生命周期的各個階段調用實例的相關方法。生命周期方法如下:
方法 | 描述 | 注解 |
---|---|---|
onOpen() | 當開啟一個新的會話時調用,該方法是客戶端與服務端握手成功后調用的方法 | @OnOpen |
onClose() | 當會話關閉時調用 | @OnClose |
onError() | 當連接過程異常時調用 | @OnError |
服務端如何接收客戶端發送的數據呢?
編程式 通過添加 MessageHandler 消息處理器來接收消息
注解式 在定義 Endpoint 時,通過 @OnMessage 注解指定接收消息的方法
服務端如何推送數據給客戶端呢?
發送消息則由 RemoteEndpoint 完成,其實例由 Session 維護。
發送消息有 2 種方式發送消息
通過 session.getBasicRemote 獲取同步消息發送的實例,然后調用其 sendXxx() 方法發送消息
通過 session.getAsyncRemote 獲取異步消息發送實例,然后調用其 sendXxx() 方法發送消息
@ServerEndpoint("/chat")
@Component
public class ChatEndpoint {@OnOpen// 連接建立時被調用public void onOpen(Session session, EndpointConfig config) {}@OnMessage// 接收到客戶端發送的數據時被調用public void onMessage(String message) {}@OnClose// 連接關閉時被調用public void onClose(Session session) {}
}
WebSocket 消息分發的三種常見模式
session.getAsyncRemote()(
getBasicRemote).sendXxx()
方法本身并不直接區分這些模式,而是通過 目標地址(如 Session、Broadcast) 和 應用層邏輯 來實現不同的消息分發方式。
WebSocket 消息分發的三種常見模式
1. 單播(Unicast)
點對點發送:消息直接發送給某個特定的客戶端(Session)。
實現方式:通過目標客戶端的
session.getAsyncRemote().sendText()
。示例:
// 向特定客戶端發送消息 targetSession.getAsyncRemote().sendText("Private message");
2. 廣播(Broadcast)
一對多發送:消息發送給所有連接的客戶端(或特定分組)。
實現方式:遍歷所有 Session 或使用
@ServerEndpoint
的全局集合。示例:
// 廣播給所有客戶端 for (Session session : allSessions) {session.getAsyncRemote().sendText("Broadcast message"); }
注意:Java WebSocket API 本身不提供原生廣播方法,需自行維護 Session 集合。
3. 組播(Multicast)
分組發送:消息發送給訂閱了特定主題(Topic)或頻道的客戶端。
實現方式:通過應用層維護分組映射(如
Map<String, Set<Session>>
)。示例:
// 向訂閱了 "news" 頻道的客戶端發送消息 for (Session session : channelSubscribers.get("news")) {session.getAsyncRemote().sendText("News update"); }
總結
模式 | 目標范圍 | 實現關鍵 | 適用場景 |
---|---|---|---|
單播 | 單個 Session | 直接調用目標 Session | 私聊、定向通知 |
廣播 | 所有 Session | 遍歷全局 Session 集合 | 公告、全局狀態更新 |
組播 | 分組 Session | 維護分組映射(Topic → Sessions) | 頻道訂閱、房間聊天 |
WebSocket 的靈活性在于:sendXxx()
是工具,分發模式由開發者通過 Session 代碼管理邏輯實現。
在線聊天室實現
具體代碼在learnWebSocket里面
流程分析
package com.learnwebsocket.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** @version v1.0* @ClassName: WebsocketConfig*/
@Configuration
public class WebsocketConfig {/*** 創建一個ServerEndpointExporter對象,這個對象會自動注冊使用了@ServerEndpoint注解的類* @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
后端
ServerEndpointExporter
首先,由于websocket不直接歸于spring管理,屬于spring的擴展模塊,所以為了把websocket的實例也注冊到spring里面,我們需要一個spring和websocket的連接橋梁。也就是ServerEndpointExporter。這個類負責加載websocket的端點。他同時可以被spring直接管理。
package com.learnwebsocket.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** @version v1.0* @ClassName: WebsocketConfig*/
@Configuration
public class WebsocketConfig {/*** 創建一個ServerEndpointExporter對象,這個對象會自動注冊使用了@ServerEndpoint注解的類* @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
端點Endpoint
然后。我們需要自己創建一個端點,供ServerEndpointExporter發現管理。
這里面我們需要實現三個方法,這個上面有講。
這里面還有廣播和單播的實現代碼,仔細看看。
還有的就是,由于Endpoint不直接屬于spring,若要給Endpoint去配置一些東西,我們需要手動創建一個類,實現java給我們的接口,來去配置之后給spring管理
package com.learnwebsocket.ws.pojo;import com.alibaba.fastjson.JSON;import com.learnwebsocket.config.GetHttpSessionConfig;
import com.learnwebsocket.utils.MessageUtils;
import org.springframework.stereotype.Component;import javax.servlet.http.HttpSession;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;/*** @version v1.0* @ClassName: ChatEndpoint* @Description: 端點* @Author: 黑馬程序員*/
@ServerEndpoint(value = "/chat",configurator = GetHttpSessionConfig.class)
@Component
public class ChatEndpoint {// 用來保存所有的用戶private static final Map<String,Session> onlineUsers = new ConcurrentHashMap<>();//當前用戶對應的session對象private HttpSession httpSession;/*** 建立websocket連接后,被調用* @param session*/@OnOpenpublic void onOpen(Session session, EndpointConfig config) {//1,將session進行保存this.httpSession = (HttpSession) config.getUserProperties().get(HttpSession.class.getName());String user = (String) this.httpSession.getAttribute("user");onlineUsers.put(user,session);//2,廣播消息。需要將登陸的所有的用戶推送給所有的用戶String message = MessageUtils.getMessage(true,null,getFriends());broadcastAllUsers(message);}public Set getFriends() {Set<String> set = onlineUsers.keySet();return set;}// 廣播所有用戶private void broadcastAllUsers(String message) {try {//遍歷map集合Set<Map.Entry<String, Session>> entries = onlineUsers.entrySet();for (Map.Entry<String, Session> entry : entries) {//獲取到所有用戶對應的session對象Session session = entry.getValue();//發送消息session.getBasicRemote().sendText(message);}} catch (Exception e) {//記錄日志}}/*** 瀏覽器發送消息到服務端,該方法被調用** 張三 --> 李四* @param message*/@OnMessagepublic void onMessage(String message) {try {//將消息推送給指定的用戶Message msg = JSON.parseObject(message, Message.class);//獲取 消息接收方的用戶名String toName = msg.getToName();String mess = msg.getMessage();//獲取消息接收方用戶對象的session對象Session session = onlineUsers.get(toName);String user = (String) this.httpSession.getAttribute("user");String msg1 = MessageUtils.getMessage(false, user, mess);session.getBasicRemote().sendText(msg1);} catch (Exception e) {//記錄日志}}/*** 斷開 websocket 連接時被調用* @param session*/@OnClosepublic void onClose(Session session) {//1,從onlineUsers中剔除當前用戶的session對象String user = (String) this.httpSession.getAttribute("user");onlineUsers.remove(user);//2,通知其他所有的用戶,當前用戶下線了String message = MessageUtils.getMessage(true,null,getFriends());broadcastAllUsers(message);}
}
配置類
上面的httpSession來自配置類的,因為登陸后我們把用戶的名字存到了httpSession。但是websocket無法直接獲取httpSession,所以要把它存到websocket配置文件里面。再獲取。
package com.learnwebsocket.config;import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;/*** @version v1.0* @ClassName: GetHttpSessionConfig*/
public class GetHttpSessionConfig extends ServerEndpointConfig.Configurator {@Overridepublic void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request,HandshakeResponse response) {//獲取HttpSession對象HttpSession httpSession = (HttpSession) request.getHttpSession();//將httpSession對象保存起來sec.getUserProperties().put(HttpSession.class.getName(),httpSession);}
}
前端
先登陸之后,然后向后端的端點請求websocket的連接,之后綁定三個方法。
await axios.get("user/getUsername").then(res => {this.username = res.data;});//創建webSocket對象ws = new WebSocket("ws://localhost:8080/chat");//給ws綁定事件ws.onopen = this.onopen;//接收到服務端推送的消息后觸發ws.onmessage = this.onMessage;ws.onclose = this.onClose;