Thursday, 27 July 2017

Indexing with Redis

If you follow my news on Twitter then you might have realized that I just started to work more with Redis.  Redis (=Remote Dictionary Server) is known as a Data Structure Store. This means that we can not just deal with Key-Value Pairs (called Strings in Redis) but in addition with data structures as Hashes (Hash-Maps), Lists, Sets or Sorted Sets. Further details about data structures can be found here:


Indexing in Key-Value Stores

With a pure Key-Value Store, you would typically maintain your index structures manually by applying some KV-Store patterns. Here some examples:

  • Direct access via the primary key: The key itself is semantically meaningful and so you can access a value directly by knowing how the key is structured (by using key patterns). An example would be to access an user profile by knowing the user's id. The key looks like 'user::<uid>'.
  • Exact match by a secondary key: The KV-Store itself can be seen as a huge Hash-Map, which means that you can use lookup items in order to reference other ones. This gives you a kind of Hash Index. An example would be to find a user by his email address. The lookup item has the key 'email::<email_addr>', whereby the value is the key of the user. In order to fetch the user with a specific email address you just need to do a Get operation on the key with the email prefix and then another one on the key with the user prefix. 
  • Range by a secondary key: This is where it is getting a bit more complicated with pure KV-Stores. Most of them allow you to retrieve a list of all keys, but doing a full 'key space scan' is not efficient (complexity of O(n), n=number of keys). You can indeed build your own tree structure by storing lists as values and by referencing between them, but maintaining these search trees on the application side is really not what you usually want to do.


The Redis Way

So how is Redis addressing these examples? We are leveraging the power of data structures as Hashes and Sorted Sets.

Direct Access via the Primary Key

A Get operation already has a complexity of O(1). This is the same for Redis.

Exact Match by a Secondary Key

Hashes (as the name already indicates) can be directly used to build a hash index in order to support exact match 'queries'. The complexity of accessing an entry in a Redis Hash is indeed O(1). Here an example:

In addition Redis Hashes are supporting operations as HSCAN. This provides you a cursor based approach to scan hashes. Further information can be found here:


Here an example:


Range By a Secondary Key

Sorted Sets can be used to support range 'queries'.  The way how this works is that we use the value for which we are searching  as the score (order number). To scan such a Sorted Set has then a complexity of O(log(n)+m) whereby n is the number of elements in the set and m is the result set size.

Here an example:

If you add 2 elements with the same score then they are sorted lexicographically. This is interesting for non-numeric values. The command ZRANGEBYLEX allows you to perform range 'queries' by taking the lexicographic order into account.


Modules

Redis supports now Modules (since v4.0). Modules are allowing you to extend Redis' functionality. One module which perfectly matches the topic of this blog post is RediSearch. RediSearch is basically providing Full Text Indexing and Searching capabilities to Redis. It uses an Inverted Index behind the scenes. Further details about RediSearch can be found here:

Here a very basic example from the RediSearch documentation:


As usual, I hope that you found this article useful and informative. Feedback is very welcome!


Thursday, 6 April 2017

Kafka Connect with Couchbase

About Kafka

Apache Kafka is a distributed persistent message queuing system. It is used in order to realize publish-subscribe use cases, process streams of data in real-time and store a stream of data safely in a distributed replicated cluster. That said Apache Kafka is not a database system but can stream data from a database system in near-real-time. The data is represented as a message stream with Kafka. Producers put messages in a so called message topic and Consumers take messages out of it for further processing. There is a variety of connectors available. A short introduction to Kafka can be found here: https://www.youtube.com/watch?v=fFPVwYKUTHs . This video explains the basic concepts and how Producers and Consumers are looking like. However, Couchbase supports 'Kafka Connect' since version 3.1 of it's connector. The Kafka documentation says "Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka.". Kafka provides a common framework for Kafka connectors. It can run in a distributed or standalone mode and it distributed and scalable by default.

Setup

Kafka uses Apache Zookeeper. Zookeeper is a cluster management service. The documentation states that "ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications ... ZooKeeper aims at distilling the essence of these different services into a very simple interface to a centralized coordination service."

After downloading and extracting the standard distribution of Apache Kafka, you can start a local Zookeeper instance by using the default configuration the following way:


The next step is to configure 3 Kafka message broker nodes. We will run these services just on the same host for demoing purposes but it's obvious that they can also run more distributed.  In order to so we need to create configurations for the broker servers. So copy the config/server.properties file to server-1.properties and server-2.properties and then edit it. The file 'sever.properties' has the following settings:


Let's assume that $i is the id of the broker. So the first broker has id '0', listens on port 9092 and logs to 'kafka-logs-0'. The second broker has the id '1', listens on port 9093 and logs to 'kafka-logs-1'. The third broker configuration is self-explaining.


The next step is to download and install and  Couchbase Plug-in. Just copy the related libraries to the libs sub-folder and the configuration files to the config sub-folder of your Kafka installation.


Streaming data from Couchbase

Before we can stream data from Couchbase we need to create a topic to which we want to stream to. So let's create a topic which is named 'test-cb'.


You can then describe this topic by using the following command:


The topic which we created has 3 partitions. Each node is the leader for 1 partition. The leader is the node responsible for all reads and writes for the given partition. The Replicas is the list of nodes that replicate the log for this partition.

Now let's create a configuration file for distributed workers under 'config/couchbase-distributed.properties':


The Connect settings are more or less the default ones. Now we also have to provide the connector settings. If using the distributed mode then the settings have to be provided by registering the connector via the Connect REST service:


The configuration file 'couchbase-distributed.json' has a name attribute and an embedded object with the configuration settings:


The Couchbase settings refer to a Couchbase bucket and the topic name to which we want to stream DCP messages out of Couchbase. In order to run the Connect workers in distributed mode, we can now execute:


The log file contains information about the tasks. We configured 2 tasks to run. The output contains the information which task is responsible for which Couchbase shards (vBuckets):


For now let's just consume the 'test-cb' messages by using a console logging consumer:


One entry looks as the following one:


We just used the standard value converter. The value is in reality a JSON document but represented as Base64 encoded string in this case.

Another article will explain how to use Couchbase via Kafka Connect as the sink for messages.

Monday, 12 September 2016

Visualizing time series data from Couchbase with Grafana

Grafana is a quite popular tool for querying and visualizing time series data and metrics. If you follow my blog then you might have seen my earlier post about how to use Couchbase Server for managing time series data:




This blog is now about extending this idea by providing a Grafana Couchbase plug-in for visualizing purposes.

After you installed Grafana (I installed it on Ubuntu, but there are installation guides available here for several platforms), you are asked to configure a data source. Before we will use Grafana's 'SimpleJson' data source, it's relevant how the backend of such a data source looks like.


  • '/': Returns any successful response in order to test if the data source is available
  • '/search': Returns the available metrics. We will just return 'dax' in our example.
  • '/annotations': Returns an array of annotations. Such an annotation has a title, a time where it would occur, a text and a tag. We just return an empty array in our example. But you can easily see that it would be possible to create an annotation if a specific value is exceeded or a specific time is reached.
  • '/query': The request is containing a time range and a target metric. The result is an array which has an entry for every target metric and each of these entries has an array of data points. Each data point is a tuple of the metric value and the time stamp.

We will just extend our example from before with an Grafana endpoint and then point Grafana's generic JSON data source plug-in to it, but I can already see a project on the horizon which standardizes the time series management in Couchbase via a standard REST service which can then be used by a dedicated Grafana Couchbase plug-in.

First let's look at our backend implementation:




As usual, the full code can be found here: https://github.com/dmaier-couchbase/cb-ts-demo/blob/master/routes/grafana.js

Here how we implemented the backend:

  • '/': As you can see we just return a 'success:true' if the backend is accessible.
  • '/search': The only metric which our backend provides is the 'dax' one. 
  • '/annotations':  Only an example annotation is returned in this case. 
  • '/query': We just check if the requested metric is the 'dax' one. In this first example, we don't take the aggregation documents into account. Instead we just request the relevant data points by using a multi-get based on the time range. Because Grafana expects the datapoints in time order, we have to finally sort them by time. Again, this code will be extended in order to take the several aggregation levels into account (Year->Month->Day->Hour).


 Now back to Grafana! Let's assume that you successfully installed the 'SimpleJson' data source:


Then the only thing you need to do is to add a new data source to Grafana by pointing to our backend service (To run the backend service, just execute 'node app.js' after you checked out the full repository and installed all necessary dependencies.):


In this example I actually, just loaded a bit of random data for testing purposes by using the demo_data.js script.

Then all you have to do is to create a Dashboard an place a panel on it:



The rest should work more or less the same as with any other Grafana data source. :-)

Friday, 26 August 2016

Time series data management with Couchbase Server

Couchbase Server is a Key Value store and Document database. The combination of being able to store time series entries as KV pairs with the possibilities to aggregate data automatically in the background via Map-Reduce and the possibility to dynamically query the data via the query language N1QL makes Couchbase Server a perfect fit for time series management use cases.

The high transaction volume seen in time series use cases is meaning that relational database systems are often not a good fit. A single Couchbase Cluster on the other hand side might support hundreds of thousands (up to millions) of operations per second (indeed dependent on the node and cluster size).

Time series use cases seen with Couchbase are for instance:
  • Activity tracking: Track the activity of a user whereby each data point is a vector of activity measurement values (e.g location, ...)
  • Internet of things: Frequently gathering data points of internet connected devices (as cars, alarm systems, home automation devices, ...), storing them as a time series and aggregate them in order monitor and analyse the device behavior
  • Financial: Store currency or stock courses as time series in order to analyse (e.g. predictive analysis) based on this data. A course diagram is typically showing a time series.
  • Industrial Manufacturing: Getting measurement values from machine sensors in order to analyse the quality of parts.

But before we start digging deeper into an example, let's talk a bit about the background of time series data management:

A time series is a series of data points in time order. So mathematically spoken a time series is expressed as a diskrete function with (simplified) two dimensions. The first dimension (x-axis) is the time. The second dimension (y-axis) is the data point value, whereby a data point value can be again a vector (which makes it actually 1+n dimensional, whereby n is the vector size). Most commonly the values on the time-axis are on an equidistant grid, which means that the distance between any x values x_1 and x_2 is equal.

So what to do with such a time series?
  • Analyse the past: Statistics, reporting, ...
  • Real-time analysis: Monitor current activities, find anomalies, ...
  • Predictive analysis: Forecast, estimate, extrapolate, classify, ...

Good, time to look at an example. First we need a data source which is frequently providing changing data. Such data could be financial courses, a sensor measurement, a human heart beat and so on.

Let's take a financial course. Google is providing such information via 'Google Finance'. So in order to get the current course of the DAX (this might tell you where I am living ;-) ), you just have to open up https://www.google.com/finance?q=INDEXDB%3A.DAX. In order to get the same information as JSON you can just use https://www.google.com/finance/info?q=INDEXDB%3ADAX .

What we get by accessing this API is:



So far so good. Now let's write a litte Node.js application (by using http://www.ceanjs.org) which is polling every minute for the current course and then writes it into Couchbase. To be more accurate: we actually fetch every 30 seconds in order to reach the granularity of a minute. In this example we decided for the minute granularity but it would work in a similar way for an e.g. seconds granularity. We also just expect that the last fetched value for a minute is the minute value. An even more sophisticated approach would be to store the max. 2 gathered values in an array in our minutes document and already aggregate on those two (avg as the minute value instead the last one). It's a question of accuracy. The key of such a data point is indeed dependent on the time stamp. We are just interested in the course value 'l', the difference 'c' and the time stamp 'lt_dts'. The job logic then looks as the following one:



BTW: The full source code can be found here: https://github.com/dmaier-couchbase/cb-ts-demo/blob/master/course_retrieval_job.js

This looks then as the following in Couchbase.



Fine, so what's next? Let's start with direct access to time series values. In order to fetch all values for a given range, you don't need any index structure because:

  • The discrete time value is part of the key. So our time-axis is directly expressed via the key space.
It's also easy to see that JSON document value is more or less a vector (as defined above)

So let's write a little service which takes a start time stamp and an end time stamp as a parameter in order to provide you all the requested values.

The service code could look like this:





The full code can be found here: https://github.com/dmaier-couchbase/cb-ts-demo/blob/master/routes/by_time.js

It just takes the start and end time in the format following format:


  • http://localhost:9000/service/by_time?start=2016-08-25T13:15&end=2016-08-25T13:20 

The output looks as the following one:




Let's next calculate some statistics based on these values. Therefore we will create some aggregate documents. As you might already imagine, we will aggregate based on the time. The resulting time dimension for these aggregates will be 'Year -> Month -> Day -> Hour'. So their will be:

  • An hour aggregate: It aggregates based on the minutes time series. There are 60 minutes per hour to aggregate. 
  • An day aggregate: It aggregates based on the hour aggregates. There are 24 hours per day.
  • A month aggregate: It aggregates based on the day aggregates. There are between 28 and 31 days per month.
  • A year aggregate: It aggregates based on the month aggregates. There 12 months per year.
I guess you got it :-) ...

So how to build these aggregates? There are multiple ways to do it. Here just some of them:
  • Use the built-in views and write the view results for a specific time range back to Couchbase
  • Execute a N1QL query by using aggregate functions
  • Do the calculations on the client side by fetching the data and write the results back
  • Load or stream the data into Spark in order to do the necessary calculations there and write the results back to Couchbase

Let's have a look at Views first. Views provide built-in map-reduce. We want to calculate the following statistic values:

  • The average value of the course
  • The maximum value of the course
  • The minimum value of the course
We will just create one View for this. The  following map and reduce functions are created on the Couchbase Server side:



The request parameters for aggregating directly for one hour are looking like:


It's easy to see that it also allows us to directly access the time function which has the hour (and no longer the minute) as the distance on time axis. The data points are then the aggregation values. The same View can be used to get the monthly and the yearly aggregation values. The trick is to set the range parameters and the group level in the right way. In the example above 'group_level=4' was used because the hour information is at the fourth position of the date array which was emitted as the search key. In order to get the daily aggregation, just use a query like this:



Now let's create an aggregation service which is using this View result in order to return the aggregation for a specific hour. It queries the aggregate for a given hour and stores the aggregation result as an aggregate document if the hour is already a full hour (so if it has 60 data points). In reality you could also run a job in order to make sure that the aggregates are built upfront. In this demo application we just build them at access time. The next time they will be not accessed from the View, but directly from the KV store.

Following the code of the service:



The full code can be found here: https://github.com/dmaier-couchbase/cb-ts-demo/blob/master/routes/agg_by_hour.js

The result in Couchbase would be then:



Their might be the question in your head 'What's if I want to aggregate by taking a specific aggregation level into account, but also need to have the last minutes (highest granularity in our example) into account?'. The answer is to combine the approaches of accessing the minute data points directly and the lower granularity aggregates. Here an example: If you want to access everything since 14:00 until 15:02, whereby 15:00 is not yet a full hour, then you can do this by using the following formula.

  •  Agg(14:00) + Agg(t_15:00, t_15:01, t_15:02)

It's easy to see that you can derive additional formulas for other scenarios.

A related question is how long you should keep the highest granularity values. One year has 525600 minutes. And so we would get every year 525600 minute documents. So for this use case we could decide to remove the minute documents (Couchbase even comes with a TTL feature in order to let them expire automatically) because it's unlikely the case that someone is interested in more than the daily course after one year. How long you keep the finest granularity data points indeed depends on your requirement and how fine your finest granularity actually is.

OK, so this blog article is already getting quite long. Another one will follow which then will cover the following topics:


  • Visualizing time series data
  • How query time series data with N1QL
  • Predictive analysis of time series data with Couchbase and Apache Spark

Friday, 1 July 2016

Caching in JavaEE with Couchbase

One of Couchbase Server's typical use cases is caching. As you might know it is a KV store. The value of a KV pair can be JSON document. Not only the fact that Couchbase Server can store JSON documents makes it a document database, more the fact that you can index and query on JSON data defines it's characteristic as a JSON document database. Back to the KV store: If you you configure the built-in managed cache in a way that all your data is fitting into memory then Couchbase Server is used as a highly available distributed cache.

If you are a Java developer, then one of your questions might be if it makes sense to use Couchbase as a cache for your applications. I had several projects, where EhCache was replaced by Couchbase because of the Garbage Collection implications. The performance was often quite better with a centralized, low-latency (sub-milliseconds) cache than with one which was colocated with the application instances. This indeed depends on several factors (size of the cache entries, number of cache entries, access throughput). The next questions might be how to best integrate such a cache into your application. A typical pattern is:
  • Try to read the data from the cache
  • If it is there, then use it
  • If is not there then get the data from the source system (e.g. relational DBMS)
  • Put it into the cache
  • The next time when you try to access the same data, then it will be most probably in the cache
Couchbase's Java SDK is quite simple for CRUD operations:
  • C: Insert
  • R: Get
  • U: Update, Replace
  • D: Remove
So as soon as you established a Bucket (a data container) connection, you can use it as a cache. However, this is involving implementation work on your side. 

I just looked at the Java standard JCache and also used the chance to play a bit around with CDI (Dependency Injection). JCache is implemented by several providers and look at that there is already a Developer Preview of a Couchbase implementation available (http://blog.couchbase.com/jcache-dp2).

Side note: The Couchbase JCache implementation is not yet officially released. Couchbase has also a good Spring Data integration which also comes with cashing support.

So let's get started. First we need to have a cache instance which we can use for caching purposes.


As you can see, we are creating a CacheProvider, then a Config and finally a CacheManager in order to access the Cache. Our cache is an object cache, whereby objects are stored by a string key. The Factory ensures that we have only a single instance of our ObjectCache. It's not using CDI and so can be also used with JavaSE. In a real world you would probably not use constants for the factory configuration, but it seemed to be sufficient for this example.

Now let's use the factory. Actually we misuse it a bit here because we use it in a Producer. In a pure CDI world, you would just use the code for initializing the cache in the producer method. So the producer is actually your factory, whereby the producer method acts as a source of objects to be injected. The annotation 'CBObjectCache' was bound to the producer.



Now that we have a producer, we can just inject CBObjectCache somewhere else. Let's do this in an Interceptor. We will use this interceptor later in order to cache objects automatically when a method is called. The annotation 'Cached' is bound to our interceptor.



Now in order to use our interceptor, we just have to annotate a method which should cache the passed data. The example below shows that the 'createHelloMessage' is annotated with 'Cached'. So before the actual method code is executed, the value of the variable 'name' will be cached in Couchbase. In order proof this, the value is fetched in the method body again to be printed out by the 'HelloWorldServlet'.



Before I forget it, here how it is looking like in Couchbase:


Hope this small introduction to JCache and CDI was interesting for you. :-)

The full source code can be found here: https://github.com/dmaier-couchbase/cb-jboss/tree/master/hello-jcache-cdi .


Wednesday, 1 June 2016

How to build Couchbase Server

Couchbase Server is Open Source under Apache2 license and even if an user would normally not build it from the source code (in fact the custom built versions are not officially supported by Couchbase), you might want to participate in the Couchbase Community by providing some lines of code. The first thing you need is to be able to build Couchbase Server from the source code.

The Couchbase Server source code is not just in one repository. Instead it is spread over multiple Git repositories. A tool which can be used in order to abstract the access to these multiple Git repositories is 'repo'. So 'repo' is a repository management tool on top of Git. It's also by Google for Android and so a short documentation can be found here: https://source.android.com/source/using-repo.html . The installation instructions are available at http://source.android.com/source/downloading.html#installing-repo .

Here some 'repo' commands:
  • repo init: Installs the repository to the current directory
  • repo sync: Downloads the new changes and updates the working files in the local directory
  • repo start: Begins a new branch for development, starting from the revision specified in the manifest
Repo is using manifest files. The Couchbase manifest files can be found here: https://github.com/couchbase/manifest . Let's take a look into one of these files (e.g. /released/4.5.0-beta.xml):

<remote name="couchbase" fetch="git://github.com/couchbase/" review="review.couchbase.org" />
...
<default remote="couchbase" revision="master" />
<project name="bleve" remote="blevesearch" revision="760057afb67ba9d8d7ad52f49a87f2bf9d31a945" path="godeps/src/github.com/blevesearch/bleve"/>
...

As you can see, the manifest includes the Git repos those are containing Couchbase dependencies. By default the master branch was referenced here. Each dependency can be provided with a specific Git Hash or branch name in order to make sure that you build based on the right version of the dependent library.

Before we build it's required to have at least make and cmake installed on your build box. If not the build will fail by telling you what's required. I already had a C development environment, python and Go installed on my computer. The build of Couchbase is actually quite simple:

cd --
mkdir -p src/couchbase
cd src/couchbase
repo init -u git://github.com/couchbase/manifest.git -m 
repo sync
make

The built version of Couchbase is then available in the sub-folder 'install'.

Friday, 13 May 2016

Couchbase Server 4.5's new Sub-Document API

Introduction

The Beta version of Couchbase Server 4.5 has just been released, so let's try it out! A complete overview of all the great new features can be found here: http://developer.couchbase.com/documentation/server/4.5/introduction/intro.html. This article will highlight the new Sub-Document API feature.

What's a sub-document? The following document contains a sub-document which is accessible via the field 'tags':


So far

With earlier Couchbase versions (<4.5) the update of a document had to follow the following pattern:

  • Get the whole document which needs to be updated
  • Update the documents on the client side (e.g. by only updating a few properties)
  • Write the whole document back



A simple Java code example would be:


Now with 4.5

The new sub-document API is a server side feature which allows you to (surprise, surprise ...) only get or modify a sub-document of an existing document in Couchbase. The advantages are:

  •  Better usability on the client side
    • CRUD operations can be performed based on paths
    • In cases where the modification doesn't rely on the previous value, you can update a document without the need to fetch it upfront
    • You can easier maintain key references between documents
  •  Improved performance
    • It saves network bandwidth and has a improved latency because you don't need to transfer the whole document over the wire


The sub-document API also allows you to get or modify inner values or arrays of a (sub-)document.
  • Lookup operations: Queries the document for a specific path, e.g. GET, EXISTS
  • Mutation operations: Modify one or multiple paths in a document, e.g. UPSERT, ARRAY_APPEND, COUNTER
A more detailed description of the API can be found in the Couchbase documentation: http://developer.couchbase.com/documentation/server/4.5-dp/sub-doc-api.html .

The update of a document can now follow the following pattern:

  • Update directly a property or subdocument by specifying the path under which it can be found


Our Java example would now be simplified to:


Optimistic "locking"

Couchbase Server does not have a built-in transaction manager, but if you talk about transactional behavior, the requirements are quite often less than what a ACID transaction manager would provide (e.g. handling just concurrent access instead of being fully ACID compliant). In Couchbase a document has a so called C(ompare) A(nd) S(wap) value. This value changes as soon as a document is modified on the server side.
  • Get a document with a specific CAS value
  • Change the properties on the client side
  • Try to replace the document by passing the old CAS value. If the CAS value changed in between on the server side then you know that someone else modified the document in between and so you can retry to apply your changes.
So CAS is used for an optimistic locking approach. It's optimistic because you expect that you can apply your changes and you handle the case that this wasn't possible because someone else changed it before. A pessimistic approach would be to lock the document upfront and so no one else can write it until this lock will be released again.

You could now ask the following question:
  • What happens if I modify a sub-document and someone else updates the same or another sub-document of the same document?
Sub-document operations are atomic. Atomicity means all or nothing. So if you update a sub-document by not retrieving an error message then you can be sure that the update was performed on the server side. This means if 5 clients are appending an element to an embedded array, then you can be sure that all 5 values were appended. However, atomicity isn't meaning consistency regarding the state. So it isn't telling you about conflicts. So if 2 clients are updating the same sub-document then both updates will be performed but in order to find out if their was a conflict regarding these updates you would still need the CAS value (or use pessimistic locking instead). However, if you are sure that the clients act on different sub-documents then you know that there will be no conflict and then the CAS value would be not required.

Summary

The new Sub-Document API is one of the new great features of Couchbase 4.5 (Beta). It allows you to avoid to fetch the whole document in order to read/modify only a part of it. This means a better usability from a client side point of view. One of the main advantages is that it improves the performance, especially if working with bigger documents.