2015-11-30

Cassandra TIme Series Bucketing

Intro

Bucketing is one of the most important techniques when working with time series data in Cassandra. This post has it's roots in two very popular blog entries:

The posts are very well written and the pretty much describe all of the standard techniques when it comes down to working with time series data in Cassandra. But to be honest there isn't all that much code in them. This is partly to a fact that almost every project has it's own specifics and from my experience it often happens that even within a relatively small team there will be multiple implementations on how to bucket and access the time series data.

The Case for Bucketing

For some time now I'm in the world if IoT and I find that explaining everything with a help of a simple temperature sensor is the best method to discuss the subject. Previously mentioned articles are also a good read. This section is sort of a warm up. Theoretically in most of the use cases we'll want to access temperature readings by some sensor Id and we know where this sensor is located. In the most simple case sensor id becomes the long row in cassandra and the readings are stored in it and kept sorted by time etc. However in some cases the temperature may be read very often and this could cause the wide row to grow to a proportion that is not manageable by cassandra so the data has to be split among multiple long rows. The easiest method to make this split is to make multiple long rows based on the measurement timestamp.

How big should my buckets be?

It may vary from project to project, but it depends on two important factors. How many readings are you storing per single measurement and how often the measurement is happening. For instance if you are recording a reading once per day you probably don't even need the bucketing. Also if you are recording it once per hour the project you are working on probably wont't last long enough for you to run into problem. It applies to seconds too, but only for the most trivial case where you are making a single reading. If you go into frequencies where something is happening on the milliseconds level you will most definetly need bucketing. The most complex project I worked up until now had time bucketing on a level of a single minute. meaning every minute, new bucket. But that project is not in the IoT world, In that world I'm using partitions on a month basis.

10 000 feet Bucketing View

Main problem is how to calculate the bucket based on measurement time stamp. Also keep in mind there might be differences between the timezones, in a distributed system a very advisable practice is to save everything in the UTC format. If we decided that wee need bucketing per day it could be something as simple as the following:

    FastDateFormat dateFormat = FastDateFormat.getInstance(
        "yyyy-MM-dd", TimeZone.getTimeZone("UTC"));

    public String dateBucket(Date date) {
        return dateFormat.format(date;
    }
    
That's it, combine this with your sensor Id and you get buckets on a day level basis. Now the problem is how to retrieve the measurements from buckets. Especially if you have to fetch the measurements across multiple buckets. We'll go over this in the next section.

Anything goes

Bare in mind that you should keep buckets in time series data easy to maintain. Also try to avoid having multiple implementation for the same thing in your code base. This section will not provide 100% implemented examples but will be more on a level of a pseudo code.

When you are fetching the data from the buckets, you will have two types of query. One is to fetch data out from the bucket without any restrictions on measurement time stamp. The other is when you will want to start from a certain position within the bucket. Again there is a question of ordering and sorting the retrieved data. I worked in systems having all sorts of practices there, most of the time reversing was done with a help of a specific boolean flag but my opinion is this should be avoided. It's best to stick to the from and to parameters and order the data according to them. i.e.

        from:   01/01/2016
        to:     02/02/2016
        returns: ascending

        from:   02/02/2016
        to:     01/01/2016
        returns: descending
    
That way you don't have to break you head and think about various flags passed over the levels in your code.

Here is a bit of pseudo code:

        // constructor of your iterator object

        startPartition = dateBucket(from);
        endPartition = dateBucket(to);

        lastFetchedToken = null;

        bucketMoveCount = 0;

        String statement = "SELECT * FROM readings"

        // from past experience, somehow the driver takes out data the fastest
        // if it fetches 3000 items at once, would be interesting to drill down
        // why is this so :)

        int fetchSize = 3000;

        if (from.isBefore(to)) {
            select = statement + " ORDER BY measurement_timestamp ASC LIMIT " + fetchSize;
            selectFromBoundary = statement + " AND measurement_timestamp > ? ORDER BY measurement_timestamp ASC LIMIT " + fetchSize;

            partitionDiff = -1f;
        } else {
            selectNormal = statement + " LIMIT " + fetchSize;
            selectFromBoundary = statement + " AND measurement_timestamp < ? LIMIT " + fetchSize;

            partitionDiff = 1f;
        }
    
Partition could move by hour, day, minute. It all depends on how you decide to implement it. You will have to do some time based calculations there I recommend using Joda-Time there. Now when you defined how init of an iterator looks like, it's time to do some iterations over it:
    public List<Row> getNextPage() {

        List<Row> resultOut = new ArrayList<>();

        boolean continueFromPreviousBucket = false;

        do {
            ResultSet resultSet =
                    lastFetchedToken == null ?
                            session.execute(new SimpleStatement(select, currentBucket)) :
                            session.execute(new SimpleStatement(selectFromBoundary, currentBucket, lastToken));

            List<Row> result = resultSet.all();

            if (result.size() == fetchSize) {
                if (continueFromPreviousBucket) {
                    resultOut.addAll(result.subList(0, fetchSize - resultOut.size()));
                } else {
                    resultOut = result;
                }

                lastFetchedToken = resultOut.get(resultOut.size() - 1).getUUID("measurement_timestamp");

            } else if (result.size() == 0) {
                currentBucket = calculateNextBucket();
                bucketMoveCount++;

            } else if (result.size() < fetchSize) {
                currentBucket = calculateNextBucket();
                bucketMoveCount++;

                lastFetchedToken = null;

                if (continueFromPreviousBucket) {
                    resultOut.addAll(result.subList(0, Math.min(result.size(), fetchSize - resultOut.size())));
                } else {
                    resultOut = result;
                }

                continueFromPreviousBucket = true;
            }

            if (resultOut.size() == fetchSize
                    || bucketMoveCount >= MAX_MOVE_COUNT
                    || Math.signum(currentBucket.compareTo(endPartition)) != okPartitionDiff) {
                break;
            }

        } while (true);

        return result;
    }
    

This is just a high level overview of how to move among the buckets. Actual implementation would actually be significantly different from project to project. My hope for this post is that you give the problems I faced a thought before you run into them.

2015-11-07

Spring Data Cassandra vs. Native Driver

Intro

For some time now spring data with cassandra is getting more and more popular. My main concern with the framework is performance characteristics when compared to native cql driver. After all with the driver everything is under your control and one can probably squeeze much more juice out of cluster. O.k. I admit it's not always about performance. If that would be the case we would all be writing software in C or assembler. But still I think it's a good practice to be aware of the drawbacks.

To be honest spring data cassandra is relatively new to me. I did the performance comparison on the lowest level without using repositories and other high level concepts that come with spring data cassandra. My focus in this post is more on the generics that decode the data that comes out from the driver. To make a comparison I'm going to use a simple cassandra table (skinny row), then I'm going to make query after query (5000 and 10000) towards cassandra and after that I'll decode results. Once again the focus in this post is not on performance characteristics of higher order functionalities like paged queries etc. I just wanted to know by a rule of thumb what can I expect from spring data cassandra.

Setup

    -- simple skinny row
    CREATE TABLE activities (
        activity_id uuid,
        activity_model_id bigint,
        activity_state text,
        asset_id text,
        attrs map<text, text>,
        creation_time timestamp,
        customer_id text,
        end_time timestamp,
        last_modified_time timestamp,
        person_id text,
        poi_id text,
        start_time timestamp,
        PRIMARY KEY (activity_id)
    );

    
To eliminate all possible effects, I just used single skinny row:
    activity_id 72b493f0-e59d-11e3-9bd6-0050568317c1
    activity_model_id 66
    activity_state DONE
    asset_id 8400848739855200000
    attrs {
        'businessDrive': '1:1',
        'customer': '4:test_test_test',
        'distance': '3:180', 
        'endLocation': '6:15.7437466839,15.9846853333,0.0000000000',
        'fromAddress': '4:XX1', 
        'locked': '1:0', 
        'reason': '4:Some reason 2', 
        'startLocation': 
        '6:15.7364385831,15.0071729736,0.0000000000', 
        'toAddress': '4:YY2'
        }
    creation_time 2014-05-27 14:50:14+0200
    customer_id 8400768435301400000
    end_time 2014-05-27 12:15:40+0200
    last_modified_time 2014-05-29 21:30:44+0200
    person_id 8401111750365200000
    poi_id null
    start_time 2014-05-27 12:13:05+0200
    
This row is fetched every time, to detect differences We'll see how long the iterations last. Network and cluster is also out of scope so everything was tested on local running datastax cassandra community (2.0.16) instance.

The code

To separate all possible interfering effects I used two separate projects. I had a situation where I used an old thrift api together with cql driver and it significantly affected performance. And it required additional configuration parameters etc. The main code snippets are located on gist. This is not the focus here, but if somebody is interested:

spring-data
native-drivers

Results in milliseconds

    3 fields - 5000 items
        spring-data
        5381
        5282
        5385
        avg: 5339

        driver
        4426
        4280
        4469
        avg: 4390

        result: driver faster 21.6%

    3 fields - 10000 items
        spring-data
        8560
        8133
        8144
        avg: 8279

        driver
        6822
        6770
        6875
        avg: 6822
        
        result: driver faster 21.3%

    12 fields - 5000 items
        spring-data
        5911
        5920
        5928
        avg: 5920 - 10.88 % slower than with 3 fields!

        driver
        4687
        4669
        4606
        avg: 4654 - 6 % slower than with 3 fields

        result: driver faster 27%

Conclusions

Spring data cassandra may be very interesting if you are interested to learn something new. It might also have very positive development effects when prototyping or doing something similar. I didn't test the higher order functionalities like pagination etc. This was just a rule of a thumb test to see what to expect. Basically the bigger the classes that you have to decode the bigger the deserialization cost. At least this is the effect I'm noticing in my basic tests.

Follow up with Object Mapping available in Cassandra driver 2.1

There was an interesting follow up disuccion on reddit. By a proposal from reddit user v_krishna another candidate was added to comparison Object-mapping API.

Let's see the results:

    3 fields - 5000 items
        spring-data
        5438
        5453
        5576
        avg: 5489

        object-map
        5390
        5299
        5476
        avg: 5388

        driver
        4382
        4410
        4249
        avg: 4347

    conclusion
        - driver 26% faster than spring data
        - object map just under 2% faster than spring data

    3 fields - 10000 items
        spring-data
        8792
        8507
        8473
        avg: 8591

        object-map
        8435
        8494
        8365
        avg: 8431

        driver
        6632
        6760
        6646
        avg: 6679

    conclusion
        - driver faster 28.6% than spring data
        - object mapping just under 2% faster than spring data

    12 fields 5000 items
        spring-data
        6193
        5999
        5938
        avg: 6043

        object-map
        6062
        5936
        5911
        avg: 5970

        driver
        4910
        4955
        4596
        avg: 4820

    conclusion
        - driver 25% faster than spring data
        - object mapping 1.2% faster than spring data

To keep everything fair, there was some deviation in test runs when compared to previous test, here are deviations:

comparison with first run:
    3 fields - 5000 items
        spring-data
        avg1: 5339
        avg2: 5489
        2.7% deviation

        driver
        avg1: 4390
        avg2: 4347
        1% deviation

    3 fields - 10000 items
        spring-data
        avg1: 8279
        avg2: 8591
        3.6% deviation

        driver
        avg1: 6822
        avg2: 6679
        2.1% deviation

    12 fields 5000 items
        spring-data
        avg1: 5920
        avg2: 6043
        2% deviation

        driver
        avg1: 4654
        avg2: 4820
        3.4% deviation
Object mapping from spring data seems to be just a bit slower then object mapping available in new driver. I can't wait to see the comparison of two in future versions. Initially I was expecting around 5-10% percent worse performance when compared to object mapping capabilities. It surprised me a bit that the difference was more on the level of 25%. So if you are planning on using object mapping capabilities there is a performance penalty.