Thursday, 27 July 2017

Indexing with Redis

If you follow my news on Twitter then you might have realized that I just started to work more with Redis.  Redis (=Remote Dictionary Server) is known as a Data Structure Store. This means that we can not just deal with Key-Value Pairs (called Strings in Redis) but in addition with data structures as Hashes (Hash-Maps), Lists, Sets or Sorted Sets. Further details about data structures can be found here:


Indexing in Key-Value Stores

With a pure Key-Value Store, you would typically maintain your index structures manually by applying some KV-Store patterns. Here some examples:

  • Direct access via the primary key: The key itself is semantically meaningful and so you can access a value directly by knowing how the key is structured (by using key patterns). An example would be to access an user profile by knowing the user's id. The key looks like 'user::<uid>'.
  • Exact match by a secondary key: The KV-Store itself can be seen as a huge Hash-Map, which means that you can use lookup items in order to reference other ones. This gives you a kind of Hash Index. An example would be to find a user by his email address. The lookup item has the key 'email::<email_addr>', whereby the value is the key of the user. In order to fetch the user with a specific email address you just need to do a Get operation on the key with the email prefix and then another one on the key with the user prefix. 
  • Range by a secondary key: This is where it is getting a bit more complicated with pure KV-Stores. Most of them allow you to retrieve a list of all keys, but doing a full 'key space scan' is not efficient (complexity of O(n), n=number of keys). You can indeed build your own tree structure by storing lists as values and by referencing between them, but maintaining these search trees on the application side is really not what you usually want to do.


The Redis Way

So how is Redis addressing these examples? We are leveraging the power of data structures as Hashes and Sorted Sets.

Direct Access via the Primary Key

A Get operation already has a complexity of O(1). This is the same for Redis.

Exact Match by a Secondary Key

Hashes (as the name already indicates) can be directly used to build a hash index in order to support exact match 'queries'. The complexity of accessing an entry in a Redis Hash is indeed O(1). Here an example:

In addition Redis Hashes are supporting operations as HSCAN. This provides you a cursor based approach to scan hashes. Further information can be found here:


Here an example:


Range By a Secondary Key

Sorted Sets can be used to support range 'queries'.  The way how this works is that we use the value for which we are searching  as the score (order number). To scan such a Sorted Set has then a complexity of O(log(n)+m) whereby n is the number of elements in the set and m is the result set size.

Here an example:

If you add 2 elements with the same score then they are sorted lexicographically. This is interesting for non-numeric values. The command ZRANGEBYLEX allows you to perform range 'queries' by taking the lexicographic order into account.


Modules

Redis supports now Modules (since v4.0). Modules are allowing you to extend Redis' functionality. One module which perfectly matches the topic of this blog post is RediSearch. RediSearch is basically providing Full Text Indexing and Searching capabilities to Redis. It uses an Inverted Index behind the scenes. Further details about RediSearch can be found here:

Here a very basic example from the RediSearch documentation:


As usual, I hope that you found this article useful and informative. Feedback is very welcome!


Thursday, 6 April 2017

Kafka Connect with Couchbase

About Kafka

Apache Kafka is a distributed persistent message queuing system. It is used in order to realize publish-subscribe use cases, process streams of data in real-time and store a stream of data safely in a distributed replicated cluster. That said Apache Kafka is not a database system but can stream data from a database system in near-real-time. The data is represented as a message stream with Kafka. Producers put messages in a so called message topic and Consumers take messages out of it for further processing. There is a variety of connectors available. A short introduction to Kafka can be found here: https://www.youtube.com/watch?v=fFPVwYKUTHs . This video explains the basic concepts and how Producers and Consumers are looking like. However, Couchbase supports 'Kafka Connect' since version 3.1 of it's connector. The Kafka documentation says "Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka.". Kafka provides a common framework for Kafka connectors. It can run in a distributed or standalone mode and it distributed and scalable by default.

Setup

Kafka uses Apache Zookeeper. Zookeeper is a cluster management service. The documentation states that "ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications ... ZooKeeper aims at distilling the essence of these different services into a very simple interface to a centralized coordination service."

After downloading and extracting the standard distribution of Apache Kafka, you can start a local Zookeeper instance by using the default configuration the following way:


The next step is to configure 3 Kafka message broker nodes. We will run these services just on the same host for demoing purposes but it's obvious that they can also run more distributed.  In order to so we need to create configurations for the broker servers. So copy the config/server.properties file to server-1.properties and server-2.properties and then edit it. The file 'sever.properties' has the following settings:


Let's assume that $i is the id of the broker. So the first broker has id '0', listens on port 9092 and logs to 'kafka-logs-0'. The second broker has the id '1', listens on port 9093 and logs to 'kafka-logs-1'. The third broker configuration is self-explaining.


The next step is to download and install and  Couchbase Plug-in. Just copy the related libraries to the libs sub-folder and the configuration files to the config sub-folder of your Kafka installation.


Streaming data from Couchbase

Before we can stream data from Couchbase we need to create a topic to which we want to stream to. So let's create a topic which is named 'test-cb'.


You can then describe this topic by using the following command:


The topic which we created has 3 partitions. Each node is the leader for 1 partition. The leader is the node responsible for all reads and writes for the given partition. The Replicas is the list of nodes that replicate the log for this partition.

Now let's create a configuration file for distributed workers under 'config/couchbase-distributed.properties':


The Connect settings are more or less the default ones. Now we also have to provide the connector settings. If using the distributed mode then the settings have to be provided by registering the connector via the Connect REST service:


The configuration file 'couchbase-distributed.json' has a name attribute and an embedded object with the configuration settings:


The Couchbase settings refer to a Couchbase bucket and the topic name to which we want to stream DCP messages out of Couchbase. In order to run the Connect workers in distributed mode, we can now execute:


The log file contains information about the tasks. We configured 2 tasks to run. The output contains the information which task is responsible for which Couchbase shards (vBuckets):


For now let's just consume the 'test-cb' messages by using a console logging consumer:


One entry looks as the following one:


We just used the standard value converter. The value is in reality a JSON document but represented as Base64 encoded string in this case.

Another article will explain how to use Couchbase via Kafka Connect as the sink for messages.