Friday, 26 August 2016

Time series data management with Couchbase Server

Couchbase Server is a Key Value store and Document database. The combination of being able to store time series entries as KV pairs with the possibilities to aggregate data automatically in the background via Map-Reduce and the possibility to dynamically query the data via the query language N1QL makes Couchbase Server a perfect fit for time series management use cases.

The high transaction volume seen in time series use cases is meaning that relational database systems are often not a good fit. A single Couchbase Cluster on the other hand side might support hundreds of thousands (up to millions) of operations per second (indeed dependent on the node and cluster size).

Time series use cases seen with Couchbase are for instance:
  • Activity tracking: Track the activity of a user whereby each data point is a vector of activity measurement values (e.g location, ...)
  • Internet of things: Frequently gathering data points of internet connected devices (as cars, alarm systems, home automation devices, ...), storing them as a time series and aggregate them in order monitor and analyse the device behavior
  • Financial: Store currency or stock courses as time series in order to analyse (e.g. predictive analysis) based on this data. A course diagram is typically showing a time series.
  • Industrial Manufacturing: Getting measurement values from machine sensors in order to analyse the quality of parts.

But before we start digging deeper into an example, let's talk a bit about the background of time series data management:

A time series is a series of data points in time order. So mathematically spoken a time series is expressed as a diskrete function with (simplified) two dimensions. The first dimension (x-axis) is the time. The second dimension (y-axis) is the data point value, whereby a data point value can be again a vector (which makes it actually 1+n dimensional, whereby n is the vector size). Most commonly the values on the time-axis are on an equidistant grid, which means that the distance between any x values x_1 and x_2 is equal.

So what to do with such a time series?
  • Analyse the past: Statistics, reporting, ...
  • Real-time analysis: Monitor current activities, find anomalies, ...
  • Predictive analysis: Forecast, estimate, extrapolate, classify, ...

Good, time to look at an example. First we need a data source which is frequently providing changing data. Such data could be financial courses, a sensor measurement, a human heart beat and so on.

Let's take a financial course. Google is providing such information via 'Google Finance'. So in order to get the current course of the DAX (this might tell you where I am living ;-) ), you just have to open up https://www.google.com/finance?q=INDEXDB%3A.DAX. In order to get the same information as JSON you can just use https://www.google.com/finance/info?q=INDEXDB%3ADAX .

What we get by accessing this API is:



So far so good. Now let's write a litte Node.js application (by using http://www.ceanjs.org) which is polling every minute for the current course and then writes it into Couchbase. To be more accurate: we actually fetch every 30 seconds in order to reach the granularity of a minute. In this example we decided for the minute granularity but it would work in a similar way for an e.g. seconds granularity. We also just expect that the last fetched value for a minute is the minute value. An even more sophisticated approach would be to store the max. 2 gathered values in an array in our minutes document and already aggregate on those two (avg as the minute value instead the last one). It's a question of accuracy. The key of such a data point is indeed dependent on the time stamp. We are just interested in the course value 'l', the difference 'c' and the time stamp 'lt_dts'. The job logic then looks as the following one:



BTW: The full source code can be found here: https://github.com/dmaier-couchbase/cb-ts-demo/blob/master/course_retrieval_job.js

This looks then as the following in Couchbase.



Fine, so what's next? Let's start with direct access to time series values. In order to fetch all values for a given range, you don't need any index structure because:

  • The discrete time value is part of the key. So our time-axis is directly expressed via the key space.
It's also easy to see that JSON document value is more or less a vector (as defined above)

So let's write a little service which takes a start time stamp and an end time stamp as a parameter in order to provide you all the requested values.

The service code could look like this:





The full code can be found here: https://github.com/dmaier-couchbase/cb-ts-demo/blob/master/routes/by_time.js

It just takes the start and end time in the format following format:


  • http://localhost:9000/service/by_time?start=2016-08-25T13:15&end=2016-08-25T13:20 

The output looks as the following one:




Let's next calculate some statistics based on these values. Therefore we will create some aggregate documents. As you might already imagine, we will aggregate based on the time. The resulting time dimension for these aggregates will be 'Year -> Month -> Day -> Hour'. So their will be:

  • An hour aggregate: It aggregates based on the minutes time series. There are 60 minutes per hour to aggregate. 
  • An day aggregate: It aggregates based on the hour aggregates. There are 24 hours per day.
  • A month aggregate: It aggregates based on the day aggregates. There are between 28 and 31 days per month.
  • A year aggregate: It aggregates based on the month aggregates. There 12 months per year.
I guess you got it :-) ...

So how to build these aggregates? There are multiple ways to do it. Here just some of them:
  • Use the built-in views and write the view results for a specific time range back to Couchbase
  • Execute a N1QL query by using aggregate functions
  • Do the calculations on the client side by fetching the data and write the results back
  • Load or stream the data into Spark in order to do the necessary calculations there and write the results back to Couchbase

Let's have a look at Views first. Views provide built-in map-reduce. We want to calculate the following statistic values:

  • The average value of the course
  • The maximum value of the course
  • The minimum value of the course
We will just create one View for this. The  following map and reduce functions are created on the Couchbase Server side:



The request parameters for aggregating directly for one hour are looking like:


It's easy to see that it also allows us to directly access the time function which has the hour (and no longer the minute) as the distance on time axis. The data points are then the aggregation values. The same View can be used to get the monthly and the yearly aggregation values. The trick is to set the range parameters and the group level in the right way. In the example above 'group_level=4' was used because the hour information is at the fourth position of the date array which was emitted as the search key. In order to get the daily aggregation, just use a query like this:



Now let's create an aggregation service which is using this View result in order to return the aggregation for a specific hour. It queries the aggregate for a given hour and stores the aggregation result as an aggregate document if the hour is already a full hour (so if it has 60 data points). In reality you could also run a job in order to make sure that the aggregates are built upfront. In this demo application we just build them at access time. The next time they will be not accessed from the View, but directly from the KV store.

Following the code of the service:



The full code can be found here: https://github.com/dmaier-couchbase/cb-ts-demo/blob/master/routes/agg_by_hour.js

The result in Couchbase would be then:



Their might be the question in your head 'What's if I want to aggregate by taking a specific aggregation level into account, but also need to have the last minutes (highest granularity in our example) into account?'. The answer is to combine the approaches of accessing the minute data points directly and the lower granularity aggregates. Here an example: If you want to access everything since 14:00 until 15:02, whereby 15:00 is not yet a full hour, then you can do this by using the following formula.

  •  Agg(14:00) + Agg(t_15:00, t_15:01, t_15:02)

It's easy to see that you can derive additional formulas for other scenarios.

A related question is how long you should keep the highest granularity values. One year has 525600 minutes. And so we would get every year 525600 minute documents. So for this use case we could decide to remove the minute documents (Couchbase even comes with a TTL feature in order to let them expire automatically) because it's unlikely the case that someone is interested in more than the daily course after one year. How long you keep the finest granularity data points indeed depends on your requirement and how fine your finest granularity actually is.

OK, so this blog article is already getting quite long. Another one will follow which then will cover the following topics:


  • Visualizing time series data
  • How query time series data with N1QL
  • Predictive analysis of time series data with Couchbase and Apache Spark

1 comment: