Most of the time that is not very useful. It’s more likely that the need is for each client to receive only the data intended for them, with broadcasts reserved for special circumstances like “The server is going down in 15 minutes!” The other thing about that particular server example was that everything was self-contained. Monolithic applications are fine for examples, but in today’s environments, distributed microservices are much better. Scalability and reliability are paramount.
Netty and Kafka are fantastic together. Netty is great at handling a boatload of clients, and Kafka is great at enabling a boatload of services to work together. Combined, they are a sweet spot in development. However, there are some “gotchas” that can make it cumbersome. This blog, along with the example microservice/Netty architecture and fully working code, will hopefully help to alleviate the irritations and enable the sweetness.
First Things First
The code for the example is located here.
There is a detailed README that describes what is needed to set up the environment. I tried to keep the requirements at a minimum, just Java 8 and Maven. SLF4J and Logback are used for logging. I set up scripts for Mac OSX and Ubuntu (14.04 running in a Parallels container is what I tested with), so apologies if you develop on Windows. The code is all Java, and I’ve seen Kafka tutorials out there for Windows, so everything should run there. The Maven build should also produce targets that can be started, so with a little elbow grease installing Zookeeper/Kafka (you can follow the script to see what settings are needed), it shouldn’t be a huge deal to get it running manually on Windows.
NOTE: As explained in the
README.md, the script will remove any existing Zookeeper/Kafka installation and data. If you have an existing setup, don’t use the script!
After installing and configuring the pre-requisites, either run
mvn package if you aren’t using the script, or
linuxlocal_run.sh) if you are. The script downloads (if it hasn’t yet) Zk/Kafka, installs them, configures them, starts them, runs
mvn package, starts the services, and finally starts the server. Once it starts, resist the urge to navigate away from the shell, because it automatically pops new tabs for each part of the architecture. After the Whirlpool server starts, then you are ready to go.
I highly recommend creating a script that installs, configures, builds, and starts up your microservice environment locally. Creating each individual service is a big pain. Docker could also be used if necessary, but I find it requires far less downloading to just run everything natively.
As a teaser, here is the UI (you can also see this from the README.md on GitHub).
- To add a stock symbol, type it in (i.e. “GOOG”) and click the A button under “Stock”. To remove it, click the X.
- To add a website to test whether it is up or down, type in the fully-qualified URL (i.e. http://facebook.com) and click the A button under “UpDown”. To remove it, click the X.
- To add a weather check, type the city,state in (i.e. “chicago,il”) and click the A button under “City,State”. To remove it, click the X.
- Subscriptions survive page refresh and even login/logout (with the same userid) because they are stored with each service in memory. A “real” system would of course use a database.
- Subscriptions are updated every 10 seconds so I don’t overwhelm the Yahoo APIs, so be patient after adding data.
With this example I was trying to think of good generic services that might be useful. I ended up choosing a stock quote service, an “is this website up or down” service, and a weather service. Each of these runs independently of the others with their own Kafka topics.
The way I chose to configure Kafka was with a command topic per service and a data topic per service. Everything could also just use a single global topic with the readers deciding what to process, but separating things out makes it more clear and clean.
Here’s a diagram of how data flows through Kafka. It was done with a free Keyhole web-based utility called Mockola. Notice that the server knows about all topics, but the services only know about their own topics. The
cmd topics are used to send commands to the services, whereas the data topics (those without
-cmd on them) are used to send data from the services. Again, all of this could be handled on a single
bus topic, but it is much easier to see what is going on by separating them out.
Now let’s talk about the services. All three are very similar, so there is a base service that does most of the work. Each service has three threads, handled by Java ExecutorService. One nice thing about the Executor service is that it automatically restarts the thread if something goes wrong. This helps resiliency.
Each service starts itself by telling the base class what topic and command topic to use. The base class then starts the three threads: one for reading commands off of the cmd topic, one for periodically collecting data for clients, and one for sending data on the data topic. These threads communicate using the non-blocking Java concurrency classes
ConcurrentHashMap. The hash map stores per-user sets of subscriptions, and the queue stores responses ready to be sent to the data topic.
The flow for each service is the three threads working concurrently. The Reader uses a Kafka Consumer to read commands from its command topic. Based on the command, the subscription is added or removed. This thread is fairly dumb in that it doesn’t ask the service to do any validation on the request, it just blindly adds whatever is sent to the subscription. Production code would obviously add a call to ask the service to validate the command before allowing the subscription to succeed. A response is created to put on the topic and then it waits for the next command.
NOTE: A few words about data being placed on topics. I’m using JSON as a transport format, but XML or anything else you want will work too. The important thing is that everyone agrees on the data format and sticks to it. The common module has POJO classes that define the contracts that the data will conform to. Things that are generally useful for all messages are a timestamp, the message type, and the id of the client.
Another useful thing would be an expiration timestamp. These example messages just live forever. The
Message class only looks at the type and the id of a message. This is used by the server to determine what kind of message needs to be processed, and who is interested in the message. Without these it is very difficult if not impossible to process data. Now, message formats can get quite involved, with some using headers and sections to describe complex data. This example attempts to keep everything as simple as possible.
Let’s go through the server one class at a time.
This class is mostly unchanged from the prior blog. The reusable pieces have been moved to the
WebSocketHelper class. The main use of this file is to serve up files that are asked for by the browser.
The first item that might be confusing is the class variable
clientAttr. Storing data in a Netty Channel requires it to be attached to an
AttributeKey. This is similar to an Atomic instance from the Java concurrent classes – it provides a container for data. We will store the client id (in our case the username, but it could just as easily be a session id) so we can figure out which Channel needs to receive messages.
realWriteAndFlush() method sets the appropriate headers, the content length, and the cookie. It then writes and flushes the HTTP response. the line
tells Netty that this is the end of the data that needs to be written to the client, so Netty will send it out.
SPECIAL NOTE: Regarding cookie creation, make sure the
The other thing about cookies is to use the STRICT version of the Netty cookie encoder so it will not allow multiple cookies with the same name. I’m not sure when it would be useful to allow this situation to occur.
This class just defines an interface that
WhirlpoolServerHandler uses to talk to the
This is where the connection exists between Netty and Kafka. Two Executors handle a reader thread and a writer thread.
The writer thread looks for messages in the request queue (more on where those messages come from in a minute), and places the messages on the appropriate Kafka command topic.
The reader thread looks for incoming messages on the Kafka data topics, looks up the correct Channel for each topic, and writes the messages to those topics.
When the client sends a message over WebSockets,
WhirlpoolServerHandler will make sure a complete message has arrived, and then call
handleMessage(). This method figures out if it is a valid message, then adds the request to the request queue so the reader thread can pick it up and give it to Kafka.
There are several interesting things in this class. First, it can tell the difference between a HTTP, REST, and WebSocket message. The Netty overridden method that does this is
channelRead0. This is the method Netty uses to tell us when a message arrives, and what type of message it is. For HTTP and REST calls,
handleHttpRequest is called, and for websockets,
handleWebSocketFrame is called.
handleHttpRequest reads the cookie if one is present. On POSTs it looks for login and logout. For login, it figures out the username/password, creates the cookie, and prevents multiple logins with the same name. All that code would be split out with additional security added in a production version of the application. For logout, it looks up the Channel, cleans it up, closes it, and expires the cookie.
WebSocketUpgrade, it asks Netty to handle the complex handshake required to get a websocket going. When that is complete, it adds the user to the Channel that was created during the handshake. This is where the user is connected to the Channel, and wouldn’t be very easy if the cookie did not come across in the request.
The only other thing to note here is that this class is set up to handle clients coded for SPA (single-page application) as it will redirect any unrecognized call to
The other methods in the class are more for informational purposes and would be used in advanced situations.
This class starts up the Netty server and creates the channel pipeline. It is a standard class for Netty that follows the Netty examples.
Obviously a whole lot more could go into this code. Multiple instances of each service and the server could be running at the same time, and Zk/Kafka could be clustered to help with resiliency. A great utility that tests the resiliency of microservice applications is another free open source Keyhole utility called TroubleMaker. I haven’t had the chance to test this example yet, but I am looking forward to the opportunity.
We didn’t touch on security, and although I previously hoped to show integration of Netty with Shiro, that is a very complex topic. All I can say about it is that it is possible, but I haven’t wrapped my head around all the parts enough to formulate a coherent blog yet.
I hope you have enjoyed the blog and find the code useful. Contact me through the blog or Twitter (@johnwboardman where I always appreciate new follows).