About Piotr Nowicki

Piotr is a Java fascinate since his computer science studies (2007). He's currently working as a senior Java EE developer in utilities and industry sector. He's mostly interested in designing and development of web applications using Java EE technology stack.

Asynchronous CDI Events

Few days ago, during our regular code review, one of my colleagues raised a question what would happen — and if it’s even possible — when a CDI Observer (so a method with parameter annotated @Observes) would be invoked multiple times at the same time for different event instances. In other words, after producing few events, is it possible that the following method will be processed by more than one thread at the same time:
 
 
 
 
 

public void observe(@Observes MyEvent myEvent) { ... }

After thinking about it I’ve decided to run a few tests and describe results in this post.

First results: it occurred that CDI Events are fired in synchronous mode which was a bit surprise for me. Why?

Up to this time, I’ve seen it this way: CDI observers allows me to very cleanly separate event producer from event consumer, so I don’t have any hard-coded registering of listeners, maintaining a list of listeners and manually informing them. The CDI container does everything for me.

Therefore, if we have cleanly separated producers from consumers I thought that there exists some kind of event bus running in specialized, thread executors pool which is responsible for mediation between registered events and invoked observers methods. I guess I based this assumption on other event/listeners solutions like Google Guava EventBus. They give you a chance to define if you want to use synchronous (default, EventBus) or asynchronous event dispatchers (AsyncEventBus.)

Moreover, if EJBs are both: producer and consumer, I assume it would have the same features as asynchronous EJB calls when it comes to transactions. The only possible JTA transaction attribute for asynchronous event observer would be: REQUIRED, REQUIRES_NEW or NOT_SUPPORTED.

Now that’s all how I expected it to work which seems to be quite different from the current status. The real life shows that CDI events are synchronous.

There is an issue for making asynchronous events available in CDI 1.1 but I’m not sure what is the current status of this feature and didn’t find a word about it in CDI 1.1 (part of Java EE 7).

Let’s see how we can deal with it on our own.

Table of contents

  1. Default Synchronous Events
  2. Solution 1 – CDI Producer and Singleton EJB as Receiver
  3. Solution 2 – Use Singleton EJB as Receiver With Read Lock
  4. Solution 3 – EJB Producer and CDI Consumer
  5. Solution 4 – EJB Producer and EJB Consumer
  6. Solution 4 vs Solution 2
  7. Solution 5 – EJB Producer and CDI Consumer II
  8. Solution 6 – CDI With JMS
  9. Conclusion

Default Synchronous Events

Let’s start with basic example showing the problem. Take a look at the code – first, the CDI Bean producer:

@Path("/produce")
public class EventGenerator {

    @Inject
    private Logger logger;

    @Inject
    private Event<MyEvent> events;

    @Path("/cdiBean/{eventsNum}")
    @GET
    public String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) {
        for (int i = 0; i < numberOfEventsToGenerate; i++) {
            MyEvent event = new MyEvent(i);

            logger.info("Generating Event: " + event);
            events.fire(event);
        }
        return "Finished. Generated " + numberOfEventsToGenerate + " events.";
    }
}

MyEvent is just some event object which is not really important here. It stores event sequence number that we pass while instantiation.

Consumer is a pretty simple CDI Bean:

public class EventConsumer {

    @Inject
    private Logger logger;

    public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException {
        logger.info("Receiving event: " + myEvent);

        TimeUnit.MILLISECONDS.sleep(500);
    }
}

Note that I’ve inserted a thread sleep to simulate some long-running event receiver process.

Now, let’s run this example by invoking a REST command this EventProducer is exposing. The result (running JBoss EAP 6.1 Alpha) would be similar to this:

14:15:59,196 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=0]
14:15:59,197 [com.piotrnowicki.EventConsumer] (http-/127.0.0.1:8080-1) Receiving event: MyEvent[seqNo=0]
14:15:59,697 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=1]
14:15:59,698 [com.piotrnowicki.EventConsumer] (http-/127.0.0.1:8080-1) Receiving event: MyEvent[seqNo=1]
14:16:00,199 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=2]
14:16:00,200 [com.piotrnowicki.EventConsumer] (http-/127.0.0.1:8080-1) Receiving event: MyEvent[seqNo=2]

It shows the synchronous nature of CDI events – event producing and consuming takes place in the same thread and one after another.

So, how to achieve asynchronous events with CDI?

Solution 1 – CDI Producer and Singleton EJB as Receiver

Producer stays at it was – a pure CDI bean:

@Path("/produce") public class EventGenerator {

    @Path("/cdiBean/{eventsNum}")
    @GET
    public String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... }
}

Now if you turn your receiver into @Singleton EJB and mark observes method as @Asynchronous like this:

@Singleton
public class EventConsumer {

    @Asynchronous
    public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException { ...  }
}

You’ll get the following results:

14:21:19,341 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=0]
14:21:19,343 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=1]
14:21:19,343 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=2]
14:21:19,347 [com.piotrnowicki.EventConsumer] (EJB default – 2) Receiving event: MyEvent[seqNo=1]
14:21:19,848 [com.piotrnowicki.EventConsumer] (EJB default – 1) Receiving event: MyEvent[seqNo=0]
14:21:20,350 [com.piotrnowicki.EventConsumer] (EJB default – 3) Receiving event: MyEvent[seqNo=2]

Events are produced one after another and in separate threads Singleton EJB is serving them one after another (take a look at time of event processing.) That’s because of implicit write-lock for every business methods of Singleton EJB. So this is:

Asynchronous: yes
Thread-safe observer method: yes

Solution 2 – Use Singleton EJB as Receiver With Read Lock

This approach is very similar to Solution 1, however, it gives you a much higher throughput because all events processing takes place in parallel.

Our producer stays the same – it’s a CDI bean:

@Path("/produce")
public class EventGenerator {

    @Path("/cdiBean/{eventsNum}")
    @GET
    public String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... }
}

Our consumer has @Lock(READ) added to its observes method; this makes the magic of being able to serve multiple events at the same time:

@Singleton
public class EventConsumer {

    @Asynchronous
    @Lock(LockType.READ)
    public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException { ... }
}

This is what you can get as a result:

14:24:44,202 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=0]
14:24:44,204 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=1]
14:24:44,205 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=2]
14:24:44,207 [com.piotrnowicki.EventConsumer] (EJB default – 4) Receiving event: MyEvent[seqNo=0]
14:24:44,207 [com.piotrnowicki.EventConsumer] (EJB default – 6) Receiving event: MyEvent[seqNo=2]
14:24:44,207 [com.piotrnowicki.EventConsumer] (EJB default – 5) Receiving event: MyEvent[seqNo=1]

Different threads serving events at the same time are giving you a bigger throughput. So this is:

Asynchronous: yes
Thread-safe observer method: no

Solution 3 – EJB Producer and CDI Consumer

CDI allows you to observe events during specific stages of transaction. You specify it using @Observes(during=TransactionPhase...). In our case, we would like the CDI to stack all those events and invoke our observer only after the transaction ends. To do so we need only to add the above attribute to our CDI Bean observer:

public class EventConsumer { 
   public void consumeEvent(@Observes(during = TransactionPhase.AFTER_COMPLETION) MyEvent myEvent) { ... } 
}

Now we just need to make sure we have a running transaction in EventGenerator method. We can do it quickly by transforming our CDI Bean into @Stateless EJB and using its implicit REQUIRED TransactionAttribute like this:

@Stateless
@Path("/produce")
public class EventGenerator {

    @Path("/cdiBean/{eventsNum}")
    @GET
    public String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... }
}

This is the result we might end with:

14:39:06,776 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=0]
14:39:06,776 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=1]
14:39:06,776 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=2]
14:39:06,778 [com.piotrnowicki.EventConsumer] (http-/127.0.0.1:8080-1) Receiving event: MyEvent[seqNo=2]
14:39:07,279 [com.piotrnowicki.EventConsumer] (http-/127.0.0.1:8080-1) Receiving event: MyEvent[seqNo=1]
14:39:07,780 [com.piotrnowicki.EventConsumer] (http-/127.0.0.1:8080-1) Receiving event: MyEvent[seqNo=0]

EJB EventGenerator starts a transaction and CDI bean observer will be invoked in serialized way only after the transaction completes.

Asynchronous: yes
Thread-safe observer method: yes

Solution 4 – EJB Producer and EJB Consumer

This is very similar to Solution 3. Our generator stays the same (Stateless EJB):

@Stateless
@Path("/produce")
public class EventGenerator {

    @Path("/cdiBean/{eventsNum}")
    @GET
    public String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... }
}

And changes are made to EventConsumer which is right now:

@Singleton
public class EventConsumer {

    @Asynchronous
    @Lock(LockType.READ)
    public void consumeEvent(@Observes(during = TransactionPhase.AFTER_COMPLETION) MyEvent myEvent) throws InterruptedException { ...  }
}

The result might be as follows:

14:44:09,363 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=0]
14:44:09,464 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=1]
14:44:09,564 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=2]
14:44:09,670 [com.piotrnowicki.EventConsumer] (EJB default – 8) Receiving event: MyEvent[seqNo=2]
14:44:09,670 [com.piotrnowicki.EventConsumer] (EJB default – 2) Receiving event: MyEvent[seqNo=1]
14:44:09,670 [com.piotrnowicki.EventConsumer] (EJB default – 1) Receiving event: MyEvent[seqNo=0]

We’ve used two features here – one is that the event consumer method is asynchronous and the second one is that the consumer will not be notified before the producer transaction completes. This gives us:

Asynchronous: yes
Thread-safe observer method: no

Solution 4 vs Solution 2

Those two solutions seems to be the same. They only differ with consumer’s annotation: @Observes vs @Observes(during = TransactionPhase.AFTER_COMPLETION). Moreover they act the same for our test case: they are asynchronous and multiple threads can be processing event receivers at the same time. However, there is one big difference between them.

In our test case we fired events one after another. Imagine that there is some other operations between events firing. In such case:

  • Solution 2 (@Observes) will start processing events just after the first one will be fired,
  • Solution 4 (@Observes(during = TransactionPhase.AFTER_COMPLETION)) will start processing just after transaction completes, so when all events will be fired.

This shows possible result of such situation:

Solution 2 (@Observes)

15:01:34,318 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=0]
15:01:34,320 [com.piotrnowicki.EventConsumer] (EJB default – 3) Receiving event: MyEvent[seqNo=0]
15:01:34,419 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=1]
15:01:34,420 [com.piotrnowicki.EventConsumer] (EJB default – 6) Receiving event: MyEvent[seqNo=1]
15:01:34,520 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=2]
15:01:34,521 [com.piotrnowicki.EventConsumer] (EJB default – 9) Receiving event: MyEvent[seqNo=2]

Solution 4 (@Observes(during = TransactionPhase.AFTER_COMPLETION))

15:00:41,126 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=0]
15:00:41,226 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=1]
15:00:41,326 [com.piotrnowicki.EventGenerator] (http-/127.0.0.1:8080-1) Generating Event: MyEvent[seqNo=2]
15:00:41,432 [com.piotrnowicki.EventConsumer] (EJB default – 10) Receiving event: MyEvent[seqNo=2]
15:00:41,432 [com.piotrnowicki.EventConsumer] (EJB default – 4) Receiving event: MyEvent[seqNo=1]
15:00:41,432 [com.piotrnowicki.EventConsumer] (EJB default – 5) Receiving event: MyEvent[seqNo=0]

Solution 5 – EJB Producer and CDI Consumer II

Up to this point we’ve tried to make our receiver asynchronous. There is also the opposite way – we can make the event producer asynchronous. We can achieve it by marking our producer as @Stateless and invoking its own asynchronous method that will just fire an event:

@Stateless
@Path("/produce")
public class EventGenerator {
    // ...

    @Resource
    private SessionContext sctx;

    @Path("/cdiBean/{eventsNum}")
    @GET
    public String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) {
        for (int i = 0; i < numberOfEventsToGenerate; i++) {
            sctx.getBusinessObject(EventGenerator.class).fireEvent(new MyEvent(i));
        }

        return "Finished. Generated " + numberOfEventsToGenerate + " events.";
    }

    @Asynchronous
    public void fireEvent(final MyEvent event) {
        events.fire(event);
    }
}

Take a closer look at EJB auto-reference using SessionContext. It is required in this case as we want the container to dispatch our method call and add the asynchronous nature of it. We don’t want to make it a local call so we refuse to use implicit this object.
Event consumer, on the other hand, is plain CDI bean:

public class EventConsumer {
    public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException { ... }
}

The result might be as follows:

00:40:32,820 [com.piotrnowicki.EventGenerator] (EJB default – 2) Generating Event: MyEvent[seqNo=1]
00:40:32,820 [com.piotrnowicki.EventGenerator] (EJB default – 3) Generating Event: MyEvent[seqNo=2]
00:40:32,820 [com.piotrnowicki.EventGenerator] (EJB default – 1) Generating Event: MyEvent[seqNo=0]
00:40:32,821 [com.piotrnowicki.EventConsumer] (EJB default – 1) Receiving event: MyEvent[seqNo=0]
00:40:32,821 [com.piotrnowicki.EventConsumer] (EJB default – 2) Receiving event: MyEvent[seqNo=1]
00:40:32,821 [com.piotrnowicki.EventConsumer] (EJB default – 3) Receiving event: MyEvent[seqNo=2]

Asynchronous: yes
Thread-safe observer method: no

Solution 6 – CDI With JMS

This is a solution presented by Juliano Viana on his blog. It uses JMS as the event bus. CDI event is produced, then fetched by some class that is responsible for putting this event into JMS topic / queue. MDB that fetches messages from topic / queue is producing an event that invokes the real receiver. This not only gives you the asynchronous delivery of events but also adds transaction nature to it. E.g. if event receiver is not able to process the message – it can rollback the transaction and the queue will make sure the message will be re-delivered (perhaps next time your event processor will be able to serve this event?)

Conclusion

CDI 1.0 doesn’t support asynchronous events generation. It also doesn’t seem that CDI 1.1 will have such support.

This, however, doesn’t mean you cannot achieve asynchronous processing. There are already existing solutions either basing on EJB’s 3.1 or existing CDI observer attributes. You should also be able to write a portable CDI extension that adds this functionality to your code.
 

Reference: Asynchronous CDI Events from our JCG partner Piotr Nowicki at the Piotr Nowicki’s Blog blog.

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

JPA Mini Book

Learn how to leverage the power of JPA in order to create robust and flexible Java applications. With this Mini Book, you will get introduced to JPA and smoothly transition to more advanced concepts.

JVM Troubleshooting Guide

The Java virtual machine is really the foundation of any Java EE platform. Learn how to master it with this advanced guide!

Given email address is already subscribed, thank you!
Oops. Something went wrong. Please try again later.
Please provide a valid email address.
Thank you, your sign-up request was successful! Please check your e-mail inbox.
Please complete the CAPTCHA.
Please fill in the required fields.

One Response to "Asynchronous CDI Events"

  1. gabber says:

    really interesting article!

Leave a Reply


6 × six =



Java Code Geeks and all content copyright © 2010-2014, Exelixis Media Ltd | Terms of Use | Privacy Policy | Contact
All trademarks and registered trademarks appearing on Java Code Geeks are the property of their respective owners.
Java is a trademark or registered trademark of Oracle Corporation in the United States and other countries.
Java Code Geeks is not connected to Oracle Corporation and is not sponsored by Oracle Corporation.
Do you want to know how to develop your skillset and become a ...
Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you two of our best selling eBooks for FREE!

Get ready to Rock!
You can download the complementary eBooks using the links below:
Close