Asynchronous communication made by OpenHub framework

We introduced OpenHub framework in the previous part of this series. This part shows one of the most powerful feature of the framework – asynchronous messaging model.

Asynchronous communication between systems is used when source system can’t wait for the response of the target system. There are several reasons:

Asynchronous scenarios

When you decide to communicate asynchronously then you have to think over possible scenarios:

When you start to think about all these scenarios then you find out it’s not simple to implement it from the scratch. There is OpenHub framework with build-in support for processing of asynchronous messages. It’s easy to use but robust and flexible at the same time. And also configurable, e.g. how many times should process be run again? In which time intervals?

Asynchronous route implementation

Route implementation with OpenHub framework has two sub-routes:

/**
 * Route definition for asynchronous operation "translate" via web services.
 */
@CamelConfiguration(value = AsyncTranslateWsRoute.ROUTE_BEAN)
public class AsyncTranslateWsRoute extends AbstractBasicRoute {
 
    static final String ROUTE_BEAN = "asyncTranslateWsRouteBean";
 
    private static final String OPERATION_NAME = "asyncTranslateWs";
 
    static final String ROUTE_ID_ASYNC_IN = getInRouteId(ServiceEnum.TRANSLATE, OPERATION_NAME);
 
    static final String ROUTE_ID_ASYNC_OUT = getOutRouteId(ServiceEnum.TRANSLATE, OPERATION_NAME);
 
    static final String URI_ASYNC_OUT = "direct:" + ROUTE_ID_ASYNC_OUT;
 
    @Override
    protected void doConfigure() throws Exception {
        // asyncTranslate - input asynch message
        createAsyncRouteIn();
 
        // asyncTranslate - process delivery (=asynchronous execution)
        createAsyncRouteOut();
    }
 
    /**
     * Route for asynchronous <strong>asyncTranslate</strong> input operation.
     * <p/>
     * Prerequisite: none
     * <p/>
     * Output: {@link AsyncTranslateResponse}
     */
    private void createAsyncRouteIn() {
        Namespaces ns = new Namespaces("h", TranslateWebServiceConfig.TRANSLATE_SERVICE_NS);
 
        // note: mandatory parameters are set already in XSD, this validation is extra
        XPathValidator validator = new XPathValidator("/h:asyncTranslateRequest", ns, "h:inputText");
 
        AsynchRouteBuilder.newInstance(ServiceEnum.TRANSLATE, OPERATION_NAME,
                getInWsUri(new QName(TranslateWebServiceConfig.TRANSLATE_SERVICE_NS, "asyncTranslateRequest")),
                new AsynchResponseProcessor() {
                    @Override
                    protected Object setCallbackResponse(CallbackResponse callbackResponse) {
                        AsyncTranslateResponse res = new AsyncTranslateResponse();
                        res.setConfirmAsyncTranslate(callbackResponse);
                        return res;
                    }
                }, jaxb(AsyncTranslateResponse.class))
 
                .withValidator(validator)
                .build(this);
    }
 
    /**
     * Route for <strong>asyncTranslate</strong> operation - process delivery (=asynchronous execution).
     * Only input text is logged in this case.
     * <p/>
     * Prerequisite: none
     */
    private void createAsyncRouteOut() {
        final String URI_LOG_INPUT_PARAMS = "direct:logInputParams";
 
        from(URI_ASYNC_OUT)
                .routeId(ROUTE_ID_ASYNC_OUT)
 
                // xml -> AsyncTranslateRequest
                .unmarshal(jaxb(AsyncTranslateRequest.class))
 
                .to("extcall:message:" + URI_LOG_INPUT_PARAMS);
 
 
        from(URI_LOG_INPUT_PARAMS)
                .validate(body().isInstanceOf(AsyncTranslateRequest.class))
                .log(LoggingLevel.DEBUG, "Asynchronous execution - input text '${body.inputText}' (lang: ${body.inputLang})");
    }
}

RouteIn uses AsynchRouteBuilder for easy configuration with the following features:

RouteOut defines asynchronous process itself. Input request (AsyncTranslateRequest) is only logged in this case.

And that’s all. Everything around is implemented by OpenHub framework.

External calls

Your route implementations will often call external systems or another routes. If you implement asynchronous process then you have to adhere to the rules of idempotency – every part of your process can be called more than once and you have to ensure identical behaviour in all calls. Sometimes external system/route is idempotent by itself and then you can call it as many times as you want. If not then you have to control it in your implementation. Therefore we have made the Camel component extcall.

Component excallin the above example ensures that route with URI_LOG_INPUT_PARAMS will be called exactly once, even if the whole asynchrounous process runs more times.

External call explaination

Description:

Funnel and throttling components

Other powerful components are funnel and throttling.

Funnel component is for filtering concurrent messages at specific integration point. This filtering ensures that only one message of specific type or with specific information at one moment will be processed in that place, even in guaranteed order (optional choice). It can be useful for communication with external systems which can accept only one input message for specific entity (e.g. one specific customer in ordering system) at one time.

Second component throttling allows you to ensure that a specific endpoint does not get overloaded, or that we don’t exceed an agreed SLA with any external service. Throttling component can also be used for synchronous messages.

All components support cluster.

Implementation details

Everything that OpenHub needs to save is saved in a database – there are no limitations on the type. There is no need to adapt a JMS/MQ system to support asynchronous messaging. Then you can use any tools you like for your daily work – data model is simple, clear and well documented. There are many more database tools than for JMS/MQ systems.

Sometimes we hear that it’s anti-pattern to use database for this case, it can be a bottleneck in some cases from a performance point of view. It depends on integration use cases from real projects but we haven’t still tought performance limits in our real projects where hundreds of thousands concurrent requests are processed. We are prepared to add JMS/MQ implementation but it hasn’t been needed so far.

It’s not necessary to start asynchronous process only by incoming request – you can also use scheduling jobs to start route at any time you need and then leave it to OpenHub framework.

All examples can be found in reference implementation at GitHub – see https://github.com/OpenWiseSolutions/openhub-ri

Exit mobile version