Friday, April 17, 2015

Cassandra Table Design


Our customer wants a table that has hundreds of thousands of rows stored in it per day. No problem, Cassandra handles it well. Our performance test shows that with a Replication Factor of 3, writes in a 5-node cluster take a touch less than 1ms and reads about 2ms per row. If you want to pull out more rows, then it's even better - 1000 rows takes about 400ms.

However, our customer wants to slice and dice the data, pivoting on an arbitrary set of columns, aggregating values as they go. Now, Cassandra is great at storing data but does not offer the richness you'd get in a SQL DB for retrieving subsets of it. You really want to search on primary keys. You can create indexes but they just create invisible tables for you. And even if you search on primary keys, you need the right permutation of keys to avoid WITH FILTERING.

So, our first stab got us creating way too many tables because of our customer requirements. The number of column families we can pivot on causes the number of tables to grow factorially. If our customer requires us to pivot on say 5 columns, that's a combination of

n!
r!(n-r)!

where n in our case is the 5 potential choices on offer and r is the number the user chooses.

Of course, our user can choose any number, from 1 to 5, so the number of permutations now looks like:

 5 

r = 1
n!
r!(n-r)!


Which is starting to cause the number of tables to balloon since

up to 3 choices means 7 tables
up to 4 choices means 15 tables
up to 5 choices means 31 tables
.
.

The number of rows also starts rising like:


 
r = 1
 r 

p = 1

dp


where r and p are just indexes and dp is the number of distinct elements for a given dimension p.

Note: this is the worse case scenario. It depends on your data how many rows there will actually be. As it happens, since our data "clumps" around certain regions, our numbers were much smaller than this.

But this article has a very interesting way of querying the data for just what you want and using one table in which to do it. To leverage it, we need to pre-compute the aggregations. As it happens, we're using Apache Spark for that then dumping it into Cassandra.

Friday, March 27, 2015

Not so clever Spark


More playing with new shiny toys...

Apache Spark promises to make handling big data faster. It also interfaces with (among other things) Cassandra. This is very useful as Cassandra has limited commands for processing data.

I tried reading in my data from Cassandra on the Spark CLI, so:

val rdd = sc.cassandraTable("keyspace", "table")

Now, I want to sum up all the values for a particular subset of my rows in table. Being little more than a key/value store, I can't do this in Cassandra so let's try it in Spark:

rdd.filter(_.getString("my_where_field") == "X").map(_.getDouble("my_value")).fold(0)((x, y) => x + y)

Not so fast. Really. It takes minutes. The solution is to be more precise earlier:

val rdd = sc.cassandraTable("keyspace", "table").where("my_where_field" = 'X')
rdd.aggregate(0.0D)((acc, row) => acc + row.getDouble("my_value"), (d1, d2) => d1 + d2)

(The second function ((d1, d2) => d1 + d2) defines how the results from all the jobs are processed.)

This is much faster! About 3s over 10 000 rows out of 100 million.

Spark is very nice but don't expect queries to run within a time the average web user's is familiar with. It spins up JVMs on all the boxes in the cluster and executes jobs by default in a serial manner. However, if you want to reduce number crunching from, say, an hour down to minutes, it might be the tool for you.

But do be aware that it is still somewhat immature. I downloaded the latest DataStax bundle that has version 1.1.0 (it's on 1.3 at the moment) and got a ClassCastException deep down in Scala code using Spark SQL (which to be fair is not considered production ready).

val results = csc.cassandraSql("select sum(my_value) from table where my_where_field = 'X'
.
.
results.collec().foreach(println)
15/03/27 19:09:27 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 18, 10.20.166.198): java.lang.ClassCastException: java.math.BigDecimal cannot be cast to scala.math.BigDecimal
    scala.math.Numeric$BigDecimalIsFractional$.plus(Numeric.scala:182)
    org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58)
    org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58)
    org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:114)
    org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:58)



Sunday, March 22, 2015

Interesting syntax but does it makes a bad API?


There's yet another interesting syntax in Scala for methods that look like this:

  def intOrErr(x: String): Int Or ErrorMessage = { ...

where it appears that the return type can really be one of two types. This actually isn't the case. It's Scala's syntactic sugar. It is equivalent to:

  def intOrErr(x: String): Or[Int, ErrorMessage] = { ...

but reads more nicely.

I came across this syntax while looking at a colleague's use of the Scalactic library. Production code returned an Or and he mapped over it. The mistake he made was what if the code had executed the unhappy path? The assertions in his map function would not have executed and the test would have given a green bar, the coder none the wiser that his code was running unhappily.

He should have used a fold but that's not immediately obvious (unlike Java code that throws an Exception where you must catch it or explicitly state that the method calling your code must deal with it).

I posted on this forum to get opinions as to how worrying this is. There's some interesting responses ranging from it's less than ideal to assertions in test code suck and there are libraries to help do it better.

Thursday, February 26, 2015

Cassandra for the Busy - Part 2


Interesting problem I had this week that's obvious when you think about it.

The Config

I have a cluster of 5 Cassandra (version 2.0.11.83) nodes. I execute:

DROP KEYSPACE IF EXISTS my_keyspace
CREATE KEYSPACE my_keyspace WITH REPLICATION = { 
    'class' :              'SimpleStrategy',
    'replication_factor' : 2 
}
CREATE TABLE my_table (
    my_int_id    varint,
    my_string_id varchar,
    my_value     decimal,
    PRIMARY KEY (my_int_id, my_string_id)
)

I then start hammering my cluster with some CQL like:

INSERT INTO my_table 
    (my_int_id, my_string_id, my_value)
    VALUES (1, "2", 42.0)

or such like. The code I use to write to the DB looks something like:

RegularStatement  toPrepare         = (RegularStatement) (new SimpleStatement(cql).setConsistencyLevel(ConsistencyLevel.TWO));
PreparedStatement preparedStatement = cassandraSession.prepare(toPrepare);
.
.
BoundStatement    bounded           = preparedStatement.bind(new BigInteger(myIntId), myStringId, new BigDecimal("42.0"));
cassandraSession.execute(bounded);


The Error

I then kill a single node and my client starts barfing with:

Not enough replica available for query at consistency TWO (2 required but only 1 alive)

which baffled me initially because there were still 4 nodes in the cluster:

dse$ bin/nodetool -h 192.168.1.15 status my_keyspace
Datacenter: Analytics
=====================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address       Load       Tokens  Owns   Host ID                               Rack
UN  192.168.1.15  83.98 KB   256     40.0%  86df3d65-0e19-49b2-a49e-bc89d6347fa4  rack1
UN  192.168.1.16  82.78 KB   256     42.0%  8fghe485-0e19-49b2-a49e-bc89d6347fa5  rack1
UN  192.168.1.17  84.91 KB   256     38.0%  9834edf1-0e19-49b2-a49e-bc89d6347fa6  rack1
UN  192.168.1.18  88.22 KB   256     41.0%  873fdeab-0e19-49b2-a49e-bc89d6347fa7  rack1
DN  192.168.1.19  86.36 KB   256     39.0%  2bdf9211-0e19-49b2-a49e-bc89d6347fa8  rack1

Of course, the problem is that your tolerance to a node fault is:

    Tolerance = Replication Factor - Consistency Level

For me, that was Tolerance = 2 - 2 = 0 tolerance. Oops.

This is not a problem on all inserts. Running

dse$ bin/cqlsh -uUSER - pPASSWORD
Connected to PhillsCassandraCluster at 192.168.1.15:9160.
[cqlsh 4.1.1 | Cassandra 2.0.11.83 | DSE 4.6.0 | CQL spec 3.1.1 | Thrift protocol 19.39.0]
Use HELP for help.

and inserting worked fine. But this is because cqlsh uses a different consistency level.

cqlsh> consistency
Current consistency level is ONE.

so, let's change that:

cqlsh> consistency TWO

Consistency level set to TWO.

Now do the inserts fail? Well sometimes. It depends on which nodes we attempt to write to, something transparent to the user.


The Problem

Cassandra will try to put a piece of data on REPLICATION_FACTOR number of nodes. These nodes will be determined by hashing their primary key. If a node is down, Cassandra will not choose another. It simply cannot be written to at this time (of course, it might get the data if/when the node joins the cluster).

Upon inserting, the client decides how many nodes must be successfully written to before the INSERT is declared a success. This number is given by your CONSISTENCY_LEVEL. If the contract cannot be fulfilled then Cassandra will barf.


Further Reading

Application Failure Scenarios with Cassandra

Lightweight Transactions in Cassandra 2.0


Monday, February 23, 2015

Cassandra For the Busy - Part 1


A quick guide to Cassandra admin

From the Cassandra installation directory, running:

bin/nodetool -uUSER -pPASSWORD -hNODE ring KEYSPACE

(where the username, password, node and keyspace correspond to whatever you set up) produces data that reads like this:

Token is a number between Long.MIN_VALUE and Long.MAX_VALUE that is used in hashing the key. (See org.apache.cassandra.dht.Murmur3Partitioner.describeOwnership for how ownership is calculated over  a cluster using something like a MurmerHash algorithm. This is the default for Cassandra ).

Load refers to how much has been flushed to disk. Therefore, this may take some time to register a change after you have truncated a table containing 10 million rows. This appears to correspond to org.apache.cassandra.service.StorageService.getLoad().

Owns This represents how much of the ring range a token covers. Note: it might not add up to 100%. "For DC [data centre] unaware replication strategies, ownership without replication will be 100%" (see code comments here. for org.apache.cassandra.service.StorageService.effectiveOwnership). But with replication, it appears to be 100% * the replication factor.


Saturday, February 21, 2015

Type Constructors


Type Constructors in Java and Scala

In "Functional Programming in Scala", I came across this code:

trait Foldable[F[_]] { ...

This is an example of something common in Java and Scala: type constructors.

"We write it as F[_], where the underscore indicates that F is not a type but a type constructor that takes one type argument. Just like functions that take other functions as arguments are called higher-order functions, something like Foldable is a higher-order type constructor or a higher-kinded type.

"Just like values and functions have types, types and type constructors have kinds. Scala uses kinds to track how many type arguments a type constructor takes, whether it’s co- or contravariant in those arguments, and what the kinds of those arguments are." [1]

However, each language differs in their use of type constructors.

"[I]n Scala it is not valid to say something is of type List, unlike in Java where javac would put the List<Object> for you. Scala is more strict here, and won’t allow us to use just a List in the place of a type, as it’s expecting a real type - not a type constructor." [2]

Java doesn't allow you to write something so abstract as the Foldable trait above. For instance, this won't compile:

public class Foldable<U<T>> { ...

Which brings us on to Types of a Higher Kind...

[1] Functional Programming in Scala

Sunday, February 15, 2015

More on those Monoids


I mentioned in a previous post about Scalaz' implementation of Monoids. Having run through the code with somebody better at Scala than I am, here are my notes.

Our code imports the implicit means of creating a monoid. The signature of the method is this:

  implicit def optionMonoid[A: Semigroup]: Monoid[Option[A]] = new Monoid[Option[A]] {

The [A: Semigroup] looks a little like a type parameter but it is totally different. It's a Context Bound that "are typically used with the so-called type class pattern, a pattern of code that emulates the functionality provided by Haskell type classes, though in a more verbose manner" (from this excellent explanation).

Context Bounds of the form [X: Y] say: I am expecting an implicit value of type Y[X]. In fact, this line is equivalent to Scalaz' implementation:

    implicit def optionMonoid[A](implicit x: Semigroup[A]): Monoid[Option[A]] = new Monoid[Option[A]] {

Now, recall that my code using the Scalaz library looked like this:

    implicit val stringSemigroup: Semigroup[String] = Semigroup.instance[String]((x, y) => x + y)

    val mySemigroup: Semigroup[Option[String]] = Semigroup[Option[String]]

The first line is me just defining my Semigroup - nothing interesting there.

The second is much more magic. It invokes this Scalaz code:

object Semigroup {
  @inline def apply[F](implicit F: Semigroup[F]): Semigroup[F] = F

which is somewhat confusing as F is used for more than one purpose. Basically, this line says: there should implicitly be a value of type Semigroup[F] where F is defined in the call site and I will return that value. (Note that the symbol 'F' is confusingly used to represent both the returned value and the type at the call site).

Well, the only implicit value that we defined is our own stringSemigroup and that's of type Semigroup[String] not Semigroup[Option[String]]. But then we also imported Scalaz' optionMonoid that says if there is a Semigroup[String] hanging around in the implicit ether, I can deliver a Monoid[Option[String]]. Since Monoid is a type of SemigroupmySemigroup is legitimately given this Monoid[Option[String]].

Et voila. Not immediately obvious to a Java programmer but that's how Scala does it.

Further reading

1. Stackoverflow: Scalaz learning resources.