1、實時通信有哪些實現方式?
特性 | 輪詢(Polling) | WebSocket | SSE (Server-Sent Events) |
---|---|---|---|
通信方向 | 單向(客戶端 → 服務端) | 雙向(客戶端 ? 服務端) | 單向(服務端 → 客戶端) |
連接方式 | 客戶端定時發起HTTP請求 | 持久連接(基于TCP協議) | 持久連接(基于HTTP協議) |
實現復雜度 | 簡單 | 較復雜(需處理握手、協議轉換等) | 簡單 |
實時性 | 低(依賴輪詢間隔) | 高(支持實時雙向通信) | 高(服務端實時推送) |
性能開銷 | 高(頻繁建立/關閉連接) | 低(單個持久連接) | 低(單個持久連接) |
示例場景 | 定時獲取天氣、股票數據 | 在線聊天、多人游戲、實時協作 | 新聞推送、通知、實時股票行情 |
- 輪詢:開發簡單,但是效率太低了,適合對實時性要求不高的場景。在項目中,基本是不推薦使用,這種做法比較 low。
- WebSocket:開發起來較為復雜。性能好,可以雙向實時通信,適合需要交互的場景。推薦使用 Socket.IO 包來開發。
- SSE:開發簡單。性能好,但只能單向實時通信,適合服務端向客戶端主動推送數據的場景。
另外值得一提的是,現在各個 AI 平臺的消息推送
,也都是使用SSE
來實現的。
2、SSE 的基礎實現
// 設置正確的響應頭
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');// 定時發送數據const intervalId = setInterval(() => {const data = {message: `當前時間是 ${new Date().toLocaleTimeString()}`,};res.write(`data: ${JSON.stringify(data)}\n\n`);
}, 2000);// 處理客戶端斷開連接
req.on('close', () => {clearInterval(intervalId); // 清除定時器res.end(); // 結束響應
});
- 想要用
SSE來推送數據
,頂部
要按照這個格式來設置響應頭
,明確指明為event-stream
。這樣才能被識別成SSE。 中間
部分,簡單的寫了一個定時執行
。每隔兩秒鐘,計算一下當前的時間。- 然后注意了,這里用的是
res.write
,這是實現SSE的關鍵代碼
。使用res.write可以不斷的,分多次發送數據,而不需要一次性發送完整的響應
。 - 還有要注意的是,我們這里
發送數據的格式
,按照SSE的規則
,必須是以data:開頭,以兩個\n結束
。\n是換行的意思。 - 最下面,如果
連接斷開了
,就停止定時器
,并結束響應
。
3、前端部分實現
SSE默認不支持在header中傳遞數據
,那就直接在URL里傳token好了
// 定義管理員令牌,用于身份驗證,在實際使用時需將 'xxxx' 替換為真實有效的令牌
const token = 'xxxx';// 創建一個 EventSource 對象,用于建立與服務器的 SSE 連接
// 這里使用模板字符串將 token 拼接到請求的 URL 中,該 URL 指向服務器上處理訂單流數據的接口
const eventSource = new EventSource(`http://localhost:3000/admin/charts/stream_order?token=${ token }`);// 當服務器向客戶端發送消息時,會觸發 onmessage 事件
eventSource.onmessage = function (event) {// 使用 try...catch 塊來捕獲可能出現的異常,例如數據解析失敗的情況try {// 服務器發送的數據存儲在 event.data 中,通常是 JSON 格式的字符串// 使用 JSON.parse 方法將其解析為 JavaScript 對象const data = JSON.parse(event.data);// 將解析后的數據打印到控制臺,方便調試和查看console.log(data);} catch (error) {// 如果解析數據過程中出現錯誤,將錯誤信息打印到控制臺console.error('解析數據出錯:', error);}
};// 當 SSE 連接出現錯誤時,會觸發 onerror 事件
eventSource.onerror = function (error) {// 將連接錯誤信息打印到控制臺,方便調試和排查問題console.error('SSE 連接出錯:', error);
};
4、測試
檢查
,選擇網絡
。然后刷新一下
,就能看到發出的請求
了
注意
:請求的狀態是pending
,等待 2 秒鐘
后,狀態變為200
的時候我們再點進去。這是因為如果點早了
,瀏覽器還沒有將它識別成SSE
。
5、 SSE 小結
- 在
Node
部分,設置好響應頭
,并用res.write
不斷的發數據出去就好。 前端
部分,用new EventSource
建立連接。并通過onmessage
事件,來接收數據。
6、實踐
redis 封裝
6.1、后端代碼實現
6.1.1、新建 SSE 處理工具類
在根目錄
新建streams
文件夾,里面再新建一個sse-handler.js
,用于處理服務器發送事件(Server-Sent Events, SSE)的工具類,代碼如下:
const {setKey, getKey} = require('../utils/redis');// 定義一個名為 SSEHandler 的類,用于處理服務器發送事件(SSE)的連接和數據廣播
class SSEHandler {// 構造函數,在創建 SSEHandler 實例時會自動調用constructor() {// 使用 ES6 的 Set 數據結構來存儲與瀏覽器建立 SSE 連接的響應對象(res)// Set 可以確保存儲的元素唯一,避免重復this.clients = new Set();}/*** 初始化 SSE 數據流,處理新的客戶端連接* @param {Object} res - Express 響應對象,用于向客戶端發送數據* @param {Object} req - Express 請求對象,包含客戶端的請求信息*/initStream(res, req) {// 設置響應頭,指定響應內容類型為 text/event-stream,這是 SSE 的標準內容類型res.setHeader('Content-Type', 'text/event-stream');// 設置緩存控制,禁止瀏覽器緩存響應內容,確保每次請求都能獲取最新數據res.setHeader('Cache-Control', 'no-cache');// 設置連接類型為 keep-alive,保持與客戶端的長連接res.setHeader('Connection', 'keep-alive');// 刷新響應頭,將設置的響應頭信息發送給客戶端res.flushHeaders();// 將當前客戶端的響應對象添加到 clients 集合中,以便后續廣播數據時使用this.clients.add(res);// 監聽客戶端連接關閉事件,當客戶端斷開連接時觸發回調函數req.on('close', () => {// 從 clients 集合中刪除當前客戶端的響應對象this.clients.delete(res);// 打印日志,提示客戶端已斷開連接console.log('Client disconnected');});}/*** 向所有連接的客戶端廣播數據* @param {Object} data - 要廣播的數據對象,會被序列化為 JSON 字符串發送給客戶端*/async broadcastData(data) {await setKey('sse_broadcast_data', data);// 遍歷 clients 集合中的每個客戶端響應對象this.clients.forEach((client) => {// 檢查客戶端響應是否已經結束,如果未結束則繼續發送數據if (!client.finished) {// 按照 SSE 的格式,以 "data: " 開頭,后面跟上 JSON 序列化后的數據,以兩個換行符 "\n\n" 結尾// 并將其寫入客戶端響應流,發送給客戶端client.write(`data: ${JSON.stringify(data)}\n\n`);}});}
}// 將 SSEHandler 類導出,以便其他模塊可以引入和使用
module.exports = SSEHandler;
6.1.2、定義和管理統計查詢的 SQL 語句·
創建utils/stats-query.js
文件,用于定義和管理統計查詢的 SQL 語句:
const statsQueries = {order: "SELECT DATE_FORMAT(`createdAt`, '%Y-%m') AS `month`, COUNT(*) AS `value` FROM `Orders` GROUP BY `month` ORDER BY `month` ASC",user: "SELECT DATE_FORMAT(`createdAt`, '%Y-%m') AS `month`, COUNT(*) AS `value` FROM `Users` GROUP BY `month` ORDER BY `month` ASC",// 可以添加更多類型的統計查詢
};function getStatsQuery(type) {return statsQueries[type];
}module.exports = {getStatsQuery
};
6.1.3、 廣播服務的實現
創建utils/broadcast-service.js
文件,實現廣播服務:
const {sequelize} = require('../models');
const SSEHandler = require('../streams/sse-handler');// 直接在 broadcast-service.js 中定義統計查詢配置
const {getStatsQuery} = require('./stats-query');
// 存儲不同類型的 SSE 處理程序
const sseHandlers = {};async function broadcastStats(type) {try {if (!getStatsQuery(type)) {console.error(`Invalid stats type: ${type}`);return;}if (!sseHandlers[type]) {sseHandlers[type] = new SSEHandler();}const [results] = await sequelize.query(getStatsQuery(type));const data = {months: results.map(item => item.month),values: results.map(item => item.value)};sseHandlers[type].broadcastData(data);console.log(`${type} stats broadcasted successfully`);} catch (error) {console.error(`Error broadcasting ${type} stats:`, error);}
}function initSSEStream(type, res, req) {if (!getStatsQuery(type)) {console.error(`Invalid stats type: ${type}`);return;}if (!sseHandlers[type]) {sseHandlers[type] = new SSEHandler();}sseHandlers[type].initStream(res, req);
}module.exports = {broadcastStats,initSSEStream
};
6.1.4、新建 charts.js
文件實現 ECharts
路由(僅展示關鍵代碼)
在這一部分,我們將創建一個 charts.js 文件,用于實現與 ECharts 相關的路由功能。該文件主要負責處理不同類型的統計數據請求,并利用 Redis 進行數據緩存,同時支持通過 SSE(Server-Sent Events)進行實時數據推送。
const {getStatsQuery} = require('../../utils/stats-query');
const {initSSEStream} = require('../../utils/broadcast-service');
const {getKey, setKey} = require('../../utils/redis');// 獲取統計數據的通用路由
// 此路由根據請求參數中的 type 來獲取相應的統計數據
router.get('/:type', async (req, res) => {// 從請求參數中獲取統計數據的類型const type = req.params.type;// 根據類型獲取對應的統計查詢語句const query = getStatsQuery(type);// 如果未找到對應的查詢語句,返回錯誤信息if (!query) {return failure(res, new Error('Invalid stats type'));}try {// 首先嘗試從 Redis 中獲取緩存的數據let cachedData = await getKey(`stats_data_${type}`);// 如果 Redis 中存在緩存數據,直接返回該數據if (cachedData) {return success(res, 'Stats data fetched successfully', {data: cachedData});}// 若 Redis 中沒有緩存數據,執行數據庫查詢const [results] = await sequelize.query(query);// 對查詢結果進行處理,將月份和對應的值分別提取出來const data = {months: results.map(item => item.month),values: results.map(item => item.value)};// 將處理后的數據存儲到 Redis 中,以便后續請求可以直接使用緩存數據await setKey(`stats_data_${type}`, data);// 返回查詢成功的響應,并將數據發送給客戶端success(res, 'Stats data fetched successfully', {data});} catch (error) {// 若出現錯誤,返回錯誤響應failure(res, error);}
});/*** SSE 統計不同類型數據* GET /admin/charts/stream/:type* 此路由用于處理 SSE 連接,實時推送不同類型的統計數據*/
router.get('/stream/:type', async (req, res) => {// 從請求參數中獲取統計數據的類型const type = req.params.type;// 初始化 SSE 數據流,開始向客戶端推送數據initSSEStream(type, res, req);
});
6.1.5、表單新增數據
時 刪除redis緩存
以order
為例
// 定義一個處理 POST 請求的路由,路徑為根路徑('/')
// 當客戶端向該路徑發送 POST 請求時,此中間件函數會被調用
// req 表示請求對象,包含客戶端發送的請求信息
// res 表示響應對象,用于向客戶端發送響應
// next 是 Express 中的中間件函數,用于將控制權傳遞給下一個中間件
router.post('/', async function (req, res, next) {try {// 生成一個唯一的訂單號// 使用 uuidv4 函數生成一個通用唯一識別碼(UUID)// 然后使用 replace 方法將 UUID 中的連字符(-)替換為空字符串,得到一個無連字符的訂單號const outTradeNo = uuidv4().replace(/-/g, '');// 調用 getMembership 函數,根據請求對象 req 獲取會員信息// 這個函數可能會從數據庫、緩存或其他數據源中獲取與當前請求相關的會員信息const membership = await getMembership(req);// 使用 Sequelize 的 create 方法創建一個新的訂單記錄// 傳入一個包含訂單信息的對象,這些信息將被插入到數據庫的 Order 表中const order = await Order.create({// 訂單號,使用前面生成的唯一訂單號outTradeNo: outTradeNo,// 用戶 ID,從請求對象中獲取當前用戶的 IDuserId: req.userId,// 訂單主題,使用會員的名稱subject: membership.name,// 會員時長(月),從會員信息中獲取會員的持續月數membershipMonths: membership.durationMonths,// 訂單總金額,使用會員的價格totalAmount: membership.price,// 訂單狀態,初始狀態設為 0,通常表示待支付status: 0,});// 刪除 Redis 中存儲的訂單統計數據緩存// 當有新訂單創建時,之前的統計數據可能不再準確,所以需要刪除緩存// 后續請求統計數據時會重新從數據庫獲取最新數據await delKey('stats_data_order');// 調用廣播服務,將訂單統計數據的更新廣播出去// 這可能會觸發前端頁面或其他相關服務更新訂單統計信息await broadcastStats('order');// 調用 success 函數,向客戶端發送成功響應// 第一個參數是響應對象 res,用于發送響應// 第二個參數是成功消息,告知客戶端訂單創建成功// 第三個參數是包含訂單信息的對象,客戶端可以使用這些信息進行后續處理success(res, '訂單創建成功。', { order });} catch (error) {// 如果在訂單創建過程中出現錯誤,調用 failure 函數向客戶端發送錯誤響應// 第一個參數是響應對象 res,用于發送響應// 第二個參數是捕獲到的錯誤對象,客戶端可以根據錯誤信息進行相應處理failure(res, error);}
});
6.2、前端代碼實現
6.2.1、前端charts
封裝
在前端開發中,為了高效管理 ECharts 圖表,我們封裝了 ChartManager 類,它能處理圖表的初始化、數據獲取、SSE 連接以及錯誤處理等操作。以下是詳細的代碼及說明:
// 配置對象,可根據實際情況靈活修改,用于存儲與圖表管理相關的配置信息
const config = {// API 請求的基礎 URL,用于獲取圖表數據和建立 SSE 連接API_BASE_URL: 'http://localhost:3000',// 最大重試次數,當 SSE 連接失敗時會進行重試,達到該次數后停止重試MAX_RETRIES: 5,// 重試延遲時間(毫秒),每次重試之間的間隔時長RETRY_DELAY: 3000
};/*** ChartManager 類,負責管理 ECharts 圖表的初始化、數據獲取、SSE 連接以及錯誤處理等功能*/
class ChartManager {/*** 構造函數,用于初始化圖表管理器的基本屬性* @param {string} chartId - 圖表容器的 ID,用于查找對應的 DOM 元素* @param {string} type - 圖表類型,例如 'order' 或 'user',決定獲取數據的接口路徑* @param {string} token - 用于身份驗證的令牌,在請求數據時使用* @param {Object} [option={}] - 可選的 ECharts 配置選項,用于自定義圖表樣式*/constructor(chartId, type, token, option = {}) {this.chartId = chartId;this.type = type;this.token = token;this.chart = null;this.initialDataFetched = false;this.sseSource = null;this.retryCount = 0;this.maxRetries = config.MAX_RETRIES;this.retryDelay = config.RETRY_DELAY;this.option = {title: {text: `月度${type === 'order' ? '訂單' : '用戶'}統計`,textStyle: { color: '#333' }},tooltip: {trigger: 'axis',axisPointer: { type: 'shadow' }},grid: {left: '3%',right: '4%',bottom: '3%',containLabel: true},xAxis: {type: 'category',data: [],axisTick: { alignWithLabel: true }},yAxis: {type: 'value'},series: [{name: '數量',type: 'bar',barWidth: '60%',data: []}],...option};}/*** 初始化圖表,包含創建 ECharts 實例、獲取初始數據以及建立 SSE 連接等操作*/async init() {try {const chartDom = document.getElementById(this.chartId);if (!chartDom) {throw new Error(`未找到指定 ID 的圖表容器:${this.chartId}`);}this.chart = echarts.init(chartDom);this.chart.setOption(this.option);await this.fetchInitialData();this.connectSSE();} catch (error) {console.error(`圖表 ${this.chartId} 初始化失敗:`, error);this.showError('圖表初始化失敗,請嘗試刷新頁面重試', error.message);}}/*** 異步獲取初始數據*/async fetchInitialData() {try {const url = `${config.API_BASE_URL}/admin/charts/${this.type}?token=${this.token}`;const response = await this.fetchData(url);const { data } = response;if (data.data.months.length === 0) {this.showError('當前暫無數據,10 秒后將嘗試重新獲取');setTimeout(() => this.fetchInitialData(), 10000);return;}if (data.data.months.length!== data.data.values.length) {this.showError('數據格式存在錯誤:月份和數量數組長度不匹配');return;}this.option.xAxis.data = data.data.months;this.option.series[0].data = data.data.values;this.initialDataFetched = true;this.chart.setOption(this.option);} catch (error) {this.showError('數據加載失敗,5 秒后將嘗試重新加載', error.message);setTimeout(() => this.fetchInitialData(), 5000);}}/*** 建立 SSE 連接,實時監聽服務器發送的數據*/connectSSE() {if (this.sseSource &&!this.sseSource.closed) {this.sseSource.close();}const url = `${config.API_BASE_URL}/admin/charts/stream/${this.type}?token=${this.token}`;this.sseSource = new EventSource(url);this.sseSource.onmessage = (event) => {try {const responseData = JSON.parse(event.data);const data = responseData.data || responseData;if (!data ||!Array.isArray(data.months) ||!Array.isArray(data.values)) {this.showError('SSE 返回的數據格式有誤,請檢查后端接口');return;}if (data.months.length === 0) {this.showError('暫無數據,請創建訂單后再進行查看');return;}if (data.months.length!== data.values.length) {this.showError('SSE 數據格式錯誤:月份和數量數組長度不匹配');return;}this.option.xAxis.data = data.months;this.option.series[0].data = data.values;this.chart.setOption(this.option);} catch (parseError) {console.error('解析 SSE 數據時出現錯誤:', parseError);this.showError('實時數據解析失敗,請檢查網絡連接或嘗試刷新頁面', parseError.message);}};this.sseSource.onerror = (error) => {console.error('SSE 連接出現錯誤:', error);this.showError('實時數據連接中斷,正在嘗試重新連接...', error.message);this.handleSseError();};}/*** 處理 SSE 連接錯誤,執行重試操作*/handleSseError() {this.retryCount++;if (this.retryCount <= this.maxRetries) {setTimeout(() => this.connectSSE(), this.retryDelay);} else {this.showError('重連失敗,請手動刷新頁面');this.retryCount = 0;}}/*** 發起網絡請求獲取數據* @param {string} url - 請求的 URL* @returns {Promise<Object>} - 返回解析后的 JSON 數據*/async fetchData(url) {try {const response = await fetch(url);if (!response.ok) {throw new Error(`HTTP 請求失敗,狀態碼:${response.status}`);}const contentType = response.headers.get('content-type');if (!contentType ||!contentType.includes('application/json')) {throw new Error('響應數據并非 JSON 格式');}return await response.json();} catch (error) {this.showError('網絡請求出現問題', error.message);throw error;}}/*** 顯示錯誤信息并更新圖表標題* @param {string} message - 錯誤消息內容* @param {string} [errorCode=null] - 可選的錯誤代碼*/showError(message, errorCode = null) {const errorDiv = document.createElement('div');errorDiv.className = 'error-message';errorDiv.innerHTML = `${message}${errorCode? `<small>錯誤碼: ${errorCode}</small>` : ''}<button onclick="this.parentElement.remove()">關閉</button>`;document.body.prepend(errorDiv);this.option.series[0].data = [];this.chart.setOption(this.option);}/*** 銷毀圖表和 SSE 連接,釋放相關資源*/destroy() {if (this.chart) {this.chart.dispose();this.chart = null;}if (this.sseSource &&!this.sseSource.closed) {this.sseSource.close();}}
}// 將 ChartManager 類暴露到全局作用域,方便在其他地方使用
window.ChartManager = ChartManager;
6.2.1、前端demo
實現
面是一個前端示例,展示了如何使用 ChartManager 類創建實時統計圖表:
<!DOCTYPE html>
<html lang="zh-CN"><head><meta charset="UTF-8"><!-- 讓頁面在移動設備上能正確顯示 --><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>實時統計圖表</title><!-- 引入 ECharts 庫,用于創建各種類型的圖表 --><script src="https://cdn.jsdelivr.net/npm/echarts@5.4.3/dist/echarts.min.js"></script><!-- 引入自定義的圖表管理腳本,包含 ChartManager 類等邏輯 --><script src="charts.js"></script><style>body {/* 設置頁面整體字體為 Arial 無襯線字體 */font-family: Arial, sans-serif;/* 使用 Flexbox 布局,讓內容水平和垂直居中 */display: flex;justify-content: center;align-items: center;/* 讓內容垂直排列 */flex-direction: column;/* 頁面四周添加 20px 的外邊距 */margin: 20px;}.chart-container {/* 設置圖表容器的寬度為 800px */width: 800px;/* 設置圖表容器的高度為 500px */height: 500px;/* 圖表容器四周添加 20px 的外邊距 */margin: 20px;/* 圖表容器添加 1px 寬的淺灰色邊框 */border: 1px solid #eee;/* 圖表容器四個角設置 4px 的圓角 */border-radius: 4px;}.error-message {/* 錯誤消息文本顏色設置為紅色 */color: red;/* 錯誤消息四周添加 20px 的外邊距 */margin: 20px;/* 錯誤消息內容四周添加 10px 的內邊距 */padding: 10px;/* 錯誤消息背景顏色設置為淺紅色 */background-color: #ffebee;/* 錯誤消息框四個角設置 4px 的圓角 */border-radius: 4px;}</style>
</head><body>
<!-- 訂單統計圖表的容器,后續 ECharts 圖表會渲染到這個容器中 -->
<div class="chart-container" id="orderChart"></div>
<!-- 用戶統計圖表的容器,后續 ECharts 圖表會渲染到這個容器中 -->
<div class="chart-container" id="userChart"></div>
<script>// 當文檔的 DOM 內容加載完成后執行以下邏輯document.addEventListener('DOMContentLoaded', async () => {try {// 這里的 token 用于身份驗證,實際使用時需要替換為從后端獲取的真實 tokenconst token = '';// 自定義的 ECharts 配置選項,可用于覆蓋默認配置const customOption = {// 修改圖表標題的文本顏色為藍色title: {textStyle: { color: 'blue' }}};// 創建訂單統計圖表的管理實例,傳入容器 ID、圖表類型、token 和自定義配置const orderChart = new ChartManager('orderChart', 'order', token, customOption);// 創建用戶統計圖表的管理實例,傳入容器 ID、圖表類型、token 和自定義配置const userChart = new ChartManager('userChart', 'user', token, customOption);// 使用 Promise.all 并行初始化訂單圖表和用戶圖表await Promise.all([orderChart.init(), userChart.init()]);// 監聽窗口的 beforeunload 事件,當用戶關閉或刷新頁面時執行以下邏輯window.addEventListener('beforeunload', () => {// 銷毀訂單圖表實例,釋放相關資源orderChart.destroy();// 銷毀用戶圖表實例,釋放相關資源userChart.destroy();});} catch (error) {// 如果在初始化過程中出現錯誤,在控制臺輸出錯誤信息console.error('圖表初始化失敗:', error);// 調用 showGlobalError 函數顯示全局錯誤消息showGlobalError('圖表初始化失敗,請檢查瀏覽器控制臺');}});/*** 顯示全局錯誤消息的函數* @param {string} message - 要顯示的錯誤消息內容*/function showGlobalError(message) {// 創建一個新的 div 元素用于顯示錯誤消息const errorDiv = document.createElement('div');// 為錯誤消息 div 元素添加 error-message 類名,以便應用相應的樣式errorDiv.className = 'error-message';// 設置錯誤消息 div 元素的文本內容為傳入的消息errorDiv.textContent = message;// 將錯誤消息 div 元素添加到頁面 body 元素的最前面document.body.prepend(errorDiv);}
</script>
</body></html>