Tuesday, 19 June 2018

Asynchronous Operation Execution with Netty on Redis

Netty got my attention a while back and I just wanted to play a bit around with it. Given the fact that I am already fallen in love with Redis, what would be more fun than implementing a low level client for Redis based on Netty?

Let's begin to answer the question "What the hell is Netty?". Netty is an asynchronous (Java based) event-driven network application framework. It is helping you to develop high performance protocol servers and clients.

We are obviously more interested in the client part here, meaning that this article is focusing on how to interact with a Redis Server.

Netty is already coming with RESP support. The package 'io.netty.handler.codec.redis' contains several Redis message formats:

  • RedisMessage: A general Redis message
  • ArrayRedisMessages: An implementation of the RESP Array message
  • SimpleRedisStringMessage: An implementation of a RESP Simple String message
  • ...

So all we need to do is to:

  1. Boostrap a channel: A channel is a nexus to a network socket or component which is capable of I/O operations. Bootstrapping means to assign the relevant components to the channel (Event loop group, handlers, listeners, ...) and to establish the socket connection. An example class can be found here.
  2. Define a channel pipeline: We are using an initialization handler in order to add several other handlers to the channel's pipeline. The pipeline is a list of channel handlers, whereby each handler handles or intercepts inbound events or outbound operations. Our channel pipeline is having the following handlers: RedisDecoder (Inbound handler that decodes into a RedisMessage), RedisBulkStringAggregator (Inbound handler that aggregates an BulkStringHeaderRedisMessage and its following BulkStringRedisContents into a single FullBulkStringRedisMessage), RedisArrayAggregator (Aggregates RedisMessage parts into an ArrayRedisMessage) and RedisEncoder (This outbound handler encodes RedisMessage into
    bytes by following the RESP (REdis Serialization Protocol). Netty will first apply the outbound handlers to the passed in value. Then it will put the encoded message on the socket. When the response will be received then it will apply the inbound handlers. The last handler is then able to work with the decoded (pre-handled) message. An example for such a pipeline definition can be found here.
  3. Add a custom handler: We are also adding a custom duplex handler to the pipeline. It is used in order to execute custom logic when a message is received (channelRead) or sent (write). We are not yet planning to execute business logic based on the RedisMessage but instead want to just fetch it, which means that our handler just allows to retrieve the result. My handler is providing an asynchronous method to do so. The method 'sendAsyncMessage' returns a Future. It's then possible to check if the Future is completed. When it is completed then you can get the RedisMessage from it. This handler is buffering the futures until they are completed. The source code of my example handler can be found here
BTW: It's also possible to attach listeners to a channel. Whereby I found it initially to be a good idea to use listeners in order to react on new messages, I had to realize that channel listeners are invoked before the last handler (the last one is usually your custom one), which means that you face the issue that your received message did not go through the channel pipeline when the listener is invoked. So my conclusion is that channel listeners are more used for side tasks (inform someone that something was received, log a message out, ...) instead of the message processing itself, whereby handlers are designed in order to be used to process the received messages. So if you want to use listeners then a better way is to let the handler work with promises and then attach the listener to the promise of a result.

In addition the following classes were implemented for demoing purposes:
  • GetMsg and SetMsg: Are extending the class ArrayRedisMessage by defining how a GET and a SET message are looking like.
  • AsyncRedisMessageBuffer: A message buffer which uses a blocking queue in order to buffer outgoing and incoming messages. The Redis Client Handler (my custom handler) is doing the following: Sending a message causes that the Future is put into the buffer. When the response arrives then the Future is updated and removed from the buffer. Whoever called the 'sendAsyncMessage' method has hopefully still a reference to the just dequeued Future. I used 'LinkedBlockingDeque' which means that the implementation should be thread safe.
Here a code example how to use the handler in order to execute an asynchronous GET operation:

Hope you enjoyed reading this blog post! Feedback is welcome.

Wednesday, 6 June 2018

Data Encryption at Rest

Data security and protection is currently a hot topic. It seems that we reached the point when the pendulum is swinging back again. After years of voluntary openness by sharing personal information freely with social networks, people are getting more and more concerned about how their personal data is used in order to profile or influence them. Social network vendors are getting currently bad press, but maybe we should ask ourself the fair question "Didn't we know all the time that their services are not for free and that we are paying them with our data?". Maybe not strictly related to prominent (so called) 'data scandals' but at least following the movement of the pendulum is the new European GDPR regulation around data protection. Even if I think that it tends to 'overshoot the mark' (as we would say in German) and leaves data controllers and processors sometimes in the dark (unexpected rhyme ...), it is a good reason for me address some security topics again from a technical point of view. So this article has the subject of 'Data Encryption at Rest' on Linux servers.

To be more accurate this article is mainly focusing on how to ensure that folders are encrypted under Linux. Linux provides the following ways to encrypt your data:
  • Partition level: This allows you to define an encrypted partition on a hard drive. I think that this is the most commonly seen way of encrypting data with Linux. Most Linux distributions are providing this option already during the installation
  • Folder level: Allows you to encrypt specific folders by i.e. mounting them under as specific path. The 'ecryptfs' solution can be used for such a purpose.
  • File level:  It is also possible to encrypt single files. The PGP (Pretty Good Privacy) tools can be used for this purpose.

This article focuses on the 'Folder level' encryption. It has the advantage that you can define encrypted folders on-demand without the need to repartition your drives. It also doesn't just work on single files but allows you to mount your folder directly. Each file which is stored in the encrypted folder is encrypted separately. This is especially useful if you want to encrypt only specific data by providing only specific users unencrypted access. One use case would be to only allow your CIFS service (File Server service) unencrypted access to the folder. I can also easily see that database systems could leverage this feature, whereby I didn't test which performance implication might be seen when using folder level encryption with DBMS.

  • Step 0 - Install 'ecryptfs'
apt-get -y install ecryptfs-utils
  • Step 1 - Create a hidden directory: Let's assume that we have a folder /mnt/data which is the mount point of your main data partition (in my case an EXT4 partition on a RAD1 of 2 spinning HDD-s. We create a hidden folder named encrypted there:
mkdir /mnt/data/.encrypted

  • Step 2 - Create a second folder: This folder is used as our mount point. All access needs to happen via this second folder.
mkdir /mnt/encrypted
  • Step 2 - Mount the hidden folder as an encrypted one: Let's assume we want to access our encryption folder under /mnt/encrypted. This means that each write to the newly mounted folder is involving the encryption of the written data. Here a small script which does the job:
mount -t ecryptfs\
-o rw,relatime,ecryptfs_fnek_sig=82028e5be8a0a05b,\
ecryptfs_key_bytes=16,ecryptfs_unlink_sigs\ /mnt/data/.encrypted /mnt/encrypted

The mount command will ask your for the passphrase. The passphrase will be used for every remount.

WARNING: If you loose your passphrase, then you will no longer be able to read your previously encrypted data.

This is what's stored in your mounted folder:

root@ubuntu-server:/mnt/encrypted# ls
hello2.txt  hello.txt
root@ubuntu-server:/mnt/encrypted# cat hello.txt 
Hello world!

Whereby the original folder contains the encrypted data:

root@ubuntu-server:/mnt/data/.encrypted# ls
root@ubuntu-server:/mnt/data/.encrypted# cat ECRYPTFS_FNEK_ENCRYPTED.FWYW-ctPtO0USURgl98vtKSoykT9hmQROUa3cBMaMT0UyWKbxkF7KQOiU---

Access to the decrypted data is possible under the following circumstances:

  • The logged-in user has permission to read or write the folder /mnt/encrypted
  • The folder /mnt/data/.encrypted was mounted to /mnt/encrypted by providing the passphrase
It's especially no longer possible to read the unencrypted data after removing the hard disk physically from a machine. As said, it's necessary to know the passphrase in order to decrypt the data of this folder again.

Hope this article is helpful :-) .

Monday, 22 January 2018

To PubSub or not to PubSub, that is the question


 The PubSub pattern is quite simple:
  • Publishers can publish messages to channels
  • Subscribers of these channels are able to receive the messages from them
There is no knowledge of the publisher about the functionality of any of the subscribers. So they are acting independently. The only thing which glues them together is a message within a channel.

Here a very brief example with Redis:
  • Open a session via 'redis-cli' and enter the following command in order to subscribe to a channel with the name 'public'

  •  In another 'redis-cli' session enter the following command in order to publish the message 'Hello world' to the 'public' channel:

The result in the first session is:

BTW: It's also possible to subscribe to a bunch of channels by using patterns, e.g. `PSUBSCRIBE pub*` 


Fire and Forget 

If we would start additional subscribers after our experiment then they won't receive the previous messages. So we can see that we can only receive messages when we are actively subscribed. Meaning that we can't retrieve missed messages afterwards. In other words:
  • Only currently listening subscribers are retrieving messages
  • A message is retrieved by all active subscribers of a channel
  • If a subscriber dies and comes back later then it might have missed messages
PubSub is completely independent from the key space. So whatever is published to a channel will not directly affect the data in your database. Published messages are not persisted and there are no delivery guarantees. However, you can indeed use it in order to notify subscribers that something affected your key space (e.g. The value of item 'hello:world' has changed, you might fetch the change!). So what's the purpose of PubSub then? It's about message delivery and notifications. Each of the subscribers can decide by himself how to handle the received message. Because all subscribers of a channel receive the same message, it's obviously not about scaling the workload itself. This is an important difference in comparision to message queue use cases.


Message Queues

Message queues on the other's hand side are intended to scale the workload. A list of messages is processed by a pool of workers. As the pool of workers is usually limited in size, it's important that messages are buffered until a worker is free in order to process it. Redis (Enterprise) features like
  • Persistency
  • High Availability
are quite more important for such a queuing scenario. So such a queue should surrive a node failure or a node restart. Redis fortunately comes with the LIST data structure for simple queues or a Sorted Set structure for priortiy queues.

It's important to state that there are already plenty of libraries and solutions out there for this purpose. Here two examples:
A very simple queue implementation would use a list. Because entries of the list are strings, it would be good to encode messages into e.g. JSON if they have a more complex structure.
  • Create a queue and inform the scheduler that a new queue is alive:

  • Add 2 messages to the queue:

  • Schedule the workers: We could indeed use a more complex scheduling apporach. However, the simplest and stupidest would be to just assign the next worker of the pool to the next message. So in order to dequeue a message we can just use `LPOP`:

BTW: If our queue would be initially empty then there is a way to wait for a while until something arrives by using the `BLPOP` command.

Using PubSub is actually optional for our message queue example. It's easy to see that the scheduler could also assign workers without getting notified because it can at any time access the queues and messages. However, I found it a bit more dynamic to combine our queue example with PubSub:
  • The scheduler gets notified when new work needs to be assigned to the workers
  • As these notifications are fire and forget, it would be also possible for the scheduler to check from time to time if there is something to do
  • If the scheduler dies then another instance can be started which can access the database in order to double check which work was already done by the workers and which work still needs to be done. An interuppted job can be restarted based on such state information. 



Redis' PubSub is 'Fire and forget'. It's intended to be used to deliver messages from many (publishers) to many (subscribers). It's indeed a useful feature for notification purposes. However, it's important to understand the differences between a messaging and a message processing use case.

The way how we used it was to inform a single scheduler that some work needs to be done. The scheduler would then hand over to a pool of worker threads in order to process the actual queue. The entire state of the queue was stored in our database as list because PubSub alone is not intended to be used for message queuing use cases. In fact the usage of PubSub for our queuing example was optional.

Thursday, 27 July 2017

Indexing with Redis

If you follow my news on Twitter then you might have realized that I just started to work more with Redis.  Redis (=Remote Dictionary Server) is known as a Data Structure Store. This means that we can not just deal with Key-Value Pairs (called Strings in Redis) but in addition with data structures as Hashes (Hash-Maps), Lists, Sets or Sorted Sets. Further details about data structures can be found here:

Indexing in Key-Value Stores

With a pure Key-Value Store, you would typically maintain your index structures manually by applying some KV-Store patterns. Here some examples:

  • Direct access via the primary key: The key itself is semantically meaningful and so you can access a value directly by knowing how the key is structured (by using key patterns). An example would be to access an user profile by knowing the user's id. The key looks like 'user::<uid>'.
  • Exact match by a secondary key: The KV-Store itself can be seen as a huge Hash-Map, which means that you can use lookup items in order to reference other ones. This gives you a kind of Hash Index. An example would be to find a user by his email address. The lookup item has the key 'email::<email_addr>', whereby the value is the key of the user. In order to fetch the user with a specific email address you just need to do a Get operation on the key with the email prefix and then another one on the key with the user prefix. 
  • Range by a secondary key: This is where it is getting a bit more complicated with pure KV-Stores. Most of them allow you to retrieve a list of all keys, but doing a full 'key space scan' is not efficient (complexity of O(n), n=number of keys). You can indeed build your own tree structure by storing lists as values and by referencing between them, but maintaining these search trees on the application side is really not what you usually want to do.

The Redis Way

So how is Redis addressing these examples? We are leveraging the power of data structures as Hashes and Sorted Sets.

Direct Access via the Primary Key

A Get operation already has a complexity of O(1). This is the same for Redis.

Exact Match by a Secondary Key

Hashes (as the name already indicates) can be directly used to build a hash index in order to support exact match 'queries'. The complexity of accessing an entry in a Redis Hash is indeed O(1). Here an example:

In addition Redis Hashes are supporting operations as HSCAN. This provides you a cursor based approach to scan hashes. Further information can be found here:

Here an example:

Range By a Secondary Key

Sorted Sets can be used to support range 'queries'.  The way how this works is that we use the value for which we are searching  as the score (order number). To scan such a Sorted Set has then a complexity of O(log(n)+m) whereby n is the number of elements in the set and m is the result set size.

Here an example:

If you add 2 elements with the same score then they are sorted lexicographically. This is interesting for non-numeric values. The command ZRANGEBYLEX allows you to perform range 'queries' by taking the lexicographic order into account.


Redis supports now Modules (since v4.0). Modules are allowing you to extend Redis' functionality. One module which perfectly matches the topic of this blog post is RediSearch. RediSearch is basically providing Full Text Indexing and Searching capabilities to Redis. It uses an Inverted Index behind the scenes. Further details about RediSearch can be found here:

Here a very basic example from the RediSearch documentation:

As usual, I hope that you found this article useful and informative. Feedback is very welcome!

Thursday, 6 April 2017

Kafka Connect with Couchbase

About Kafka

Apache Kafka is a distributed persistent message queuing system. It is used in order to realize publish-subscribe use cases, process streams of data in real-time and store a stream of data safely in a distributed replicated cluster. That said Apache Kafka is not a database system but can stream data from a database system in near-real-time. The data is represented as a message stream with Kafka. Producers put messages in a so called message topic and Consumers take messages out of it for further processing. There is a variety of connectors available. A short introduction to Kafka can be found here: https://www.youtube.com/watch?v=fFPVwYKUTHs . This video explains the basic concepts and how Producers and Consumers are looking like. However, Couchbase supports 'Kafka Connect' since version 3.1 of it's connector. The Kafka documentation says "Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka.". Kafka provides a common framework for Kafka connectors. It can run in a distributed or standalone mode and it distributed and scalable by default.


Kafka uses Apache Zookeeper. Zookeeper is a cluster management service. The documentation states that "ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications ... ZooKeeper aims at distilling the essence of these different services into a very simple interface to a centralized coordination service."

After downloading and extracting the standard distribution of Apache Kafka, you can start a local Zookeeper instance by using the default configuration the following way:

The next step is to configure 3 Kafka message broker nodes. We will run these services just on the same host for demoing purposes but it's obvious that they can also run more distributed.  In order to so we need to create configurations for the broker servers. So copy the config/server.properties file to server-1.properties and server-2.properties and then edit it. The file 'sever.properties' has the following settings:

Let's assume that $i is the id of the broker. So the first broker has id '0', listens on port 9092 and logs to 'kafka-logs-0'. The second broker has the id '1', listens on port 9093 and logs to 'kafka-logs-1'. The third broker configuration is self-explaining.

The next step is to download and install and  Couchbase Plug-in. Just copy the related libraries to the libs sub-folder and the configuration files to the config sub-folder of your Kafka installation.

Streaming data from Couchbase

Before we can stream data from Couchbase we need to create a topic to which we want to stream to. So let's create a topic which is named 'test-cb'.

You can then describe this topic by using the following command:

The topic which we created has 3 partitions. Each node is the leader for 1 partition. The leader is the node responsible for all reads and writes for the given partition. The Replicas is the list of nodes that replicate the log for this partition.

Now let's create a configuration file for distributed workers under 'config/couchbase-distributed.properties':

The Connect settings are more or less the default ones. Now we also have to provide the connector settings. If using the distributed mode then the settings have to be provided by registering the connector via the Connect REST service:

The configuration file 'couchbase-distributed.json' has a name attribute and an embedded object with the configuration settings:

The Couchbase settings refer to a Couchbase bucket and the topic name to which we want to stream DCP messages out of Couchbase. In order to run the Connect workers in distributed mode, we can now execute:

The log file contains information about the tasks. We configured 2 tasks to run. The output contains the information which task is responsible for which Couchbase shards (vBuckets):

For now let's just consume the 'test-cb' messages by using a console logging consumer:

One entry looks as the following one:

We just used the standard value converter. The value is in reality a JSON document but represented as Base64 encoded string in this case.

Another article will explain how to use Couchbase via Kafka Connect as the sink for messages.

Monday, 12 September 2016

Visualizing time series data from Couchbase with Grafana

Grafana is a quite popular tool for querying and visualizing time series data and metrics. If you follow my blog then you might have seen my earlier post about how to use Couchbase Server for managing time series data:

This blog is now about extending this idea by providing a Grafana Couchbase plug-in for visualizing purposes.

After you installed Grafana (I installed it on Ubuntu, but there are installation guides available here for several platforms), you are asked to configure a data source. Before we will use Grafana's 'SimpleJson' data source, it's relevant how the backend of such a data source looks like.

  • '/': Returns any successful response in order to test if the data source is available
  • '/search': Returns the available metrics. We will just return 'dax' in our example.
  • '/annotations': Returns an array of annotations. Such an annotation has a title, a time where it would occur, a text and a tag. We just return an empty array in our example. But you can easily see that it would be possible to create an annotation if a specific value is exceeded or a specific time is reached.
  • '/query': The request is containing a time range and a target metric. The result is an array which has an entry for every target metric and each of these entries has an array of data points. Each data point is a tuple of the metric value and the time stamp.

We will just extend our example from before with an Grafana endpoint and then point Grafana's generic JSON data source plug-in to it, but I can already see a project on the horizon which standardizes the time series management in Couchbase via a standard REST service which can then be used by a dedicated Grafana Couchbase plug-in.

First let's look at our backend implementation:

As usual, the full code can be found here: https://github.com/dmaier-couchbase/cb-ts-demo/blob/master/routes/grafana.js

Here how we implemented the backend:

  • '/': As you can see we just return a 'success:true' if the backend is accessible.
  • '/search': The only metric which our backend provides is the 'dax' one. 
  • '/annotations':  Only an example annotation is returned in this case. 
  • '/query': We just check if the requested metric is the 'dax' one. In this first example, we don't take the aggregation documents into account. Instead we just request the relevant data points by using a multi-get based on the time range. Because Grafana expects the datapoints in time order, we have to finally sort them by time. Again, this code will be extended in order to take the several aggregation levels into account (Year->Month->Day->Hour).

 Now back to Grafana! Let's assume that you successfully installed the 'SimpleJson' data source:

Then the only thing you need to do is to add a new data source to Grafana by pointing to our backend service (To run the backend service, just execute 'node app.js' after you checked out the full repository and installed all necessary dependencies.):

In this example I actually, just loaded a bit of random data for testing purposes by using the demo_data.js script.

Then all you have to do is to create a Dashboard an place a panel on it:

The rest should work more or less the same as with any other Grafana data source. :-)

Friday, 26 August 2016

Time series data management with Couchbase Server

Couchbase Server is a Key Value store and Document database. The combination of being able to store time series entries as KV pairs with the possibilities to aggregate data automatically in the background via Map-Reduce and the possibility to dynamically query the data via the query language N1QL makes Couchbase Server a perfect fit for time series management use cases.

The high transaction volume seen in time series use cases is meaning that relational database systems are often not a good fit. A single Couchbase Cluster on the other hand side might support hundreds of thousands (up to millions) of operations per second (indeed dependent on the node and cluster size).

Time series use cases seen with Couchbase are for instance:
  • Activity tracking: Track the activity of a user whereby each data point is a vector of activity measurement values (e.g location, ...)
  • Internet of things: Frequently gathering data points of internet connected devices (as cars, alarm systems, home automation devices, ...), storing them as a time series and aggregate them in order monitor and analyse the device behavior
  • Financial: Store currency or stock courses as time series in order to analyse (e.g. predictive analysis) based on this data. A course diagram is typically showing a time series.
  • Industrial Manufacturing: Getting measurement values from machine sensors in order to analyse the quality of parts.

But before we start digging deeper into an example, let's talk a bit about the background of time series data management:

A time series is a series of data points in time order. So mathematically spoken a time series is expressed as a diskrete function with (simplified) two dimensions. The first dimension (x-axis) is the time. The second dimension (y-axis) is the data point value, whereby a data point value can be again a vector (which makes it actually 1+n dimensional, whereby n is the vector size). Most commonly the values on the time-axis are on an equidistant grid, which means that the distance between any x values x_1 and x_2 is equal.

So what to do with such a time series?
  • Analyse the past: Statistics, reporting, ...
  • Real-time analysis: Monitor current activities, find anomalies, ...
  • Predictive analysis: Forecast, estimate, extrapolate, classify, ...

Good, time to look at an example. First we need a data source which is frequently providing changing data. Such data could be financial courses, a sensor measurement, a human heart beat and so on.

Let's take a financial course. Google is providing such information via 'Google Finance'. So in order to get the current course of the DAX (this might tell you where I am living ;-) ), you just have to open up https://www.google.com/finance?q=INDEXDB%3A.DAX. In order to get the same information as JSON you can just use https://www.google.com/finance/info?q=INDEXDB%3ADAX .

What we get by accessing this API is:

So far so good. Now let's write a litte Node.js application (by using http://www.ceanjs.org) which is polling every minute for the current course and then writes it into Couchbase. To be more accurate: we actually fetch every 30 seconds in order to reach the granularity of a minute. In this example we decided for the minute granularity but it would work in a similar way for an e.g. seconds granularity. We also just expect that the last fetched value for a minute is the minute value. An even more sophisticated approach would be to store the max. 2 gathered values in an array in our minutes document and already aggregate on those two (avg as the minute value instead the last one). It's a question of accuracy. The key of such a data point is indeed dependent on the time stamp. We are just interested in the course value 'l', the difference 'c' and the time stamp 'lt_dts'. The job logic then looks as the following one:

BTW: The full source code can be found here: https://github.com/dmaier-couchbase/cb-ts-demo/blob/master/course_retrieval_job.js

This looks then as the following in Couchbase.

Fine, so what's next? Let's start with direct access to time series values. In order to fetch all values for a given range, you don't need any index structure because:

  • The discrete time value is part of the key. So our time-axis is directly expressed via the key space.
It's also easy to see that JSON document value is more or less a vector (as defined above)

So let's write a little service which takes a start time stamp and an end time stamp as a parameter in order to provide you all the requested values.

The service code could look like this:

The full code can be found here: https://github.com/dmaier-couchbase/cb-ts-demo/blob/master/routes/by_time.js

It just takes the start and end time in the format following format:

  • http://localhost:9000/service/by_time?start=2016-08-25T13:15&end=2016-08-25T13:20 

The output looks as the following one:

Let's next calculate some statistics based on these values. Therefore we will create some aggregate documents. As you might already imagine, we will aggregate based on the time. The resulting time dimension for these aggregates will be 'Year -> Month -> Day -> Hour'. So their will be:

  • An hour aggregate: It aggregates based on the minutes time series. There are 60 minutes per hour to aggregate. 
  • An day aggregate: It aggregates based on the hour aggregates. There are 24 hours per day.
  • A month aggregate: It aggregates based on the day aggregates. There are between 28 and 31 days per month.
  • A year aggregate: It aggregates based on the month aggregates. There 12 months per year.
I guess you got it :-) ...

So how to build these aggregates? There are multiple ways to do it. Here just some of them:
  • Use the built-in views and write the view results for a specific time range back to Couchbase
  • Execute a N1QL query by using aggregate functions
  • Do the calculations on the client side by fetching the data and write the results back
  • Load or stream the data into Spark in order to do the necessary calculations there and write the results back to Couchbase

Let's have a look at Views first. Views provide built-in map-reduce. We want to calculate the following statistic values:

  • The average value of the course
  • The maximum value of the course
  • The minimum value of the course
We will just create one View for this. The  following map and reduce functions are created on the Couchbase Server side:

The request parameters for aggregating directly for one hour are looking like:

It's easy to see that it also allows us to directly access the time function which has the hour (and no longer the minute) as the distance on time axis. The data points are then the aggregation values. The same View can be used to get the monthly and the yearly aggregation values. The trick is to set the range parameters and the group level in the right way. In the example above 'group_level=4' was used because the hour information is at the fourth position of the date array which was emitted as the search key. In order to get the daily aggregation, just use a query like this:

Now let's create an aggregation service which is using this View result in order to return the aggregation for a specific hour. It queries the aggregate for a given hour and stores the aggregation result as an aggregate document if the hour is already a full hour (so if it has 60 data points). In reality you could also run a job in order to make sure that the aggregates are built upfront. In this demo application we just build them at access time. The next time they will be not accessed from the View, but directly from the KV store.

Following the code of the service:

The full code can be found here: https://github.com/dmaier-couchbase/cb-ts-demo/blob/master/routes/agg_by_hour.js

The result in Couchbase would be then:

Their might be the question in your head 'What's if I want to aggregate by taking a specific aggregation level into account, but also need to have the last minutes (highest granularity in our example) into account?'. The answer is to combine the approaches of accessing the minute data points directly and the lower granularity aggregates. Here an example: If you want to access everything since 14:00 until 15:02, whereby 15:00 is not yet a full hour, then you can do this by using the following formula.

  •  Agg(14:00) + Agg(t_15:00, t_15:01, t_15:02)

It's easy to see that you can derive additional formulas for other scenarios.

A related question is how long you should keep the highest granularity values. One year has 525600 minutes. And so we would get every year 525600 minute documents. So for this use case we could decide to remove the minute documents (Couchbase even comes with a TTL feature in order to let them expire automatically) because it's unlikely the case that someone is interested in more than the daily course after one year. How long you keep the finest granularity data points indeed depends on your requirement and how fine your finest granularity actually is.

OK, so this blog article is already getting quite long. Another one will follow which then will cover the following topics:

  • Visualizing time series data
  • How query time series data with N1QL
  • Predictive analysis of time series data with Couchbase and Apache Spark