About Tomasz Nurkiewicz

Java EE developer, Scala enthusiast. Enjoying data analysis and visualization. Strongly believes in the power of testing and automation.

Synchronizing transactions with asynchronous events in Spring

Today as an example we will take a very simple scenario: placing an order stores it and sends an e-mail about that order:
 
 
 
 
 
 
 
 
 

@Service
class OrderService @Autowired() (orderDao: OrderDao, mailNotifier: OrderMailNotifier) {
 
    @Transactional
    def placeOrder(order: Order) {
        orderDao save order
        mailNotifier sendMail order
    }
}

So far so good, but e-mail functionality has nothing to do with placing an order. It’s just a side-effect that distracts rather than part of business logic. Moreover sending an e-mail unnecessarily prolongs transaction and introduces latency. So we decided to decouple these two actions by using events. For simplicity I will take advantage of Spring built-in custom events but our discussion is equally relevant for JMS or other producer-consumer library/queue.

case class OrderPlacedEvent(order: Order) extends ApplicationEvent
 
@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {
 
    @Transactional
    def placeOrder(order: Order) {
        orderDao save order
        eventPublisher publishEvent OrderPlacedEvent(order)
    }
 
}

As you can see instead of accessing OrderMailNotifier bean directly we send OrderPlacedEvent wrapping newly created order. ApplicationEventPublisher is needed to send an event. Of course we also have to implement the client side receiving messages:

@Service
class OrderMailNotifier extends ApplicationListener[OrderPlacedEvent] {
 
    def onApplicationEvent(event: OrderPlacedEvent) {
        //sending e-mail...
    }
 
}

ApplicationListener[OrderPlacedEvent] indicates what type of events are we interested in. This works, however by default Spring ApplicationEvents are synchronous, which means publishEvent() is actually blocking. Knowing Spring it shouldn’t be hard to turn event broadcasting into asynchronous mode. Indeed there are two ways: one suggested in JavaDoc and the other I discovered because I failed to read the JavaDoc first… According to documentation if you want your events to be delivered asynchronously, you should define bean named applicationEventMulticaster of type SimpleApplicationEventMulticaster and define taskExecutor:

@Bean
def applicationEventMulticaster() = {
    val multicaster = new SimpleApplicationEventMulticaster()
    multicaster.setTaskExecutor(taskExecutor())
    multicaster
}
 
@Bean
def taskExecutor() = {
    val pool = new ThreadPoolTaskExecutor()
    pool.setMaxPoolSize(10)
    pool.setCorePoolSize(10)
    pool.setThreadNamePrefix("Spring-Async-")
    pool
}

Spring already supports broadcasting events using custom TaskExecutor. I didn’t know about it so first I simply annotated onApplicationEvent() with @Async:

@Async
def onApplicationEvent(event: OrderPlacedEvent) {  //...

no further modifications, once Spring discovers @Async method it runs it in different thread asynchronously. Period. Well, you still have to enable @Async support if you don’t use it already:

@Configuration
@EnableAsync
class ThreadingConfig extends AsyncConfigurer {
    def getAsyncExecutor = taskExecutor()
 
    @Bean
    def taskExecutor() = {
        val pool = new ThreadPoolTaskExecutor()
        pool.setMaxPoolSize(10)
        pool.setCorePoolSize(10)
        pool.setThreadNamePrefix("Spring-Async-")
        pool
    }
 
}

Technically @EnableAsync is enough. However by default Spring uses SimpleAsyncTaskExecutor which creates new thread on every @Async invocation. A bit unfortunate default for enterprise framework, luckily easy to change. Undoubtedly @Async seems cleaner than defining some magic beans.

All above was just a setup to expose the real problem. We now send an asynchronous message that is processed in other thread. Unfortunately we introduced race condition that manifests itself under heavy load, or maybe only some particular operating system. Can you spot it? To give you a hint, here is what happens:

  1. Starting transaction
  2. Storing order in database
  3. Sending a message wrapping order
  4. Commit

In the meantime some asynchronous thread picks up OrderPlacedEvent and starts processing it. The question is, does it happen right after point (3) but before point (4) or maybe after (4)? That makes a big difference! In the former case the transaction didn’t yet committed, thus Order is not yet in the database. On the other hand lazy loading might work on that object as it’s still bound to a a PersistenceContext (in case we are using JPA). However if the original transaction already committed, order will behave much differently. If you rely on one behaviour or the other, due to race condition, your event listener might fail spuriously under heavy to predict circumstances.

Of course there is a solution1: using not commonly known TransactionSynchronizationManager. Basically it allows us to register arbitrary number of TransactionSynchronization listeners. Each such listener will then be notified about various events like transaction commit and rollback. Here is a basic API:

@Transactional
def placeOrder(order: Order) {
    orderDao save order
    afterCommit {
        eventPublisher publishEvent OrderPlacedEvent(order)
    }
}
 
private def afterCommit[T](fun: => T) {
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter {
        override def afterCommit() {
            fun
        }
    })
}

afterCommit() takes a function and calls it after the current transaction commits. We use it to hide the complexity of Spring API. One can safely call registerSynchronization() multiple times – listeners are stored in a Set and are local to the current transaction, disappearing after commit.

So, the publishEvent() method will be called after the enclosing transaction commits, which makes our code predictable and race condition free. However, even with higher order function afterCommit() it still feels a bit unwieldy and unnecessarily complex. Moreover it’s easy to forget wrapping every publishEvent(), thus maintainability suffers. Can we do better? One solution is to use write custom utility class wrapping publishEvent() or employ AOP. But there is much simpler, proven solution that works great with Spring – the Decorator pattern. We shall wrap original implementation of ApplicationEventPublisher provided by Spring and decorate its publishEven():

class TransactionAwareApplicationEventPublisher(delegate: ApplicationEventPublisher)
    extends ApplicationEventPublisher {
 
    override def publishEvent(event: ApplicationEvent) {
        if (TransactionSynchronizationManager.isActualTransactionActive) {
            TransactionSynchronizationManager.registerSynchronization(
                new TransactionSynchronizationAdapter {
                    override def afterCommit() {
                        delegate publishEvent event
                    }
                })
        }
        else
            delegate publishEvent event
    }
 
}

As you can see if the transaction is active, we register commit listener and postpone sending of a message until transaction is completed. Otherwise we simply forward the event to original ApplicationEventPublisher, which delivers it immediately. Of course we somehow have to plug this new implementation instead of the original one. @Primary does the trick:

@Resource
val applicationContext: ApplicationContext = null
 
@Bean
@Primary
def transactionAwareApplicationEventPublisher() =
    new TransactionAwareApplicationEventPublisher(applicationContext)

Notice that the original implementation of ApplicationEventPublisher is provided by core ApplicationContext class. After all these changes our code looks… exactly the same as in the beginning:

@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {
 
    @Transactional
    def placeOrder(order: Order) {
        orderDao save order
        eventPublisher publishEvent OrderPlacedEvent(order)
    }

However this time auto-injected eventPublisher is our custom decorator. Eventually we managed to fix the race condition problem without touching the business code. Our solution is safe, predictable and robust. Notice that the exact same approach can be taken for any other queuing technology, including JMS (if complex transaction manager was not used) or custom queues. We also discovered an interesting low-level API for transaction lifecycle listening. Might be useful one day.

1 – one might argue that a much simpler solution would be to publishEvent() outside of the transaction:

def placeOrder(order: Order) {
    storeOrder(order)
    eventPublisher publishEvent OrderPlacedEvent(order)
}
 
@Transactional
def storeOrder(order: Order) = orderDao save order

That’s true, but this solution doesn’t “scale” well (what if placeOrder() has to be part of a greater transaction?) and is most likely incorrect due to proxying peculiarities.
 

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

JPA Mini Book

Learn how to leverage the power of JPA in order to create robust and flexible Java applications. With this Mini Book, you will get introduced to JPA and smoothly transition to more advanced concepts.

JVM Troubleshooting Guide

The Java virtual machine is really the foundation of any Java EE platform. Learn how to master it with this advanced guide!

Given email address is already subscribed, thank you!
Oops. Something went wrong. Please try again later.
Please provide a valid email address.
Thank you, your sign-up request was successful! Please check your e-mail inbox.
Please complete the CAPTCHA.
Please fill in the required fields.

2 Responses to "Synchronizing transactions with asynchronous events in Spring"

  1. gopi says:

    Its good .Thank you for this scenario.Can you please give me an example application of asynchronous execution based on system generated events and could you please provide example for how to destroy spring prototype scoped beans

  2. Xavier Dury says:

    Nice! Once, we had the same problem in an EJB project where some directive was given to an Oracle BPEL Process Manager to post-process some data. The problem was that the directive was sent during a transaction in which the targeted data was persisted. Sometimes, it occurred that BPEL was unable to find that data if it started before the transaction could be committed.

    The three possible solutions we came with were:

    1) registering a javax.transaction.Synchronization on the current transaction to send the directive after commit

    2) send the directive first to an XA JMS queue and let a message-driven bean be in charge of sending that directive to the BPEL instance (the MDB could only consume the message once the transaction was committed)

    3) get rid of BPEL

    Solution #3 was chosen ;-)

Leave a Reply


1 × one =



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy | Contact
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below:
Close