Thursday, March 19, 2015

Write Batch Size Error - spark-cassandra-connector



The Use Case:
The Data is currently on Amazon’s S3 storage system and most it is time series data.
We compute and analyze our data using an “Apache Spark” cluster.
While trying to migrate the data to a more efficient storage model (both reads and writes), we tried out Apache Cassandra Database, and during running the first benchmark, we encountered a lot of failures during the “write” part of the Spark nodes.
I'm using the DataStax Java API spark-cassandra-connector.
(You can find some old but useful examples in the DataStax Blog and in the DataStax JavaAPI Documentation)
I started off with using a 2 node Cassandra Cluster, running on 2 c3.2xlarge machines on AWS EC2. I used the DataStax Community AMI to create the nodes and connected two of them into a cluster.

I’ve written a Spark Java application that reads the data from S3 and writes out the RDDs to the Cassandra cluster.


During the runJob at RDDFunctions.scala:24stage there were a lot of failures like:


“java.io.IOException: Failed to write 273 batches to test.some_cassandra_table.
……”


Finally the Spark application would fail because of the failures, so i tried to find out the cause of the problem, i went to one of the Cassandra nodes and checked the nodes log located at:
/var/log/cassandra/system.log”, and many warning messages of the next type coming in all the time:
WARN  [SharedPool-Worker-132] 2015-03-19 07:44:43,229 BatchStatement.java:243 - Batch of prepared statements for [test.some_cassandra_table] is of size 5264, exceeding specified threshold of 5120 by 144.


After looking for the solution to my problem on Google and coming up with nothing except a variation of the next StackOverFlow answer in many sites, that did not help much, i started looking for the meaning of the Log messages.
I found the definition of the “Batch size threshold” in the Cassandra nodes at  “/etc/cassandra/cassandra.yaml”:


batch_size_warn_threshold_in_kb: 5


Ending up to 5120 bytes, and that’s the value in the logs.


The next step was trying to figure out how to change the write batch size of the DataStax cassandra-spark-driver, and i found the next documentation reference: Link


In the “Tuning” paragraph in mentioned the “spark.cassandra.output.batch.size.rows” parameter you can set to the SparkConf while creating the JavaSparkContext, and i changed it to 5120, but it didn’t give the wanted effect, it multiplied the batch size to being much higher.
It’s default is “auto”, and the “auto”s outcome was much better.


So, after being frustrated with the outcome, i went on reading the source code of the cassandra driver, in the scala class: “WriteConf.scala” and found the usage of another parameter that really made a real change, when using the “auto” default the WriteConf goes straight to the second parameters, “spark.cassandra.output.batch.size.bytes”, value.

I changed the value to being 8192 (instead of the deafult: 1024 * 16 = 16384), making the batch size smaller, and things started working fine.


Although the log message is a warning, it was working some of the time, and sometimes failing, but with changing the parameter, it did not fail anymore.


And if you are asking why the threshold of 5K, i found the next explanation: Link.
Quote: Key reasoning for the desire comes from Patrick McFadden


"Yes that was in bytes. Just in my own experience, I don't recommend more
than ~100 mutations per batch. Doing some quick math I came up with 5k as
100 x 50 byte mutations.


Totally up for debate."


So as we see we can tune these things a bit more, depends on the queries we want to run and depends on the actual environment.
In the future i’ll play a bit more with these configuration, but for now it’s enough for me to continue the benchmark with no errors :)


I felt that someone else must of had the same problem but i didn’t find anything about it written in the internet,
If anyone has a better solution or some other insights i would love to hear, please comment to this post.

I hope this might help someone else.