Tuesday, 19 June 2018

Asynchronous Operation Execution with Netty on Redis

Netty got my attention a while back and I just wanted to play a bit around with it. Given the fact that I am already fallen in love with Redis, what would be more fun than implementing a low level client for Redis based on Netty?

Let's begin to answer the question "What the hell is Netty?". Netty is an asynchronous (Java based) event-driven network application framework. It is helping you to develop high performance protocol servers and clients.

We are obviously more interested in the client part here, meaning that this article is focusing on how to interact with a Redis Server.

Netty is already coming with RESP support. The package 'io.netty.handler.codec.redis' contains several Redis message formats:

  • RedisMessage: A general Redis message
  • ArrayRedisMessages: An implementation of the RESP Array message
  • SimpleRedisStringMessage: An implementation of a RESP Simple String message
  • ...

So all we need to do is to:

  1. Boostrap a channel: A channel is a nexus to a network socket or component which is capable of I/O operations. Bootstrapping means to assign the relevant components to the channel (Event loop group, handlers, listeners, ...) and to establish the socket connection. An example class can be found here.
  2. Define a channel pipeline: We are using an initialization handler in order to add several other handlers to the channel's pipeline. The pipeline is a list of channel handlers, whereby each handler handles or intercepts inbound events or outbound operations. Our channel pipeline is having the following handlers: RedisDecoder (Inbound handler that decodes into a RedisMessage), RedisBulkStringAggregator (Inbound handler that aggregates an BulkStringHeaderRedisMessage and its following BulkStringRedisContents into a single FullBulkStringRedisMessage), RedisArrayAggregator (Aggregates RedisMessage parts into an ArrayRedisMessage) and RedisEncoder (This outbound handler encodes RedisMessage into
    bytes by following the RESP (REdis Serialization Protocol). Netty will first apply the outbound handlers to the passed in value. Then it will put the encoded message on the socket. When the response will be received then it will apply the inbound handlers. The last handler is then able to work with the decoded (pre-handled) message. An example for such a pipeline definition can be found here.
  3. Add a custom handler: We are also adding a custom duplex handler to the pipeline. It is used in order to execute custom logic when a message is received (channelRead) or sent (write). We are not yet planning to execute business logic based on the RedisMessage but instead want to just fetch it, which means that our handler just allows to retrieve the result. My handler is providing an asynchronous method to do so. The method 'sendAsyncMessage' returns a Future. It's then possible to check if the Future is completed. When it is completed then you can get the RedisMessage from it. This handler is buffering the futures until they are completed. The source code of my example handler can be found here
BTW: It's also possible to attach listeners to a channel. Whereby I found it initially to be a good idea to use listeners in order to react on new messages, I had to realize that channel listeners are invoked before the last handler (the last one is usually your custom one), which means that you face the issue that your received message did not go through the channel pipeline when the listener is invoked. So my conclusion is that channel listeners are more used for side tasks (inform someone that something was received, log a message out, ...) instead of the message processing itself, whereby handlers are designed in order to be used to process the received messages. So if you want to use listeners then a better way is to let the handler work with promises and then attach the listener to the promise of a result.

In addition the following classes were implemented for demoing purposes:
  • GetMsg and SetMsg: Are extending the class ArrayRedisMessage by defining how a GET and a SET message are looking like.
  • AsyncRedisMessageBuffer: A message buffer which uses a blocking queue in order to buffer outgoing and incoming messages. The Redis Client Handler (my custom handler) is doing the following: Sending a message causes that the Future is put into the buffer. When the response arrives then the Future is updated and removed from the buffer. Whoever called the 'sendAsyncMessage' method has hopefully still a reference to the just dequeued Future. I used 'LinkedBlockingDeque' which means that the implementation should be thread safe.
Here a code example how to use the handler in order to execute an asynchronous GET operation:

Hope you enjoyed reading this blog post! Feedback is welcome.

Wednesday, 6 June 2018

Data Encryption at Rest

Data security and protection is currently a hot topic. It seems that we reached the point when the pendulum is swinging back again. After years of voluntary openness by sharing personal information freely with social networks, people are getting more and more concerned about how their personal data is used in order to profile or influence them. Social network vendors are getting currently bad press, but maybe we should ask ourself the fair question "Didn't we know all the time that their services are not for free and that we are paying them with our data?". Maybe not strictly related to prominent (so called) 'data scandals' but at least following the movement of the pendulum is the new European GDPR regulation around data protection. Even if I think that it tends to 'overshoot the mark' (as we would say in German) and leaves data controllers and processors sometimes in the dark (unexpected rhyme ...), it is a good reason for me address some security topics again from a technical point of view. So this article has the subject of 'Data Encryption at Rest' on Linux servers.

To be more accurate this article is mainly focusing on how to ensure that folders are encrypted under Linux. Linux provides the following ways to encrypt your data:
  • Partition level: This allows you to define an encrypted partition on a hard drive. I think that this is the most commonly seen way of encrypting data with Linux. Most Linux distributions are providing this option already during the installation
  • Folder level: Allows you to encrypt specific folders by i.e. mounting them under as specific path. The 'ecryptfs' solution can be used for such a purpose.
  • File level:  It is also possible to encrypt single files. The PGP (Pretty Good Privacy) tools can be used for this purpose.

This article focuses on the 'Folder level' encryption. It has the advantage that you can define encrypted folders on-demand without the need to repartition your drives. It also doesn't just work on single files but allows you to mount your folder directly. Each file which is stored in the encrypted folder is encrypted separately. This is especially useful if you want to encrypt only specific data by providing only specific users unencrypted access. One use case would be to only allow your CIFS service (File Server service) unencrypted access to the folder. I can also easily see that database systems could leverage this feature, whereby I didn't test which performance implication might be seen when using folder level encryption with DBMS.

  • Step 0 - Install 'ecryptfs'
apt-get -y install ecryptfs-utils
  • Step 1 - Create a hidden directory: Let's assume that we have a folder /mnt/data which is the mount point of your main data partition (in my case an EXT4 partition on a RAD1 of 2 spinning HDD-s. We create a hidden folder named encrypted there:
mkdir /mnt/data/.encrypted

  • Step 2 - Create a second folder: This folder is used as our mount point. All access needs to happen via this second folder.
mkdir /mnt/encrypted
  • Step 2 - Mount the hidden folder as an encrypted one: Let's assume we want to access our encryption folder under /mnt/encrypted. This means that each write to the newly mounted folder is involving the encryption of the written data. Here a small script which does the job:
mount -t ecryptfs\
-o rw,relatime,ecryptfs_fnek_sig=82028e5be8a0a05b,\
ecryptfs_key_bytes=16,ecryptfs_unlink_sigs\ /mnt/data/.encrypted /mnt/encrypted

The mount command will ask your for the passphrase. The passphrase will be used for every remount.

WARNING: If you loose your passphrase, then you will no longer be able to read your previously encrypted data.

This is what's stored in your mounted folder:

root@ubuntu-server:/mnt/encrypted# ls
hello2.txt  hello.txt
root@ubuntu-server:/mnt/encrypted# cat hello.txt 
Hello world!

Whereby the original folder contains the encrypted data:

root@ubuntu-server:/mnt/data/.encrypted# ls
root@ubuntu-server:/mnt/data/.encrypted# cat ECRYPTFS_FNEK_ENCRYPTED.FWYW-ctPtO0USURgl98vtKSoykT9hmQROUa3cBMaMT0UyWKbxkF7KQOiU---

Access to the decrypted data is possible under the following circumstances:

  • The logged-in user has permission to read or write the folder /mnt/encrypted
  • The folder /mnt/data/.encrypted was mounted to /mnt/encrypted by providing the passphrase
It's especially no longer possible to read the unencrypted data after removing the hard disk physically from a machine. As said, it's necessary to know the passphrase in order to decrypt the data of this folder again.

Hope this article is helpful :-) .

Monday, 22 January 2018

To PubSub or not to PubSub, that is the question


 The PubSub pattern is quite simple:
  • Publishers can publish messages to channels
  • Subscribers of these channels are able to receive the messages from them
There is no knowledge of the publisher about the functionality of any of the subscribers. So they are acting independently. The only thing which glues them together is a message within a channel.

Here a very brief example with Redis:
  • Open a session via 'redis-cli' and enter the following command in order to subscribe to a channel with the name 'public'

  •  In another 'redis-cli' session enter the following command in order to publish the message 'Hello world' to the 'public' channel:

The result in the first session is:

BTW: It's also possible to subscribe to a bunch of channels by using patterns, e.g. `PSUBSCRIBE pub*` 


Fire and Forget 

If we would start additional subscribers after our experiment then they won't receive the previous messages. So we can see that we can only receive messages when we are actively subscribed. Meaning that we can't retrieve missed messages afterwards. In other words:
  • Only currently listening subscribers are retrieving messages
  • A message is retrieved by all active subscribers of a channel
  • If a subscriber dies and comes back later then it might have missed messages
PubSub is completely independent from the key space. So whatever is published to a channel will not directly affect the data in your database. Published messages are not persisted and there are no delivery guarantees. However, you can indeed use it in order to notify subscribers that something affected your key space (e.g. The value of item 'hello:world' has changed, you might fetch the change!). So what's the purpose of PubSub then? It's about message delivery and notifications. Each of the subscribers can decide by himself how to handle the received message. Because all subscribers of a channel receive the same message, it's obviously not about scaling the workload itself. This is an important difference in comparision to message queue use cases.


Message Queues

Message queues on the other's hand side are intended to scale the workload. A list of messages is processed by a pool of workers. As the pool of workers is usually limited in size, it's important that messages are buffered until a worker is free in order to process it. Redis (Enterprise) features like
  • Persistency
  • High Availability
are quite more important for such a queuing scenario. So such a queue should surrive a node failure or a node restart. Redis fortunately comes with the LIST data structure for simple queues or a Sorted Set structure for priortiy queues.

It's important to state that there are already plenty of libraries and solutions out there for this purpose. Here two examples:
A very simple queue implementation would use a list. Because entries of the list are strings, it would be good to encode messages into e.g. JSON if they have a more complex structure.
  • Create a queue and inform the scheduler that a new queue is alive:

  • Add 2 messages to the queue:

  • Schedule the workers: We could indeed use a more complex scheduling apporach. However, the simplest and stupidest would be to just assign the next worker of the pool to the next message. So in order to dequeue a message we can just use `LPOP`:

BTW: If our queue would be initially empty then there is a way to wait for a while until something arrives by using the `BLPOP` command.

Using PubSub is actually optional for our message queue example. It's easy to see that the scheduler could also assign workers without getting notified because it can at any time access the queues and messages. However, I found it a bit more dynamic to combine our queue example with PubSub:
  • The scheduler gets notified when new work needs to be assigned to the workers
  • As these notifications are fire and forget, it would be also possible for the scheduler to check from time to time if there is something to do
  • If the scheduler dies then another instance can be started which can access the database in order to double check which work was already done by the workers and which work still needs to be done. An interuppted job can be restarted based on such state information. 



Redis' PubSub is 'Fire and forget'. It's intended to be used to deliver messages from many (publishers) to many (subscribers). It's indeed a useful feature for notification purposes. However, it's important to understand the differences between a messaging and a message processing use case.

The way how we used it was to inform a single scheduler that some work needs to be done. The scheduler would then hand over to a pool of worker threads in order to process the actual queue. The entire state of the queue was stored in our database as list because PubSub alone is not intended to be used for message queuing use cases. In fact the usage of PubSub for our queuing example was optional.