Home » Java » Enterprise Java » RabbitMQ retries using Spring Integration

About Biju Kunjummen

Biju Kunjummen

RabbitMQ retries using Spring Integration

I recently read about an approach to retry with RabbitMQ
here and wanted to try a similar approach with
Spring Integration, which provides an awesome set of integration abstractions.

TL;DR the problem being solved is to retry a message(in case of failures in processing) a few times with a large delay between retries(say 10 mins +). The approach makes use of the RabbitMQ support for
Dead Letter Exchanges and looks something like this

canvas-2

The gist of the flow is :

1. A Work dispatcher creates “Work Unit”(s) and sends it to a RabbitMQ queue via an exchange.

2. The Work queue is set with a
Dead Letter exchange. If the message processing fails for any reason the “Work Unit” ends up with the Work Unit Dead Letter Queue.

3. Work Unit Dead Letter queue is in-turn set with the Work Unit exchange as the Dead Letter Exchange, this way creating a cycle. Further, the expiration of messages in the dead letter queue is set to say 10 mins, this way once the message expires it will be back again in the Work unit queue.

4. To break the cycle the processing code has to stop processing once a certain count threshold is exceeded.

Implementation using Spring Integration

I have covered a straight happy path flow using Spring Integration and RabbitMQ
before, here I will mostly be building on top of this code.

A good part of the set-up is the configuration of the appropriate dead letter exchanges/queues, and looks like this when expressed using Spring’s Java Configuration:

@Configuration
public class RabbitConfig {

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Bean
    Exchange worksExchange() {
        return ExchangeBuilder.topicExchange("work.exchange")
                .durable()
                .build();
    }


    @Bean
    public Queue worksQueue() {
        return QueueBuilder.durable("work.queue")
                .withArgument("x-dead-letter-exchange", worksDlExchange().getName())
                .build();
    }

    @Bean
    Binding worksBinding() {
        return BindingBuilder
                .bind(worksQueue())
                .to(worksExchange()).with("#").noargs();
    }
    
    // Dead letter exchange for holding rejected work units..
    @Bean
    Exchange worksDlExchange() {
        return ExchangeBuilder
                .topicExchange("work.exchange.dl")
                .durable()
                .build();
    }

    //Queue to hold Deadletter messages from worksQueue
    @Bean
    public Queue worksDLQueue() {
        return QueueBuilder
                .durable("works.queue.dl")
                .withArgument("x-message-ttl", 20000)
                .withArgument("x-dead-letter-exchange", worksExchange().getName())
                .build();
    }

    @Bean
    Binding worksDlBinding() {
        return BindingBuilder
                .bind(worksDLQueue())
                .to(worksDlExchange()).with("#")
                .noargs();
    }
    ...
}

Note that here I have set the TTL of the Dead Letter queue to 20 seconds, this means that after 20 seconds a failed message will be back in the processing queue. Once this set-up is in place and the appropriate structures are created in RabbitMQ, the consuming part of the code looks like this, expressed using
Spring Integration Java DSL:

@Configuration
public class WorkInbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(
                Amqp.inboundAdapter(rabbitConfig.workListenerContainer()))
                .transform(Transformers.fromJson(WorkUnit.class))
                .log()
                .filter("(headers['x-death'] != null) ? headers['x-death'][0].count <= 3: true", f -> f.discardChannel("nullChannel"))
                .handle("workHandler", "process")
                .get();
    }

}

Most of the retry logic here is handled by the RabbitMQ infrastructure, the only change here is to break the cycle by explicitly discarding the message after a certain 2 retries. This break is expressed as a filter above, looking at the header called “x-death” that RabbitMQ adds to the message once it is sent to Dead Letter exchange. The filter is admittedly a little ugly – it can likely be expressed a little better in Java code.

One more thing to note is that the retry logic could have been expressed in-process using Spring Integration, however I wanted to investigate a flow where the retry times can be high (say 15 to 20 mins) which will not work well in-process and is also not cluster safe as I want any instances of an application to potentially handle the retry of a message.

If you want to explore further, do try the sample at
my github repo – https://github.com/bijukunjummen/si-dsl-rabbit-sample

Reference:

Retry With RabbitMQ: http://dev.venntro.com/2014/07/back-off-and-retry-with-rabbitmq

Reference: RabbitMQ retries using Spring Integration from our JCG partner Biju Kunjummen at the all and sundry blog.
(0 rating, 0 votes)
You need to be a registered member to rate this.
Start the discussion Views Tweet it!
Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to the Terms and Privacy Policy

Leave a Reply

avatar

This site uses Akismet to reduce spam. Learn how your comment data is processed.

  Subscribe  
Notify of