Saturday, April 23, 2016

Graphs as Matrices - part 2


As we saw in the last post, treating graphs as matrices gives us a lot of tools to analyse them without traversing them.

One handy tool is being able to detect cycles. If we take the adjacency matrix in the last post, we can calculate the number of paths from i to j and back to i by taking all the outgoing connections from i then AND them with all the incoming connection to i. Since an element in the adjacency matrix is 0 if and only if there is no connection, then multiplication acts like an AND operator.

All the outgoing connections for node i are represented by the i-th row of matrix (that is Ai,j) and all the incoming connections to node i are represented by the i-th column (that is Aj,i). Multiplying each outgoing value with the corresponding incoming and adding them all up (to give us the total number of paths) is:
n
number of cycles between i and j = Σ ai,j aj,i
j = 1

which if you look at the Wikipedia page for matrix multiplication, is just the i-th diagonal of the matrix A multiplied by itself.

If we wanted to find the number of cycles if we took 3 steps not 2, then the equation becomes:
nn
number of cycles between i and j via k = Σ Σ ai,j aj,k ak,i
j = 1k = 1

which is just the i-th diagonal of A3.

More generally, the non-diagonal values (Axi,j where i ≠ j) are also of interest. They show the number of paths from i to j after x steps.

Now, taking our matrix representing an acyclic matrix, let's keep multiply it by itself (in Python):

>>> B = re_arranged_rows_and_cols
>>> np.dot(B, B)
matrix([[0, 0, 1, 0, 2, 0, 0, 1, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 2, 1, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
>>> np.dot(B, np.dot(B, B))
matrix([[0, 0, 0, 0, 0, 0, 0, 0, 3, 1, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
>>> np.dot(B, np.dot(B, np.dot(B, B)))
matrix([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
>>> np.dot(B, np.dot(B, np.dot(B, np.dot(B, B))))
matrix([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
>>> np.dot(B, np.dot(B, np.dot(B, np.dot(B, np.dot(B, B)))))
matrix([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
>>> 

Ah. Because it's acyclic, there is a limit to how many steps we can take (4, apparently). It's not too surprising if you look at the graph and try to find the longest path possible by eye:

G = nx.from_numpy_matrix(A, create_using=nx.MultiDiGraph())
nx.draw(G, with_labels=True)
plt.show()



(The thick end of the connection indicates its direction like an arrow).

Now, let's introduce a loop from 2 to 0:

>>> B[2,0] = 1
>>> B
matrix([[0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 1, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])

so the graph now looks like:
We also note that the eigenvalues are no longer all 0 (as predicted by my previous post).

>>> eig(B)[0]
array([-0.5+0.8660254j, -0.5-0.8660254j,  1.0+0.j       ,  0.0+0.j       ,
        0.0+0.j       ,  0.0+0.j       ,  0.0+0.j       ,  0.0+0.j       ,
        0.0+0.j       ,  0.0+0.j       ,  0.0+0.j       ,  0.0+0.j       ,
        0.0+0.j       ])

The total number of cycles can be calculated by the trace of the matrix resulting from repeated multiplication:

>>> B.trace()
matrix([[0]])
>>> np.dot(B, B).trace()
matrix([[0]])
>>> np.dot(B, np.dot(B, B)).trace()
matrix([[3]])

There is one caveat. "Note that this expression counts separately loops consisting of the same vertices in the same order but with different starting points. Thus the loop 1 -> 2 -> 3 -> 1 is considered different from the loop 2 -> 3 -> 1 -> 2." (Networks - An Introduction, Newman).

Looking at the matrix tells you who is in a cycle (see the diagonals). For instance, this is the matrix after 3 hops looks like this:

>>> np.dot(B, np.dot(B, B))
matrix([[1, 0, 0, 0, 0, 0, 0, 0, 3, 1, 0, 0, 0],
        [0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 1, 0, 2, 0, 0, 1, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])

Note the value of A0,8 has the value of 3 and if you look at the picture above, you can see that there are indeed 3 different ways to get from 0 to 8 given 3 hops.

Friday, April 22, 2016

Graphs as Matrices - part 1


Matrices are typically represented as adjacency lists. But if they're represented as matrices, you get all the juicy goodness of linear algebra.

What does this mean? Well, for example, let's represent this directed graph as a matrix.

Digraph from Segewick's "Algorithms".
Note that it has no cycles, so we can represent it like this:

The same digraph.
We can represent this as an n x n matrix (A) where
Ai,j is 1 when i is linked to j and 0 otherwise. Since there are no self-loops, Ai,i= 0.

In Python, it would look like this:

import numpy as np
from numpy.linalg import eig 
from scipy.linalg import lu  
import networkx as nx  
import sympy

A = np.matrix(  # 0  1  2  3  4  5  6  7  8  9 10 11 12
              '0  1  0  0  0  1  1  0  0  0  0  0  0;'  # 0
              '0  0  0  0  0  0  0  0  0  0  0  0  0;'  # 1
              '1  0  0  1  0  0  0  0  0  0  0  0  0;'  # 2
              '0  0  0  0  0  1  0  0  0  0  0  0  0;'  # 3
              '0  0  0  0  0  0  0  0  0  0  0  0  0;'  # 4
              '0  0  0  0  1  0  0  0  0  0  0  0  0;'  # 5
              '0  0  0  0  1  0  0  0  0  1  0  0  0;'  # 6
              '0  0  0  0  0  0  1  0  0  0  0  0  0;'  # 7
              '0  0  0  0  0  0  0  1  0  0  0  0  0;'  # 8
              '0  0  0  0  0  0  0  0  0  0  1  1  1;'  # 9
              '0  0  0  0  0  0  0  0  0  0  0  0  0;'  # 10
              '0  0  0  0  0  0  0  0  0  0  0  0  1;'  # 11
              '0  0  0  0  0  0  0  0  0  0  0  0  0'   # 12
)

It's worth noting that the eigenvalues of this matrix are all 0:

print "eigenvalues = ", eig(A)[0] #  [ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]

This is always true of adjacency matrices that are acyclic. I'll explain why.

Firstly, if we re-label the nodes, we can change the order of the rows and columns such that it becomes an upper triangular matrix. Given an order that I calculated elsewhere:

newOrder = [2, 0, 1, 3, 5, 8, 7, 6, 4, 9, 10, 11, 12] 
re_arranged_cols = A[newOrder, :]
re_arranged_rows_and_cols = re_arranged_cols[:, newOrder]

we get a matrix that looks like this:

[[0 1 0 1 0 0 0 0 0 0 0 0 0]
 [0 0 1 0 1 0 0 1 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 1 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 1 0 0 0 0]
 [0 0 0 0 0 0 1 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 1 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 1 1 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 1 1 1]
 [0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 1]
 [0 0 0 0 0 0 0 0 0 0 0 0 0]]

which also has all zero eigenvalues:

print "eigenvalues for this new matrix: ", eig(re_arranged_rows_and_cols)[0]<-- all="" as="" expected="" font="" zeroes="">

Notice that with this new choice of basis, the matrix has all the connections (that is, the 1 values) above the diagonal and the diagonal values themselves are all zeros (since even with this re-ordering there are still no self-loops).

Triangular Matrices

Now, an interesting thing about upper triangular matrices is that their diagonals are their eigenvalues. The reason for this follows this recipe:

1. The elements in an upper triangular matrix, aij are necessarily 0 if i < j (where i and j are the row and column indexes). That is:

aij = 0 ∀ {i, j | i < j }

2. A formula for calculating the determinants is this one from Leibniz:

n
det(A) = Σ sgn(σ)Π aσ(i),i
σ ∈ S ni = 1


where


  • n is the order of the square matrix (2 if it's 2x2; 3 if it's 3x3 ...)
  • σ is a permutation of n integers in the permutation group Sn.
  • sgn(σ) is 1 or -1 depending on the order of σ. Ignore this. It will disappear.
  • and σ(i) is the i-th integer in σ

Take a look at the rightmost element of (2). It multiplies n elements of the matrix given by axi . But remember from (1) that the product is 0 is any i is less than x. If you think about it, there is only one permutation for which this is not true, namely [1, 2, 3, ... n]. So, for a triangular matrix,
n
det(A) =
Π ai,i

i = 1

that is, the determinant is the product of the diagonals.

Given that you calculate the eigenvalues from the characteristic equation:

det(A - λ I) = 0

(where I is the identity matrix) then the eigenvalues are given by:

(λ - a1,1) . (λ - a2,2) . (λ - a3,3) ... (λ - an,n)  = 0

The only way for this equation to hold for all eigenvalues, λ, is that each eigenvalue must equal its corresponding diagonal element. Since the diagonals of a triangular matrix representing an acyclic graph are all 0, all its eigenvalues are 0.

Conclusion

We've shown that just by looking at the adjacency matrix (and not tediously exploring the graph) we can tell whether the graph is cyclic or acyclic. This is done by calculating the eigenvalues which is simple given any decent maths library.

Friday, April 1, 2016

The Big Data Landscape


Having praised the speed of GraphX, it now seems I may have to eat my words. When I tried to use the method to find the Strongly Connected Components, it was incredibly slow. The method takes a parameter for how many iterations it is to run for. An increasing number found more SCCs but with ever-increasing times (22s, 62s, 194s, 554s, 1529s, ... for my admittedly naff laptop). An off-the-shelf library performed a perfect calculation in sub-second time.

It appears I'm not the only one to suffer:
"We conducted some tests for using Graphx to solve the connected-components problem and were disappointed.  On 8 nodes of 16GB each, we could not get above 100M edges.  On 8 nodes of 60GB each, we could not process 1bn edges.  RDD serialization would take excessive time and then we would get failures.  By contrast, we have a C++ algorithm that solves 1bn edges using memory+disk on a single 16GB node in about an hour.  I think that a very large cluster will do better, but we did not explore that."

Alternatives

So, over the Easter holidays, I had a quick look for what else is out there.

Giraph: "Apache Giraph is an iterative graph processing system built for high scalability. For example, it is currently used at Facebook to analyze the social graph formed by users and their connections."

It has an SCC calculator here.

Hama: "It provides not only pure BSP programming model but also vertex and neuron centric programming models, inspired by Google's Pregel and DistBelief."

Pregelix: "Pregelix is an open-source implementation of the bulk-synchronous vertex-oriented programming model (Pregel API) for large-scale graph analytics."

A comparison between their performance (and GraphX's) can be found here.

Incidentally, there is code to test GraphX's ability to find SCCs here and libraries for various graph analysis algorithms here (but sadly nothing that gives a fast SCC algorithm).


In-memory graph analysis libraries

My graph should (just about) fit into memory on a big, big box (more than 100gb of memory), so I looked at what was out there.

Jung: "is a software library that provides a common and extendible language for the modeling, analysis, and visualization of data that can be represented as a graph or network."

Signal/Collect: "The intuition behind the Signal/Collect programming model is that computations are executed on a graph, where the vertices are the computational units that interact by the means of signals that flow along the edges. This is similar to the actor programming model. All computations in the vertices are accomplished by collecting the incoming signals and then signaling the neighbors in the graph."

JGraphT: "JGraphT is a free Java graph library that provides mathematical graph-theory objects and algorithms."

Boost: This library is written in C++ but is open source and well established.


Parallelism?

The problem is that although my graph can just about fit into memory, processing it will be slow if the libraries are single-threaded. Only Boost offered a parallel algorithm for calculating SCCs.

My implementation using JGraphT was great for relatively small graphs but when the number of vertices were roughly one million, I was looking at about 10 minutes to calculate SCCs. When the number of vertices was 4 million, it took an hour.

"[T]here is currently no library for parallel graph algorithms in Java available" as of 2011 (see here). They are well established (see here) but it appears that nobody has written one for the JVM. If they did, they'd have to take into account the most memory-efficient Scala and Java data structures to represent the graph if there was a hope of fitting large ones into memory.

Monday, March 21, 2016

Little differences and Big Data


Typically, a small amount of code is executed against a large amount of data in Spark etc. Because of this, your code needs to be efficient. I've just come a code that was taking more time to read and parse the data than to actually do any computations on it!

Don't be lazy

The problem showed up when I looked at the threads of the cluster's executors (you don't need jstack to do this; you can do it from the Spark GUI). Most executor threads were BLOCKED on accessing a lazy val while only one thread was RUNNABLE while accessing it. Oops - lazy vals use synchronization under the covers (see here for why lazy can be expensive).

Not always obvious

Another innocuous looking piece of code might be this:

    private static final Random random = new java.util.Random();

    static long[] generateRandom(int num, long maxExclusive) {
        long    bitMask = randomBitMask(maxExclusive);
        int     index   = 0;
        long    count   = 0;
        long[]  results = new long[num];

        while (index < num) {
            long nextVal = count ^ bitMask;
            if (nextVal < maxExclusive) {
                results[index] = nextVal;
                index++;
            }
            count++;
        }
        return results;
    }

    private static long randomBitMask(long maxExclusive) {
        long seed = random.nextLong();
        return seed & createMask(maxExclusive);
    }

    private static long createMask(long highest) {
        long leftMost = Long.highestOneBit(highest);
        return (leftMost << 1) - 1;
    }


This is an attempt at implementing a non-repeating list of random numbers using a Linear Feedback Shift Register.

Basically, I wanted a list of (semi-) random numbers that don't repeat themselves. If you want only a few from a large set, you might just get a random one and add it to the list, re-trying if you've seen it before. This is fine but if you want lots of numbers out of a set only slightly larger  (that is when num is comparable to maxExclusive), you'll be doing a lot of retrying. This is where the code above comes in. Unfortunately, it is considerably slower than the "try again" approach when num is a good deal less than maxExclusive.

So, benchmarking with this in the pom.xml:

        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-core</artifactId>
            <version>${jmh.version}</version>
        </dependency>
        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-generator-annprocess</artifactId>
            <version>${jmh.version}</version>
        </dependency>


(where jmh.version is 1.11.3) and something in a src/test/java that looks something like this:

    public static final int LARGE = 100000000;

    /**
     Result "javaRandomNumbersBenchmark_LARGE":
     198.036 ±(99.9%) 37.176 ops/s [Average]
     (min, avg, max) = (123.962, 198.036, 290.256), stdev = 42.812
     CI (99.9%): [160.861, 235.212] (assumes normal distribution)

     Benchmark                                               Mode  Cnt    Score    Error  Units
     RandomStatsBenchmark.javaRandomNumbersBenchmark_LARGE  thrpt   20  197.703 ± 36.994  ops/s

     */
    @Benchmark
    @Fork(1)
    public void javaRandomNumbersBenchmark_LARGE() {
        JavaRandomStats.generateRandom(10, LARGE);
    }

    /**
     Result "simpleUniqueRandomNumbersBenchmark_LARGE":
     3103855.467 ±(99.9%) 24691.158 ops/s [Average]
     (min, avg, max) = (2845502.900, 3103855.467, 3277692.295), stdev = 104543.910
     CI (99.9%): [3079164.308, 3128546.625] (assumes normal distribution)
     */
    @Benchmark
    @Fork(1)
    public void simpleUniqueRandomNumbersBenchmark_LARGE() {
        uniqueRandomNumbersSmallNumLargeMax(10, LARGE);
    }

I got the results you see in the comments. I'm not currently sure why my attempt at optimisation is such a performance killer.

More contention

Spark's Broadcast variables are an efficient way to access read-only data without making network calls. However, in the documentation examples, you'll see code like this:

records mapPartitions { rowIter =>
  rowIter map { case (record) =>
    val x = broadcastX.value
    // ... do something with x
  }
}

But I noticed a lot of contention when reading the value deep down in TorrentBroadcast.readBroadcastBlock (version 1.3.1). Re-writing the code slightly to:

records mapPartitions { rowIter =>
  val x = broadcastX.value
  rowIter map { case (record) =>
    // ... do something with x
  }
}

meant the executor threads suffered far less contention as the broadcast variable is only obtained once. Doing this, another 10% or so of time was saved.


Sunday, March 13, 2016

GraphX and the Pregel algorithm


GraphX is a Spark module that allows me to find (typically) small sub-graphs in a huge collection of vertices. I've only got a prototype running but I can attest that it is indeed very fast.

GraphX does this by using the Pregel Algorithm. This is named after the river Pregel on which the Seven Bridges of Konigsberg problem was based. The problem asks: can you cross all seven bridges in the city of Konigsberg once and only once? You can't and the proof was the start of Graph Theory.

GraphX

GraphX's representation of graphs has both an RDD of edges as well as vertices.

The VertexRDD is just an RDD of tuples of a vertex ID and an attribute.

An EdgeRDD is just an RDD of Edges where an edge is just a triple of source, destination and attribute.

What these vertex and edge attributes are is totally arbitrary.

Pregel

The Pregel algorithm was inspired by a model that looks a lot like how a multi-core processor works, with each core doing its computations then communicating with the others by the bus.

(This particular abstract model of a computer is called Bulk Synchronous Parallel which in turn is like Parallel Random Access Machine, a shared-memory abstract machine.)

In our analogy, each processor core is represented by a Spark partition and each message on the bus is delivered by a Spark join between RDDs.

I thought the simplest explanation was the second at this link by Shagun Sodhani. Basically, we repeatedly perform iterations over all vertices (a superstep) that executes a user-defined function on each vertex. This function can send and receive messages to other vertices and/or mutate its own vertex.

Spark's Implementation

Spark modifies this slightly. "Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over edges, enables the message sending computation to read both vertex attributes, and constrains messages to the graph structure." (from the docs)

You invoke all this magic with a simple call to connectedComponents() on the graph. As its name suggests, this will return all the connected components.

(This is not to be confused with strongly connected components where all component members can reach each other but not necessarily other members of the same directed graph. For  this problem you might use the linear-time Kosaraju's algorithm).

Pre-computation

Before Spark calls its Pregel implementation it must do four things.

First, it points all vertices to themselves. That is, each vertex's attribute is itself - that is a Long.

    val ccGraph = graph.mapVertices { case (vid, _) => vid }

Secondly, it defines the function to run on all vertices upon receiving a message. In our case, that's simply picking the minimum value of the vertex's attribute and the incoming message. This is the user-defined vertex program that the Pregel definition mentions.

Then it defines the function to be executed at each edge. The output on which vertex has the lower attribute. If it's the source vertex attribute, then the output is a mapping from the destination vertex ID to source vertex's attribute. If it's the destination vertex attribute, then the output it's a mapping of the source vertex ID to the destination's attribute. These are the Pregel messages that will be sent to different nodes. The key is the receiving node's ID and the value is the message payload. We call this function the map function.

Finally, it defines a function that given two vertex attributes will pick the minimum. We call this the reduce function.

Computing

The Pregel implementation repeatedly runs over the graph running the given map function on each edge generating messages that will then be reduced using the given reduce function. For our use case, this boils down to mapping each edge to a [node ID, attribute]-tuple and then reducing all the messages for each recipient node to a message that has the minimum attribute.

We join these messages with the vertices for whom they're destined (as we would join any RDDs) running our user-defined vertex program on the result. Now, all our ducks are lined up for another iteration generating yet more new messages.

Only when we have no more messages do we halt the iterations.


Saturday, February 27, 2016

Locality Sensitive Hashing in Tweets


Spark's DIMSUM algorithm might have a limit to how much data it can process but the code for it came from Twitter who process gazillions of tweets everyday. How do you apply it to huge amounts of data?

Well, one way is to break the data down into smaller chunks and one way to do that without losing its meaningfulness is to use locality sensitive hashing. The best (but rather verbose) description I've found of LSH is here, a chapter from a free book on data mining.

(Note: Ullman uses the term "characteristic matrix" in the document differently from how perhaps a mathematician would use it. A characteristic matrix in maths is λ I - A where A is an n x n matrix, I is the n x n identity matrix and λ is an eigenvalue. Solving the characteristic equation, |λ I - A| = 0, gives you at most n real roots and allows you to calculate the eigenvectors, v, by substituting the discovered eigenvectors into the homogeneous system of equations, (λ I - A) v = 0 ).

To summarise, the key points follow.

Jaccard Similarity

First, we define something called the Jaccard similarity between two sets. This is simply:

s  =  number of shared elements between the sets
number of total elements in the sets

Next, we convert the two sets into a matrix where the columns represent the set and the rows represent the elements in the set. The cell value where they meet simply represents whether that term is in that set (a value of 1) or not (a value of 0).

Minhashing

Next, we define a function that gives us the row of the first element in a given column that have value 1 (going from top to bottom). We call this minhashing. Typically, we've reordered the rows of the original matrix randomly.

The Minhash and Jaccard relationship

The chances that two columns have the same minhash result is actually the Jaccard similarity. The proof of this follows.

Given two columns, say they both have 1 in x rows; only one of them has a non-zero value in y columns; and we ignore the rest. If we pick any non-zero row at random then the probability that both sets have 1 is unsurprisingly:

 x 
x + y

which is actually exactly the same as the Jaccard similarity, s.

Minhash Signatures

Now, let's make another matrix with a column per set (as before) but this time each row represents one of n random functions. The function simply takes the row index and returns a deterministic value. The cell values are all set to positive infinity but that will change.

For each function, we use the index of the row in the original matrix to generate a result. For each set in the original matrix in that row that has a non-zero value, we place the function's value in the corresponding cell in our new matrix if it less than what is already there.

If there are m sets, this results in an n x m matrix that is our (much reduced) signature.

Banding

Now let's break our signature matrix into b bands of r rows each.

The probability that all r rows within a band have the same value is sr.

The probability that at least one row is not the same is (1 - sr).

The probability that all b bands have at least one row that is not the same is (1 - sr)b.

Therefore, the probability that at least one of the b bands has all r rows the same is 1 - (1 - sr)b.

Which is the probability that we have a pair of sets that may be similar.

The nice things about it is that this describes an S-shaped curve no matter what the values for s, r and b. This is good as the probability that either a point on it is clearly probable or clearly improbable is maximized.

For b = 10, the probabilities for r = [2, 4, 6, 8] look like this:



Distance functions

There are many types of distance measurements. Euclidean distance is the one we're taught most in school but there are others. The only rule is that for any distance function, d, that gives the distance between x and y, the following must hold:
d(x, y) >= 0 
d(x, y) = 0 iff x == y 
d(x, y) = d(y, x) [they're symmetric] 
d(x, y) <= d(x, z) + d(z, y)  [the triangle inequality]
Nothing too clever there. They're just common sense if you were navigating around town. The thing to bear in mind is that they apply to more than just the 3D space in which we live.

Family of functions

We define a family of functions as a collection of functions that say "yes, make x and y a candidate
pair,” or "no, do not make x and y a candidate pair unless some other function concludes we should do so.” Minhashing is one such family.

Furthermore, we define the term (d1 , d2 , p1 , p2)-sensitive for such families to describe the curve they generate. We define
d1 < d2 
p1 > p2
where d1 and d2 are the results of a given distance function d(x, y); p1 and p2 are minimum probabilities that two functions agree when d(x, y) < d1 and the maximum probability they agree when d(x, y) > d2 respectively.

This gives us a nice short-hand way to describe a complicated graph.

Amplifying the effect

It is desirable to maximize the steepness of the S-curve to reduce the number of false negatives and false positives. We can do this by applying a second family of functions to the first. We could demand all the bands agree for 2 points (x, y), that is an AND operator. Or we could demand at least one of the bands agree, that is an OR operator. Each will change the curve in a different way depending on what we want.

This is analogous to the banding (see above) and the maths for this is the same only this time instead of s, we're dealing with p, the probability two functions give the same answer for a row.

The downside to this is we increase the demand on our CPUs. In large datasets, a small increase in CPU is magnified proportional to the amount of data we have to run the algorithm over so it's worth bearing this in mind.

Code

I stole the Python code from here to generate the graphs and slightly modified it to below:

from mpl_toolkits.mplot3d import Axes3D
from matplotlib.collections import PolyCollection
from matplotlib.colors import colorConverter
import matplotlib.pyplot as plt
import numpy as np

# from http://matplotlib.org/mpl_toolkits/mplot3d/tutorial.html#line-plots

fig = plt.figure()
ax = fig.gca(projection='3d')


def cc(arg):
    return colorConverter.to_rgba(arg, alpha=0.6)


def p(s, r):
    b = 10
    return 1 - ((1 - s**r) ** b)


xs = np.arange(0, 1, 0.04)
verts = []
zs = [2.0, 4.0, 6.0, 8.0]
for z in zs:
    ys = map(lambda x: p(x, z), xs)
    ys[0], ys[-1] = 0, 0
    verts.append(list(zip(xs, ys)))

poly = PolyCollection(verts, facecolors=[cc('r'), cc('g'), cc('b'),
                                         cc('y')])
poly.set_alpha(0.7)
ax.add_collection3d(poly, zs=zs, zdir='y')

ax.set_xlabel('s')
ax.set_xlim3d(0, 1)
ax.set_ylabel('r')
ax.set_ylim3d(min(zs), max(zs))
ax.set_zlabel('p')
ax.set_zlim3d(0, 1)

plt.show()


Monday, February 15, 2016

A Case for Scala


Here's three interesting things I came across this week with Scala's case idiom.

Backticks

The first is this snippet:

    val b = "b"
    "a" match {
      case b => println("'a' matches b. WTF?")
      case _ => println("no match")
    }

which prints the first line ("... WTF?"). It's a simple gotcha. The b in the case shadows the outside b that equals the string "b". You need to use backticks to make Scala use the outer b (see here) as in:

    "a" match {
      case `b` => println("'a' matches b. WTF?")
      case _   => println("no match") // <-- printed
    }

Regex

The case keyword can also be used in conjunction with regular expressions, for example:

    val hello = "hello.*".r
    val goodbye = "goodbye.*".r

    "goodbye, cruel world" match {
      case hello()   => println("g'day!")
      case goodbye() => println("thanks for all the fish") // this is printed
      case _         => println("no match")
    }

(see here for further information)

Implicits

The third case for the case keyword is no case at all like this code from Elastic4S where the Type Class Pattern is employed. In a nutshell, there is an implementation in the implicit ether that can fulfil our request. In the Elastic4S (1.5.1) code, it looks like this:

  def execute[T, R](t: T)(implicit executable: Executable[T, R]): Future[R] = executable(client, t)

where the executable in question is any Executable that matches the type of our argument of type T. It's a little harder to read (where in the ether is this Executable?) but is much more extensible.