Monday, 12 September 2016

Visualizing time series data from Couchbase with Grafana

Grafana is a quite popular tool for querying and visualizing time series data and metrics. If you follow my blog then you might have seen my earlier post about how to use Couchbase Server for managing time series data:

This blog is now about extending this idea by providing a Grafana Couchbase plug-in for visualizing purposes.

After you installed Grafana (I installed it on Ubuntu, but there are installation guides available here for several platforms), you are asked to configure a data source. Before we will use Grafana's 'SimpleJson' data source, it's relevant how the backend of such a data source looks like.

  • '/': Returns any successful response in order to test if the data source is available
  • '/search': Returns the available metrics. We will just return 'dax' in our example.
  • '/annotations': Returns an array of annotations. Such an annotation has a title, a time where it would occur, a text and a tag. We just return an empty array in our example. But you can easily see that it would be possible to create an annotation if a specific value is exceeded or a specific time is reached.
  • '/query': The request is containing a time range and a target metric. The result is an array which has an entry for every target metric and each of these entries has an array of data points. Each data point is a tuple of the metric value and the time stamp.

We will just extend our example from before with an Grafana endpoint and then point Grafana's generic JSON data source plug-in to it, but I can already see a project on the horizon which standardizes the time series management in Couchbase via a standard REST service which can then be used by a dedicated Grafana Couchbase plug-in.

First let's look at our backend implementation:

As usual, the full code can be found here:

Here how we implemented the backend:

  • '/': As you can see we just return a 'success:true' if the backend is accessible.
  • '/search': The only metric which our backend provides is the 'dax' one. 
  • '/annotations':  Only an example annotation is returned in this case. 
  • '/query': We just check if the requested metric is the 'dax' one. In this first example, we don't take the aggregation documents into account. Instead we just request the relevant data points by using a multi-get based on the time range. Because Grafana expects the datapoints in time order, we have to finally sort them by time. Again, this code will be extended in order to take the several aggregation levels into account (Year->Month->Day->Hour).

 Now back to Grafana! Let's assume that you successfully installed the 'SimpleJson' data source:

Then the only thing you need to do is to add a new data source to Grafana by pointing to our backend service (To run the backend service, just execute 'node app.js' after you checked out the full repository and installed all necessary dependencies.):

In this example I actually, just loaded a bit of random data for testing purposes by using the demo_data.js script.

Then all you have to do is to create a Dashboard an place a panel on it:

The rest should work more or less the same as with any other Grafana data source. :-)

Friday, 26 August 2016

Time series data management with Couchbase Server

Couchbase Server is a Key Value store and Document database. The combination of being able to store time series entries as KV pairs with the possibilities to aggregate data automatically in the background via Map-Reduce and the possibility to dynamically query the data via the query language N1QL makes Couchbase Server a perfect fit for time series management use cases.

The high transaction volume seen in time series use cases is meaning that relational database systems are often not a good fit. A single Couchbase Cluster on the other hand side might support hundreds of thousands (up to millions) of operations per second (indeed dependent on the node and cluster size).

Time series use cases seen with Couchbase are for instance:
  • Activity tracking: Track the activity of a user whereby each data point is a vector of activity measurement values (e.g location, ...)
  • Internet of things: Frequently gathering data points of internet connected devices (as cars, alarm systems, home automation devices, ...), storing them as a time series and aggregate them in order monitor and analyse the device behavior
  • Financial: Store currency or stock courses as time series in order to analyse (e.g. predictive analysis) based on this data. A course diagram is typically showing a time series.
  • Industrial Manufacturing: Getting measurement values from machine sensors in order to analyse the quality of parts.

But before we start digging deeper into an example, let's talk a bit about the background of time series data management:

A time series is a series of data points in time order. So mathematically spoken a time series is expressed as a diskrete function with (simplified) two dimensions. The first dimension (x-axis) is the time. The second dimension (y-axis) is the data point value, whereby a data point value can be again a vector (which makes it actually 1+n dimensional, whereby n is the vector size). Most commonly the values on the time-axis are on an equidistant grid, which means that the distance between any x values x_1 and x_2 is equal.

So what to do with such a time series?
  • Analyse the past: Statistics, reporting, ...
  • Real-time analysis: Monitor current activities, find anomalies, ...
  • Predictive analysis: Forecast, estimate, extrapolate, classify, ...

Good, time to look at an example. First we need a data source which is frequently providing changing data. Such data could be financial courses, a sensor measurement, a human heart beat and so on.

Let's take a financial course. Google is providing such information via 'Google Finance'. So in order to get the current course of the DAX (this might tell you where I am living ;-) ), you just have to open up In order to get the same information as JSON you can just use .

What we get by accessing this API is:

So far so good. Now let's write a litte Node.js application (by using which is polling every minute for the current course and then writes it into Couchbase. To be more accurate: we actually fetch every 30 seconds in order to reach the granularity of a minute. In this example we decided for the minute granularity but it would work in a similar way for an e.g. seconds granularity. We also just expect that the last fetched value for a minute is the minute value. An even more sophisticated approach would be to store the max. 2 gathered values in an array in our minutes document and already aggregate on those two (avg as the minute value instead the last one). It's a question of accuracy. The key of such a data point is indeed dependent on the time stamp. We are just interested in the course value 'l', the difference 'c' and the time stamp 'lt_dts'. The job logic then looks as the following one:

BTW: The full source code can be found here:

This looks then as the following in Couchbase.

Fine, so what's next? Let's start with direct access to time series values. In order to fetch all values for a given range, you don't need any index structure because:

  • The discrete time value is part of the key. So our time-axis is directly expressed via the key space.
It's also easy to see that JSON document value is more or less a vector (as defined above)

So let's write a little service which takes a start time stamp and an end time stamp as a parameter in order to provide you all the requested values.

The service code could look like this:

The full code can be found here:

It just takes the start and end time in the format following format:

  • http://localhost:9000/service/by_time?start=2016-08-25T13:15&end=2016-08-25T13:20 

The output looks as the following one:

Let's next calculate some statistics based on these values. Therefore we will create some aggregate documents. As you might already imagine, we will aggregate based on the time. The resulting time dimension for these aggregates will be 'Year -> Month -> Day -> Hour'. So their will be:

  • An hour aggregate: It aggregates based on the minutes time series. There are 60 minutes per hour to aggregate. 
  • An day aggregate: It aggregates based on the hour aggregates. There are 24 hours per day.
  • A month aggregate: It aggregates based on the day aggregates. There are between 28 and 31 days per month.
  • A year aggregate: It aggregates based on the month aggregates. There 12 months per year.
I guess you got it :-) ...

So how to build these aggregates? There are multiple ways to do it. Here just some of them:
  • Use the built-in views and write the view results for a specific time range back to Couchbase
  • Execute a N1QL query by using aggregate functions
  • Do the calculations on the client side by fetching the data and write the results back
  • Load or stream the data into Spark in order to do the necessary calculations there and write the results back to Couchbase

Let's have a look at Views first. Views provide built-in map-reduce. We want to calculate the following statistic values:

  • The average value of the course
  • The maximum value of the course
  • The minimum value of the course
We will just create one View for this. The  following map and reduce functions are created on the Couchbase Server side:

The request parameters for aggregating directly for one hour are looking like:

It's easy to see that it also allows us to directly access the time function which has the hour (and no longer the minute) as the distance on time axis. The data points are then the aggregation values. The same View can be used to get the monthly and the yearly aggregation values. The trick is to set the range parameters and the group level in the right way. In the example above 'group_level=4' was used because the hour information is at the fourth position of the date array which was emitted as the search key. In order to get the daily aggregation, just use a query like this:

Now let's create an aggregation service which is using this View result in order to return the aggregation for a specific hour. It queries the aggregate for a given hour and stores the aggregation result as an aggregate document if the hour is already a full hour (so if it has 60 data points). In reality you could also run a job in order to make sure that the aggregates are built upfront. In this demo application we just build them at access time. The next time they will be not accessed from the View, but directly from the KV store.

Following the code of the service:

The full code can be found here:

The result in Couchbase would be then:

Their might be the question in your head 'What's if I want to aggregate by taking a specific aggregation level into account, but also need to have the last minutes (highest granularity in our example) into account?'. The answer is to combine the approaches of accessing the minute data points directly and the lower granularity aggregates. Here an example: If you want to access everything since 14:00 until 15:02, whereby 15:00 is not yet a full hour, then you can do this by using the following formula.

  •  Agg(14:00) + Agg(t_15:00, t_15:01, t_15:02)

It's easy to see that you can derive additional formulas for other scenarios.

A related question is how long you should keep the highest granularity values. One year has 525600 minutes. And so we would get every year 525600 minute documents. So for this use case we could decide to remove the minute documents (Couchbase even comes with a TTL feature in order to let them expire automatically) because it's unlikely the case that someone is interested in more than the daily course after one year. How long you keep the finest granularity data points indeed depends on your requirement and how fine your finest granularity actually is.

OK, so this blog article is already getting quite long. Another one will follow which then will cover the following topics:

  • Visualizing time series data
  • How query time series data with N1QL
  • Predictive analysis of time series data with Couchbase and Apache Spark

Friday, 1 July 2016

Caching in JavaEE with Couchbase

One of Couchbase Server's typical use cases is caching. As you might know it is a KV store. The value of a KV pair can be JSON document. Not only the fact that Couchbase Server can store JSON documents makes it a document database, more the fact that you can index and query on JSON data defines it's characteristic as a JSON document database. Back to the KV store: If you you configure the built-in managed cache in a way that all your data is fitting into memory then Couchbase Server is used as a highly available distributed cache.

If you are a Java developer, then one of your questions might be if it makes sense to use Couchbase as a cache for your applications. I had several projects, where EhCache was replaced by Couchbase because of the Garbage Collection implications. The performance was often quite better with a centralized, low-latency (sub-milliseconds) cache than with one which was colocated with the application instances. This indeed depends on several factors (size of the cache entries, number of cache entries, access throughput). The next questions might be how to best integrate such a cache into your application. A typical pattern is:
  • Try to read the data from the cache
  • If it is there, then use it
  • If is not there then get the data from the source system (e.g. relational DBMS)
  • Put it into the cache
  • The next time when you try to access the same data, then it will be most probably in the cache
Couchbase's Java SDK is quite simple for CRUD operations:
  • C: Insert
  • R: Get
  • U: Update, Replace
  • D: Remove
So as soon as you established a Bucket (a data container) connection, you can use it as a cache. However, this is involving implementation work on your side. 

I just looked at the Java standard JCache and also used the chance to play a bit around with CDI (Dependency Injection). JCache is implemented by several providers and look at that there is already a Developer Preview of a Couchbase implementation available (

Side note: The Couchbase JCache implementation is not yet officially released. Couchbase has also a good Spring Data integration which also comes with cashing support.

So let's get started. First we need to have a cache instance which we can use for caching purposes.

As you can see, we are creating a CacheProvider, then a Config and finally a CacheManager in order to access the Cache. Our cache is an object cache, whereby objects are stored by a string key. The Factory ensures that we have only a single instance of our ObjectCache. It's not using CDI and so can be also used with JavaSE. In a real world you would probably not use constants for the factory configuration, but it seemed to be sufficient for this example.

Now let's use the factory. Actually we misuse it a bit here because we use it in a Producer. In a pure CDI world, you would just use the code for initializing the cache in the producer method. So the producer is actually your factory, whereby the producer method acts as a source of objects to be injected. The annotation 'CBObjectCache' was bound to the producer.

Now that we have a producer, we can just inject CBObjectCache somewhere else. Let's do this in an Interceptor. We will use this interceptor later in order to cache objects automatically when a method is called. The annotation 'Cached' is bound to our interceptor.

Now in order to use our interceptor, we just have to annotate a method which should cache the passed data. The example below shows that the 'createHelloMessage' is annotated with 'Cached'. So before the actual method code is executed, the value of the variable 'name' will be cached in Couchbase. In order proof this, the value is fetched in the method body again to be printed out by the 'HelloWorldServlet'.

Before I forget it, here how it is looking like in Couchbase:

Hope this small introduction to JCache and CDI was interesting for you. :-)

The full source code can be found here: .

Wednesday, 1 June 2016

How to build Couchbase Server

Couchbase Server is Open Source under Apache2 license and even if an user would normally not build it from the source code (in fact the custom built versions are not officially supported by Couchbase), you might want to participate in the Couchbase Community by providing some lines of code. The first thing you need is to be able to build Couchbase Server from the source code.

The Couchbase Server source code is not just in one repository. Instead it is spread over multiple Git repositories. A tool which can be used in order to abstract the access to these multiple Git repositories is 'repo'. So 'repo' is a repository management tool on top of Git. It's also by Google for Android and so a short documentation can be found here: . The installation instructions are available at .

Here some 'repo' commands:
  • repo init: Installs the repository to the current directory
  • repo sync: Downloads the new changes and updates the working files in the local directory
  • repo start: Begins a new branch for development, starting from the revision specified in the manifest
Repo is using manifest files. The Couchbase manifest files can be found here: . Let's take a look into one of these files (e.g. /released/4.5.0-beta.xml):

<remote name="couchbase" fetch="git://" review="" />
<default remote="couchbase" revision="master" />
<project name="bleve" remote="blevesearch" revision="760057afb67ba9d8d7ad52f49a87f2bf9d31a945" path="godeps/src/"/>

As you can see, the manifest includes the Git repos those are containing Couchbase dependencies. By default the master branch was referenced here. Each dependency can be provided with a specific Git Hash or branch name in order to make sure that you build based on the right version of the dependent library.

Before we build it's required to have at least make and cmake installed on your build box. If not the build will fail by telling you what's required. I already had a C development environment, python and Go installed on my computer. The build of Couchbase is actually quite simple:

cd --
mkdir -p src/couchbase
cd src/couchbase
repo init -u git:// -m 
repo sync

The built version of Couchbase is then available in the sub-folder 'install'.

Friday, 13 May 2016

Couchbase Server 4.5's new Sub-Document API


The Beta version of Couchbase Server 4.5 has just been released, so let's try it out! A complete overview of all the great new features can be found here: This article will highlight the new Sub-Document API feature.

What's a sub-document? The following document contains a sub-document which is accessible via the field 'tags':

So far

With earlier Couchbase versions (<4.5) the update of a document had to follow the following pattern:

  • Get the whole document which needs to be updated
  • Update the documents on the client side (e.g. by only updating a few properties)
  • Write the whole document back

A simple Java code example would be:

Now with 4.5

The new sub-document API is a server side feature which allows you to (surprise, surprise ...) only get or modify a sub-document of an existing document in Couchbase. The advantages are:

  •  Better usability on the client side
    • CRUD operations can be performed based on paths
    • In cases where the modification doesn't rely on the previous value, you can update a document without the need to fetch it upfront
    • You can easier maintain key references between documents
  •  Improved performance
    • It saves network bandwidth and has a improved latency because you don't need to transfer the whole document over the wire

The sub-document API also allows you to get or modify inner values or arrays of a (sub-)document.
  • Lookup operations: Queries the document for a specific path, e.g. GET, EXISTS
  • Mutation operations: Modify one or multiple paths in a document, e.g. UPSERT, ARRAY_APPEND, COUNTER
A more detailed description of the API can be found in the Couchbase documentation: .

The update of a document can now follow the following pattern:

  • Update directly a property or subdocument by specifying the path under which it can be found

Our Java example would now be simplified to:

Optimistic "locking"

Couchbase Server does not have a built-in transaction manager, but if you talk about transactional behavior, the requirements are quite often less than what a ACID transaction manager would provide (e.g. handling just concurrent access instead of being fully ACID compliant). In Couchbase a document has a so called C(ompare) A(nd) S(wap) value. This value changes as soon as a document is modified on the server side.
  • Get a document with a specific CAS value
  • Change the properties on the client side
  • Try to replace the document by passing the old CAS value. If the CAS value changed in between on the server side then you know that someone else modified the document in between and so you can retry to apply your changes.
So CAS is used for an optimistic locking approach. It's optimistic because you expect that you can apply your changes and you handle the case that this wasn't possible because someone else changed it before. A pessimistic approach would be to lock the document upfront and so no one else can write it until this lock will be released again.

You could now ask the following question:
  • What happens if I modify a sub-document and someone else updates the same or another sub-document of the same document?
Sub-document operations are atomic. Atomicity means all or nothing. So if you update a sub-document by not retrieving an error message then you can be sure that the update was performed on the server side. This means if 5 clients are appending an element to an embedded array, then you can be sure that all 5 values were appended. However, atomicity isn't meaning consistency regarding the state. So it isn't telling you about conflicts. So if 2 clients are updating the same sub-document then both updates will be performed but in order to find out if their was a conflict regarding these updates you would still need the CAS value (or use pessimistic locking instead). However, if you are sure that the clients act on different sub-documents then you know that there will be no conflict and then the CAS value would be not required.


The new Sub-Document API is one of the new great features of Couchbase 4.5 (Beta). It allows you to avoid to fetch the whole document in order to read/modify only a part of it. This means a better usability from a client side point of view. One of the main advantages is that it improves the performance, especially if working with bigger documents.

Friday, 6 May 2016

Microservices and Polyglot Persistence


  • Introduction
  • Why Microservices?
  • Polyglot character
  • What's happening with my Database?
  • Summary


The idea behind Microservices is already described by it's name. In summary it means to use multiple smaller self-contained services to build up a system, instead of using one monolithic one. This explanation does sound simple, doesn't it? We will see that it is not because breaking up one single big system in several services has quite a lot implications.

Why Microservices?

A monolithic system would be a system which has only one main component. One of the disadvantages is usually that you have to deploy changes in a way that they affect the deployment of the whole system. A today's system is actually not completely monolithic at all, because it normally already consists of several sub-components. Often other decomposition mechanisms are already used. One way would be to build your system modular. Such a module might be actually a good candidate for a microservice, whereby it should optimally have business domain specific functionality and not a pure technical one.

Another aspect, you should be already familiar with as an object oriented developer, is de-coupling (loose coupling). Actually one component should live in a way for it's own. Sure there are well defined dependencies to other components. De-coupling allows you to ensure that you can replace one component of your system without the need to rewrite the a majority system again.

If splitting up a monolithic application into several parts, whereby specific functionalities are provided as services, you end up with a distributed system because each service is deployed by it's own. The idea is exactly to be able to scale these services independently out.

So Microservices are in a way not a complete new invention.  Microservices are often just a consequence of what we already know or target regarding software architectures. Service oriented designs are also not completely new for us.

Polyglot character

One system made of multiple smaller services:

  • Can have a variety of communication protocols: About 10 years ago, I remember to have discussions about SOAP vs. REST. I actually liked SOAP because it was well defined and so your service client could be created just by the service definition. It has message based communication and there was a kind of standard message format (dependent on the binding). REST on the other hand's side had the charm to be less chatty and resource based. The protocol how 2 parties are communicating (which resources are accessed in exactly which way) was not out of the box predefined. Indeed, you also define what a REST service exposes. But it seemed to happen more often that the service did no longer talk exactly the same language as the client. Actually, it was more like it was speaking partially a weird dialect which could no longer be understood by the client and so the client had to learn it as well.  There are libs and frameworks those are helping you (e.g annotations in JAX-RS). However, I'm pretty sure that a today's green field solutions would rely on RESTFul services. Sometimes you don't come from a green field and so you might still need to integrate a variety of different kinds of services.  
  • Can be implemented by using several programming languages and frameworks: It's just relevant for another component of your system how to communicate with a specific service. The actual implementation is completely hidden from the other components of your system. So one service might be implemented in Node.js but another one might be implemented in Python. There are sometimes good reasons to develop one part of a system in e.g. C but others with maybe less effort e.g. in Node.js. Not every component might have the same resource and efficiency requirements (e.g. Garbage collection vs. manual disposal)
  • Can be developed by different kinds of developers: This is indeed related to the different programming languages and frameworks point. From my personal experience I would say that a C and a Java developer are really speaking different languages. Not just regarding the programming language, also how a specific problem would be addressed or how the tool chains are looking like. There is no good or bad, it's just different. So given that different functionalities might be developed by multiple different and independent teams, this point especially makes sense if these teams already got skill sets around specific programming languages and frameworks. 
  • Polyglot persistence: A modern application consists usually of 3 tiers: interface, service and persistence. Given that we split the service tier up into multiple smaller services, there is the fair question what happens with your database/storage tier. We will discuss this a bit later in depth. Important is that the several services can use different database/storage back-ends. One service might need to write content items and stores the content itself in a Blob Store and the meta data in a Document Database. Another service might need to store the information who knows whom and so uses a Graph Database. A third service might handle user sessions and so uses a KV-store. This is what what polyglot persistence means.

What's happening with my Database?

This is actually quite interesting. Even if your system used before several modules, you will quite often see that the modules integrate with each other on the database level. The reasons is that the rules for your schema consolidation (regarding the good database schema design) might conflict with the de-coupling requirements. At the end each independent service should use it's own database. Instead of integrating service functionality on the database tier, the services should talk with each other. Let's use a very simple and stupid example. We talk about orders and customers. Let's assume that your shopping service is independent from the user profile service. Sure, shopping needs to know who the customer is, but not at the same level of detail as the user profile service knows. In a monolithic application you would have a 1-many relationship between customers and orders. So getting all the orders of a customer would be JOIN query. In a more service oriented world, you would ask for the directory service for the customer (by e.g. his email address) and then you would ask the shopping service for all the orders of this customer. If e.g. a new order should be processed then the shopping service would also need to talk with the payment service in order to fulfill the payment. The payment service also only knows the relevant information about the customer and not the complete user profile. Again, a very simple example, but the point is clear. A Microservices approach leads to distributed system, made of several services, which leads to split up databases as well.

Now, the relational database was so far  gluing your data focused operations together by talking about transactions. Doesn't the service based approach mean that I loose these transactions on the database tier? Exactly! Given that you decoupled your services by no longer integrating on the database tier, you know have also to take care about the transactions on the service level. Relational database systems vendors are talking since decades about ACID and you got the impression that you absolutely need it? From my experience it's quite often the case that you anyway give up on ACID for performance reasons (weaker isolation level - e.g. read uncommitted) and we tend to rely such a lot on the DB's transaction management (by accidentally tolerating it's overhead) that we forgot that we often don't need ACID but only handling concurrent access to specific data items. The NoSQL system Couchbase Server for instance, doesn't come with a built-in transaction manager, but it comes with a framework which helps you on the client side to handle transactional behavior. You can e.g. lock specific documents (JSON documents or KV-pairs) and so somebody else has to wait until it is released again. Or you can be more optimistic and use C(ompare)A(nd)S(wap). A write operation is then successful if the CAS value for your document is still the same. This means if nobody else did change the document since you have fetched it. Otherwise you can just try it again with the updated document until you are the winner. Sure, there are also strictly transactional cases out there. They can be addressed by using a service side transaction manager (e.g. implementing 2-phase commit).

Not to use one single and big database is also a chance. We already talked about that you want to be able to scale your services out (adding new service instances behind a load balancer - so web scale) independently. Scaling out the service tier is only half of the story. More and more service instances might also raise the need to scale out on the storage/database tier. So instead doing all with your non-scalable relational DBMS, you can now follow the polyglot persistence idea and use the right database for the job, which means that you might introduce a highly scalable NoSQL database system for some of your service.


As explained Microservices are self-contained services those are providing business domain specific functionality. A system which uses Microservices is per definition a distributed system, with all it's advantages and disadvantages. Getting your system more scalable is easier possible, whereby distributed transactions are harder. Polyglot persistence is one benefit. You can now use the right storage or database system for the job, dependent on the requirements of the specific service.   

Saturday, 26 March 2016

CBGraph now supports edge list compression

About CBGraph

CBGraph ( is a Graph API for the NoSQL database system Couchbase Server.

Adjacency list compression

The latest version of CBGraph (v0.9.1) supports now adjacency list compression. An adjacency list is the list of neighbors of a vertex in a Graph.

So far the adjacency lists were stored directly at the vertices but vertices can become quite big if they have a huge amount of incoming or outgoing edges (such a vertex is called a supernode). One of the limitations which such a supernode introduces is that it just takes longer to transfer a e.g. a 10MB vertex over the wire than e.g. a 1KB one. In order support such supernodes better by reducing the network latency, two optimization steps were introduced for CBGraph.
  1. Compress the adjacency list by still storing it at the vertex (as base64 string). The base64 encoding causes that the lists are taking a bit more space for small vertices but you save a lot (saw up to 50% with UUID-s as vertex id-s) for supernodes.
  2. Externalize and compress the adjacency list as a binary
The used compression algorithm is GZIP.

There are the following switches in the file in order to controll the compression mode.
  • graph.compression.enabled
  • graph.compression.binary
The first property controls if compression is used. The second one controls if the compressed adjacency list is stored as a separated binary (As Couchbase Server is a KV store and a document database, you can directly store binaries as KV pairs).

Compression disabled

The following setting is used in order to disable compression:
  • graph.compression.enabled=false
The document model which is used in Couchbase looks then as the following one:

Compression enabled by embedding the adjacency list

The following setting is used in order to enable compression:
  • graph.compression.enabled=true
  • graph.compression.enabled=false
The document model that is used in Couchbase looks then as the following one:

Compression enabled by storing the adjacency list as a separated binary

The following setting is used in order to enable the storage of the adjacency list as compressed binary:
  • graph.compression.enabled=true
  • graph.compression.enabled=true
The document model which is used looks then as the following one:


Edge list compression helps in order to handle supernodes. The advantages are obviously that you can store more edges at a vertex, but also that the average size of a node is lower and so the latency behavior is improved because the network latency for getting a vertex from Couchbase to CBGraph is a function of the size of a vertex. The overall performance for bigger graphs was improved via this feature. As you can see, the underlying model looks different dependent on the compression mode. So the compression mode is a life time decision for a Graph. You can't access an uncompressed Graph via a CBGraph instance which is configured to use compression and vice versa.

Tuesday, 9 February 2016

Large-scale data processing with Couchbase Server and Apache Spark

I just had the chance to work a bit with Apache Spark. Apache Spark is a distributed computation framework. So the idea is to spread computation tasks to many machines in a computation cluster. The idea here is to load data from Couchbase, process it in Spark and store the results back to Couchbase. Couchbase is the perfect companion for Spark because it is capable to handle huge amounts of data, provides a high performance (hundreds of thousands ops per second / sub-milliseconds latency) and is horizontally scalable by also being fault tolerant (replica copies, failover, ...).

You might already know Hadoop for this purpose. Sparks approach is similar but different ;-). In Hadoop you typically load everything into the Hadoop distributed file system and then let process it 'co-located' in parallel. In Spark each worker node is processing the data by default in memory. Your data is described by a R(esilient) D(istributed) D(ataset). Such an RDD is in the first step not the data itself. It is more describing from where the data is coming and which kind of data is expected to be processed. The RDD API is also describing how data can be processed (actions, transformations). In the next step the data is retrieved based on this description, whereby it is distributed across multiple workers (executors). RDD-s are not just sharded across the cluster, it is also possible to replicate them for fault tollerance.

Just in case that you don't know Spark, here the components of the Spark stack:

  • Spark Core: Handle RDD-s from several sources (and store them to several targets); So you could easily combine the data from several data sources with the data in Couchbase in order to derive new data which can then be stored back to your Couchbase bucket.
  • Spark SQL: Handle data frames (RDD-s with a schema) whereby retrieving them (e.g. from a database system) by using a SQL like query syntax; Couchbase has it's own SQL like query language (N1QL) and so the integration works like a charm.
  • Spark Streaming: Handle D(iscrete)Streams (RDD-s as micro batches); Couchbase allows you to consume the D(atabase) C(hange) P(rotocol) stream in order to react on changes in a bucket.
This blog post is focusing on these 3 main components, but there are also the following ones:

  •  MLib: An algorithm package for machine learning (to solve e.g. classification-, cluster-, regression- problems)
  • GraphX: Graph processing and graph-parallel computations

The following diagram shows the main architecture of a Spark cluster:

  • Driver Program: Creates the Spark context, declares the transformation and actions on RDD-s of data and submits them to the Master
  • Cluster Manager: Acquires executors on worker nodes in the cluster.
  • Workers and their Executors: Execute tasks and return results to the driver

So what can we do with Couchbase? The following simple code example (Scala) shows how to read some keys from the database, filter based on the country, log them to the console and count. Finally the driver is storing back the result to Couchbase.

You can also store the RDD-s directly to Couchbase by using 'rdd.saveToCouchbase()'. Here an example from Couchbase's documentation:

The following very simple example shows how to retrieve data via SparkSQL:

And finally, here a streaming example which retrieves a micro-batch every second:

The example source can be found here: . It also provides the helper 'Contexts' which I used in order to initialize and access the Spark context.

It's clear that this blog post was just a very brief introduction to Spark. Further, more use case focused, articles will follow. :-)

Further examples can be found in Couchbase's documentation: .