this.server = new WebSocket.Server({ port: this.port });this.server.on('connection', (ws, req) => {const uniqueId = dataUtil.uuid();ws.id = uniqueId;global.serverSession.set(uniqueId, ws);logger.debug({ message: '客戶端已連接', traceId: ws.id, address: req.socket.remoteAddress });let buffer = Buffer.alloc(0);ws.on('message', async (data) => {// WebSocket 的消息可能是 Buffer 或字符串buffer = Buffer.concat([buffer, Buffer.from(data)]);await this.processData(ws, buffer );});ws.on('close', () => {logger.debug({ message: '客戶端已斷開連接', traceId: ws.id });socketCloseEvent.emit(ws.id, ws.destroyType);global.serverSession.delete(ws.id);if(ws._idleTimer){clearInterval(ws._idleTimer);}});ws.on('error', (err) => {logger.error({ message: `WebSocket 連接發生錯誤 ${err}`, traceId: ws.id });});// 心跳檢測(可選,客戶端配合)ws.isAlive = true;ws.on('pong', () => { ws.isAlive = true; });// 設置連接超時(模擬 TCP_IDLE_THRESHOLD)const timeout = parseInt(process.env.TCP_IDLE_THRESHOLD);ws._idleTimer = setInterval(() => {if (!ws.isAlive) {logger.debug({ message: 'WebSocket 心跳超時斷開連接', traceId: ws.id });ws.terminate();} else {ws.isAlive = false;ws.ping();}}, timeout);});
在上述代碼
let buffer = Buffer.alloc(0);ws.on('message', async (data) => {// WebSocket 的消息可能是 Buffer 或字符串buffer = Buffer.concat([buffer, Buffer.from(data)]);await this.processData(ws, Buffer.from(data));});
通過以上這種方法解析時,會存在buffer 拼接半包、粘包等問題。
問題根因逐步解析
第一步:WebSocket 的 on(‘message’) 是異步且無排隊機制
每次前端發送消息,Node.js 的 ws 庫會調用:
ws.on('message', callback);
- 這個回調不會等待上一個 message 處理完成;
- 如果前端并發發送多個消息(如調用多次 ws.send()),后端會并發觸發多個 on(‘message’) 回調;
- 回調函數內部的 await 并不會阻塞下一個消息的到來。
第二步:共享狀態 buffer 被多個異步回調同時修改
你在每個 on(‘message’) 中都這么寫:
buffer = Buffer.concat([buffer, Buffer.from(data)]);
buffer = await this.processData(ws, buffer);
- 多個 on(‘message’) 回調是并發執行的;
- 但它們共用同一個 buffer;
- 多個異步回調可能同時拼接數據,造成粘包錯亂、重復消費、消息丟失。
第三步:粘包/解包邏輯對 buffer 有修改,也導致狀態錯亂
比如你在 processData 里這么做:
while (buffer.length >= HEADER_LENGTH) {const packetLength = buffer.readUInt32LE(0);if (buffer.length < packetLength) break;// 處理一包buffer = buffer.slice(packetLength);
}
你修改了 buffer 的引用,但如果多個 on(‘message’) 同時運行:
- A 在 slice 的時候,B 正在 concat;
- buffer 的狀態被錯改/錯切;
- 最終造成:
- 包處理多次;
- 某些包永遠解不開;
- 無法回收剩余數據(死循環或內存增長)
解決辦法
在 WebSocket 中實現自動串行、自動排隊的消息處理機制
目標:確保每個 WebSocket 連接收到的消息,在服務端串行處理,按順序執行,且不丟、不亂、不重。
封裝成“連接級消息調度器類”——自動串行消息消費器
// MessageProcessor.jsclass MessageProcessor {/*** @param {WebSocket} ws - 當前 WebSocket 連接實例* @param {Function} handler - 數據處理函數 (ws, buffer) => Promise<newBuffer>*/constructor(ws, handler) {this.ws = ws;this.handler = handler;this.queue = [];this.buffer = Buffer.alloc(0);this.processing = false;}/*** 將收到的數據放入處理隊列* @param {Buffer} data*/enqueue(data) {this.queue.push(data);this._processQueue();}/*** 串行處理隊列中的數據* @private*/async _processQueue() {if (this.processing) return;this.processing = true;try {while (this.queue.length > 0) {const chunk = this.queue.shift();this.buffer = Buffer.concat([this.buffer, chunk]);this.buffer = await this.handler(this.ws, this.buffer);}} catch (err) {console.error(`[MessageProcessor] Error while processing message: ${err.message}`);} finally {this.processing = false;}}
}module.exports = MessageProcessor;
使用方式:
const processor = new MessageProcessor(ws, this.processData.bind(this));ws.on('message', (data) => {processor.enqueue(Buffer.from(data));
});
優點:
-
封裝性好,每個連接一個實例;
-
可測試、可復用;
-
自動排隊串行,無需使用鎖;
-
非阻塞,可多連接并發使用;
-
和 through2 在語義上非常接近。
解析MessageProcessor 是如何一步步解決這些問題的?
Step 1:用內部隊列排隊所有消息 queue.push(data)
enqueue(data) {this.queue.push(data);this._processQueue();
}
- 所有收到的消息都放進內部隊列;
- 消息不會被立即處理,而是等待處理器“空閑”后再處理;
- 避免了并發執行邏輯
Step 2:使用 processing 標志,嚴格串行執行
async _processQueue() {if (this.processing) return;this.processing = true;while (this.queue.length > 0) {const chunk = this.queue.shift();this.buffer = Buffer.concat([this.buffer, chunk]);this.buffer = await this.handler(this.ws, this.buffer); // handler = processData}this.processing = false;
}
關鍵點:
- 第一個消息進來時,開始處理;
- 未處理完之前,不會進入下一輪處理;
- 后續數據只能排隊,不會同時觸發多次 processData;
- buffer 的狀態始終由一個處理器獨占。
Step 3:粘包 / 解包邏輯只在一個線程上下文中修改 buffer
由于 processData 只在 _processQueue 內調用,它拿到的是唯一的 buffer 實例,無并發修改。
你的典型 processData 中邏輯是這樣:
while (buffer.length >= HEADER_LENGTH) {const len = buffer.readUInt32LE(0);if (buffer.length < len) break;const packet = buffer.slice(0, len);buffer = buffer.slice(len);// do something with packet
}
現在沒有并發修改了,buffer.slice、concat 都是線程安全的。
四、所以總結:問題為什么發生 + MessageProcessor 如何解決
階段 | 原問題 | MessageProcessor 如何修復 |
---|---|---|
并發觸發 on('message') | 多次同時執行邏輯 | 使用隊列排隊每條消息 |
buffer 是共享變量 | 被多個 async 改寫 | 僅在一個邏輯處理器中使用 |
processData 修改 buffer 狀態 | 狀態競爭 / 臟讀 | 串行執行保證唯一狀態 |
五、你現在處于穩定狀態的根本原因
- 你用 MessageProcessor 隔離了每個連接的處理隊列;
- 每個連接獨立維護 buffer 狀態;
- 消息進來后,不會打斷正在處理的內容,形成了消息處理管線(stream-like);
這本質上就是在 net.pipe(through2(…)) 中所依賴的 backpressure、數據流順序處理機制。
注意: 只有當你用 WebSocket 進行“低層級的二進制通信 + 自定義協議格式”時,才需要 buffer 拼接 + 串行處理。
場景 | 說明 | 是否會出現“粘包/拆包/并發 buffer 錯亂”問題? |
---|---|---|
? WebSocket + 發送 JSON 字符串 | 瀏覽器默認場景,如:ws.send(JSON.stringify(obj)) | ? 不會,每條消息獨立、完整 |
? WebSocket + 發送 Blob / Uint8Array | 發送結構化數據、圖片、ArrayBuffer | ? 不會,每個 message 是一個完整幀 |
? WebSocket + 發送自定義二進制協議(Buffer) | 比如你自定義包頭(長度 + 指令 + 數據) | ? 會,需要你手動 buffer 拼接、解析邊界 |
? TCP + 業務協議 + stream 處理 | 典型 TCP 應用,粘包問題嚴重 | ? 會,這就是你原來通過 through2 解決的方式 |
? HTTP 請求(REST) | 每個請求獨立處理 | ? 不會,天然不會粘包 |
為什么自定義二進制協議需要 buffer 拼接?
WebSocket 的“消息”單位 vs “物理幀”單位:
-
WebSocket 協議本身會保留幀邊界,也就是說:
瀏覽器/Node.js 的 ws 庫可以保障你在 on(‘message’) 時拿到的是完整的一幀;
但是!!!
一幀不代表你的一條“業務邏輯消息”!
舉個例子:你的自定義協議如下:
[4字節 length][2字節版本][4字節 cmdid][...data]
但你發送多個這樣的包,比如:
// 你可能將 3 個包合成一個 buffer 發出:
ws.send(Buffer.concat([packet1, packet2, packet3]));
結果后端拿到的 data 是:
- 不是一條消息;
- 而是多個業務包合并的;
- 所以你必須手動解析 buffer,按長度解開每個邏輯包;
- 而這個 buffer 本身是共享狀態,所以你才需要串行處理。
正常 JSON 傳輸為什么不會出問題?
假設你前端這樣做:
ws.send(JSON.stringify({ cmd: 'ping', ts: Date.now() }));
后端直接:
ws.on('message', data => {const obj = JSON.parse(data.toString());handle(obj);
});
每一條 message 是瀏覽器封裝好的獨立幀,不存在“粘包/拆包”的問題;也沒有共享 buffer、也就沒有并發讀寫的風險。
什么時候需要處理拼包 + 并發?
條件 | 是否需要處理拼包/并發 |
---|---|
自己定義包頭 + 發送多個合并 Buffer | ? 是,必須處理 buffer 解析、并發狀態 |
每次發送只發一個完整包 | 如果后端一定能拿到完整幀,一般不需要 |
你用了 ws.send(packet1); ws.send(packet2) 連續發 | 仍然有可能在某些環境下合并為一幀 |
用 TCP socket + stream | ? 必須做拼包處理,TCP 是流,不保邊界 |