Home » Java » Enterprise Java » Rx-netty and Karyon2 based cloud ready microservice

About Biju Kunjummen

Biju Kunjummen

Rx-netty and Karyon2 based cloud ready microservice

Netflix Karyon provides a clean framework for creating cloud-ready micro-services. In your organization if you use the Netflix OSS stack consisting of Eureka for service registration and discovery, Archaius for property management, then very likely you use Karyon to create your microservices.

Karyon has been undergoing quite a lot of changes recently and my objective here is to document a good sample using the newer version of Karyon. The old Karyon(call it Karyon1) was based on JAX-RS 1.0 Specs with Jersey as the implementation, the newer version of Karyon(Karyon2) still supports Jersey but also encourages the use of RX-Netty which is a customized version of Netty with support for Rx-java.

With that said, let me jump into a sample. My objective with this sample is to create a “pong” micro-service which takes a “POST”ed “message” and returns an “Acknowledgement”

The following is a sample request:

{
"id": "id",
"payload":"Ping"
}

And an expected response:

{"id":"id","received":"Ping","payload":"Pong"}

The first step is to create a RequestHandler which as the name suggests is an RX-Netty component dealing with routing the incoming request:

package org.bk.samplepong.app;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;
import org.bk.samplepong.domain.Message;
import org.bk.samplepong.domain.MessageAcknowledgement;
import rx.Observable;

import java.io.IOException;
import java.nio.charset.Charset;


public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final String healthCheckUri;
    private final HealthCheckEndpoint healthCheckEndpoint;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
        this.healthCheckUri = healthCheckUri;
        this.healthCheckEndpoint = healthCheckEndpoint;
    }

    @Override
    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        if (request.getUri().startsWith(healthCheckUri)) {
            return healthCheckEndpoint.handle(request, response);
        } else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
            return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                    .map(s -> {
                        try {
                            Message m = objectMapper.readValue(s, Message.class);
                            return m;
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    })
                    .map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                    .flatMap(ack -> {
                                try {
                                    return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                                } catch (Exception e) {
                                    response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                    return response.close();
                                }
                            }
                    );
        } else {
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }
}

This flow is completely asynchronous and internally managed by the RX-java libraries, Java 8 Lambda expressions also help in making the code concise. The one issue that you would see here is that the routing logic(which uri to which controller) is mixed up with the actual controller logic and I believe this is being addressed.

Given this RequestHandler, a server can be started up in a standalone java program, using raw RX-Netty this way, this is essentially it, an endpoint will be brought up at port 8080 to handle the requests:

public final class RxNettyExample {

    public static void main(String... args) throws Exception {
        final ObjectMapper objectMapper = new ObjectMapper();
        RxNettyHandler handler = new RxNettyHandler();

        HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080, handler);

        server.start();

This is however the native Rx-netty way, for a cloud-ready micro-service a few things have to happen, the service should register with Eureka and should respond to the healthchecks back from Eureka and should be able to load up properties using Archaius.

So with Karyon2, the startup in a main program looks a little different:

package org.bk.samplepong.app;

import netflix.adminresources.resources.KaryonWebAdminModule;
import netflix.karyon.Karyon;
import netflix.karyon.KaryonBootstrapModule;
import netflix.karyon.ShutdownModule;
import netflix.karyon.archaius.ArchaiusBootstrapModule;
import netflix.karyon.eureka.KaryonEurekaModule;
import netflix.karyon.servo.KaryonServoModule;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;
import org.bk.samplepong.resource.HealthCheck;

public class SamplePongApp {

    public static void main(String[] args) {
        HealthCheck healthCheckHandler = new HealthCheck();
        Karyon.forRequestHandler(8888,
                new RxNettyHandler("/healthcheck",
                        new HealthCheckEndpoint(healthCheckHandler)),
                new KaryonBootstrapModule(healthCheckHandler),
                new ArchaiusBootstrapModule("sample-pong"),
                KaryonEurekaModule.asBootstrapModule(),
                Karyon.toBootstrapModule(KaryonWebAdminModule.class),
                ShutdownModule.asBootstrapModule(),
                KaryonServoModule.asBootstrapModule()
        ).startAndWaitTillShutdown();
    }
}

Now it is essentially cloud ready, this version of the program on startup would register cleanly with Eureka and expose a healthcheck endpoint. It additionally exposes a neat set of admin endpoints at port 8077.

Conclusion

I hope this provides a good intro on using Karyon2 to develop Netflix OSS based. The entire sample is available at my github repo here: https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample-pong. As a follow up I will show how the same service can be developed using spring-cloud which is the Spring way to create Micro-services.

(0 rating, 0 votes)
You need to be a registered member to rate this.
Start the discussion Views Tweet it!
Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to the Terms and Privacy Policy

Leave a Reply

avatar

This site uses Akismet to reduce spam. Learn how your comment data is processed.

  Subscribe  
Notify of