Saturday, May 21, 2016

Lazy Spark


You may want Spark to write to some data store (after all, there are some things that other technologies do better). Given an RDD distributed over many partitions on many nodes, how do you write the data efficiently?

You may choose to do something like this (where I've used println statements rather than explicit calls to the technologies of your choice):

    rdd foreachPartition { iterator =>
      println("create connection")
      iterator foreach { element =>
        println("insert")
      }
      println("close connection")
    }

Naturally, you need to get a connection inside the function (as the function will be serialized and run on other partitions that might be on other machines). 

Note that if the connection is referenced with a lazy val then each function will have its own connection (even on the same machine) and it will only be instantiated when it's run on its partition.

So, how do we know when to close the connection? A common answer is to just use connection pools.

Also note that the return type of foreachPartition is Unit so it's not too surprising that this is executed immediately since Unit hints at side effects. A quick look at the code of RDD confirms this.

Great. But what if you want to read data from some store and enhance the RDD? Now we're using mapPartitions that may very well be lazy. So, with similar code, the function might look like this:

    val mapped = rdd mapPartitions { iterator =>
      lazy val connection = getConnection()

      val mapped = iterator map { element =>
        if (connection.isOpen) select(element)
      }

      connection.close()
      mapped
    }

where I'm pretending to open and close some sort of pseudo-connection thus:

  val nSelects        = new AtomicInteger(0)
  val nConnections    = new AtomicInteger(0)

  class Connection {
    @volatile private var open = true
    def close(): Unit = { open = false }
    def isOpen(): Boolean = open
  }

  def getConnection(): Connection = {
    nConnections.incrementAndGet()
    new Connection
  }

  def select(element: Int): Int = nSelects.incrementAndGet()
  
Now, let's run RDD.count() so it should force even the lazy map to do something:

    println("Finished. Count  = " + mapped.count())
    println("nSelects         = " + nSelects.get() + ", nConnections = " + nConnections.get())

giving:

Finished. Count  = 10000
nSelects         = 0, nConnections = 0

What's this? We get the correct number of elements but no selects? What gives?

The issue is that the iterator given to the function passed to mapPartitions is lazy. This is nothing to do with Spark but is due to Scala's lazy collections (this iterator is actually a class of scala.collection.Iterator$$anon$11). If we ran this code with a real connection, we'd see that the it had closed by the time inner function wanted to do something with it. Odd, we might say since the call to close it comes later.

We could just force the mapping to run by calling size() on the resulting collection (which may or may not be efficient) but one solution suggested to me was:

    val batchSize = 100
    rdd mapPartitions { iterator =>
      val batched = iterator.grouped(batchSize)
      batched.flatMap { grouped =>
        val connection = getConnection()
        val seqMapped  = grouped.map(insert(_))
        connection.close()
        seqMapped
      }
    }

This creates (and closes) a connection for batchSize each of elements but this could be more efficient than the usual contention in a connection pool. Remember, small contention can make a big difference when dealing with hundreds of millions of elements.


Scala Curry


"All functions are curried in Haskell by default" that is, all functions in Haskell take only one argument. How does it do this? By currying.

Scala has currying built-in. To demonstrate, take this method that takes tow Ints and returns an Int:

  def aMethodWith2Args(x: Int, y: Int): Int = x + y

Let's lift it:

  val lifted: (Int, Int) => Int = aMethodWith2Args(_, _)

and curry it:

  val curried: Int => Int => Int = lifted.curried
   
So, we too now have a function that only takes one argument. It just returns another function that also takes only one argument. That's currying.

Now we can partially apply it

  val partialInt => Int = curried(1)
  println(partial(2)) // prints out '3'

Just like Haskell.

Sunday, May 15, 2016

Linear Regression in Practice


Here's a sweet little challenge. Given a linear function, write some code that estimates what it is given only the data it produces.

Let's make the function a simple one, say f(x): Double -> Double, such that we can plot a straight line. A first draft might look like this in Scala:

First, generate a random function of the form α x + β

  def main(args: Array[String]): Unit = {
    val intercept               = Random.nextDouble()
    val rate                    = Random.nextDouble()
    val f                       = generateLinearFn(intercept, rate)
    val (estRate, estIntercept) = guessRateAndIntercept(f)
    println(s"real:     intercept = $intercept, rate = $rate")
    println(s"guessed:  intercept = $estIntercept, rate = $estRate")
  }

  def generateLinearFn(intercept: Double, rate: Double): (Double => Double) = { x =>
    intercept + (rate * x)
  }


Now, generate some values for x and run them through a similar shaped function where we have guessed the values for α and β. Then, we wiggle α and β and compare the results of this ersatz funtion with the results from the real function. If a wiggle makes the results better, keep the adjusted values of α and β.

  def guessRateAndIntercept(f: Double => Double): (Double, Double) = {
    val nIterations = 100
    val stepSize    = 1d / nIterations

    var rate        = Random.nextDouble()
    var intercept   = Random.nextDouble()

    val xs          = (1 to 100).map(x => x.toDouble / 100)

    for (i <- 0 to nIterations) {
      intercept = intercept + bestStep(stepSize, delta => generateLinearFn(intercept + delta, rate), f, xs)
      rate      = rate      + bestStep(stepSize, delta => generateLinearFn(intercept, rate  + delta), f, xs)

      println(s"intercept = $intercept, rate = $rate")
    }

    (rate, intercept)
  }

  def bestStep(stepSize: Double, df: Double => Double => Double, actualFn: Double => Double, xs: Seq[Double]): Double = {
    val errorToStep = Seq(stepSize, 0d, -stepSize).map(step => df(step) -> step).map { case (guessFn, step) =>
      (errorOver(actualFn, guessFn, xs), step)
    }
    errorToStep.minBy(_._1)._2
  }

Where the errors are measured simply by the sum of squared values between the results from our approximate function and the real thing.

  def errorOver(f: Double => Double, guessFn: Double => Double, data: Seq[Double]): Double =
    sumOfSquaredErrors(data.map(guessFn), data.map(f))

  def sumOfSquaredErrors(y1s: Seq[Double], y2s: Seq[Double]): Double =
    y1s.zip(y2s).map{ case(x, y) =>
      val diff = x - y
      pow(diff, 2)
    }.sum


OK, nothing clever here. It's just a simple approximation technique that gives reasonable answers:

real:     intercept = 0.9830623503713442, rate = 0.8187143023851281
guessed:  intercept = 0.9598594405202521, rate = 0.8497631295034207

But it has some obvious flaws. This only handles functions with one independent variable (ie, it only deals with (Double) => Double. What about (Double, Double) => Double or (Double, Double, Double) => Double etc?)

So, how do the big boys do it in maths libraries? One cunning way can be found here in the Apache commons math library. It takes a bit of head-scratching to understand why it works but this is what I found.

First, lets expand on the comments in the JavaDoc. Say, we have a set of real data and a proposed linear function that can approximate that data. Let's say we have n samples of real data and our function has k independent variables.

Aside: a note on terms used here: 
"A simple way to describe variables is as either dependent, if they represent some outcome of a study, or independent, if they are presumed to influence the value of the dependent variable(s)... The dependent variable is also referred to as the 'Y variable' and the independent variables as the 'X variables'. Other names for the dependent variable include the outcome variable and the explained variable. Other names for the independent variables include regressors, predictor variables and explanatory variables." [1]
Right, so we have n equations that look something like this:

k
y = Σ xi bi + ε
i = 1

where y is the actual output for trial n, x is an input, b is the coefficient we're trying to find and ε is the error between our prediction and the actual value.

This can be expressed in matrix form as:

Y = (X' B') + E

where Y is a vector of all the n outputs, X' is the matrix of all the n inputs, B' is all k coefficients and E is a vector of errors.

By adding an extra row to B' and an extra column to X', we can subsume the E vector into these terms to make the equation look neater and easier to handle, thus:

Y = X B

Now comes the clever bit. A matrix can be represented as the product of two matrices in a process called QR decomposition where Q and R are the two matrices. The interesting thing about this is that R is an upper triangular matrix. Why this is important will become clear later.

QR Factorization

This trick simply states that "a matrix whose columns are v1, v2, ... vn can  be expressed as the product of two matrices: a matrix [Q] whose columns v*1, v*2, ... v*n  are mutually orthogonal and a triangular matrix [R]." [2]

The algorithm goes like this. Take the vectors v1, v2, ... vn one at a time. For each vector, find its components that are orthogonal to those that have come before. This is done by subtracting from it its projection on the space represented by the vectors that have already been processed and adding the result to the set. Full proofs can be found in [2] but the gist is: imagine shaving off each vector its projection on the space defined by the vectors before it.

This is a recursive algorithm that would give you:

v1, v2, ... vn  = σ1 v*1, σ2 ( (v*1) -  (v2 . v*1)), ... , σn (v*-  (v2 . v*1) - ... (vn . (v*-  (v2 . v*1) - ... (vn-1 . (v*1 - ...))

or, to express it more neatly in matrix form:

| v*11  v*12  v*13  ... | | σ11  σ12  σ13  ... |
| v*21  v*22  v*23  ... | | σ21  σ22  σ23  ... |
| v*31  v*32  v*33  ... | | σ31  σ32  σ33  ... |
...

where v*11, v*21, v*31, ... are the components of v*1 and σ11, σ21, σ31, ... are the components of σ1.

Now, since σ1 is only ever multiplied by v*1 (see the expansion of the recursive algorithm above) then all the elements σi1 must be 0 for i>1.

Similarly, since σ2 is only ever multiplied by v*1 and v*2 (again, see the above expansion) then σi1 must be 0 for i>2 and so on.

So the matrix for σ must be an upper triangular matrix and the matrix product above can better be written as:

| v*11  v*12  v*13  ... | | σ11  σ12  σ13  ... |
| v*21  v*22  v*23  ... | | σ22  σ23  ... |
| v*31  v*32  v*33  ... | | σ33  ... |
...

Note that the v*vectors can be chosen to have unit length of 1. This will come in handy later.

Back to linear regression

Knowing this, we can now take our equation

Y = X B

and note that X can be expressed as Q R

Y = Q R B

multiply both sides by Q-1 (at the beginning of each side as matrix multiplication is not commutative):

Q-1 Y = Q-1 Q R B

but note that Q-1 Q is multiplying every orthogonal column vector within it by all the column vectors. This means that the value will be 1 (they're unit vectors) when a column is multiplied by itself and 0 otherwise (from the definition of orthogonal vectors). So, the equation becomes:

Q-1 Y = R B = A

because Q-1 Q is the identity matrix (A is just short-hand for Q-1 Y).

Now, we can solve through back-substitution. Since R is an upper rectangular matrix, this equation expands to:

rn,n bn = an,n

rn-1,n-1 bn-1 + rn-1,n bn = an-1, n-1 + an-1, n 
.
.
.

Well, we know all the values of the A and R matrices (since they are derived from the outputs and the v*vectors) we can solve the first equation for bn. We can substitute this into the second equation and solve it for bn-1 and so on (this is back-substitution).

[1] Statistics in a Nutshell (O'Reilly)

[2] Coding the Matrix - Philip Klein






Saturday, April 23, 2016

Graphs as Matrices - part 2


As we saw in the last post, treating graphs as matrices gives us a lot of tools to analyse them without traversing them.

One handy tool is being able to detect cycles. If we take the adjacency matrix in the last post, we can calculate the number of paths from i to j and back to i by taking all the outgoing connections from i then AND them with all the incoming connection to i. Since an element in the adjacency matrix is 0 if and only if there is no connection, then multiplication acts like an AND operator.

All the outgoing connections for node i are represented by the i-th row of matrix (that is Ai,j) and all the incoming connections to node i are represented by the i-th column (that is Aj,i). Multiplying each outgoing value with the corresponding incoming and adding them all up (to give us the total number of paths) is:
n
number of cycles between i and j = Σ ai,j aj,i
j = 1

which if you look at the Wikipedia page for matrix multiplication, is just the i-th diagonal of the matrix A multiplied by itself.

If we wanted to find the number of cycles if we took 3 steps not 2, then the equation becomes:
nn
number of cycles between i and j via k = Σ Σ ai,j aj,k ak,i
j = 1k = 1

which is just the i-th diagonal of A3.

More generally, the non-diagonal values (Axi,j where i ≠ j) are also of interest. They show the number of paths from i to j after x steps.

Now, taking our matrix representing an acyclic matrix, let's keep multiply it by itself (in Python):

>>> B = re_arranged_rows_and_cols
>>> np.dot(B, B)
matrix([[0, 0, 1, 0, 2, 0, 0, 1, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 2, 1, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
>>> np.dot(B, np.dot(B, B))
matrix([[0, 0, 0, 0, 0, 0, 0, 0, 3, 1, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
>>> np.dot(B, np.dot(B, np.dot(B, B)))
matrix([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
>>> np.dot(B, np.dot(B, np.dot(B, np.dot(B, B))))
matrix([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
>>> np.dot(B, np.dot(B, np.dot(B, np.dot(B, np.dot(B, B)))))
matrix([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
>>> 

Ah. Because it's acyclic, there is a limit to how many steps we can take (4, apparently). It's not too surprising if you look at the graph and try to find the longest path possible by eye:

G = nx.from_numpy_matrix(A, create_using=nx.MultiDiGraph())
nx.draw(G, with_labels=True)
plt.show()



(The thick end of the connection indicates its direction like an arrow).

Now, let's introduce a loop from 2 to 0:

>>> B[2,0] = 1
>>> B
matrix([[0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 1, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])

so the graph now looks like:
We also note that the eigenvalues are no longer all 0 (as predicted by my previous post).

>>> eig(B)[0]
array([-0.5+0.8660254j, -0.5-0.8660254j,  1.0+0.j       ,  0.0+0.j       ,
        0.0+0.j       ,  0.0+0.j       ,  0.0+0.j       ,  0.0+0.j       ,
        0.0+0.j       ,  0.0+0.j       ,  0.0+0.j       ,  0.0+0.j       ,
        0.0+0.j       ])

The total number of cycles can be calculated by the trace of the matrix resulting from repeated multiplication:

>>> B.trace()
matrix([[0]])
>>> np.dot(B, B).trace()
matrix([[0]])
>>> np.dot(B, np.dot(B, B)).trace()
matrix([[3]])

There is one caveat. "Note that this expression counts separately loops consisting of the same vertices in the same order but with different starting points. Thus the loop 1 -> 2 -> 3 -> 1 is considered different from the loop 2 -> 3 -> 1 -> 2." (Networks - An Introduction, Newman).

Looking at the matrix tells you who is in a cycle (see the diagonals). For instance, this is the matrix after 3 hops looks like this:

>>> np.dot(B, np.dot(B, B))
matrix([[1, 0, 0, 0, 0, 0, 0, 0, 3, 1, 0, 0, 0],
        [0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 1, 0, 2, 0, 0, 1, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])

Note the value of A0,8 has the value of 3 and if you look at the picture above, you can see that there are indeed 3 different ways to get from 0 to 8 given 3 hops.

Friday, April 22, 2016

Graphs as Matrices - part 1


Matrices are typically represented as adjacency lists. But if they're represented as matrices, you get all the juicy goodness of linear algebra.

What does this mean? Well, for example, let's represent this directed graph as a matrix.

Digraph from Segewick's "Algorithms".
Note that it has no cycles, so we can represent it like this:

The same digraph.
We can represent this as an n x n matrix (A) where
Ai,j is 1 when i is linked to j and 0 otherwise. Since there are no self-loops, Ai,i= 0.

In Python, it would look like this:

import numpy as np
from numpy.linalg import eig 
from scipy.linalg import lu  
import networkx as nx  
import sympy

A = np.matrix(  # 0  1  2  3  4  5  6  7  8  9 10 11 12
              '0  1  0  0  0  1  1  0  0  0  0  0  0;'  # 0
              '0  0  0  0  0  0  0  0  0  0  0  0  0;'  # 1
              '1  0  0  1  0  0  0  0  0  0  0  0  0;'  # 2
              '0  0  0  0  0  1  0  0  0  0  0  0  0;'  # 3
              '0  0  0  0  0  0  0  0  0  0  0  0  0;'  # 4
              '0  0  0  0  1  0  0  0  0  0  0  0  0;'  # 5
              '0  0  0  0  1  0  0  0  0  1  0  0  0;'  # 6
              '0  0  0  0  0  0  1  0  0  0  0  0  0;'  # 7
              '0  0  0  0  0  0  0  1  0  0  0  0  0;'  # 8
              '0  0  0  0  0  0  0  0  0  0  1  1  1;'  # 9
              '0  0  0  0  0  0  0  0  0  0  0  0  0;'  # 10
              '0  0  0  0  0  0  0  0  0  0  0  0  1;'  # 11
              '0  0  0  0  0  0  0  0  0  0  0  0  0'   # 12
)

It's worth noting that the eigenvalues of this matrix are all 0:

print "eigenvalues = ", eig(A)[0] #  [ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]

This is always true of adjacency matrices that are acyclic. I'll explain why.

Firstly, if we re-label the nodes, we can change the order of the rows and columns such that it becomes an upper triangular matrix. Given an order that I calculated elsewhere:

newOrder = [2, 0, 1, 3, 5, 8, 7, 6, 4, 9, 10, 11, 12] 
re_arranged_cols = A[newOrder, :]
re_arranged_rows_and_cols = re_arranged_cols[:, newOrder]

we get a matrix that looks like this:

[[0 1 0 1 0 0 0 0 0 0 0 0 0]
 [0 0 1 0 1 0 0 1 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 1 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 1 0 0 0 0]
 [0 0 0 0 0 0 1 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 1 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 1 1 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 1 1 1]
 [0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 1]
 [0 0 0 0 0 0 0 0 0 0 0 0 0]]

which also has all zero eigenvalues:

print "eigenvalues for this new matrix: ", eig(re_arranged_rows_and_cols)[0]<-- all="" as="" expected="" font="" zeroes="">

Notice that with this new choice of basis, the matrix has all the connections (that is, the 1 values) above the diagonal and the diagonal values themselves are all zeros (since even with this re-ordering there are still no self-loops).

Triangular Matrices

Now, an interesting thing about upper triangular matrices is that their diagonals are their eigenvalues. The reason for this follows this recipe:

1. The elements in an upper triangular matrix, aij are necessarily 0 if i < j (where i and j are the row and column indexes). That is:

aij = 0 ∀ {i, j | i < j }

2. A formula for calculating the determinants is this one from Leibniz:

n
det(A) = Σ sgn(σ)Π aσ(i),i
σ ∈ S ni = 1


where


  • n is the order of the square matrix (2 if it's 2x2; 3 if it's 3x3 ...)
  • σ is a permutation of n integers in the permutation group Sn.
  • sgn(σ) is 1 or -1 depending on the order of σ. Ignore this. It will disappear.
  • and σ(i) is the i-th integer in σ

Take a look at the rightmost element of (2). It multiplies n elements of the matrix given by axi . But remember from (1) that the product is 0 is any i is less than x. If you think about it, there is only one permutation for which this is not true, namely [1, 2, 3, ... n]. So, for a triangular matrix,
n
det(A) =
Π ai,i

i = 1

that is, the determinant is the product of the diagonals.

Given that you calculate the eigenvalues from the characteristic equation:

det(A - λ I) = 0

(where I is the identity matrix) then the eigenvalues are given by:

(λ - a1,1) . (λ - a2,2) . (λ - a3,3) ... (λ - an,n)  = 0

The only way for this equation to hold for all eigenvalues, λ, is that each eigenvalue must equal its corresponding diagonal element. Since the diagonals of a triangular matrix representing an acyclic graph are all 0, all its eigenvalues are 0.

Conclusion

We've shown that just by looking at the adjacency matrix (and not tediously exploring the graph) we can tell whether the graph is cyclic or acyclic. This is done by calculating the eigenvalues which is simple given any decent maths library.

Friday, April 1, 2016

The Big Data Landscape


Having praised the speed of GraphX, it now seems I may have to eat my words. When I tried to use the method to find the Strongly Connected Components, it was incredibly slow. The method takes a parameter for how many iterations it is to run for. An increasing number found more SCCs but with ever-increasing times (22s, 62s, 194s, 554s, 1529s, ... for my admittedly naff laptop). An off-the-shelf library performed a perfect calculation in sub-second time.

It appears I'm not the only one to suffer:
"We conducted some tests for using Graphx to solve the connected-components problem and were disappointed.  On 8 nodes of 16GB each, we could not get above 100M edges.  On 8 nodes of 60GB each, we could not process 1bn edges.  RDD serialization would take excessive time and then we would get failures.  By contrast, we have a C++ algorithm that solves 1bn edges using memory+disk on a single 16GB node in about an hour.  I think that a very large cluster will do better, but we did not explore that."

Alternatives

So, over the Easter holidays, I had a quick look for what else is out there.

Giraph: "Apache Giraph is an iterative graph processing system built for high scalability. For example, it is currently used at Facebook to analyze the social graph formed by users and their connections."

It has an SCC calculator here.

Hama: "It provides not only pure BSP programming model but also vertex and neuron centric programming models, inspired by Google's Pregel and DistBelief."

Pregelix: "Pregelix is an open-source implementation of the bulk-synchronous vertex-oriented programming model (Pregel API) for large-scale graph analytics."

A comparison between their performance (and GraphX's) can be found here.

Incidentally, there is code to test GraphX's ability to find SCCs here and libraries for various graph analysis algorithms here (but sadly nothing that gives a fast SCC algorithm).


In-memory graph analysis libraries

My graph should (just about) fit into memory on a big, big box (more than 100gb of memory), so I looked at what was out there.

Jung: "is a software library that provides a common and extendible language for the modeling, analysis, and visualization of data that can be represented as a graph or network."

Signal/Collect: "The intuition behind the Signal/Collect programming model is that computations are executed on a graph, where the vertices are the computational units that interact by the means of signals that flow along the edges. This is similar to the actor programming model. All computations in the vertices are accomplished by collecting the incoming signals and then signaling the neighbors in the graph."

JGraphT: "JGraphT is a free Java graph library that provides mathematical graph-theory objects and algorithms."

Boost: This library is written in C++ but is open source and well established.


Parallelism?

The problem is that although my graph can just about fit into memory, processing it will be slow if the libraries are single-threaded. Only Boost offered a parallel algorithm for calculating SCCs.

My implementation using JGraphT was great for relatively small graphs but when the number of vertices were roughly one million, I was looking at about 10 minutes to calculate SCCs. When the number of vertices was 4 million, it took an hour.

"[T]here is currently no library for parallel graph algorithms in Java available" as of 2011 (see here). They are well established (see here) but it appears that nobody has written one for the JVM. If they did, they'd have to take into account the most memory-efficient Scala and Java data structures to represent the graph if there was a hope of fitting large ones into memory.

Monday, March 21, 2016

Little differences and Big Data


Typically, a small amount of code is executed against a large amount of data in Spark etc. Because of this, your code needs to be efficient. I've just come a code that was taking more time to read and parse the data than to actually do any computations on it!

Don't be lazy

The problem showed up when I looked at the threads of the cluster's executors (you don't need jstack to do this; you can do it from the Spark GUI). Most executor threads were BLOCKED on accessing a lazy val while only one thread was RUNNABLE while accessing it. Oops - lazy vals use synchronization under the covers (see here for why lazy can be expensive).

Not always obvious

Another innocuous looking piece of code might be this:

    private static final Random random = new java.util.Random();

    static long[] generateRandom(int num, long maxExclusive) {
        long    bitMask = randomBitMask(maxExclusive);
        int     index   = 0;
        long    count   = 0;
        long[]  results = new long[num];

        while (index < num) {
            long nextVal = count ^ bitMask;
            if (nextVal < maxExclusive) {
                results[index] = nextVal;
                index++;
            }
            count++;
        }
        return results;
    }

    private static long randomBitMask(long maxExclusive) {
        long seed = random.nextLong();
        return seed & createMask(maxExclusive);
    }

    private static long createMask(long highest) {
        long leftMost = Long.highestOneBit(highest);
        return (leftMost << 1) - 1;
    }


This is an attempt at implementing a non-repeating list of random numbers using a Linear Feedback Shift Register.

Basically, I wanted a list of (semi-) random numbers that don't repeat themselves. If you want only a few from a large set, you might just get a random one and add it to the list, re-trying if you've seen it before. This is fine but if you want lots of numbers out of a set only slightly larger  (that is when num is comparable to maxExclusive), you'll be doing a lot of retrying. This is where the code above comes in. Unfortunately, it is considerably slower than the "try again" approach when num is a good deal less than maxExclusive.

So, benchmarking with this in the pom.xml:

        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-core</artifactId>
            <version>${jmh.version}</version>
        </dependency>
        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-generator-annprocess</artifactId>
            <version>${jmh.version}</version>
        </dependency>


(where jmh.version is 1.11.3) and something in a src/test/java that looks something like this:

    public static final int LARGE = 100000000;

    /**
     Result "javaRandomNumbersBenchmark_LARGE":
     198.036 ±(99.9%) 37.176 ops/s [Average]
     (min, avg, max) = (123.962, 198.036, 290.256), stdev = 42.812
     CI (99.9%): [160.861, 235.212] (assumes normal distribution)

     Benchmark                                               Mode  Cnt    Score    Error  Units
     RandomStatsBenchmark.javaRandomNumbersBenchmark_LARGE  thrpt   20  197.703 ± 36.994  ops/s

     */
    @Benchmark
    @Fork(1)
    public void javaRandomNumbersBenchmark_LARGE() {
        JavaRandomStats.generateRandom(10, LARGE);
    }

    /**
     Result "simpleUniqueRandomNumbersBenchmark_LARGE":
     3103855.467 ±(99.9%) 24691.158 ops/s [Average]
     (min, avg, max) = (2845502.900, 3103855.467, 3277692.295), stdev = 104543.910
     CI (99.9%): [3079164.308, 3128546.625] (assumes normal distribution)
     */
    @Benchmark
    @Fork(1)
    public void simpleUniqueRandomNumbersBenchmark_LARGE() {
        uniqueRandomNumbersSmallNumLargeMax(10, LARGE);
    }

I got the results you see in the comments. I'm not currently sure why my attempt at optimisation is such a performance killer.

More contention

Spark's Broadcast variables are an efficient way to access read-only data without making network calls. However, in the documentation examples, you'll see code like this:

records mapPartitions { rowIter =>
  rowIter map { case (record) =>
    val x = broadcastX.value
    // ... do something with x
  }
}

But I noticed a lot of contention when reading the value deep down in TorrentBroadcast.readBroadcastBlock (version 1.3.1). Re-writing the code slightly to:

records mapPartitions { rowIter =>
  val x = broadcastX.value
  rowIter map { case (record) =>
    // ... do something with x
  }
}

meant the executor threads suffered far less contention as the broadcast variable is only obtained once. Doing this, another 10% or so of time was saved.