Friday, 16 October 2015

Understanding Couchbase's Document Expiration

I just had to investigate Couchbase's T(ime)T(o)L(ive) feature a bit because I was asked how to react externally on document expirations via DCP.

The TTL feature allows you to create documents by specifying when the document should be automatically deleted. Here two example use cases:

  • Caching: The items in a cache should be only valid for a specific period of time. The TTL feature allows you to invalidate items in the cache automatically. Let's assume that you are caching some product details (as a retailer). Then the prices of some products might change daily (or even more often). It now would make sense to make sure that the product price gets automatically updated once a day. So if the item is expired in cache then it should be fetched again from the original source system. The newly fetched item then has the updated price information.
  • Session management: Couchbase is often used as a session cache/store. A session is just realized as a key value pair whereby the key is dependent on the id of the logged in user and the session value could be just a random hash value (e.g. an UUID). Let's expect that a session times out after 20 mins if the user is inactive. Which means that you just can create such a session document by using a TTL value of 20 mins. If the user shows some activity then you can touch the session document in order to extend the TTL.
So far we saw two examples for what the feature can be used. Let's now talk a bit about how it can be used. Let's create 1000 documents with an expiration value of 60 seconds:

       List<JsonDocument> docs = new ArrayList<>();

        for (int i = 0; i < 1000; i++) {

              docs.add(JsonDocument.create("key::" + i, 60, JsonObject.fromJson("{ \"content\" : \"This is the document #" + i  +"\"}")));    
        }

        Observable.from(docs).flatMap(d -> bucket.upsert(d)).subscribe(

                res -> LOG.log(Level.INFO, "Created document with id {0}", res.id())
        );

Instead using the relative number of seconds (since the creation) you could also use a Unix time stamp. For example, the value 1421454149 represents Saturday, 17 January 2015 at 00:22:29 UTC. The maximum value you can specify in seconds is the number of seconds in a month. Internally it is stored as a Unix time stamp.

So far so good. Let's now understand how this expiration works internally. There are actually multiple ways how to cause the actual removal of an expired document.
  • Lazily: If you try to access a document which is expired but not yet deleted then you would not get the document returned. Instead the document deletion would be performed. It's important to understand that this is only true for the document itself. If you access a Couchbase View then this would not cause a deletion. If you would retrieve the actual documents based on the View result (e.g. by performing an additional multi-get or by using the include_docs option) then this would cause it.
  • The expiry pager: This is a job which runs by default on every node in the cluster. You can set the interval by using the following command which sets it here to 60 seconds (for testing only) for a bucket with the name 'dcp'. Please be aware of that this setting is not persisted and so would not survive a node Couchbase restart: 
./cbepctl localhost:11210 set flush_param exp_pager_stime 60 -b dcp -p test
    • Compaction: Couchbase uses an append-only storage engine (Couchstore). So instead performing in-place updates, the updated data is appended to the end of the database files. The storage structure is a so called 'Append-only Btree'. It's easy to see that this leads to some fragmentation over the time. So compaction means to rewrite the database files (vBucket files) by skipping tombstone entries (e.g. old revisions, deleted documents or expired documents). Compaction is by default configured to run automatically, but you can also invoke it manually (for testing purposes):
    ./couchbase-cli bucket-compact -c localhost:8091 --bucket=dcp -u couchbase -p couchbase

    I wrote a little test program (for testing purposes only) which allows you to see which documents are expiring by consuming the D(atabase)C(hange)P(rotocol) stream of a Couchbase bucket. It also helps to show that each of the above mentioned expiration methods are causing DCP RemoveMessages. Regarding DCP, there is (as far as I can see) no way to distinguish an expiration event from another deletion one. Both are causing a RemoveMessage.

    • ExpirationHandler:
    
    package com.couchbase.example.dcp.handler;
    
    import com.couchbase.client.core.message.dcp.DCPRequest;
    import com.couchbase.client.core.message.dcp.RemoveMessage;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    /**
     * A logging DCP handler for remove messages
     * 
     * @author David Maier <david.maier at couchbase.com>
     */
    public class ExpirationHandler implements IHandler {
    
        private static final Logger LOG = Logger.getLogger(ExpirationHandler.class.getName());
        private int count = 0;
    
        @Override
        public void handle(DCPRequest dcp) {
    
            if (dcp instanceof RemoveMessage) {
    
                this.count++;
    
                RemoveMessage msg = (RemoveMessage) dcp;
    
                LOG.log(Level.INFO, "Removed document with bucket/vBucket/key: {0}/{1}/{2}", new Object[]{msg.bucket(), msg.partition(), msg.key()});
                LOG.log(Level.INFO, "So far {0} documents were deleted.", this.count);
            }
        } 
    
    }
    • ExpirationTest:
    package com.couchbase.example.dcp;
    
    import com.couchbase.client.java.AsyncBucket;
    import com.couchbase.client.java.CouchbaseCluster;
    import com.couchbase.client.java.document.JsonDocument;
    import com.couchbase.client.java.document.json.JsonObject;
    import org.junit.BeforeClass;
    import org.junit.Test;
    import static com.couchbase.example.dcp.TestConstants.*;
    import com.couchbase.example.dcp.handler.ExpirationHandler;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    import rx.Observable;
    
    /**
     *
     * @author David Maier <david.maier at couchbase.com>
     */
    public class ExpirationTest {
    
        private static final Logger LOG = Logger.getLogger(ExpirationTest.class.getName());
        private static Receiver r;
        private static AsyncBucket bucket;
    
    
        public ExpirationTest() {
    
            //Make sure that DCP is usable
            System.setProperty("com.couchbase.dcpEnabled", "true");
    
            bucket = CouchbaseCluster.create(new String[]{HOST}).openBucket(BUCKET, PWD).async();
            r = new Receiver(new String[]{HOST}, BUCKET, PWD, new ExpirationHandler());   
        }
    
        @BeforeClass
        public static void setUpClass() {
        }
    
        /** 
         * Create 1000 docs with an expiration time and then 
         * consume the DCP stream by handling remove messages
         * 
         * (1) Create an empty bucket
         * (2) Run this test
         * (2a) Wait until the expiry pager did run
         * (2b) After the expiration period access all keys
         * (2c) Run compaction
         * (3) You should see the line 'INFORMATION: So far 1.000 documents were deleted.'
         * in the output
         */
        @Test
        public void testReceiveDeletionStream() throws Exception {
    
            LOG.info("-- testReceiveStream");
    
    
            LOG.info("Creating some documents with expiration times ...");
    
            List<JsonDocument> docs = new ArrayList<>();
    
            for (int i = 0; i < 1000; i++) {
    
                  docs.add(JsonDocument.create("key::" + i, 60, JsonObject.fromJson("{ \"content\" : \"This is the document #" + i  +"\"}")));    
            }
    
            Observable.from(docs).flatMap(d -> bucket.upsert(d)).subscribe(
    
                    res -> LOG.log(Level.INFO, "Created document with id {0}", res.id())
            );
    
            //Wait a moment before streaming
            Thread.sleep(5000);
    
    
            LOG.info("Connecting ...");
            r.connect();
            LOG.info("Streaming ...");
            r.stream();
        }
    }

    The full source code can be found here: https://github.com/dmaier-couchbase/cb-dcp-receiver

    Important to mention is that my test program uses Couchbase's Java core library. Couchbase does not support the usage of this library officially. Instead you should use one of the many Couchbase Connectors (Spark, Kafka, ...), which uses the core library behind the scenes, if you want to externally consume DCP messages.