在Node.js 中基于請求 ID 實現簡單隊列
下面示例演示兩種策略,以同一個請求 ID 為單位:
-
即時阻止策略:如果已有相同 ID 的請求在處理,直接報錯并返回。
-
排隊等待策略:后續相同 ID 的請求不報錯,而是掛起,直到首個請求完成后一起接收相同的處理結果。
Powered by Moshow@https://zhengkai.blog.csdn.net/
一、核心思路
-
用一個
Map
存儲「正在處理」的條目,key 為請求 ID,value 包含:-
當前正在執行的 Promise
-
一個用于掛起后續請求的隊列數組
-
-
新請求到來時,檢查 Map:
-
如果不存在對應條目:將當前請求注冊為「首個處理」,并執行異步業務邏輯
-
如果已存在對應條目:
-
即時阻止策略:直接拋錯
-
排隊等待策略:返回一個掛起的 Promise,入隊等待
-
-
-
首個請求完成(resolve 或 reject)后:
-
刪除 Map 中的條目
-
將結果(成功或失敗)廣播給隊列中的所有掛起請求
-
二、代碼示例
隊列管理類
// queue-manager.js
class QueueManager {constructor() {this.map = new Map(); // key: requestId, value: { promise, queue }}/*** 注冊請求* @param {string} id 請求 ID* @param {() => Promise<any>} handler 首次請求的異步邏輯* @param {Object} options* @param {boolean} options.rejectOnDuplicate 是否即時拒絕重復請求* @returns {Promise<any>}* @author Moshow@https://zhengkai.blog.csdn.net/*/enqueue(id, handler, options = { rejectOnDuplicate: true }) {// 未在處理隊列中,直接執行 handlerif (!this.map.has(id)) {const queue = [];const entry = { queue };this.map.set(id, entry);// 執行核心業務邏輯entry.promise = handler().then(result => {// 廣播成功結果給所有掛起請求queue.forEach(({ resolve }) => resolve(result));return result;}).catch(err => {// 廣播錯誤給所有掛起請求queue.forEach(({ reject }) => reject(err));throw err;}).finally(() => {// 清理this.map.delete(id);});return entry.promise;}// 已有正在處理的相同 IDconst entry = this.map.get(id);// ****** 即時阻止策略:直接拋錯if (options.rejectOnDuplicate) {return Promise.reject(new Error(`Request ${id} 正在處理中,請稍后再試`));}// ****** 排隊等待策略:返回掛起的 Promise,結果由首個請求結束時統一廣播return new Promise((resolve, reject) => {entry.queue.push({ resolve, reject });});}
}module.exports = new QueueManager();
排隊等待策略:返回一個掛起的 Promise,入隊等待
即時阻止策略:直接拋錯
// app.js
const express = require('express');
const queueManager = require('./queue-manager');
const app = express();app.get('/process', async (req, res) => {const requestId = req.query.id;if (!requestId) {return res.status(400).send('缺少 id');}try {// Powered by Moshow@https://zhengkai.blog.csdn.net/// strategy 1: 即時拒絕 rejectOnDuplicate: true// const result = await queueManager.enqueue(requestId, () => doWork(requestId), { rejectOnDuplicate: true });// strategy 2: 排隊等待并共享結果 rejectOnDuplicate: falseconst result = await queueManager.enqueue(requestId, () => doWork(requestId), { rejectOnDuplicate: false });res.json({ status: 'ok', data: result });} catch (err) {res.status(429).json({ status: 'error', message: err.message });}
});async function doWork(id) {// 模擬耗時操作await new Promise(r => setTimeout(r, 2000));return { id, timestamp: Date.now() };
}app.listen(3000, () => console.log('Server started on port 3000'));
三、使用說明與擴展
-
即時阻止
-
優點:后續請求瞬間反饋,不占用額外內存。
-
缺點:用戶需自行決定重試時機,可能客戶端不停打重試包。
-
-
排隊等待并共享結果
-
優點:后續請求無需重試,直接拿到同樣的處理結果。
-
缺點:高并發時可能積壓大量掛起請求,需監控內存和隊列長度。(或者配置超過30分鐘則清空隊列+終端業務重新來,根據業務調整)
-
-
異步錯誤處理
-
在實際業務中,記得對
handler
內部的異常或超時做額外保護,避免永久掛起。
-
-
更細粒度的隊列
-
如果想要限制同時處理的總量(不只是同 ID),可在
QueueManager
里再加一個全局計數器或池子。
-
-
分布式場景
-
若服務水平擴展到多臺機器,需要引入分布式鎖(如 Redis 的 Redlock)或共享隊列(如 Kafka、RabbitMQ)來保證同一 ID 只被一臺機器處理。
-
這樣我們就能在 Node.js/Express 中靈活地基于請求 ID 實現:要么即時拒絕重復請求,要么統一排隊等待并共用第一個請求的處理結果。根據業務場景,選擇最合適的策略即可。