Sunday, May 21, 2017

Lazy Scala


This might be elementary but when a simple error trips me more than once, I blog it.

You probably know that an Iterator can only be accessed once as it's stateful. "An Iterator can only be used once because it is a traversal pointer into a collection, and not a collection in itself" from here. (Iterable just means the implementing class can generate an Iterator).

The simple mistake I made was to check the size of the iterator in a temporary log statement before mapping over it. What was a little more interesting was that other collections behave the same way if you call them via their TraversableOnce interface.

To demonstrate, say we have a Set

    val aSet = Set(1, 2, 3)

and two functions that are identical other than the type of their argument:

  def mapSet[T](xs: Set[T]): TraversableOnce[String] = {
    val mapped = xs.map(_.toString)
    println(mapped.mkString("\n"))
    mapped
  }

  def mapTraversableOnce[T](xs: TraversableOnce[T]): TraversableOnce[String] = {
    val mapped = xs.map(_.toString)
    println(mapped.mkString("\n"))
    mapped
  }

then mapTraversableOnce will return an empty iterator while mapSet will return a Set of Strings. This will come as a surprise to anybody expecting Object Oriented behaviour.

Iterator also violates the functor laws. Take two otherwise identical methods:

  def isTraversableFunctor[T, U, V](xs: Traversable[T], f: T => U, g: U => V): Boolean = {
    val lhs = xs.map(f).map(g)
    val rhs = xs.map(g compose f)
    (lhs == rhs)
  }

  def isTraversableOnceFunctor[T, U, V](xs: TraversableOnce[T], f: T => U, g: U => V): Boolean = {
    val lhs = xs.map(f).map(g)
    val rhs = xs.map(g compose f)
    (lhs == rhs)
  }

and pass them aSet. The first will say it's a functor, the second says it's not.

This is somewhat trivial as the reason it fails is that the iterator has not been materialized. "TraversableOnce's map is not a functor map, but, then again, it never pretended to be one. It's design goals specifically precluded it from being a functor map" from Daniel C. Sobral.

Laziness

Iterator comes into its own when we want access to the underlying elements lazily. But there are other ways to do this like Stream "which implements all its transformer methods lazily."

Almost all collections are eager by default. Use the view method to make them lazy. Use force on these to make them eager again.

Finally, some methods are naturally lazy. For instance, exists terminates quickly if it finds what it's looking for.


Tuesday, May 16, 2017

Playing with Pregel


Implementing your own distributed Pregel algorithm in GraphX is surprisingly simple but there are a few things to know that will help you.

First, what is GraphX's Pregel implementation? Well, it takes three functions:

1. one that merges messages of type T, that is (T, T) => T.

2. one that runs on each vertex with an attribute type of VD and that takes that message and creates a new state. Its type is (VertexId, VD, T) => VD.

3. one that runs on each edge/vertex/vertex combination and produces messages to be sent to the vertices. Its type is (EdgeTriplet[VD, ED]) => Iterator[(VertexId, VD)] where ED is the edge's attribute type.

TL;DR: the vertex holds its state and the edges send it messages. If this starts sounding like the Actor Model pattern to you, you're not alone

"It's a bit similar to the actor mode if you think of each vertex as an actor, except that vertex state and messages between vertices are fault tolerant and durable, and communication proceeds in fixed rounds: at every iteration the framework delivers all messages sent in the previous iteration. Actors normally have no such timing guarantee." - Martin Kleppmann.

So, off I went and wrote my code where all the paths through a vertex are stored as state at that vertex.

And it ran like a dog. After five hours of processing, it died with out of memory exceptions.

Judicious jstack-ing the JVMs in the cluster showed that threads were hanging around in Seq.containsSlice (we're using Scala 2.10.6). This Scala method was being used to find sub-sequences of VertexIds (which are just an alias for Longs) in the paths that had already been seen.

Desperate, I turned the Seq of Longs to Strings and then used String.contains and the whole thing ran in less than 25 minutes.

This is not the first time I've seen innocent looking code bring a cluster to its knees. Curious, I wrote some micro-benchmarking code comparing these two methods using JMH and got this:


     Benchmark                                   Mode  Cnt          Score          Error  Units
     ContainsSliceBenchmark.scalaContainsSlice  thrpt    5     594519.717 ±    33463.038  ops/s
     ContainsSliceBenchmark.stringContains      thrpt    5  307051356.098 ± 44642542.440  ops/s

That's three orders of magnitude slower.

Although it's a hack, this approach gives great performance. And it shows that taking jstack snapshots of your cluster is the best way to understand why your big data application is so slow.

Friday, May 5, 2017

Spark OutOfMemoryErrors.


Spark can deal with data sets that are much larger than the available physical memory. So, you never have problems with OutOfMemoryErrors, right? Well, no. It depends what you're doing.

When asked how to avoid OOMEs, the first thing people generally say is increase the number of partitions so that there is never too much data in memory at one time. However, this doesn't always work and you're better off having a look at the root cause.

This very week was one of those times. I was getting FetchFailedException. "This error is almost guaranteed to be caused by memory issues on your executors" (from here). Increasing the number of partitions didn't help.

The problem was that I had to find similar entities and declare there was a relationship between them. Foolishly, I tried to do a Cartesian product of all entities that were similar. This is fine 99.99% of the time but scales very badly - O(n2) - for outlying data points where n is large.

The symptoms were that everything was running tickety-boo until the very end where the last few partitions were taking tens of minutes and then the fatal FetchFailedException was thrown.

These pairs were going to be passed to GraphX's Connected Components algorithm so instead of creating a complete graph (which is what the Cartesian product of the entities would give you) you need to a different graph topology. This is not as simple as it sounds as you have to be a little clever in choosing an alternative. For instance, a graph that is just a line will take ages for GraphX to process if it is sufficiently large. For me, my driver ran out of memory after tens of thousands of Pregel supersteps.

A more efficient approach is a star topology that has branches of length 1. This graph is very quickly explored by Pregel/ConnectedComponents and has the same effect as a complete graph or a line since all vertices are part of the same connected component and we don't care at this point how they got there.

The result was the whole graph was processed in about 20 minutes.