The main idea behind this is utilizing a new feature available in 2.8.0, dead-letter exchanges. This AMQP extension allows you to specify an exchange on a queue that messages should be published to when a message either expires or is rejected with requeue set to false.
With this in mind, we can simply create a queue for messages we want to be delivered later with an x-message-ttl set to the duration we want to wait before it is delivered. And to ensure the message is transferred to another queue we simply define the x-dead-letter-exchange to an exchange we created (in this case I’ll call it immediate) and bind a queue to it (the “right.now.queue”).
In coffeescript with node-amqp this looks like this:
amqp = require 'amqp'
conn = amqp.createConnection()
key = "send.later.#{new Date().getTime()}"
conn.on 'ready', ->'
conn.queue key, {
arguments:{
"x-dead-letter-exchange":"immediate"
, "x-message-ttl": 5000
}
}
Next I define the immediate exchange, bind a queue to it and subscribe.
conn.exchange 'immediate'
conn.queue 'right.now.queue', {autoDelete: false, durable: true}, (q) ->
q.bind('immediate', 'right.now.queue')
q.subscribe (msg, headers, deliveryInfo) ->
console.log msg
console.log headers
Finally, after defining the queue I created earlier we want publish a message on it. So to revisit the earlier queue definition we add a publish call to publish directly to the queue (using the default exchange).
conn.on 'ready', ->
conn.queue key, {
arguments:{
"x-dead-letter-exchange":"immediate"
, "x-message-ttl": 5000
}
}, ->
conn.publish key, {v:1}, {contentType:'application/json'}
The result of running this is we’ll see a 5 second wait and then the message content and headers get dumped to the console. Since the queue is only used temporarily in this scenario I also set the x-expires attribute of the queue to expire in a reasonable amount of time after the message expires. This makes sure we don’t wind up with a ton of unused queues just sitting around.
Here’s the result of this exercise in its entirety.
amqp = require 'amqp'
events = require 'events'
em = new events.EventEmitter()
conn = amqp.createConnection()
key = "send.later.#{new Date().getTime()}"
conn.on 'ready', ->
conn.queue key, {
arguments:{
"x-dead-letter-exchange":"immediate"
, "x-message-ttl": 5000
, "x-expires": 6000
}
}, ->
conn.publish key, {v:1}, {contentType:'application/json'}
conn.exchange 'immediate'
conn.queue 'right.now.queue', {
autoDelete: false
, durable: true
}, (q) ->
q.bind('immediate', 'right.now.queue')
q.subscribe (msg, headers, deliveryInfo) ->
console.log msg
console.log headers
You can get this exercise in full on github.
This is pretty interesting and I plan to experiment further with utilizing this in one of my production node.js applications that use interval based polling to trigger scheduled events.
Reference: Scheduled Message Delivery with RabbitMQ from our JCG partner James Carr at the Rants and Musings of an Agile Developer blog.



