2015-01-24

Cassandra Community Handling 100 000 req per second

Intro

Recently I got an assignment to prove that Cassandra cluster can hold up to 100 000 requests per second. Also all this had to be done on the budget and with not so much time spent on development of the whole application. This setup had to be as close to the real thing as possible. We will go trough the details soon. Here is just the basic overview of the experiment:

Amazon

Generating and handling the load on this scale requires the infrastructure that is usually not available within a personal budget so I turned to Amazon EC2. I listened about the EC2 for quite some time now and It turned out really easy to use. Basically All you have to do is to setup a security group and store the "pem" file for that security group. Really easy and if anybody didn't try it yet there is a free micro instance available for a whole year after registering. I won't go into details of how to setup the security group. It's all described in the DataStax documentation. Note that the security definition is a bit extensive and that defining the port range from 1024-65535 is sufficient for an inter group communication and I didn't expose any ports to the public as described in the documentation. The second part is generating the key pair. In the rest of the document I'll reference this file as "cassandra.pem".

Load

Generating the load on that scale is not as easy as it might seem. After some searching I've stumbled upon the following. So I came to a conclusion that the best solution is to use Tsung. I've setup the load generating machines with the following snippet. Note that I've placed the "cassandra.pem" file on the node from which I'll start running tsung. Read the node addresses from the aws console. The rest is pretty much here:

        # do this only for the machine from which you'll initiate tsung
        scp -i cassandra.pem cassandra.pem ec2-user@tsung_machine:~

        # connect to every load machine and install erlang and tsung
        ssh -i cassandra.pem ec2-user@every_load_machine

        # repeat this on every node
        sudo yum install erlang

        wget http://tsung.erlang-projects.org/dist/tsung-1.5.1.tar.gz
        tar -xvzf tsung-1.5.1.tar.gz
        cd tsung-1.5.1
        ./configure
        make
        sudo make install

        # you can close other load nodes now
        # go back to the first node. and move cassandra.pem to id_rsa
        mv cassandra.pem .ssh/id_rsa

        # now make an ssh connection from first tsung node to every
        # load generating machine (to add the host key) so that
        # the first tsung node won't have any problem connecting to
        # other nodes and issuing erlang commands to them
        ssh ip-a-b-c-d
        exit

        # create the basic.xml file on the first tsung node
        vi basic.xml
    

The second part with the load generating machines is to edit the basic.xml file. To make it more interesting we are going to send various kinds of messages with a timestamp. The users list will be predefined in a file userlist.csv. Note that the password is the same for all the users, you can adapt this to your own needs or completely remove the password:

        0000000001;pass
        0000000002;pass
        0000000003;pass
        ...
        ...
        ...
    

The tsung tool is well documented, the configuration I used is similar to this:

        <?xml version="1.0" encoding="utf-8"?>
        <!DOCTYPE tsung SYSTEM "/usr/share/tsung/tsung-1.0.dtd" []>
        <tsung loglevel="warning">

        <clients>
            <client host="ip-a-b-c-d0" cpu="8" maxusers="25"/>
            <client host="ip-a-b-c-d1" cpu="8" maxusers="25"/>
            <client host="ip-a-b-c-d2" cpu="8" maxusers="25"/>
            <client host="ip-a-b-c-d3" cpu="8" maxusers="25"/>
        </clients>

        <servers>
            <server host="app-servers-ip-addresses-internal" port="8080" type="tcp"/>
            <!-- enter the rest of the app servers here-->
        </servers>

        <load>
            <arrivalphase phase="1" duration="11" unit="minute">
                <users maxnumber="100" arrivalrate="100" unit="second"/>
            </arrivalphase>
        </load>

        <options>
            <option name="file_server" id='id' value="userlist.csv"/>
        </options>

        <sessions>
            <session probability="100" name="load_session" type="ts_http">
                <setdynvars sourcetype="file" fileid="id" delimiter=";" order="iter">
                    <var name="username" />
                    <var name="pass" />
                </setdynvars>
                <setdynvars sourcetype="eval"
                            code="fun({Pid,DynVars}) -&gt;
                            {Mega, Sec, Micro} = os:timestamp(),
                            (Mega*1000000 + Sec)*1000 + round(Micro/1000)
                            end.
                            ">
                    <var name="millis" />
                </setdynvars>
                <for from="1" to="10000000" var="i">
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%ABC41.7127837,42.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%DEF43.7127837,44.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%GHI45.7127837,46.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%JKL47.7127837,48.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%MNO49.7127837,50.71278370000.0"  method="GET"/>
                    </request>
                </for>
            </session>
        </sessions>
        </tsung>
    

Resources

  • 3x c3.xlarge
  • 1x c4.xlarge
Note I've added c4 node because I was limited on the amazon with the number of instances I could boot.

App

I've spent most of the time on the app part when developing. The basics for the component handling the requests was netty listener. In one of my previous posts I described how to use netty to handle http requests and acknowledge them with HELLO message. Here I acknowledged them with OK.

The most complicated part with the messages was sending them to cassandra as fast as possible. The fastest way to send them is to use executeAsync. Initially I had trouble with it where I was loosing messages. Some of the issues were due to concurrency. Some were due to poor understanding of the DataStax driver.

Concurrency - Basically what I was doing was that I tried to save on instantiating the BoundStatement instances because of the overal speed. The BoundStatement is not thread safe and after calling the bind method it returns "this". It took me some time to figure this out because when used in loops this behavior is not dangerous. Anyway, thanks to colleague I figured it out.

        // always instantiate new in concurrent code
        // don't reuse and make multiple calls with .bind()!

        BoundStatement bs = new BoundStatement(insertStatement);
    

Asynchronous execution - also a bit tricky. The executeAsync returns a future. Initially I was just adding it to Futures.

        // don't do this under heavy load with the result of executeAsync
        // in Cassandra you will start to loose data

        Futures.addCallback(future, ...
    

After some trial and error I found a pattern where I didn't loose any data:

        // here we are going to keep the futures
        private ArrayBlockingQueue<ResultSetFuture> queue = 
            new ArrayBlockingQueue<>(10000);

        // in the handling code
        queue.add(session.executeAsync(bs));

        // when reaching 1000th element in the queue
        // start emptying it
        if (queue.size() % 1000 == 0) {
            ResultSetFuture elem;
            do {
                elem = queue.poll();
                if (elem != null) {
                    elem.getUninterruptibly();
                }
            } while (elem != null);
        }

        // this will make your insertions around
        // 4x faster when compared to normal execute
    

App setup

The instances come with Open JDK installed. This doesn't guarantee the best performance so I installed the Oracle java. In order not to loose the time on firewall setup I simply copied the "cassandra.pem" file to every node.

        # copy ".jar" and "cassandra.pem" file to a single app node
        # copy the two files from single node to other nodes
        # it's a lot faster then uploading to every node (at least on my connection)

        # setup the machine
        wget --no-check-certificate --no-cookies - --header "Cookie: oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/7u71-b14/jdk-7u71-linux-x64.tar.gz"
        
        tar -xvzf jdk-7u71-linux-x64.tar.gz

        sudo update-alternatives --install "/usr/bin/java" "java" "/home/ec2-user/jdk1.7.0_71/jre/bin/java" 1
        
        # pick the new java number in this step
        sudo update-alternatives --config java

        # check with this
        java -version
    

Resources

  • 2x c4.xlarge
  • 2x c4.2xlarge
  • 4x c3.xlarge
Note I've added c4 nodes because I was limited on the amazon with the number of instances I could boot. Also I had to request it with the customer service but I couldn't assume how many instances of every type I'll use so the instances are not of the same type for load and app servers.

Cassandra

Setting up the Cassandra is the easiest part of the whole undertaking. All I did was following this guide by DataStax.

Resources

  • 7x c3.2xlarge
After hanging on the 90 000 req/s for a while I came to conclusion that perhaps the replication factor of two might be too much for the resources I had available. I would probably need to further increase the number of Cassandra nodes but since I couldn't get any more instance up I've set the replication to 1. Notice that this replication factor does not allow loosing nodes in the cluster without loosing the data. But the goal here is 100 000 req/s on a budget :)

Results

In the end it took me around 30$ to reach the 100k limit. I'm afraid to calculate how much this setup would cost on a monthly or yearly basis.

The successful run looked like this:

Total messages: 31 145 914 messages
Checked number: 31 145 914 messages
Average: 103 809 req/s

Don't be afraid to send me an email if you have any questions what so ever ;)

5 comments:

afiskon said...

Could you publish a full version of source code used in this article please?

Marko Švaljek said...

Send me your e-mail to msvaljek@gmail.com.

Christopher Batey said...

Nice write up.

I wouldn't do too many benchmarks with a RF 1 as it isn't what you'd use for a real setup.

DId you look at cassandra-stress and try benchmarking the cluster directly?

Marko Švaljek said...

Thanks ;)

You are totally right. Replication factor 1 isn't the best for a real setup. With the replication factor 2 I got up to around 90k. That's a fair result too If we take into account that the amount of data in Cassandra is essentially doubled. I think that with the stress tool the performance results for the nodes would be significantly better. The goal was to generate real http requests, process them and store them. To be honest I ran out of resources on amazon and couldn't bring additional nodes up and had to cope with what I had.

By the way I attended your talk at Cassandra Summit EU 2014 about Testing Cassandra Applications it feel surreal to me that you ended up reading my blog post :D

Christopher Batey said...

Wow, small world!