Tuesday, November 24, 2015

Spark's sortByKey doesn't

... or at least not when you map. Sure, if you collect you get the elements in a sorted order. But let's say you want to process the elements in a given order, say, finding the difference from one to the next. You might naively write:

    val pairRdd   = sparkContext.parallelize((1 to 10), 3).map(x => (x, x))
    val sortedRdd = pairRdd.sortByKey()

  def bogusMap(sortedRdd: RDD[(Int, Int)]): Unit = {
    var last = 0
    def checkMonotonicKeys(kv: (Int, Int)): Int = {
      val key = kv._1
      if (key != last + 1) throw new IllegalStateException(s"key = $key, last = $last")
      last = key
    val mappedAndSorted = sortedRdd.map(checkMonotonicKeys(_))
    mappedAndSorted.collect().foreach { kv =>

But you'll see an exception thrown something like:

java.lang.IllegalStateException: key = 8, last = 0

The reason is that the keys are sorted within each partition not across all partitions.

One "solution" is to ensure that all the elements are within one partition such as:

    val sortedInto1Partition = pairRdd.sortByKey(numPartitions = 1)

This works but there is little point to using Spark for it since there is no parallelization. The best solution is to generate the differences when the data was incoming.

Incidentally, this article has a good description of what is happening during a sortByKey operation. Basically, each shuffle has two sides. The first "writes out data to local disk" and the second makes "remote requests to fetch that data... The job of the [first] side of the shuffle is to write out records in such a way that all records headed for the same [second] task are grouped next to each other for easy fetching." Note that the second task that groups data is not obligated to also order it within a group.

As another aside, note the importance of persisting an RDD in this use case.

"Failure to persist an RDD after it has been transformed with partitionBy() will cause subsequent uses of the RDD to repeat the partitioning of the data. Without persistence, use of the partitioned RDD will cause reevaluation of the RDDs complete lineage. That would negate the advantage of partitionBy(), resulting in repeated partitioning and shuffling of data across the network, similar to what occurs without any specified partitioner.

"In fact, many other Spark operations automatically result in an RDD with known partitioning information, and many operations other than join() will take advantage of this information. For example, sortByKey() and groupByKey() will result in range-partitioned and hash-partitioned RDDs, respectively. On the other hand, operations like map() cause the new RDD to forget the parent’s partitioning information, because such operations could theoretically modify the key of each record." [1]

The code above that forces all the data into one partition (using numPartitions = 1) seems immune to map forgetting the the parent RDD's partitioning information. Since there is only one partition, there is no information to forget.

[1] Learning Spark - Karau and Konwinski

Wednesday, November 18, 2015

Interpreting Spark

Spark has a nice web interface that allows you to find problems in the jobs you submit to it. Here are some notes I made on using it. First, let's clarify some terminology:

Task, Stage, Job

"Invoking an action inside a Spark application triggers the launch of a Spark job to fulfill it...  The execution plan consists of assembling the job’s transformations into stages. A stage corresponds to a collection of tasks [map, flatMap etc] that all execute the same code, each on a different subset of the data. Each stage contains a sequence of transformations that can be completed without shuffling the full data.

"At each stage boundary, data is written to disk by tasks in the parent stages and then fetched over the network by tasks in the child stage. Because they incur heavy disk and network I/O, stage boundaries can be expensive and should be avoided when possible." [1]

Narrow and Wide Transformation

"For the RDDs returned by so-called narrow transformations like map and filter, the records required to compute the records in a single partition reside in a single partition in the parent RDD...  Spark also supports transformations with wide dependencies such as groupByKey and reduceByKey. In these dependencies, the data required to compute the records in a single partition may reside in many partitions of the parent RDD" [1]

And this is what can induce a shuffle. Imagine a diagram with time running down the Y-axis. Now imagine this diagram having vertical columns representing a partition at various points in time with horizontal slices representing RDDs in which these partitions live, stacked such that parents are (naturally) above children. Now, if we draw arrows from the data points in one RDD/Partition combo to the data on which it relies, we'd hope those arrows remain in the same vertical partition stream. If they cross streams, a shuffle ensues.

The Web GUI

You may see Jobs that have Skipped Stages. This is nothing to worry about. From the code:

    This may be an underestimate because the job start event references all of the result
    stages' transitive stage dependencies, but some of these stages might be skipped if their
    output is available from earlier runs.
    See https://github.com/apache/spark/pull/3009 for a more extensive discussion.

When looking at my own application's performance (open source finance toy & tool found here on GitHub), a DAG (directed, acyclic graphs)  may look like this:

My application takes end-of-day stock prices of two different companies and runs a Pearson correlation on them.

In the picture above we see the details of Job 1. Stages 4,5 and 6 are skipped as they were already loaded by Job 0 (which is not shown here).

Stages 7 and 8 are more interesting. I join the two data sets (keyed on date) and map over both of them to get just the stock price (discarding the date). Note that this is done on both stock data series in parallel.

From then on, all work is done in Spark's MLLib library. First, the zip and map is done in Correlation.computeCorrelationWithMatrixImpl. Then, in calculating the Pearson Correlation from the data, it calls RDD.treeAggregate twice.

This method takes an identity and two functions. The first function handles how the data is aggregated within a partition. The second then handles the totals of this function over all partitions. Since this latter function requires "crossing the streams", a stage finishes and a new one begins (Stage 8 which happens to be another call to treeAggregate).

This visualization will also tell you when one of the RDDs is cached - it will be denoted by a green highlighted dot [2] although we do not see this on the picture above.

[1] Cloudera blog.

[2] Databricks blog.

Wednesday, October 21, 2015


This is a very interesting comparison between how the architects of Java and Scala each implemented optional values.

Functor Laws

First, what is a functor? From Categories for the Working Mathematician (Mac Lane):

A functor is a morphism of categories. In detail, for categories C and B a functor T:C->B with domain C and codomain B consists of two suitably related functions: The object function T, which assigns to each object c of C an object TC of B and the arrow function (also written T) which assigns to each arrow f:c->c' of C and Tf: Tc->Tc' of B, in such a way that
T(1c) = 1Tc, T(g o f) = Tg o Tf
This takes several readings to sink in. So don't bother. Let's look at some code.

Actually, the first rule is the identity law that says if you map over something with the output being the same as the input you get what you started with. That's too trivial to spend any more time on.

The second can be written in Scala as:

val x: Option = ...
x.map(f compose g) } == x.map(g).map(f) // true

or in Java 8 as:

Optional x = ...
x.map(f).map(g).equals( x.map(g.compose(f)) ) // true... Sometimes. See below.

Why is it useful?

"If I map a function across a List, I should get back a List of the same size as I started with, and with all the elements in the corresponding order. This means I can guarantee some things, like for instance if I reverse the order of the list and then map the function, I will get the same result as mapping the function and then reversing the list. I can refactor without fear." (from Wesely-Smith's post)

How Scala and Java differ

The two radically diverge on their treatment of nulls. Firstly, in Java, if you did this:

Optional ofNull = Optional.of(null);

I'd have a NullPointerException thrown!

Whereas, in Scala you could do this:

val optionWithNull = Option(null)

which would actually be a None. Or, you could do this:

val someWithNull = Some(null)

which is very different. It really is Something that contains a reference to a null.

In this way, Java is less expressive. If, say we wanted to get a value (mapped in an Option) from a Map, Scala could distinguish between the value not being there (None) and it being there but it being null (Some(null)). An equivalent in Java:

        Map map = new HashMap<>();
        map.put("A", null);
        map.put("B", null);
        map.put("C", null);
        System.out.println(map.size()); // 3

gives us no way to express this as if we had a method that returned an Optional, it would blow up for keys "A", "B" and "C".

Higher Kinded Types in Scala and Java

What are they?

"For lack of a fuzzier name, types that abstract over types that abstract over types are called higher-kinded types” (from here). This StackOverflow answer has some more details. Note the answer uses Type Lambdas, an unusual syntax for partial application of types.

What do they look like?

Well, in Java you won't see them (not directly anyway) as that's one of the limitations of the language. You can't write, for instance:

class HigherKinded<T<U>> { } // doesn't compile

But in Scala, they might look like this:

  trait Functor[F[_]] {
    def map[A, B](fa: F[A])(f: A => B): F[B]

the Functor trait is a higher-kinded type as it abstracts over F which in turn abstracts over something else. By the way, F[_] is an existential type.

Why is this useful?

By parametrizing over the type constructor rather than a particular type, eg F[String], one can use the parameter in method definitions with different types. In particular, in the definition of map, the return type is F[B], where B is a type parameter of the method map; with parametrization by types only, map would have to be homogeneous. (This is a slightly modified quote taken from here).

This is to say something that's not higher-kinded that might look like this:

  trait NotAHigherKindedType[List[A]] {
    def map[A, B](fa: List[A])(f: A => B): List[B]

is unnecessarily concrete. It only works for Lists. Something more abstract would look like:

  trait FunctorHKT[F[_]] {
    def map[A, B](fa: F[A])(f: A => B): F[B]

and could be implemented with:

  class MyFunctorForGenTraversable extends FunctorHKT[GenTraversable] {
    override def map[A, B](fa: GenTraversable[A])(f: (A) => B): GenTraversable[B] = fa map f

and would work equally well for Lists as Sets (although not, say, Futures but that's just because the don't implement GenTraversable).

A more sophisticated example that uses implicits can be found here. Because of the limitation we mention above about our solution working for Sets and Lists but not futures, Wesley-Smith notes that this is why it's "less powerful and also more complicated (both syntactically and semantically) than ad-hoc polymorphism via type-classes."

Sunday, October 18, 2015

In- Co- Contravariants and Mutability

Take a Covariant type constructor:

  class ContainerCovariant[+A](element: A) {

Let's say we're writing it to contain an A, whatever that is. We'll leave it generic. OK, here's the set method:

  class ContainerCovariant[+A](element: A) {
     def set(what: A): Unit = ??? // covariant type A occurs in contravariant position in type A of value what

Eek, it doesn't compile. (Note: all argument types are covariant on a JVM. Haskell doesn't have sub- and super-types so you encounter Co- and Contra-variance less often.)

Why it doesn't compile can be understood by Proof by Contradiction. Say it did compile. Then we could write:

    val covariantWithChild:  ContainerCovariant[Child]  = new ContainerCovariant[Child](aChild)
    val covariantWithParent: ContainerCovariant[Parent] = covariantWithChild
    val aChildRight: Child = covariantWithChild.get     // what the hey? this is a parent not a child!

So, one way the compiler makes it hard for us to do this is the above error. There are more.

[Note that when a type constructor is co- or contra-variant, it is relative to the reference pointing at it. For example, in the above code, the reference on the right hand side (a ContainerCovariant[Child]) is covariant to the reference on the left (a ContainerCovariant[Parent])

This leads to an important mnemonic, the five Cs:

"Co- and Contra-variance of Classes are Compared to the Call Site".]

Anyway, we can get around this compilation error (see here and the other question linked off it) by making the type of the parameter contra-variant in A as the compiler is asking us to do:

    def set[B >: A](what: B): Unit = ???

and the above block of code compiles. So, we've beaten the system, right? Not quite. If we're mutating the state of our covariantWithChild, then we have to have a var (or similar strategy). So, let's add it:

  class ContainerCovariant[+A](element: A) {

    var varA: A = null.asInstanceOf[A] // <-- "covariant type A occurs in contravariant position in type A of value varA_
    def set[B >: A](what: B): Unit = { varA = what }

Only this time, the compiler complains at the var declaration. Making it a val helps:

    val valA: A = null.asInstanceOf[A] // this compiles

But now our class isn't mutable so we're foiled again.

Remembering the Get/Set Principle, a mutable class should have an invariant type, so let's add:

  class ContainerCovariant[+A](element: A) {
    class InvariantMutableRef[B](var b: B)
    val invariantMutableRef = new InvariantMutableRef(valA) // "covariant type A occurs in invariant position in type => ..."

If A is covariant, all references that use it must be to. The same problem exists if we try to introduce a contra-variant holder:

  class ContainerCovariant[+A](element: A) {
    class ContravariantRef[-B] { ... }
    val contravariantRef = new ContravariantRef[A] // "covariant type A occurs in contravariant position in type => ContainerCovariant.this.ContravariantRef[A] of value contravariantRef..."

What's more, our contra-variant container couldn't have a member either, mutable or immutable:

    class ContravariantRef[-B] {
      val b: B = null.asInstanceOf[B] // "contravariant type B occurs in covariant position in type => B of value b"
      var b: B = null.asInstanceOf[B] // "contravariant type B occurs in covariant position in type => B of method b"
because we could refer to ContravariantRef with a more specific type of B when B wasn't more specific at all. Double eek.

So, mutability is related to contr-, co- and invariance down at the compiler level which tries to stop us from doing something evil.

Thursday, October 15, 2015

Cheeky and efficient maths algorithm

There is a very efficient algorithm in Breeze for calculating polynomials. That is, say you have an equation like:

10x3 + 5x2 + 3x + 1

and you want to find the answer for a given x. You could of course calculate x to the power of 3, times it by 10 add this to 5 times x to the power of 2 etc etc...

But a much more efficient way is to observe that the very same equation can be written:

( ( (10 * x) + 5) * x ) + 3 ) * x ) + 1

and run this code:

    var i = coefs.length-1
    var p = coefs(i)
    while (i>0) {
      i -= 1
      p = p*x + coefs(i)

where coefs are the array of numbers that come before the xs, in our case 10, 5, 3 and 1 (note that the 1 is actually 1 x0  = 1). The answer at the end is in the var p.

The code uses vars which are generally considered bad practise but there is an argument for being pragmatic too. We could have been more functional and written:

      (coeff, agg) => (agg + coeff) * x
    ) + coefs(0)

But it is debatable if it's any nicer.

Architectural and Development Patterns

Event Sourcing
"[E]very operational command executed on any given Aggregate instance in the domain model will publish at least one Domain Event that describes the execution outcome. Each of the events is saved to an Event Store in the order in which it occurred. When each Aggregate is retrieved from its Repository, the instance is reconstituted by playing back the Events in the order in which they previously occurred... To avoid this bottleneck we can apply an optimization the uses Aggregate state snapshots." [1]

Instead of having one data store for everything, "the change that CQRS introduces is to split that conceptual model into separate models for update and display, which it refers to as Command and Query respectively". [2]

I've seen this work well in a project that used a Lambda Architecture. Here, risk data was being written slowly but surely to a Hadoop cluster. When a day's data was complete, the data could be converted into a format that was digestible by the Risk Managers.

"Having separate models raises questions about how hard to keep those models consistent, which raises the likelihood of using eventual consistency." [2] For us this was fine as the Risk Managers were in bed while the munging was taking place.

"Interacting with the command-model naturally falls into commands or events, which meshes well with Event Sourcing." [2] And this is where Kafka came in.

Martin Fowler is cautious about using CQRS [2] but we had a good experience. In our domain, the Risk Managers were not interested in the Query model after two weeks and so it could be thrown away. Had it been needed at any time in the future, it could always have been generated again from the Command model.

Lambda Architecture
Lambda Architectures consist of three main pieces:
  1. The batch layer, managing the master dataset (an immutable, append-only set of raw data) and pre-computing batch views.
  2. The serving layer, indexing batch views so that they can be queried in a low-latency, ad-hoc way.
  3. The speed layer, dealing with recent data only, and compensating for the high latency of the batch layer.
(taken from here).

As mentioned in the CQRS subsection above, "even if you were to lose all your serving layer datasets and speed layer datasets, you could reconstruct your application from the master dataset. This is because the batch views served by the serving layer are produced via functions on the master dataset, and since the speed layer is based only on recent data, it can construct itself within a few hours." [3]

Since Hadoop has an append-only, highly available file system (HDFS), it makes it an obvious choice for the batch layer.

"The serving layer is a specialized distributed database that loads in a batch Batch layer view and makes it possible to do random reads on it (see figure 1.9). When new batch views are available, the serving layer automatically swaps those in so that more up-to-date results are available. A serving layer database supports batch updates and random reads. Most notably, it doesn’t need to support random writes." [3] Cassandra was our choice for this layer.

Finally, "the speed layer only looks at recent data, whereas the batch layer looks at all the data at once. Another big difference is that in order to achieve the smallest latencies possible, the speed layer doesn’t look at all the new data at once. Instead, it updates the realtime views as it receives new data instead of recomput­ing the views from scratch like the batch layer does. The speed layer does incre­mental computation instead of the recomputation done in the batch layer." [3]

Lambda architecture shares some similarities with CQRS but is very different in that it never updates data (although it can add data that renders old data obsolete).

Feature Branching
The idea here is that each developer works on his own branch. He regular synchs with the main branch to lessen the pain of later merging his work into it. With Git, he'd regularly run:

> git checkout master
> git pull
> git checkout BRANCH_NAME
> git merge master
> git push

There is some debate on whether Feature Branching is an anti-pattern. Ideally, the system should be architected such that a feature is isolated to a particular silo of the code and not spill out to other components. Thus, a developer can merrily work on that particular silo and check into the main branch all the time. Mainly for political reasons, though, this is not always possible. Software like Stash that integrates with Jira can make code reviews even within a distributed team quite pleasant.

[1] Implementing Domain Driven Design, Vaughn Vernon
[2] Martin Fowler's blog.
[3] Big Data, Nathan Marz