Enterprise Java

Asynchronous communication made by OpenHub framework

We introduced OpenHub framework in the previous part of this series. This part shows one of the most powerful feature of the framework – asynchronous messaging model.

Asynchronous communication between systems is used when source system can’t wait for the response of the target system. There are several reasons:

  • source system must be responsive as much as possible and not be impacted by external influences (slow communication, unstable target system, etc.)
  • processing in the target system takes a lot of time
  • asynchronous communication has positive impact on performance and traffic

Asynchronous scenarios

When you decide to communicate asynchronously then you have to think over possible scenarios:

  • Target system has to confirm that incoming message was sucessfully saved and is prepared for further processing. Should source system be notified about the final result of asynchronous process?
  • What to do if asynchronous processing fails? Try it several times if there is a temporal technical error (e.g. communication to another system fails) or stop further processing because there is business error (e.g. input data is not valid).
  • Other systems are called during asynchronous processing – what to do if call to the first system is ok but call to the second system fails? Asynchronous processing has to be idempotent and skip first successful call and try again second call only.
  • Asynchronous process can be tricky and then it would be good to divide one big process (parent) into smaller (child) processes. If children are processed then also parent process will be finished.
  • Sometimes you have to guarantee order of the incoming requests (requests musn’t arrive at the same order as they were sent) and process them in the exact order.
  • It is asynchronous processing and you need to monitor it or be notified automatically if something unexpected happens, e.g. asynchronous process fails.
  • Sometimes you need to save data or current state of asynchronous process between attempts to finish it successfully, e.g. result of the first call to external system is the input to the second call.

When you start to think about all these scenarios then you find out it’s not simple to implement it from the scratch. There is OpenHub framework with build-in support for processing of asynchronous messages. It’s easy to use but robust and flexible at the same time. And also configurable, e.g. how many times should process be run again? In which time intervals?

Asynchronous route implementation

Route implementation with OpenHub framework has two sub-routes:

  • one for processing of incoming message (RouteIn)
  • one for asynchronous process implementation (RouteOut)
/**
 * Route definition for asynchronous operation "translate" via web services.
 */
@CamelConfiguration(value = AsyncTranslateWsRoute.ROUTE_BEAN)
public class AsyncTranslateWsRoute extends AbstractBasicRoute {
 
    static final String ROUTE_BEAN = "asyncTranslateWsRouteBean";
 
    private static final String OPERATION_NAME = "asyncTranslateWs";
 
    static final String ROUTE_ID_ASYNC_IN = getInRouteId(ServiceEnum.TRANSLATE, OPERATION_NAME);
 
    static final String ROUTE_ID_ASYNC_OUT = getOutRouteId(ServiceEnum.TRANSLATE, OPERATION_NAME);
 
    static final String URI_ASYNC_OUT = "direct:" + ROUTE_ID_ASYNC_OUT;
 
    @Override
    protected void doConfigure() throws Exception {
        // asyncTranslate - input asynch message
        createAsyncRouteIn();
 
        // asyncTranslate - process delivery (=asynchronous execution)
        createAsyncRouteOut();
    }
 
    /**
     * Route for asynchronous <strong>asyncTranslate</strong> input operation.
     * <p/>
     * Prerequisite: none
     * <p/>
     * Output: {@link AsyncTranslateResponse}
     */
    private void createAsyncRouteIn() {
        Namespaces ns = new Namespaces("h", TranslateWebServiceConfig.TRANSLATE_SERVICE_NS);
 
        // note: mandatory parameters are set already in XSD, this validation is extra
        XPathValidator validator = new XPathValidator("/h:asyncTranslateRequest", ns, "h:inputText");
 
        AsynchRouteBuilder.newInstance(ServiceEnum.TRANSLATE, OPERATION_NAME,
                getInWsUri(new QName(TranslateWebServiceConfig.TRANSLATE_SERVICE_NS, "asyncTranslateRequest")),
                new AsynchResponseProcessor() {
                    @Override
                    protected Object setCallbackResponse(CallbackResponse callbackResponse) {
                        AsyncTranslateResponse res = new AsyncTranslateResponse();
                        res.setConfirmAsyncTranslate(callbackResponse);
                        return res;
                    }
                }, jaxb(AsyncTranslateResponse.class))
 
                .withValidator(validator)
                .build(this);
    }
 
    /**
     * Route for <strong>asyncTranslate</strong> operation - process delivery (=asynchronous execution).
     * Only input text is logged in this case.
     * <p/>
     * Prerequisite: none
     */
    private void createAsyncRouteOut() {
        final String URI_LOG_INPUT_PARAMS = "direct:logInputParams";
 
        from(URI_ASYNC_OUT)
                .routeId(ROUTE_ID_ASYNC_OUT)
 
                // xml -> AsyncTranslateRequest
                .unmarshal(jaxb(AsyncTranslateRequest.class))
 
                .to("extcall:message:" + URI_LOG_INPUT_PARAMS);
 
 
        from(URI_LOG_INPUT_PARAMS)
                .validate(body().isInstanceOf(AsyncTranslateRequest.class))
                .log(LoggingLevel.DEBUG, "Asynchronous execution - input text '${body.inputText}' (lang: ${body.inputLang})");
    }
}

RouteIn uses AsynchRouteBuilder for easy configuration with the following features:

  • defines which incoming Web Service request should start this route
  • defines confirmation response for the source system. When input route is successfully executed, synchronous response to the source system is returned.
  • defines validator that checks if there is element inputText in the incoming request

RouteOut defines asynchronous process itself. Input request (AsyncTranslateRequest) is only logged in this case.

And that’s all. Everything around is implemented by OpenHub framework.

External calls

Your route implementations will often call external systems or another routes. If you implement asynchronous process then you have to adhere to the rules of idempotency – every part of your process can be called more than once and you have to ensure identical behaviour in all calls. Sometimes external system/route is idempotent by itself and then you can call it as many times as you want. If not then you have to control it in your implementation. Therefore we have made the Camel component extcall.

Component excallin the above example ensures that route with URI_LOG_INPUT_PARAMS will be called exactly once, even if the whole asynchrounous process runs more times.

External call explaination
External call explaination

Description:

  • two external systems are called during asynchronous message processing
  • there are two extcall’s stops to which we can return during processing
    • if error occurs before first request to external system 1 then next processing attempt will start from the beginning, same as new message arrives
    • during communication with external system 2 then next processing attempt will start from extcall1
    • after successful response from external system 2 then next processing attempt will start from extcall2

Funnel and throttling components

Other powerful components are funnel and throttling.

Funnel component is for filtering concurrent messages at specific integration point. This filtering ensures that only one message of specific type or with specific information at one moment will be processed in that place, even in guaranteed order (optional choice). It can be useful for communication with external systems which can accept only one input message for specific entity (e.g. one specific customer in ordering system) at one time.

Second component throttling allows you to ensure that a specific endpoint does not get overloaded, or that we don’t exceed an agreed SLA with any external service. Throttling component can also be used for synchronous messages.

All components support cluster.

Implementation details

Everything that OpenHub needs to save is saved in a database – there are no limitations on the type. There is no need to adapt a JMS/MQ system to support asynchronous messaging. Then you can use any tools you like for your daily work – data model is simple, clear and well documented. There are many more database tools than for JMS/MQ systems.

Sometimes we hear that it’s anti-pattern to use database for this case, it can be a bottleneck in some cases from a performance point of view. It depends on integration use cases from real projects but we haven’t still tought performance limits in our real projects where hundreds of thousands concurrent requests are processed. We are prepared to add JMS/MQ implementation but it hasn’t been needed so far.

It’s not necessary to start asynchronous process only by incoming request – you can also use scheduling jobs to start route at any time you need and then leave it to OpenHub framework.

All examples can be found in reference implementation at GitHub – see https://github.com/OpenWiseSolutions/openhub-ri

Petr Juza

Over 12 years of proven experience in Java and integration development. He's part of group of experienced profesionals in OpenWise Solution company focusing on making products and software for customers. He's Java and integration enthusiast. He's co-organizer of many developer meetups at Czech republic. He has been writing popular Java blog for several years.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button