Friday, March 27, 2015

Not so clever Spark


More playing with new shiny toys...

Apache Spark promises to make handling big data faster. It also interfaces with (among other things) Cassandra. This is very useful as Cassandra has limited commands for processing data.

I tried reading in my data from Cassandra on the Spark CLI, so:

val rdd = sc.cassandraTable("keyspace", "table")

Now, I want to sum up all the values for a particular subset of my rows in table. Being little more than a key/value store, I can't do this in Cassandra so let's try it in Spark:

rdd.filter(_.getString("my_where_field") == "X").map(_.getDouble("my_value")).fold(0)((x, y) => x + y)

Not so fast. Really. It takes minutes. The solution is to be more precise earlier:

val rdd = sc.cassandraTable("keyspace", "table").where("my_where_field" = 'X')
rdd.aggregate(0.0D)((acc, row) => acc + row.getDouble("my_value"), (d1, d2) => d1 + d2)

(The second function ((d1, d2) => d1 + d2) defines how the results from all the jobs are processed.)

This is much faster! About 3s over 10 000 rows out of 100 million.

Spark is very nice but don't expect queries to run within a time the average web user's is familiar with. It spins up JVMs on all the boxes in the cluster and executes jobs by default in a serial manner. However, if you want to reduce number crunching from, say, an hour down to minutes, it might be the tool for you.

But do be aware that it is still somewhat immature. I downloaded the latest DataStax bundle that has version 1.1.0 (it's on 1.3 at the moment) and got a ClassCastException deep down in Scala code using Spark SQL (which to be fair is not considered production ready).

val results = csc.cassandraSql("select sum(my_value) from table where my_where_field = 'X'
.
.
results.collec().foreach(println)
15/03/27 19:09:27 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 18, 10.20.166.198): java.lang.ClassCastException: java.math.BigDecimal cannot be cast to scala.math.BigDecimal
    scala.math.Numeric$BigDecimalIsFractional$.plus(Numeric.scala:182)
    org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58)
    org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58)
    org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:114)
    org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:58)



Sunday, March 22, 2015

Interesting syntax but does it makes a bad API?


There's yet another interesting syntax in Scala for methods that look like this:

  def intOrErr(x: String): Int Or ErrorMessage = { ...

where it appears that the return type can really be one of two types. This actually isn't the case. It's Scala's syntactic sugar. It is equivalent to:

  def intOrErr(x: String): Or[Int, ErrorMessage] = { ...

but reads more nicely.

I came across this syntax while looking at a colleague's use of the Scalactic library. Production code returned an Or and he mapped over it. The mistake he made was what if the code had executed the unhappy path? The assertions in his map function would not have executed and the test would have given a green bar, the coder none the wiser that his code was running unhappily.

He should have used a fold but that's not immediately obvious (unlike Java code that throws an Exception where you must catch it or explicitly state that the method calling your code must deal with it).

I posted on this forum to get opinions as to how worrying this is. There's some interesting responses ranging from it's less than ideal to assertions in test code suck and there are libraries to help do it better.

Thursday, February 26, 2015

Cassandra for the Busy - Part 2


Interesting problem I had this week that's obvious when you think about it.

The Config

I have a cluster of 5 Cassandra (version 2.0.11.83) nodes. I execute:

DROP KEYSPACE IF EXISTS my_keyspace
CREATE KEYSPACE my_keyspace WITH REPLICATION = { 
    'class' :              'SimpleStrategy',
    'replication_factor' : 2 
}
CREATE TABLE my_table (
    my_int_id    varint,
    my_string_id varchar,
    my_value     decimal,
    PRIMARY KEY (my_int_id, my_string_id)
)

I then start hammering my cluster with some CQL like:

INSERT INTO my_table 
    (my_int_id, my_string_id, my_value)
    VALUES (1, "2", 42.0)

or such like. The code I use to write to the DB looks something like:

RegularStatement  toPrepare         = (RegularStatement) (new SimpleStatement(cql).setConsistencyLevel(ConsistencyLevel.TWO));
PreparedStatement preparedStatement = cassandraSession.prepare(toPrepare);
.
.
BoundStatement    bounded           = preparedStatement.bind(new BigInteger(myIntId), myStringId, new BigDecimal("42.0"));
cassandraSession.execute(bounded);


The Error

I then kill a single node and my client starts barfing with:

Not enough replica available for query at consistency TWO (2 required but only 1 alive)

which baffled me initially because there were still 4 nodes in the cluster:

dse$ bin/nodetool -h 192.168.1.15 status my_keyspace
Datacenter: Analytics
=====================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address       Load       Tokens  Owns   Host ID                               Rack
UN  192.168.1.15  83.98 KB   256     40.0%  86df3d65-0e19-49b2-a49e-bc89d6347fa4  rack1
UN  192.168.1.16  82.78 KB   256     42.0%  8fghe485-0e19-49b2-a49e-bc89d6347fa5  rack1
UN  192.168.1.17  84.91 KB   256     38.0%  9834edf1-0e19-49b2-a49e-bc89d6347fa6  rack1
UN  192.168.1.18  88.22 KB   256     41.0%  873fdeab-0e19-49b2-a49e-bc89d6347fa7  rack1
DN  192.168.1.19  86.36 KB   256     39.0%  2bdf9211-0e19-49b2-a49e-bc89d6347fa8  rack1

Of course, the problem is that your tolerance to a node fault is:

    Tolerance = Replication Factor - Consistency Level

For me, that was Tolerance = 2 - 2 = 0 tolerance. Oops.

This is not a problem on all inserts. Running

dse$ bin/cqlsh -uUSER - pPASSWORD
Connected to PhillsCassandraCluster at 192.168.1.15:9160.
[cqlsh 4.1.1 | Cassandra 2.0.11.83 | DSE 4.6.0 | CQL spec 3.1.1 | Thrift protocol 19.39.0]
Use HELP for help.

and inserting worked fine. But this is because cqlsh uses a different consistency level.

cqlsh> consistency
Current consistency level is ONE.

so, let's change that:

cqlsh> consistency TWO

Consistency level set to TWO.

Now do the inserts fail? Well sometimes. It depends on which nodes we attempt to write to, something transparent to the user.


The Problem

Cassandra will try to put a piece of data on REPLICATION_FACTOR number of nodes. These nodes will be determined by hashing their primary key. If a node is down, Cassandra will not choose another. It simply cannot be written to at this time (of course, it might get the data if/when the node joins the cluster).

Upon inserting, the client decides how many nodes must be successfully written to before the INSERT is declared a success. This number is given by your CONSISTENCY_LEVEL. If the contract cannot be fulfilled then Cassandra will barf.


Further Reading

Application Failure Scenarios with Cassandra

Lightweight Transactions in Cassandra 2.0


Monday, February 23, 2015

Cassandra For the Busy - Part 1


A quick guide to Cassandra admin

From the Cassandra installation directory, running:

bin/nodetool -uUSER -pPASSWORD -hNODE ring KEYSPACE

(where the username, password, node and keyspace correspond to whatever you set up) produces data that reads like this:

Token is a number between Long.MIN_VALUE and Long.MAX_VALUE that is used in hashing the key. (See org.apache.cassandra.dht.Murmur3Partitioner.describeOwnership for how ownership is calculated over  a cluster using something like a MurmerHash algorithm. This is the default for Cassandra ).

Load refers to how much has been flushed to disk. Therefore, this may take some time to register a change after you have truncated a table containing 10 million rows. This appears to correspond to org.apache.cassandra.service.StorageService.getLoad().

Owns This represents how much of the ring range a token covers. Note: it might not add up to 100%. "For DC [data centre] unaware replication strategies, ownership without replication will be 100%" (see code comments here. for org.apache.cassandra.service.StorageService.effectiveOwnership). But with replication, it appears to be 100% * the replication factor.


Saturday, February 21, 2015

Type Constructors


Type Constructors in Java and Scala

In "Functional Programming in Scala", I came across this code:

trait Foldable[F[_]] { ...

This is an example of something common in Java and Scala: type constructors.

"We write it as F[_], where the underscore indicates that F is not a type but a type constructor that takes one type argument. Just like functions that take other functions as arguments are called higher-order functions, something like Foldable is a higher-order type constructor or a higher-kinded type.

"Just like values and functions have types, types and type constructors have kinds. Scala uses kinds to track how many type arguments a type constructor takes, whether it’s co- or contravariant in those arguments, and what the kinds of those arguments are." [1]

However, each language differs in their use of type constructors.

"[I]n Scala it is not valid to say something is of type List, unlike in Java where javac would put the List<Object> for you. Scala is more strict here, and won’t allow us to use just a List in the place of a type, as it’s expecting a real type - not a type constructor." [2]

Java doesn't allow you to write something so abstract as the Foldable trait above. For instance, this won't compile:

public class Foldable<U<T>> { ...

Which brings us on to Types of a Higher Kind...

[1] Functional Programming in Scala

Sunday, February 15, 2015

More on those Monoids


I mentioned in a previous post about Scalaz' implementation of Monoids. Having run through the code with somebody better at Scala than I am, here are my notes.

Our code imports the implicit means of creating a monoid. The signature of the method is this:

  implicit def optionMonoid[A: Semigroup]: Monoid[Option[A]] = new Monoid[Option[A]] {

The [A: Semigroup] looks a little like a type parameter but it is totally different. It's a Context Bound that "are typically used with the so-called type class pattern, a pattern of code that emulates the functionality provided by Haskell type classes, though in a more verbose manner" (from this excellent explanation).

Context Bounds of the form [X: Y] say: I am expecting an implicit value of type Y[X]. In fact, this line is equivalent to Scalaz' implementation:

    implicit def optionMonoid[A](implicit x: Semigroup[A]): Monoid[Option[A]] = new Monoid[Option[A]] {

Now, recall that my code using the Scalaz library looked like this:

    implicit val stringSemigroup: Semigroup[String] = Semigroup.instance[String]((x, y) => x + y)

    val mySemigroup: Semigroup[Option[String]] = Semigroup[Option[String]]

The first line is me just defining my Semigroup - nothing interesting there.

The second is much more magic. It invokes this Scalaz code:

object Semigroup {
  @inline def apply[F](implicit F: Semigroup[F]): Semigroup[F] = F

which is somewhat confusing as F is used for more than one purpose. Basically, this line says: there should implicitly be a value of type Semigroup[F] where F is defined in the call site and I will return that value. (Note that the symbol 'F' is confusingly used to represent both the returned value and the type at the call site).

Well, the only implicit value that we defined is our own stringSemigroup and that's of type Semigroup[String] not Semigroup[Option[String]]. But then we also imported Scalaz' optionMonoid that says if there is a Semigroup[String] hanging around in the implicit ether, I can deliver a Monoid[Option[String]]. Since Monoid is a type of SemigroupmySemigroup is legitimately given this Monoid[Option[String]].

Et voila. Not immediately obvious to a Java programmer but that's how Scala does it.

Further reading

1. Stackoverflow: Scalaz learning resources.



Sunday, February 8, 2015

Things they don't tell you about Akka clustering


... or rather they do but it's hard to find. Akka promises to make multi-threaded, multi-JVM code easy. It's very nice but it's not necessarily easy.

Getting your actor system up and running is not too hard and there are good books on it. Derek Wyatt's book just missed the inclusion of cluster and "would probably add a couple of hundred pages" anyway. Here I've tried to give the minimum to get a cluster up and running.

The thing to note is that your Akka actors don't need to changes at all. It's all down to the configuration.

Ok, so let's start with a simple Actor that prints out everything he hears:

class PhillTheActor extends Actor {
  
  val myIndex = PhillTheActor.counter.incrementAndGet()

  override def receive: Receive = {
    case _ => {
      println("message received by actor #" + myIndex)
      Thread.sleep(1000) // very naughty
    }
  }
}

object PhillTheActor {
  final val counter = new AtomicInteger(0)
}

Very simple.

First, let's take the config. We'll create a class we can run:

object ClusterApp extends App {

  println("started")
  
  val systemName = "testSystem"
  val myPort     = args(0)
  val seedPort   = "9119"
  
  val actorSystem = ActorSystem(systemName, ConfigFactory.parseString(
      s"""
        akka {
          actor.provider = "akka.cluster.ClusterActorRefProvider"
      
          remote.log-remote-lifecycle-events = on
          remote.netty.tcp.port = $myPort
          remote.netty.tcp.hostname = 127.0.0.1
          
          cluster {
              seed-nodes = [
                  "akka.tcp://$systemName@127.0.0.1:$seedPort",
              ]
          }
        }
      """))
.
.

We've (arbitrarily) chosen port 9119 to be the port of the seed node. We could have chosen any reasonable port number but the node that has the same value for myPort will be the one calling the shots as we'll see later.

"The seed nodes are configured contact points for initial, automatic, join of the cluster... [Y]ou can only join to an existing cluster member, which means that for bootstrapping some node must join itself." [1]

Now, we need a local router:
 
.
.
  val localSamplerActorName = "localSamplerActorName"
    
  val localRouter =   
    actorSystem.actorOf(Props(classOf[PhillTheActor]).withRouter(RoundRobinPool(nrOfInstances = 2)), localSamplerActorName)
.
.
     
We'll be using localSamplerActorName later so keep an eye on that.

Then we need the cluster router. From the scala docs:

"akka.routing.RouterConfig implementation for deployment on cluster nodes. Delegates other duties to the local akka.routing.RouterConfig, which makes it possible to mix this with the built-in routers such as akka.routing.RoundRobinRouter or custom routers."

We only need this on one node so:

.
.
  if (myPort == seedPort) {
        
      val loadBalancingGroup = AdaptiveLoadBalancingGroup(HeapMetricsSelector)
      val clusterRouterGroupSettings = ClusterRouterGroupSettings(
          totalInstances = 3,
          routeesPaths = List(s"/user/$localSamplerActorName"),
          allowLocalRoutees = true,
          useRole = None
          )
      
      val clusterRouterGroup = ClusterRouterGroup(loadBalancingGroup, clusterRouterGroupSettings)
      val clusterActor       = actorSystem.actorOf(clusterRouterGroup.props, "clusterWideRouter")
.
.

There's a lot going on here. The argument totalInstances refers to how many instances in the cluster will be part of this load balancing.

The routeesPaths is the path to the actor that is the local router (see above) that is referenced by the variable localSamplerActorName.

The allowLocalRoutees is straightforward: does this node also process work or is it just a coordinator only?

As for roles: "Not all nodes of a cluster need to perform the same function: there might be one sub-set which runs the web front-end, one which runs the data access layer and one for the number-crunching. Deployment of actors—for example by cluster-aware routers—can take node roles into account to achieve this distribution of responsibilities." [1]

Notice that the cluster wide balancer uses a Group and the local balancer uses a Pool. "The Akka team has introduced two different types of routers in Akka 2.3: Pool and Group. Pools are routers that manage their routees by themselves (creation and termination as child actors) whereas Groups are routers that get pre-configured routees from outside." (from this blog). So, the local router using a round-robin pool will spin up actors for us while the cluster-wide router is using this pre-configured routing actor.

Finally, we send messages to this cluster just as we would any actor:

      while (true) {
        clusterActor ! "hello!"
        Thread.sleep(10)
      }

whereupon PhillTheActor starts printing out messages. If I start this app again with other ports as arguments, then PhillTheActor on them also prints out messages.

Obviously, there is much more to Akka clustering but this got my simple app up and running.

[1]  Akka Cluster Usage docs

Further Reading

Adam Warski's blog.