Asynchronous Operation Execution with Netty on Redis

Netty got my attention a while back and I just wanted to play a bit around with it. Given the fact that I am already fallen in love with Redis, what would be more fun than implementing a low level client for Redis based on Netty?

Let's begin to answer the question "What the hell is Netty?". Netty is an asynchronous (Java based) event-driven network application framework. It is helping you to develop high performance protocol servers and clients.

We are obviously more interested in the client part here, meaning that this article is focusing on how to interact with a Redis Server.

Netty is already coming with RESP support. The package 'io.netty.handler.codec.redis' contains several Redis message formats:

  • RedisMessage: A general Redis message
  • ArrayRedisMessages: An implementation of the RESP Array message
  • SimpleRedisStringMessage: An implementation of a RESP Simple String message
  • ...

So all we need to do is to:

  1. Boostrap a channel: A channel is a nexus to a network socket or component which is capable of I/O operations. Bootstrapping means to assign the relevant components to the channel (Event loop group, handlers, listeners, ...) and to establish the socket connection. An example class can be found here.
  2. Define a channel pipeline: We are using an initialization handler in order to add several other handlers to the channel's pipeline. The pipeline is a list of channel handlers, whereby each handler handles or intercepts inbound events or outbound operations. Our channel pipeline is having the following handlers: RedisDecoder (Inbound handler that decodes into a RedisMessage), RedisBulkStringAggregator (Inbound handler that aggregates an BulkStringHeaderRedisMessage and its following BulkStringRedisContents into a single FullBulkStringRedisMessage), RedisArrayAggregator (Aggregates RedisMessage parts into an ArrayRedisMessage) and RedisEncoder (This outbound handler encodes RedisMessage into
    bytes by following the RESP (REdis Serialization Protocol). Netty will first apply the outbound handlers to the passed in value. Then it will put the encoded message on the socket. When the response will be received then it will apply the inbound handlers. The last handler is then able to work with the decoded (pre-handled) message. An example for such a pipeline definition can be found here.
  3. Add a custom handler: We are also adding a custom duplex handler to the pipeline. It is used in order to execute custom logic when a message is received (channelRead) or sent (write). We are not yet planning to execute business logic based on the RedisMessage but instead want to just fetch it, which means that our handler just allows to retrieve the result. My handler is providing an asynchronous method to do so. The method 'sendAsyncMessage' returns a Future. It's then possible to check if the Future is completed. When it is completed then you can get the RedisMessage from it. This handler is buffering the futures until they are completed. The source code of my example handler can be found here
BTW: It's also possible to attach listeners to a channel. Whereby I found it initially to be a good idea to use listeners in order to react on new messages, I had to realize that channel listeners are invoked before the last handler (the last one is usually your custom one), which means that you face the issue that your received message did not go through the channel pipeline when the listener is invoked. So my conclusion is that channel listeners are more used for side tasks (inform someone that something was received, log a message out, ...) instead of the message processing itself, whereby handlers are designed in order to be used to process the received messages. So if you want to use listeners then a better way is to let the handler work with promises and then attach the listener to the promise of a result.

In addition the following classes were implemented for demoing purposes:
  • GetMsg and SetMsg: Are extending the class ArrayRedisMessage by defining how a GET and a SET message are looking like.
  • AsyncRedisMessageBuffer: A message buffer which uses a blocking queue in order to buffer outgoing and incoming messages. The Redis Client Handler (my custom handler) is doing the following: Sending a message causes that the Future is put into the buffer. When the response arrives then the Future is updated and removed from the buffer. Whoever called the 'sendAsyncMessage' method has hopefully still a reference to the just dequeued Future. I used 'LinkedBlockingDeque' which means that the implementation should be thread safe.
Here a code example how to use the handler in order to execute an asynchronous GET operation:

Hope you enjoyed reading this blog post! Feedback is welcome.