其背后的主要思想是利用2.8.0中可用的新功能, 即死信交換 。 此AMQP擴展允許您在隊列中指定一個交換,當消息過期或被拒絕且requeue設置為false時,應將該消息發布到該隊列上。
考慮到這一點,我們可以為要稍后發送的消息創建一個隊列,將x-message-ttl設置為我們希望在發送之前等待的持續時間。 為了確保將消息傳輸到另一個隊列,我們??只需將x-dead-letter-exchange定義為我們創建的交換(在這種情況下,我將其稱為即時交換),并將隊列綁定到該交換(“ right.now。隊列”)。
在帶有node-amqp的coffeescript中,這看起來像這樣:
amqp = require 'amqp'
conn = amqp.createConnection()key = "send.later.#{new Date().getTime()}"
conn.on 'ready', ->'conn.queue key, {arguments:{"x-dead-letter-exchange":"immediate", "x-message-ttl": 5000}}
接下來,我定義立即交換,將隊列綁定到它并訂閱。
conn.exchange 'immediate'conn.queue 'right.now.queue', {autoDelete: false, durable: true}, (q) ->q.bind('immediate', 'right.now.queue')q.subscribe (msg, headers, deliveryInfo) ->console.log msgconsole.log headers
最后,在定義了我先前創建的隊列之后,我們要在其上發布一條消息。 因此,為了重新訪問之前的隊列定義,我們添加了一個publish調用,以直接發布到隊列(使用默認交換)。
conn.on 'ready', ->conn.queue key, {arguments:{"x-dead-letter-exchange":"immediate", "x-message-ttl": 5000}}, ->conn.publish key, {v:1}, {contentType:'application/json'}
運行此命令的結果是我們將等待5秒鐘,然后將消息內容和標頭轉儲到控制臺。 由于該隊列僅在這種情況下臨時使用,因此我還將隊列的x-expires屬性設置為在消息過期后的合理時間內過期。 這樣可以確保我們不會在周圍坐滿大量未使用的隊列。
這是整個練習的結果。
amqp = require 'amqp'
events = require 'events'
em = new events.EventEmitter()
conn = amqp.createConnection()key = "send.later.#{new Date().getTime()}"
conn.on 'ready', ->conn.queue key, {arguments:{"x-dead-letter-exchange":"immediate", "x-message-ttl": 5000, "x-expires": 6000}}, ->conn.publish key, {v:1}, {contentType:'application/json'}conn.exchange 'immediate'conn.queue 'right.now.queue', {autoDelete: false, durable: true}, (q) ->q.bind('immediate', 'right.now.queue')q.subscribe (msg, headers, deliveryInfo) ->console.log msgconsole.log headers
您可以在github上完整地獲得此練習。
這非常有趣,我計劃在我的生產node.js應用程序之一中利用它進行進一步實驗,該應用程序使用基于間隔的輪詢來觸發計劃的事件。
參考: 敏捷開發人員博客的Rants and Musings中我們的JCG合作伙伴 James Carr 使用RabbitMQ進行的預定消息傳遞 。
翻譯自: https://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html