Enterprise Java

A first look at Redis Streams and how to use them with Java

Redis Streams have made it into Redis’ unstable branch since the beginning of this year, and the first clients start with the adoption of Redis Streams APIs. That makes it an excellent time to take a look what Redis Streams provide and how you can use them from a client perspective.

Disclaimer: Redis Streams are available as a first draft and are not part of a stable release yet. APIs are subject to change.

What is a Redis Stream?

A Redis Stream is a log/journal-like data structure that represents a log of events in sequential order. Messages (Events) can be appended to a Stream. These messages can be then consumed in either a standalone-fashion or by reading within a consumer group. Consumer groups are a concept where multiple consumers (such as application instances) can be grouped into a consumer group whose stream offset (the reading progress) is kept on the server side. This concept simplifies building clients as the stream offset is not required to be kept on the consumer side.

A stream message consists of a message id that is generated by Redis on submission and a body represented as hash (map) – basically a set of keys and values.
The stream itself is identified by a key and holds zero to many stream messages along with some metadata such as consumer groups.

Redis Stream API

As of now, all stream commands are prefixed with X. A stream allows adding, reading, introspection and maintenance commands to be used. The most common commands you will see in the next sections are:

  • XADD key * field1 value1 [field2 value2] [fieldN valueN]: Append (submit) a message to a Redis Stream.
  • XREAD [BLOCK timeout] [COUNT n] STREAMS key1 [keyN] offset1 [offsetN]: Read a message from a Redis Stream.
  • XRANGE key from to [COUNT n]: Scan (introspect) a Redis Stream for its messages

Additionally, when using consumer groups, there are the additional commands that come into play:

  • XREADGROUP GROUP name consumer [BLOCK timeout] [COUNT n] [NOACK] STREAMS key1 [keyN] offset1 [offsetN]: Read a message from a Redis Stream in the context of a consumer and its group.
  • XACK key group messageId1 [messageId2] [messageIdN]: Acknowledge a message after reading in the context of a consumer.
  • XPENDING key group [from to COUNT n]: Enumerate pending (not acknowledged messages).
  • XGROUP and subcommands: API to create and delete consumer groups.

Note: Commands above are truncated regarding options for brevity. See Redis Streams documentation for an explanation of all possible options and combinations.

Using a Redis Stream

Let’s take a look at how we can use a Redis Stream through redis-cli applying the commands we’ve seen before. Let’s add (and initially create a stream) message to a new stream.

127.0.0.1:6379> XADD my-stream * key value
1527062149743-0

We’re using XADD to add a new message to the stream my-stream with a key-value tuple. Note the * (asterisk)? That’s a field used to control id generation. If you want to generate a message id by the server (which is true in 99.5% of use cases unless you’re a Redis server that wants to replicate), always put * there. Redis replies with the message id 1527062149743-0.

Our stream now contains a message. Let’s read it with XREAD.

127.0.0.1:6379>  XREAD COUNT 1 STREAMS my-stream 0
1) 1) "my-stream"
   2) 1) 1) 1527062149743-0
         2) 1) "key"
            2) "value"

We have read the message right now and retrieve the body along the read. Reading a message leaves the message in the stream. We can verify this with XRANGE:

127.0.0.1:6379> XRANGE my-stream - +
1) 1) 1527068644230-0
   2) 1) "key"
      2) "value"

Issuing subsequent reads with the same stream offset will return us the same message. You have different options to avoid this behavior:

  1. Tracking the message Id on the client side
  2. Blocking reads
  3. Removing messages from the stream
  4. Limiting the stream size
  5. Using consumer groups

Let’s take a closer look at these options.

MessageId Tracking

Each read operation returns a message id along with the stream message. If you have a single client (no concurrent reads), then you can keep the reference of the latest message id within your application and reuse this one on subsequent read calls. Let’s do this for the message id we’ve seen earlier 1527068644230-0:

127.0.0.1:6379> XADD my-stream * key value
1527069672240-0
127.0.0.1:6379>  XREAD COUNT 1 STREAMS my-stream 1527068644230-0
1) 1) "my-stream"
   2) 1) 1) 1527069672240-0
         2) 1) "key"
            2) "value"

We used 1527068644230-0 as stream offset and receive the next added message. This approach allows to resume reading of older (probably already consumed messages) but requires some coordination on the client side not to read duplicate messages.

If you don’t want to keep track of the message id and you’re interested only in the most recent messages, then you can use blocking reads.

Blocking Reads

Reading through XREAD allows reading from streams in a blocking manner. XREAD behaves similarly to BLPOP and BRPOP operations where you specify a timeout and the call returns either if a message is available or the read times out. However, the Stream API allows for more options. For this example, we need two separate parties to be involved: A producer and a consumer. If you have read from the beginning on, you’ve seen examples executed using a single client. We start with the consumer first otherwise the produced message arrives in the stream without the chance to notify a waiting consumer.

Consumer

We’re using XREAD with BLOCK 10000 to wait 10000 milliseconds (10 seconds). Note that we’re using a symbolic stream offset $ that points to the head of the stream.

127.0.0.1:6379> XREAD COUNT 1 BLOCK 10000 STREAMS my-stream $

The consumer is now blocked and awaits message arrival.

Producer

127.0.0.1:6379> XADD my-stream * key value
1527070630698-0

Redis writes the message to our stream. Now let’s switch back to the consumer.

Consumer

After the message is written to our stream, the consumer receives a message and is unblocked again. You can start processing the message and potentially issue another read.

1) 1) "my-stream"
   2) 1) 1) 1527070630698-0
         2) 1) "key"
            2) "value"
(1.88s)

Issuing another read using stream offset $ would again await the next message that arrives into the stream. Using $ leaves us however with a period in which other messages can arrive which we didn’t consume. To avoid these holes, you should keep track of the last message id you read and reuse it for the next XREAD call.
Another caveat to watch out for ist concurrency. We’ve seen an example with a single consumer. What if you increase the number of consumers?

In this case, if you have for example two consumers that issue a blocking read, both consumers receive the same message which leaves us again with the task to coordinate reads, so a stream message isn’t processed multiple times.

Removing Messages from the Stream

It’s possible to remove messages from a stream, however it’s not recommended. We haven’t seen XDEL yet, but from the name it’s getting obvious that we can remove messages from the stream:

127.0.0.1:6379> XDEL my-stream 1527070789716-0
(integer) 1

The message is now gone. Deleting isn’t recommended as the operations are costly: Streams use radix trees with macro nodes. Deleting is a safe operation when, but when consuming a message with multiple consumers you need to synchronize access as removing does not prevent reading a message multiple times.

Limiting Stream Size

You can specify a maximal stream size when appending messages to the stream. This happens with the MAXLEN option while issuing an XADD command.

127.0.0.1:6379> XADD my-stream MAXLEN 4 * key value
1527071269045-0

The message is added to the stream, and the stream is trimmed on a best-effort basis to the size limit. This also means that older messages get pruned and are no longer readable.

Consumer Groups

The last approach to address duplicate message processing utilizes consumer groups. The idea of consumer groups is to track acknowledgment. Acknowledgment allows to flag a message as acknowledged by a consumer. The XACK command returns whether the message was acknowledged or whether a previous consumer already acknowledged the message.

To use consumer groups, we need to create a consumer group first. Please note that as of the time I wrote this post a stream must already exist before a consumer group can be created. This issue is probably going to be addressed with https://github.com/antirez/redis/issues/4824.

As of now, we can reuse our stream my-stream if you followed the previous samples.

We’re creating the consumer group named my-group which is only valid for the stream my-stream. Note the last parameter is the stream offset that is used to track the reading progress. We’re using $ to point to the stream head.

127.0.0.1:6379> XGROUP CREATE my-stream my-group $
OK

Let’s now add a message to the stream:

127.0.0.1:6379> XADD my-stream * key value
1527072009813-0

And issue a non-blocking read through XREADGROUP:

127.0.0.1:6379> XREADGROUP GROUP my-group c1 COUNT 1 STREAMS my-stream >
1) 1) "my-stream"
   2) 1) 1) 1527072009813-0
         2) 1) "key"
            2) "value"

XREADGROUP accepts the name of the group and the name of a consumer to track reading progress. Note also the stream offset >. This symbolic stream offset points to the latest message Id that was read by the consumer group my-group.
You might have noticed there is a consumer name along the group. Consumer groups are designed to track message delivery and distinguish between consumers. If you remember the blocking read example from above, you’ve seen that two consumers received a message at the same time. To change (or retain) this behavior, you can specify a consumer name:

  1. Reads with same consumer names can receive the same message multiple times.
  2. Reads with different consumer names prevent receiving the same message multiple times.

Depending on the mode you’re consuming messages, you might want to restart processing or consume messages by multiple clients without building your own synchronization mechanism. Redis Streams allow you to do so by acknowledging messages. By default, XREADGROUP acknowledges messages which signal the message was processed and can be evicted. You can specify NOACK not to acknowledge the message while reading it. Once you have processed the message, acknowledge the message issuing an XACK. Depending on the command return, you can see whether you are the one who acknowledged the message or whether another client has already acknowledged the message.

Now let’s pause here and do not dive into recovery and more advanced topics. The Redis website provides a full documentation about Redis Streams at https://redis.io/topics/streams-intro.

Consuming Redis Streams with Java

Note: As of writing the only Java client supporting Redis Streams is a Lettuce preview version 5.1.0.M1.

Redis Streams come with a new server-side API that requires adoption on the client side as well. Let’s replay the examples above using a Java client.

First of all, we need a client instance to prepare a connection. We will be using the synchronous API. However, Redis Stream APIs are supported by the asynchronous and reactive APIs as well.

RedisClient client = RedisClient.create("redis://localhost");
StatefulRedisConnection<String, String> connection = client.connect();
RedisStreamCommands<String, String> streamCommands = connection.sync();

Lettuce introduces a new command interface RedisStreamCommands that declares Redis Stream API methods along with its various types (such as StreamOffset, Consumer, and command argument objects).

The first thing, we want to do is adding a new message to a stream:

Map<String, String> body =  Collections.singletonMap("key", "value");
String messageId = streamCommands.xadd("my-stream", body);

This example uses UTF-8-encoded strings to represent keys and values. The body itself is transported as Map and issues the command XADD my-stream * key value.

Now let’s read a single message from our stream with a command that corresponds with XREAD COUNT 1 STREAMS my-stream 0:

List<StreamMessage<String, String>> messages = streamCommands
        .xread(XReadArgs.Builder.count(1), 
               StreamOffset.from("my-stream", "0"));

if(messages.size() == 1) { // a message was read
    
} else { // no message was read
    
}

The xread(…) method accepts XReadArgs and StreamOffset and returns a list of StreamMessage<K, V> objects that contain the message Id along with the body. The messages can be processed now, and a subsequent read would include the last messageId to read new messages:

StreamMessage<String, String> message = …;
List<StreamMessage<String, String>> messages = streamCommands
        .xread(XReadArgs.Builder.count(1), 
               StreamOffset.from("my-stream", message.getId()));

if(messages.size() == 1) { // a message was read
    
} else { // no message was read
    
}

Blocking reads require an additional duration to be passed into the argument object. Adding the BLOCK option turns a non-blocking call (from a Redis perspective) into a blocking one:

List<StreamMessage<String, String>> messages = streamCommands
        .xread(XReadArgs.Builder.count(1)
                                .block(Duration.ofSeconds(10)), 
                                StreamOffset.from("my-stream", "0"));

In the last example, let’s take a look on consumer groups. RedisStreamCommands provides methods to create consumers – as of writing, the methods to remove consumers and consumer groups are not yet implemented in Redis.

streamCommands.xadd("my-stream", Collections.singletonMap("key", "value")); // add a message to create the stream data structure

streamCommands.xgroupCreate("my-stream", "my-group", "$"); // add a group pointing to the stream head

List<StreamMessage<String, String>> messages = streamCommands.xreadgroup(Consumer.from("my-group", "c1"),
        StreamOffset.lastConsumed("my-stream"));

Messages are read from my-stream using the consumer group my-group and the consumer c1. Consumer groups and consumer names are byte-safe encoded and therefore case-sensitive when using ASCII or UTF-8 strings.

Conclusion

This blog post outlined the first look on Redis Streams that will be available with Redis 5 and how to use the Stream API with the Lettuce Redis client. The API is not fully implemented so we should expect changes.

Published on Java Code Geeks with permission by Mark Paluch, partner at our JCG program. See the original article here: A first look at Redis Streams and how to use them with Java

Opinions expressed by Java Code Geeks contributors are their own.

Mark Paluch

Mark is a software craftsman, did just about every job in IT, open source committer, and is passionate about dev culture. Particularly interested in hardware hacking and internet of things. Mark helps development teams to improve continuously during their software endeavor to achieve outstanding performance.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
5 years ago

Thanks for the tutorial Mark. It really helped me out.

Back to top button