js使用webscoket時使用自定義二進制包協議時并發問題處理

this.server = new WebSocket.Server({ port: this.port });this.server.on('connection', (ws, req) => {const uniqueId = dataUtil.uuid();ws.id = uniqueId;global.serverSession.set(uniqueId, ws);logger.debug({ message: '客戶端已連接', traceId: ws.id, address: req.socket.remoteAddress });let buffer = Buffer.alloc(0);ws.on('message', async (data) => {// WebSocket 的消息可能是 Buffer 或字符串buffer = Buffer.concat([buffer, Buffer.from(data)]);await this.processData(ws, buffer );});ws.on('close', () => {logger.debug({ message: '客戶端已斷開連接', traceId: ws.id });socketCloseEvent.emit(ws.id, ws.destroyType);global.serverSession.delete(ws.id);if(ws._idleTimer){clearInterval(ws._idleTimer);}});ws.on('error', (err) => {logger.error({ message: `WebSocket 連接發生錯誤 ${err}`, traceId: ws.id });});// 心跳檢測(可選,客戶端配合)ws.isAlive = true;ws.on('pong', () => { ws.isAlive = true; });// 設置連接超時(模擬 TCP_IDLE_THRESHOLD)const timeout = parseInt(process.env.TCP_IDLE_THRESHOLD);ws._idleTimer = setInterval(() => {if (!ws.isAlive) {logger.debug({ message: 'WebSocket 心跳超時斷開連接', traceId: ws.id });ws.terminate();} else {ws.isAlive = false;ws.ping();}}, timeout);});

在上述代碼

let buffer = Buffer.alloc(0);ws.on('message', async (data) => {// WebSocket 的消息可能是 Buffer 或字符串buffer = Buffer.concat([buffer, Buffer.from(data)]);await this.processData(ws, Buffer.from(data));});

通過以上這種方法解析時,會存在buffer 拼接半包、粘包等問題。

問題根因逐步解析

第一步:WebSocket 的 on(‘message’) 是異步且無排隊機制

每次前端發送消息,Node.js 的 ws 庫會調用:

ws.on('message', callback);
  • 這個回調不會等待上一個 message 處理完成;
  • 如果前端并發發送多個消息(如調用多次 ws.send()),后端會并發觸發多個 on(‘message’) 回調;
  • 回調函數內部的 await 并不會阻塞下一個消息的到來。

第二步:共享狀態 buffer 被多個異步回調同時修改

你在每個 on(‘message’) 中都這么寫:

buffer = Buffer.concat([buffer, Buffer.from(data)]);
buffer = await this.processData(ws, buffer);
  • 多個 on(‘message’) 回調是并發執行的;
  • 但它們共用同一個 buffer;
  • 多個異步回調可能同時拼接數據,造成粘包錯亂、重復消費、消息丟失。

第三步:粘包/解包邏輯對 buffer 有修改,也導致狀態錯亂

比如你在 processData 里這么做:

while (buffer.length >= HEADER_LENGTH) {const packetLength = buffer.readUInt32LE(0);if (buffer.length < packetLength) break;// 處理一包buffer = buffer.slice(packetLength);
}

你修改了 buffer 的引用,但如果多個 on(‘message’) 同時運行:

  • A 在 slice 的時候,B 正在 concat;
  • buffer 的狀態被錯改/錯切;
  • 最終造成:
    • 包處理多次;
    • 某些包永遠解不開;
    • 無法回收剩余數據(死循環或內存增長)

解決辦法

在 WebSocket 中實現自動串行、自動排隊的消息處理機制

目標:確保每個 WebSocket 連接收到的消息,在服務端串行處理,按順序執行,且不丟、不亂、不重。

封裝成“連接級消息調度器類”——自動串行消息消費器

// MessageProcessor.jsclass MessageProcessor {/*** @param {WebSocket} ws - 當前 WebSocket 連接實例* @param {Function} handler - 數據處理函數 (ws, buffer) => Promise<newBuffer>*/constructor(ws, handler) {this.ws = ws;this.handler = handler;this.queue = [];this.buffer = Buffer.alloc(0);this.processing = false;}/*** 將收到的數據放入處理隊列* @param {Buffer} data*/enqueue(data) {this.queue.push(data);this._processQueue();}/*** 串行處理隊列中的數據* @private*/async _processQueue() {if (this.processing) return;this.processing = true;try {while (this.queue.length > 0) {const chunk = this.queue.shift();this.buffer = Buffer.concat([this.buffer, chunk]);this.buffer = await this.handler(this.ws, this.buffer);}} catch (err) {console.error(`[MessageProcessor] Error while processing message: ${err.message}`);} finally {this.processing = false;}}
}module.exports = MessageProcessor;

使用方式:

const processor = new MessageProcessor(ws, this.processData.bind(this));ws.on('message', (data) => {processor.enqueue(Buffer.from(data));
});

優點:

  • 封裝性好,每個連接一個實例;

  • 可測試、可復用;

  • 自動排隊串行,無需使用鎖;

  • 非阻塞,可多連接并發使用;

  • 和 through2 在語義上非常接近。

解析MessageProcessor 是如何一步步解決這些問題的?

Step 1:用內部隊列排隊所有消息 queue.push(data)

enqueue(data) {this.queue.push(data);this._processQueue();
}
  • 所有收到的消息都放進內部隊列;
  • 消息不會被立即處理,而是等待處理器“空閑”后再處理;
  • 避免了并發執行邏輯

Step 2:使用 processing 標志,嚴格串行執行

async _processQueue() {if (this.processing) return;this.processing = true;while (this.queue.length > 0) {const chunk = this.queue.shift();this.buffer = Buffer.concat([this.buffer, chunk]);this.buffer = await this.handler(this.ws, this.buffer); // handler = processData}this.processing = false;
}

關鍵點:

  • 第一個消息進來時,開始處理;
  • 未處理完之前,不會進入下一輪處理;
  • 后續數據只能排隊,不會同時觸發多次 processData;
  • buffer 的狀態始終由一個處理器獨占。

Step 3:粘包 / 解包邏輯只在一個線程上下文中修改 buffer

由于 processData 只在 _processQueue 內調用,它拿到的是唯一的 buffer 實例,無并發修改。
你的典型 processData 中邏輯是這樣:

while (buffer.length >= HEADER_LENGTH) {const len = buffer.readUInt32LE(0);if (buffer.length < len) break;const packet = buffer.slice(0, len);buffer = buffer.slice(len);// do something with packet
}

現在沒有并發修改了,buffer.slice、concat 都是線程安全的。

四、所以總結:問題為什么發生 + MessageProcessor 如何解決

階段原問題MessageProcessor 如何修復
并發觸發 on('message')多次同時執行邏輯使用隊列排隊每條消息
buffer 是共享變量被多個 async 改寫僅在一個邏輯處理器中使用
processData 修改 buffer 狀態狀態競爭 / 臟讀串行執行保證唯一狀態

五、你現在處于穩定狀態的根本原因

  • 你用 MessageProcessor 隔離了每個連接的處理隊列;
  • 每個連接獨立維護 buffer 狀態;
  • 消息進來后,不會打斷正在處理的內容,形成了消息處理管線(stream-like);
    這本質上就是在 net.pipe(through2(…)) 中所依賴的 backpressure、數據流順序處理機制。

注意: 只有當你用 WebSocket 進行“低層級的二進制通信 + 自定義協議格式”時,才需要 buffer 拼接 + 串行處理。

場景說明是否會出現“粘包/拆包/并發 buffer 錯亂”問題?
? WebSocket + 發送 JSON 字符串瀏覽器默認場景,如:ws.send(JSON.stringify(obj))? 不會,每條消息獨立、完整
? WebSocket + 發送 Blob / Uint8Array發送結構化數據、圖片、ArrayBuffer? 不會,每個 message 是一個完整幀
? WebSocket + 發送自定義二進制協議(Buffer)比如你自定義包頭(長度 + 指令 + 數據)? 會,需要你手動 buffer 拼接、解析邊界
? TCP + 業務協議 + stream 處理典型 TCP 應用,粘包問題嚴重? 會,這就是你原來通過 through2 解決的方式
? HTTP 請求(REST)每個請求獨立處理? 不會,天然不會粘包

為什么自定義二進制協議需要 buffer 拼接?

WebSocket 的“消息”單位 vs “物理幀”單位:

  • WebSocket 協議本身會保留幀邊界,也就是說:

    瀏覽器/Node.js 的 ws 庫可以保障你在 on(‘message’) 時拿到的是完整的一幀;

但是!!!

一幀不代表你的一條“業務邏輯消息”!

舉個例子:你的自定義協議如下:

[4字節 length][2字節版本][4字節 cmdid][...data]

但你發送多個這樣的包,比如:

// 你可能將 3 個包合成一個 buffer 發出:
ws.send(Buffer.concat([packet1, packet2, packet3]));

結果后端拿到的 data 是:

  • 不是一條消息;
  • 而是多個業務包合并的;
  • 所以你必須手動解析 buffer,按長度解開每個邏輯包;
  • 而這個 buffer 本身是共享狀態,所以你才需要串行處理。

正常 JSON 傳輸為什么不會出問題?

假設你前端這樣做:

ws.send(JSON.stringify({ cmd: 'ping', ts: Date.now() }));

后端直接:

ws.on('message', data => {const obj = JSON.parse(data.toString());handle(obj);
});

每一條 message 是瀏覽器封裝好的獨立幀,不存在“粘包/拆包”的問題;也沒有共享 buffer、也就沒有并發讀寫的風險。

什么時候需要處理拼包 + 并發?

條件是否需要處理拼包/并發
自己定義包頭 + 發送多個合并 Buffer? 是,必須處理 buffer 解析、并發狀態
每次發送只發一個完整包如果后端一定能拿到完整幀,一般不需要
你用了 ws.send(packet1); ws.send(packet2) 連續發仍然有可能在某些環境下合并為一幀
用 TCP socket + stream? 必須做拼包處理,TCP 是流,不保邊界

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/92857.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/92857.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/92857.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

元數據管理與數據治理平臺:Apache Atlas 分類傳播 Classification Propagation

文中內容僅限技術學習與代碼實踐參考&#xff0c;市場存在不確定性&#xff0c;技術分析需謹慎驗證&#xff0c;不構成任何投資建議。Apache Atlas 框架是一套可擴展的核心基礎治理服務&#xff0c;使企業能夠有效、高效地滿足 Hadoop 中的合規性要求&#xff0c;并支持與整個企…

TSF應用開發與運維部署

架構演進歷程&#xff1a;單體架構-->SOA架構-->微服務架構-->Service Mesh騰訊微服務平臺TSF (Tencent Service Framework) 是一個圍繞應用和微服務的 PaaS 平臺。提供服務全生命周期管理能力和數據化運營支持。提供多維度應用、服務、機器的監控數據&#xff0c;助力…

linux開發之mmap內存映射

mmap概念 mmp是 將文件或設備直接映射到進程的虛擬內存空間 的一種機制&#xff0c;可實現程序像訪問內存一樣訪問文件&#xff0c;而不需要傳統的 read()/write()系統調用 文件內容被映射到進程的地址空間&#xff0c;讀寫文件就像操作內存一樣&#xff0c;操作系統負責自動同…

CPP繼承

繼承 一、繼承概述 1、為什么需要繼承 如下示例&#xff0c;Person 類、Student 類、Teacher 類有大量重復的代碼&#xff0c;造成代碼冗余&#xff0c;降低開發效率。我們可以通過繼承來解決這一問題。在面向對象的編程語言中&#xff0c;繼承是一個核心概念。主要作用將重復的…

模塊 PCB 技術在未來通信領域的創新突破方向

未來通信領域對數據傳輸速率、信號穩定性及設備集成度的要求持續攀升&#xff0c;模塊 PCB 作為通信設備的關鍵組件&#xff0c;其技術創新成為推動行業發展的核心動力。獵板 PCB 憑借深厚的技術積累與持續的研發投入&#xff0c;在模塊 PCB 技術創新方面取得諸多突破&#xff…

mysql的InnoDB索引總結

MySQL InnoDB索引知識點總結 1. 索引類型 1.1 聚簇索引&#xff08;Clustered Index&#xff09; 定義與特性 定義&#xff1a;聚簇索引是InnoDB的默認存儲方式&#xff0c;數據行按照主鍵的順序物理存儲在磁盤上特性&#xff1a; 每個InnoDB表只能有一個聚簇索引數據頁中的記錄…

C++模板的補充

類模板(上一篇沒講到類模板C/C內存管理&函數模板-CSDN博客&#xff09; 類模板的定義&#xff1a; template<class T1, class T2, ..., class Tn> class 類模板名 {// 類內成員定義 }; 用一個簡單的棧例子講類模板 #define _CRT_SECURE_NO_WARNINGS #include &l…

用JOIN替代子查詢的查詢性能優化

一、子查詢的性能瓶頸分析?重復執行成本?關聯子查詢會導致外層每行數據觸發一次子查詢&#xff0c;時間復雜度為O(M*N)sql-- 典型低效案例 SELECT e.employee_id, (SELECT d.department_name FROM departments d WHERE d.department_id e.department_id) FROM employees e; …

【設計模式】訪問者模式模式

訪問者模式&#xff08;Visitor Pattern&#xff09;詳解一、訪問者模式簡介 訪問者模式&#xff08;Visitor Pattern&#xff09; 是一種 行為型設計模式&#xff08;對象行為型模式&#xff09;&#xff0c;它允許你在不修改對象結構的前提下&#xff0c;為對象結構中的元素添…

比特幣現貨和比特幣合約的區別與聯系

一、基本定義項目現貨&#xff08;Spot&#xff09;合約&#xff08;Futures / Perpetual&#xff09;本質直接買賣比特幣本身買賣比特幣價格的衍生品合約所得資產真實的 BTC合約頭寸&#xff08;沒有直接持有 BTC&#xff09;結算方式交割比特幣現金結算&#xff08;多數平臺&…

Qt/C++開發監控GB28181系統/實時監測設備在線離線/視頻預覽自動重連/重新點播取流/低延遲

一、前言說明 一個好的視頻監控系統&#xff0c;設備掉線后能夠自動重連&#xff0c;也是一個重要的功能指標&#xff0c;如果監控系統只是個rtsp流地址&#xff0c;那非常好辦&#xff0c;只需要重新打開流地址即可&#xff0c;而gb28181中就變得復雜了很多&#xff0c;需要多…

此芯p1開發板使用OpenHarmony時llama.cpp不同優化速度對比(GPU vs CPU)

硬件環境 Cix P1 SoC 瑞莎星睿 O6 開發板 rx580顯卡 產品介紹&#xff1a; https://docs.radxa.com/orion/o6/getting-started/introduction OpenHarmony 5.0.0 使用vulkan后端的llama.cpp &#xff08;GPU&#xff09; # ./llama-bench -m /data/qwen1_5-0_5b-chat-q2_k.…

Android 四大布局:使用方式與性能優化原理

一、四大布局基本用法與特點1. LinearLayout&#xff08;線性布局&#xff09;使用方式&#xff1a;<LinearLayoutandroid:orientation"vertical" <!-- 排列方向&#xff1a;vertical/horizontal -->android:layout_width"match_parent"android:…

Redis的BigKey問題

Redis的BigKey問題 什么是大Key問題&#xff1f; 大key問題其實可以說是大value問題&#xff0c;就是某個key對應的value所占據的存儲空間太大了&#xff0c;所以導致我們在操作這個key的時候花費的時間過長&#xff08;序列化\反序列化&#xff09;&#xff0c;從而降低了redi…

TDengine IDMP 產品基本概念

基本概念 元素 (Element) IDMP 通過樹狀層次結構來組織數據&#xff0c;樹狀結構里的每個節點被稱之為元素 (Element)。元素是一個物理的或邏輯的實體。它可以是具體的物理設備&#xff08;比如一臺汽車&#xff09;&#xff0c;物理設備的一個子系統&#xff08;比如一臺汽車的…

專題二_滑動窗口_將x減到0的最小操作數

一&#xff1a;題目解釋&#xff1a;每次只能移除數組的邊界&#xff0c;移除的邊界的總和為x&#xff0c;要求返回你移除邊界的最小操作數&#xff01;也就是說你最少花幾次移除邊界&#xff0c;就能夠讓這些移除的邊界的和為x&#xff0c;則返回這個次數&#xff01;所以這個…

CentOS 7 下通過 Anaconda3 運行llm大模型、deepseek大模型的完整指南

CentOS 7 下通過 Anaconda3 運行llm大模型、deepseek大模型的完整指南A1 CentOS 7 下通過 Anaconda3 運行大模型的完整指南一、環境準備二、創建專用環境三、模型部署與運行四、優化配置常見問題解決B1 CentOS 7 下通過 Anaconda3 使用 CPU 運行 DeepSeek 大模型的完整方案一、…

Flutter應用在Windows 8上正常運行

要讓Flutter應用在Windows 8上正常運行,需滿足以下前提條件,涵蓋系統環境、依賴配置、編譯設置等關鍵環節: 一、系統環境基礎要求 Windows 8版本 必須是 Windows 8.1(核心支持),不支持早期Windows 8(需升級到8.1,微軟已停止對原版Windows 8的支持)。 確認系統版本:右…

Redis實現消息隊列三種方式

參考 Redis隊列詳解&#xff08;springboot實戰&#xff09;_redis 隊列-CSDN博客 前言 MQ消息隊列有很多種&#xff0c;比如RabbitMQ,RocketMQ,Kafka等&#xff0c;但是也可以基于redis來實現&#xff0c;可以降低系統的維護成本和實現復雜度&#xff0c;本篇介紹redis中實現…

【C++動態版本號生成方案:實現類似C# 1.0.* 的自動構建號】

C動態版本號生成方案&#xff1a;實現類似C# 1.0.* 的自動構建號 在C#中&#xff0c;1.0.*版本號格式會在編譯時自動生成構建號和修訂號。本文將介紹如何在C項目中實現類似功能&#xff0c;通過MSBuild自動化生成基于編譯時間的版本號。 實現原理 版本號構成&#xff1a;主版本…