本文作者為360奇舞團前端工程師
前言
最近在做一個
AIGC
項目,后端基于Koa2
實現。其中有一個需求就是調用兄弟業務線服務端AIGC
能力生成圖片。但由于目前兄弟業務線的AIGC
項目也是處于測試階段,能夠提供的服務器資源有限,當并發請求資源無法滿足時,會響應【服務器繁忙】,這樣對于C
端展示的我們是非常不友好的。基于當前的困境,第一想到的解決方案就是Kafka
或RabbitMQ
,但實際上對于我們目前的用戶體量來說,簡直就是大材小用。于是轉換思路,是不是可以利用js模擬隊列的方式解決問題呢,答案是:可以,Promise
的Resolve
隊列!
分析
Resolve
的理解
Promise
的核心用法就是利用 Resolve
函數做鏈式傳遞。例如:
new?Promise(resolve?=>?{resolve('ok')
}).then(res?=>?{console.log(res)
})
//?輸出結果:ok
通過上邊的例子我們可以理解,Resolve
將 Promise
對象的狀態從 pending
變為 fullfilled
,在異步操作成功時調用,并將異步操作的結果,作為參數傳遞出去。
核心點:異步
此時拋出一個問題:假如我把 resolve
回調函數都放入一個隊列里,Promise
是不是一直處于pending
狀態?pending
狀態就意味著then函數一直處于 waitting
狀態,直到隊列中的 resolve
函數執行后,then
函數才能被執行?
制造阻塞的 Promise
函數
const?queue?=?[]
new?Promise(resolve?=>?{queue.push(resolve)
}).then(res?=>?{console.log(res)
})
//?輸出結果:Promise {<pending>}queue[0]('ok')
//?輸出結果:ok
為了佐證,直接貼圖:

異步轉同步
Koa2
屬于洋蔥模型,當請求過來以后需要調用 next
函數繼續穿透,而我們的需求是限流,這意味著我們要阻塞請求,此時此刻,await
舉起了雙手,阻塞這種不要臉的事我在行呀!
const?queue?=?[]
const?fn?=?async?=?()?=>?{await?new?Promise(resolve?=>?{queue.push(resolve)})//?...一大波操作
}
//?queue[0]()
如果 queue[0]
不執行,代碼就會一直處于阻塞狀態。那我們就可以利用await寫一個中間件實現阻塞某些 api
的需求了。
//?阻塞所有請求,知道queue中的resolve函數被執行才會執行next
const?queue?=?[]
module.exports?=?function?()?{return?async?function?(ctx,?next)?{await?new?Promise(resolve?=>?{queue.push(resolve)})await?next();};
};
實現中間件
原理和思路都捋直了,那就開搞吧。話不多說,貼代碼:
const?resolveMap?=?{};/***?請求隊列*?@param?{*}?ctx*?@param?{*}?ifer?是否是圖生圖*?@param?{*}?maxReqNumber?最大請求數量*?@returns*?@description*?使用promise解決請求隊列問題*?1.?用于限制aicg的并發請求*?2.?當文生圖是,根據風格分類存儲resolve,當前請求響應完成時,觸發消費隊列中下一個請求*?3.?當圖生圖是,直接存儲resolve到image風格,當前請求響應完成時,觸發消費隊列中下一個請求* 4. 同時處理的請求數量不超過maxReqNumber個,否則加入隊列等待。*/
function?requestQueue(ctx,?maxReqNumber)?{const?params?=?ctx.request.body????ctx.request.query????ctx.request.params????{};const?style?=?params.style????'pruned_cgfull';resolveMap[style]?=?resolveMap[style]?||?{?list:?[],?processNumber:?0?};const?currentResolve?=?resolveMap[style];((currentResolve)?=>?{ctx.res.on('close',?()?=>?{saveNumberMinus(currentResolve);//?當前請求響應完成時,觸發消費隊列中下一個請求if?(currentResolve.list.length?!==?0)?{const?node?=?currentResolve.list.shift();node.resolve();currentResolve.processNumber++;}currentResolve?=?null;});})(currentResolve);//?當前請求正在處理中,將resolve存儲到隊列中if?(currentResolve.processNumber?+?1?>?maxReqNumber)?{//?利用promise阻塞請求return?new?Promise((resolve,?reject)?=>?{//?當前請求正在處理中,將resolve存儲到隊列中currentResolve.list.push({?resolve,?reject,?timeStamp:?Date.now(),?params?});});}?else?{currentResolve.processNumber++;return?Promise.resolve();}
}module.exports?=?function?(options?=?{})?{const?{?maxReqNumber?=?2,?apis?=?[]?}?=?options;return?async?function?(ctx,?next)?{const?url?=?ctx.url;if?(apis.includes(url))?{try?{await?requestQueue(ctx,?maxReqNumber);}?catch?(error)?{console.log(error);ctx.body?=?{code:?0,msg:?error,};return;}}await?next();};
};const?fiveMinutes?=?5?*?60?*?1000;
setInterval(()?=>?{Object.values(resolveMap).forEach((item)?=>?{const?{?timeStamp,?resolve?}?=?item;if?(Date.now()?-?timeStamp?>?fiveMinutes)?{resolve();?//?執行并釋放請求,防止用戶請求因異常積壓導致一直掛起saveNumberMinus(item);}});
},?5?*?60?*?1000);
這里要著重提示一點,閉包的使用。之所以使用閉包是為了保證當前請求的
close
事件觸發時能夠使用currentResolve
對象。因為當前請求是放在自身對應風格的數組中,close
時要消費下一個等待的請求,同時也不要忘了手動釋放資源。
app.js
邏輯部分
const?requsetQueue?=?require('./app/middleware/request-queue');
const?app?=?new?Koa();
app.use(requsetQueue({maxReqNumber:?1,apis:?['/api/aigc/image',?'/api/aigc/textToImage',?'/api/aigc/img2img'],})
);
app.listen(process.env.NODE_ENV?===?'development'???'9527'?:?'3000');
總結
其實基于 Promise
的 Resolve
隊列,我們還可以實現一些其他的功能,比如:前端代碼中未登錄狀態下收集某些請求,等到登錄成功后發送請求。也希望大家一起探索和討論Promise的其他解決能力的實現方案。
-?END?-
關于奇舞團
奇舞團是 360 集團最大的大前端團隊,代表集團參與 W3C 和 ECMA 會員(TC39)工作。奇舞團非常重視人才培養,有工程師、講師、翻譯官、業務接口人、團隊 Leader 等多種發展方向供員工選擇,并輔以提供相應的技術力、專業力、通用力、領導力等培訓課程。奇舞團以開放和求賢的心態歡迎各種優秀人才關注和加入奇舞團。