Monday, March 21, 2016

Little differences and Big Data

Typically, a small amount of code is executed against a large amount of data in Spark etc. Because of this, your code needs to be efficient. I've just come a code that was taking more time to read and parse the data than to actually do any computations on it!

Don't be lazy

The problem showed up when I looked at the threads of the cluster's executors (you don't need jstack to do this; you can do it from the Spark GUI). Most executor threads were BLOCKED on accessing a lazy val while only one thread was RUNNABLE while accessing it. Oops - lazy vals use synchronization under the covers (see here for why lazy can be expensive).

Not always obvious

Another innocuous looking piece of code might be this:

    private static final Random random = new java.util.Random();

    static long[] generateRandom(int num, long maxExclusive) {
        long    bitMask = randomBitMask(maxExclusive);
        int     index   = 0;
        long    count   = 0;
        long[]  results = new long[num];

        while (index < num) {
            long nextVal = count ^ bitMask;
            if (nextVal < maxExclusive) {
                results[index] = nextVal;
        return results;

    private static long randomBitMask(long maxExclusive) {
        long seed = random.nextLong();
        return seed & createMask(maxExclusive);

    private static long createMask(long highest) {
        long leftMost = Long.highestOneBit(highest);
        return (leftMost << 1) - 1;

This is an attempt at implementing a non-repeating list of random numbers using a Linear Feedback Shift Register.

Basically, I wanted a list of (semi-) random numbers that don't repeat themselves. If you want only a few from a large set, you might just get a random one and add it to the list, re-trying if you've seen it before. This is fine but if you want lots of numbers out of a set only slightly larger  (that is when num is comparable to maxExclusive), you'll be doing a lot of retrying. This is where the code above comes in. Unfortunately, it is considerably slower than the "try again" approach when num is a good deal less than maxExclusive.

So, benchmarking with this in the pom.xml:


(where jmh.version is 1.11.3) and something in a src/test/java that looks something like this:

    public static final int LARGE = 100000000;

     Result "javaRandomNumbersBenchmark_LARGE":
     198.036 ±(99.9%) 37.176 ops/s [Average]
     (min, avg, max) = (123.962, 198.036, 290.256), stdev = 42.812
     CI (99.9%): [160.861, 235.212] (assumes normal distribution)

     Benchmark                                               Mode  Cnt    Score    Error  Units
     RandomStatsBenchmark.javaRandomNumbersBenchmark_LARGE  thrpt   20  197.703 ± 36.994  ops/s

    public void javaRandomNumbersBenchmark_LARGE() {
        JavaRandomStats.generateRandom(10, LARGE);

     Result "simpleUniqueRandomNumbersBenchmark_LARGE":
     3103855.467 ±(99.9%) 24691.158 ops/s [Average]
     (min, avg, max) = (2845502.900, 3103855.467, 3277692.295), stdev = 104543.910
     CI (99.9%): [3079164.308, 3128546.625] (assumes normal distribution)
    public void simpleUniqueRandomNumbersBenchmark_LARGE() {
        uniqueRandomNumbersSmallNumLargeMax(10, LARGE);

I got the results you see in the comments. I'm not currently sure why my attempt at optimisation is such a performance killer.

More contention

Spark's Broadcast variables are an efficient way to access read-only data without making network calls. However, in the documentation examples, you'll see code like this:

records mapPartitions { rowIter =>
  rowIter map { case (record) =>
    val x = broadcastX.value
    // ... do something with x

But I noticed a lot of contention when reading the value deep down in TorrentBroadcast.readBroadcastBlock (version 1.3.1). Re-writing the code slightly to:

records mapPartitions { rowIter =>
  val x = broadcastX.value
  rowIter map { case (record) =>
    // ... do something with x

meant the executor threads suffered far less contention as the broadcast variable is only obtained once. Doing this, another 10% or so of time was saved.

Sunday, March 13, 2016

GraphX and the Pregel algorithm

GraphX is a Spark module that allows me to find (typically) small sub-graphs in a huge collection of vertices. I've only got a prototype running but I can attest that it is indeed very fast.

GraphX does this by using the Pregel Algorithm. This is named after the river Pregel on which the Seven Bridges of Konigsberg problem was based. The problem asks: can you cross all seven bridges in the city of Konigsberg once and only once? You can't and the proof was the start of Graph Theory.


GraphX's representation of graphs has both an RDD of edges as well as vertices.

The VertexRDD is just an RDD of tuples of a vertex ID and an attribute.

An EdgeRDD is just an RDD of Edges where an edge is just a triple of source, destination and attribute.

What these vertex and edge attributes are is totally arbitrary.


The Pregel algorithm was inspired by a model that looks a lot like how a multi-core processor works, with each core doing its computations then communicating with the others by the bus.

(This particular abstract model of a computer is called Bulk Synchronous Parallel which in turn is like Parallel Random Access Machine, a shared-memory abstract machine.)

In our analogy, each processor core is represented by a Spark partition and each message on the bus is delivered by a Spark join between RDDs.

I thought the simplest explanation was the second at this link by Shagun Sodhani. Basically, we repeatedly perform iterations over all vertices (a superstep) that executes a user-defined function on each vertex. This function can send and receive messages to other vertices and/or mutate its own vertex.

Spark's Implementation

Spark modifies this slightly. "Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over edges, enables the message sending computation to read both vertex attributes, and constrains messages to the graph structure." (from the docs)

You invoke all this magic with a simple call to connectedComponents() on the graph. As its name suggests, this will return all the connected components.

(This is not to be confused with strongly connected components where all component members can reach each other but not necessarily other members of the same directed graph. For  this problem you might use the linear-time Kosaraju's algorithm).

The best description of how Spark implements Pregel I found here:

"It's fairly simple - each node just broadcasts its VertexId to its neighbors, and its neighbors do the same. Any node that receives a VertexId lower than its own broadcasts that the next round; if not the Vertex goes silent" and does not participate in the algorithm any more.


Before Spark calls its Pregel implementation it must do four things.

First, it points all vertices to themselves. That is, each vertex's attribute is itself - that is a Long.

    val ccGraph = graph.mapVertices { case (vid, _) => vid }

Secondly, it defines the function to run on all vertices upon receiving a message. In our case, that's simply picking the minimum value of the vertex's attribute and the incoming message. This is the user-defined vertex program that the Pregel definition mentions.

Then it defines the function to be executed at each edge. The output on which vertex has the lower attribute. If it's the source vertex attribute, then the output is a mapping from the destination vertex ID to source vertex's attribute. If it's the destination vertex attribute, then the output it's a mapping of the source vertex ID to the destination's attribute. These are the Pregel messages that will be sent to different nodes. The key is the receiving node's ID and the value is the message payload. We call this function the map function.

Finally, it defines a function that given two vertex attributes will pick the minimum. We call this the reduce function.


The Pregel implementation repeatedly runs over the graph running the given map function on each edge generating messages that will then be reduced using the given reduce function. For our use case, this boils down to mapping each edge to a [node ID, attribute]-tuple and then reducing all the messages for each recipient node to a message that has the minimum attribute.

We join these messages with the vertices for whom they're destined (as we would join any RDDs) running our user-defined vertex program on the result. Now, all our ducks are lined up for another iteration generating yet more new messages.

Only when we have no more messages do we halt the iterations.