集成RabbitMQ
本節我們介紹在 express.js 中集成 rabbitmq.
RabbitMQ 是一個消息隊列中間件,常用于請求削峰,事務的隊列處理,事件訂閱機制的實現等。
準備工作
- 創建一個 express.js 項目(本文基于evp-express-cli)
- 在開發環境下安裝rabbitmq
- 安裝amqplib.js:
npm i amqplib
創建代理
正常的項目都是分層的,為了避免循環依賴,本文采用代理類構造單例的方式來創建ampqlib連接。
redisProxy.js:
在構造器內創建redis連接,并監聽個別事件,最后把連接賦給client成員變量。再定義一個靜態的獲取實例方法,調用時實例若為空,就構建實例:
const amqplib = require('amqplib');
const logger = require('./logger');class RabbitmqProxy {/**@type {RabbitmqProxy}*/_instance = null;/**@type {amqplib.Connection}*/conn;/**@type {amqplib.Channel}*/channel;static async instance() {if (!this._instance) {let ins = new RabbitmqProxy();const conn = await amqplib.connect({username: `guest`,password: `guest`,hostname: `127.0.0.1`,port: `5672`,});logger.info("Connected to RabbitMQ!");ins.conn = conn;const channel = await ins.conn.createChannel();//確認隊列channel.assertQueue("hellos");//訂閱隊列channel.consume("hellos", async (message) => {console.log("hello, two!");channel.ack(message); //報告處理完畢});ins.channel = channel;this._instance = ins;}return this._instance;}
}
amqplib創建rabbitmq連接是異步的,所以獲取實例的靜態方法也是異步的,如果你想轉為同步函數,只能通過進程阻塞的方式實現。上面給我們的rabbitmq客戶端訂閱了一個hellos隊列。
然后把rabbitmq導出來:
async function init() {return RedisProxy.instance();
}module.exports = {init,instance: RedisProxy.instance(),
};
然后在任意其它地方調用 await rabbitmqProxy.instance 即可獲取單例,在從單例中獲取conn和channel即可操作rabbitmq.
const rabbitmqProxy = require('../utils/rabbitmqProxy');app.post('/', async(req,res,next)=>{const rbmqproxy = await rabbitmqProxy.instance;const channel = rbmqproxy.channel;//發送消息到"hellos"隊列channel.sendToQueue("hellos", "hello!");res.send();
})
你可以自己手動配置一遍,也可以使用evp-express-cli作為手腳架創建項目并選擇rabbitmq模板。
關于amqplib.js的詳細用法請見官方文檔: http://npmjs.com/package/amqplib