實現消息推送功能
來了,來了,大家做系統應該是最關心這個功能。
【思路】
需求:對全系統【所有的業務操作】進行消息推送,有【群發】、【私發】功能、處理【消息狀態(未讀/已讀)】,websocket持續鏈接防止因其他故障中斷【心跳機制】
【后端篇】
1、確定自己系統的需求,先做數據表
通過代碼生成,對后續推送的信息進行保存,通過is_read字段來對消息進行已讀未讀操作
添加mapper
/*** 設為已讀* @param id 消息的id* @return 結果* */public int updateWbNoticeMessageReadStatus(Long id);
添加service
/*** 設為已讀* @param id 消息的id* @return 結果* */public int updateWbNoticeMessageReadStatus(Long id);
添加serviceImpl
/*** 更新消息的閱讀狀態* @param id 消息的id* @return*/@Overridepublic int updateWbNoticeMessageReadStatus(Long id) {return wbNoticeMessageMapper.updateWbNoticeMessageReadStatus(id);}
添加mapper.xml下的方法
<update id="updateWbNoticeMessageReadStatus" parameterType="Long">update wb_notice_messageset is_read = '1'where id = #{id}
</update>
2、明確websocket鏈接
消息的推送,肯定是有推送人和被推送人,根據如何獲取這些數據來確定你的websocket鏈接
// const token // 需要鑒權
const currentUserId = this.$store.state.user.id;
const currentUserNickName = this.$store.state.user.nickName;
const wsUrl = `ws://localhost:8080/websocket/pushMessage?userId=${currentUserId}&nickName=${currentUserNickName}`; // 替換為你的 WebSocket 地址
this.socket = new WebSocket(wsUrl);
這是我的websocket鏈接,可以看出我是通過前端拼接的userId和userName來獲取到推送人信息的。
ps:實際開發過程中最好是通過token來獲取,并解析出用戶,進行后續的操作,此處是為了方便理解和通用
3、配置WebSocketConfig
package com.ruoyi.websocket.config;import com.ruoyi.websocket.handler.ChatRoomSessionIdWebsocketHandler;
import com.ruoyi.websocket.handler.ChatRoomUserIdWebsocketHandler;
import com.ruoyi.websocket.handler.ConnectWebsocketHandler;
import com.ruoyi.websocket.handler.PushMessageWebsocketHandler;
import com.ruoyi.websocket.interceptor.WebSocketInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.*;@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Autowiredprivate PushMessageWebsocketHandler pushMessageWebsocketHandler;@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {//連接websocket測試registry.addHandler(new ConnectWebsocketHandler(), "/websocket").setAllowedOrigins("*"); // 允許跨域//聊天室 -- sessionId版registry.addHandler(new ChatRoomSessionIdWebsocketHandler(), "/websocket/chatRoomSessionId").setAllowedOrigins("*"); // 允許跨域//聊天室 -- UserId版registry.addHandler(new ChatRoomUserIdWebsocketHandler(), "/websocket/chatRoomUserId").addInterceptors(new WebSocketInterceptor())//攔截器用來獲取前端傳遞過來的userid.setAllowedOrigins("*"); // 允許跨域//消息推送registry.addHandler(pushMessageWebsocketHandler, "/websocket/pushMessage").addInterceptors(new WebSocketInterceptor())//攔截器用來獲取前端傳遞過來的userid.setAllowedOrigins("*"); // 允許跨域}}
4、添加攔截器?WebSocketInterceptor 來獲取到webocket鏈接攜帶的userId和nickName
package com.ruoyi.websocket.interceptor;import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;import java.net.URI;
import java.util.Map;@Component
public class WebSocketInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request,ServerHttpResponse response,WebSocketHandler wsHandler,Map<String, Object> attributes) throws Exception {URI uri = request.getURI();String query = uri.getQuery(); // userId=xxx&nickName=yyyif (query == null) return false;Map<String, String> paramMap = parseQuery(query);String userId = paramMap.get("userId");String nickName = paramMap.get("nickName");if (userId == null || nickName == null) {return false; // 拒絕握手}// 放入 WebSocketSession attributes,后面 WebSocketHandler 可取attributes.put("userId", userId);attributes.put("nickName", nickName);return true; // 允許握手}@Overridepublic void afterHandshake(ServerHttpRequest request,ServerHttpResponse response,WebSocketHandler wsHandler,Exception exception) {// 握手完成后進行的操作}//拆分傳遞的參數private Map<String, String> parseQuery(String query) {Map<String, String> map = new java.util.HashMap<>();if (query == null || query.isEmpty()) return map;String[] pairs = query.split("&");for (String pair : pairs) {int idx = pair.indexOf('=');if (idx > 0) {String key = pair.substring(0, idx);String value = pair.substring(idx + 1);map.put(key, value);}}return map;}}
5、添加 PushMessageWebsocketHandler 來處理推送信息
package com.ruoyi.websocket.handler;import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.core.domain.entity.SysUser;
import com.ruoyi.system.mapper.SysUserMapper;
import com.ruoyi.websocket.domain.WbNoticeMessage;
import com.ruoyi.websocket.service.IWbNoticeMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;/*** 消息推送 WebSocket Handler*/
@Component
public class PushMessageWebsocketHandler extends TextWebSocketHandler {@Autowiredprivate IWbNoticeMessageService wbNoticeMessageService;@Autowiredprivate SysUserMapper userMapper;// 存儲所有連接的會話private final Set<WebSocketSession> sessions = Collections.synchronizedSet(new HashSet<>());@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {sessions.add(session);}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {//獲取前端發送的messageString payload = message.getPayload();// 解析整個 JSON 對象JSONObject jsonObject = JSONObject.parseObject(payload);// 心跳檢測String type = jsonObject.getString("type");if ("ping".equalsIgnoreCase(type)) {session.sendMessage(new TextMessage("{\"type\":\"pong\"}"));return;}//獲取websocket攜帶的參數的userId和nickName// todo 前端可以通過token攜帶參數,然后使用ruoyi封裝的token方法獲取到當前用戶,這里方便演示和通用性直接使用前端傳遞的UserId和nickNameString userId = (String) session.getAttributes().get("userId");String nickName = (String) session.getAttributes().get("nickName");// 提取 data 對象--從這里添加前端所需要推送的字段JSONObject data = jsonObject.getJSONObject("data");String title = data.getString("title");String content = data.getString("content");Long receiverId = data.getLong("receiverId");String receiverName = data.getString("receiverName");// 1. 如果receiverId為空則是群發,否則是單發,保存消息到數據庫// todo 可以自行根據前端傳遞的type來判斷是群發還是單發,這里為了方便演示直接通過receiverId是否為空來判斷if (receiverId != null) {WbNoticeMessage wbNoticeMessage = new WbNoticeMessage();wbNoticeMessage.setTitle(title);wbNoticeMessage.setContent(content);wbNoticeMessage.setSenderId(Long.parseLong(userId));wbNoticeMessage.setSenderName(nickName);wbNoticeMessage.setReceiverId(receiverId);wbNoticeMessage.setReceiverName(receiverName);wbNoticeMessageService.insertWbNoticeMessage(wbNoticeMessage);} else {SysUser user = new SysUser();List<SysUser> userList = userMapper.selectUserList(user);for (SysUser sysUser : userList) {WbNoticeMessage wbNoticeMessage = new WbNoticeMessage();wbNoticeMessage.setTitle(title);wbNoticeMessage.setContent(content);wbNoticeMessage.setSenderId(Long.parseLong(userId));wbNoticeMessage.setSenderName(nickName);wbNoticeMessage.setReceiverId(sysUser.getUserId());wbNoticeMessage.setReceiverName(receiverName);wbNoticeMessageService.insertWbNoticeMessage(wbNoticeMessage);}}// 2. 給所有在線客戶端廣播消息for (WebSocketSession s : sessions) {if (s.isOpen()) {s.sendMessage(new TextMessage(payload));}}// todo 3.重要的信息還可以通過郵件等其他方式通知用戶}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {sessions.remove(session);}@Overridepublic void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {exception.printStackTrace();sessions.remove(session);if (session.isOpen()) {session.close();}}
}
【前端篇】
1、創建消息鈴鐺樣式,封裝成組件
InfoBell.vue代碼
<template><div><el-tooltip :content="noticeContent" effect="dark" placement="bottom"><el-badge :value="noticeCount" class="right-menu-item hover-effect" :class="{ 'badge-custom': noticeCount > 0 }"><i class="el-icon-message-solid" @click="toNoticePage"></i></el-badge></el-tooltip></div></template><script>import { listWbNoticeMessage } from "@/api/websocket/WbNoticeMessage";export default {name: "InfoBell",props: {refreshNoticeCount: {type: Boolean,default: false}},data() {return {noticeContent: "", // 通知內容noticeCount: 0, // 通知數量socket: null, // WebSocket 實例// 查詢參數queryParams: {pageNum: 1,pageSize: 10,title: null,content: null,type: null,senderId: null,senderName: null,receiverId: this.$store.state.user.id,receiverName: null,isRead: null,readTime: null,priority: null,targetUrl: null,bizType: null,bizId: null},};},created() {this.getList();},mounted() {this.initWebSocket();},beforeDestroy() {this.closeWebSocket();},watch: {refreshNoticeCount(val) {if (val) {this.getList();}}},methods: {/**---------------------websocket專欄-------------------- *//** 初始化/連接 WebSocket */initWebSocket() {// const token // 需要鑒權const currentUserId = this.$store.state.user.id;const currentUserNickName = this.$store.state.user.nickName;const wsUrl = `ws://localhost:8080/websocket/pushMessage?userId=${currentUserId}&nickName=${currentUserNickName}`; // 替換為你的 WebSocket 地址this.socket = new WebSocket(wsUrl);this.socket.onopen = () => {console.log("頭部導航消息鈴鐺-WebSocket 連接已建立");this.startHeartbeat();//啟用心跳機制};this.socket.onmessage = (event) => {try {const msg = JSON.parse(event.data);if (msg.type === "pong") {console.log("收到心跳 pong");return;}} catch (e) {// 非 JSON 消息,繼續執行}this.getList();};this.socket.onerror = (error) => {console.error("頭部導航消息鈴鐺-WebSocket 發生錯誤:", error);};this.socket.onclose = () => {console.log("頭部導航消息鈴鐺-WebSocket 已關閉");this.stopHeartbeat();this.tryReconnect();};},/** 關閉 WebSocket */closeWebSocket() {if (this.socket) {this.socket.close();this.socket = null;}this.stopHeartbeat();if (this.reconnectTimer) {clearInterval(this.reconnectTimer);this.reconnectTimer = null;}},/** 啟動心跳 */startHeartbeat() {this.heartbeatTimer = setInterval(() => {if (this.socket && this.socket.readyState === WebSocket.OPEN) {this.socket.send(JSON.stringify({ type: "ping" }));console.log("發送心跳 ping");}}, 30000); // 每 30 秒},/** 停止心跳 */stopHeartbeat() {if (this.heartbeatTimer) {clearInterval(this.heartbeatTimer);this.heartbeatTimer = null;}},/** 嘗試重連 */tryReconnect() {if (this.reconnectTimer) return;this.reconnectTimer = setInterval(() => {console.log("嘗試重連 InfoBell-WebSocket...");this.initWebSocket();if (this.socket && this.socket.readyState === WebSocket.OPEN) {clearInterval(this.reconnectTimer);this.reconnectTimer = null;}}, 5000); // 每 5 秒重連一次},/** -------------------------- 業務處理專欄---------------------- *//** 查詢通知信息框列表 */getList() {this.queryParams.isRead = 0;listWbNoticeMessage(this.queryParams).then(response => {this.noticeCount = response.total;this.noticeContent = `您有${this.noticeCount}條未讀的信息`;})},/** 跳轉到通知頁面 */toNoticePage() {this.$router.push("/websocket/pushMessage");},},};</script><style lang="scss" scoped>::v-deep .el-badge__content {margin-top: 9px;margin-right: 1px;}.badge-custom {animation: blink-animation 0.5s infinite alternate;}@keyframes blink-animation {0% {opacity: 1;}100% {opacity: 0.1;}}</style>
2、在頂部導航引用消息鈴鐺組件(InfoBell)
引入組件后,頁面就完成了
3、創建推送信息查看頁面
pushMessage.vue代碼
<template><div style="padding: 50px;"><el-row :gutter="20"><el-col :span="5" ><el-card><h3>消息推送(快捷創建)</h3><el-form ref="form" :model="form" label-width="90px"><el-form-item label="通知標題" prop="title"><el-input v-model="form.title" placeholder="請輸入通知標題" /></el-form-item><el-form-item label="通知內容"><el-input v-model="form.content" placeholder="請輸入通知標題" type="textarea" /></el-form-item><el-form-item label="接收人ID" prop="receiverId"><el-input v-model="form.receiverId" placeholder="請輸入接收人ID" /></el-form-item><el-form-item label="接收人昵稱" prop="receiverName"><el-input v-model="form.receiverName" placeholder="請輸入接收人昵稱" /></el-form-item></el-form><div style="color: red;font-weight: 600;font-size: 14px;">PS:不填接受人id則視為群發</div><el-button type="primary" @click="sendMessage" style="margin-top: 10px;">推送消息</el-button><el-divider></el-divider><div style="height: 300px; overflow-y: auto; border: 1px solid #ebeef5; padding: 10px;"><div v-for="(msg, index) in messages" :key="index" style="margin-bottom: 8px;"><el-tag type="info" size="small">消息 {{ index + 1 }}</el-tag><span style="margin-left: 8px;">{{ msg }}</span></div></div></el-card></el-col><el-col :span="19"><el-card><el-tabs v-model="activeName" @tab-click="handleClick"><el-tab-pane label="未讀" name="unread"><el-table v-loading="loading" :data="WbNoticeMessageList"><el-table-column label="id" align="center" prop="id" /><el-table-column label="通知標題" align="center" prop="title" /><el-table-column label="通知內容" align="center" prop="content" /><el-table-column label="消息類型" align="center" prop="type" /><el-table-column label="發送人ID" align="center" prop="senderId" /><el-table-column label="發送人名稱" align="center" prop="senderName" /><el-table-column label="接受者ID" align="center" prop="receiverId" /><el-table-column label="接受者名稱" align="center" prop="receiverName" /><el-table-column label="是否已讀" align="center" prop="isRead" /><el-table-column label="閱讀時間" align="center" prop="readTime" width="100"><template slot-scope="scope"><span>{{ parseTime(scope.row.readTime, '{y}-{m}-{d}') }}</span></template></el-table-column><el-table-column label="優先級" align="center" prop="priority" /><el-table-column label="業務類型" align="center" prop="bizType" /><el-table-column label="業務ID" align="center" prop="bizId" /><el-table-column label="操作" align="center" class-name="small-padding fixed-width"><template slot-scope="scope"><el-buttonsize="mini"type="text"icon="el-icon-edit"@click="handleUpdateReadStatus(scope.row)">設為已讀</el-button></template></el-table-column></el-table><pagination v-show="total > 0" :total="total" :page.sync="queryParams.pageNum" :limit.sync="queryParams.pageSize" @pagination="getList" /></el-tab-pane><el-tab-pane label="已讀" name="read"><el-table v-loading="loading" :data="WbNoticeMessageList" ><el-table-column label="id" align="center" prop="id" /><el-table-column label="通知標題" align="center" prop="title" /><el-table-column label="通知內容" align="center" prop="content" /><el-table-column label="消息類型" align="center" prop="type" /><el-table-column label="發送人ID" align="center" prop="senderId" /><el-table-column label="發送人名稱" align="center" prop="senderName" /><el-table-column label="接受者ID" align="center" prop="receiverId" /><el-table-column label="接受者名稱" align="center" prop="receiverName" /><el-table-column label="是否已讀" align="center" prop="isRead" /><el-table-column label="閱讀時間" align="center" prop="readTime" width="100"><template slot-scope="scope"><span>{{ parseTime(scope.row.readTime, '{y}-{m}-{d}') }}</span></template></el-table-column><el-table-column label="優先級" align="center" prop="priority" /><el-table-column label="業務類型" align="center" prop="bizType" /><el-table-column label="業務ID" align="center" prop="bizId" /></el-table><pagination v-show="total > 0" :total="total" :page.sync="queryParams.pageNum" :limit.sync="queryParams.pageSize" @pagination="getList" /></el-tab-pane></el-tabs></el-card></el-col></el-row><div v-show="false"><info-bell :refreshNoticeCount="isRefreshNoticeCount" /></div></div>
</template><script>
import { listWbNoticeMessage,updateReadStatus} from "@/api/websocket/WbNoticeMessage"
import InfoBell from "@/components/InfoBell";export default {name:"pushMesage",components: { InfoBell },data() {return {ws: null,message: '',messages: [],loading: true,total: 0,WbNoticeMessageList: [],form:{},// 查詢參數queryParams: {pageNum: 1,pageSize: 10,title: null,content: null,type: null,senderId: null,senderName: null,receiverId: this.$store.state.user.id,receiverName: null,isRead: null,readTime: null,priority: null,targetUrl: null,bizType: null,bizId: null},activeName: 'unread',isRefreshNoticeCount:false,//是否刷新通知數量};},methods: {connectWebSocket() {// 連接 WebSocket,地址根據后端實際情況調整const currentUserId = this.$store.state.user.id;const currentUserNickName = this.$store.state.user.nickName;this.ws = new WebSocket(`ws://localhost:8080/websocket/pushMessage?userId=${currentUserId}&nickName=${currentUserNickName}`);this.ws.onopen = () => {console.log("推送信息-WebSocket 已連接");this.addMessage("推送信息-WebSocket 已連接");};this.ws.onmessage = event => {console.log("收到消息:", event.data);this.addMessage(event.data);};this.ws.onclose = () => {this.addMessage("推送信息-WebSocket 已關閉");};this.ws.onerror = error => {this.addMessage("推送信息-WebSocket 發生錯誤");};},sendMessage() {if (!this.form.content.trim()) {this.$message.warning("請輸入消息內容");return;}if (this.ws && this.ws.readyState === WebSocket.OPEN) {// 發送整個表單內容this.ws.send(JSON.stringify({data: this.form}));this.$message.success("消息發送成功");// 因為websocket發送請求是異步的,為了方便顯示這里使用了延時,實際情況還是要在后端通過返回值來顯示getListsetTimeout(() => {this.getList();}, 500);} else {this.$message.error("WebSocket 未連接");}},addMessage(msg) {this.messages.push(msg);this.$nextTick(() => {// 自動滾動到底部const container = this.$el.querySelector("div[style*='overflow-y']");if (container) container.scrollTop = container.scrollHeight;});},/** --------------------------------- 信息模塊 --------------------- */handleClick(){this.getList();},/** 查詢通知信息框列表 */getList() {this.loading = truethis.queryParams.isRead = this.activeName === 'unread' ? 0 : 1;console.log(this.queryParams);listWbNoticeMessage(this.queryParams).then(response => {this.WbNoticeMessageList = response.rowsthis.total = response.totalthis.loading = false})},handleUpdateReadStatus(row){if (row.id != null) {updateReadStatus(row.id).then(response => {this.isRefreshNoticeCount = true;console.log(this.$store);this.$modal.msgSuccess("該信息已標記為已讀~")this.getList();})}}},created() {this.getList();},mounted() {this.connectWebSocket();},beforeDestroy() {if (this.ws) {this.ws.close();}}
};
</script><style scoped>
</style>
以下是快捷創建推送信息的頁面
4、詳解【心跳機制】
一、詳解
WebSocket 的心跳機制,是一種保持連接活躍、防止斷線、檢測對方是否存活的機制。特別是在使用 WebSocket 建立了長連接之后,如果網絡設備(如代理、網關、防火墻)或者服務端/客戶端本身在長時間沒有數據傳輸時自動斷開連接,就會導致推送失敗、消息丟失的問題。
二、為什么要使用心跳機制?
1、防止連接被中間設備斷開
很多中間設備(比如 Nginx、CDN、防火墻)會在一段時間內沒有數據傳輸時,主動斷開“看起來閑置”的連接。
2、檢測對方是否在線
如果客戶端意外斷線(如:網絡斷了、電腦睡眠、瀏覽器崩潰),服務器端并不知道,繼續保留 WebSocket 會話資源,浪費內存。
3、實現自動重連
通過心跳,可以判斷連接是否斷開,如果斷了,客戶端就能自動發起重連。
三、心跳機制怎么工作?
通常的設計方式如下:
角色 | 行為說明 |
---|---|
客戶端 | 每隔一段時間(如 30 秒)發送一個特定的“心跳包”消息,如 { "type": "ping" } |
服務端 | 收到 ping 后立即回復 { "type": "pong" } ,表示“我還活著” |
客戶端 | 若在預期時間內未收到 pong ,說明可能斷線,可以發起重連 |
四、代碼實操
【瀏覽器】,每隔30秒向【后端】發送ping信號,后端接收到了返回pong信號表示通信正常,不做任何業務處理。
可以理解成這是一個地震的救援過程:
遇難者被埋在了地底下,救援人員在進行挖地救援,遇難者每隔30秒向救援人員叫喊一聲:ping!,救援人員聽到了遇難者的聲音得知遇難者還活著,隨之回復一聲:pong!。表示別怕,我正在救援。表示通信正常。
【前端發起心跳】
/** 啟動心跳 */startHeartbeat() {this.heartbeatTimer = setInterval(() => {if (this.socket && this.socket.readyState === WebSocket.OPEN) {this.socket.send(JSON.stringify({ type: "ping" }));console.log("發送心跳 ping");}}, 30000); // 每 30 秒},/** 停止心跳 */stopHeartbeat() {if (this.heartbeatTimer) {clearInterval(this.heartbeatTimer);this.heartbeatTimer = null;}},/** 嘗試重連 */tryReconnect() {if (this.reconnectTimer) return;this.reconnectTimer = setInterval(() => {console.log("正在嘗試重連 InfoBell-WebSocket...");this.initWebSocket();if (this.socket && this.socket.readyState === WebSocket.OPEN) {clearInterval(this.reconnectTimer);this.reconnectTimer = null;}}, 5000); // 每 5 秒重連一次},
【后端接收心跳】
// 心跳檢測String type = jsonObject.getString("type");if ("ping".equalsIgnoreCase(type)) {session.sendMessage(new TextMessage("{\"type\":\"pong\"}"));return;}
代碼將整理成 ruoyi-vue-websocket上傳到git~