Saturday, January 30, 2016

Cloud Atlas

Note on setting up Spark on AWS

YMMV, but these are some things that were useful to me.

Set up your Amazon account per the instructions page. I put my key in:


I downloaded the latest Spark and ran this from the top level directory:

./spark-ec2 -k MyTestAwsIreland -i ~/.ssh/MyTestAwsIreland.id_rsa -s 4 --instance-type=c4.8xlarge --region=eu-west-1 --vpc-id=MY_VPC_ID --subnet-id=MY_SUBNET_ID --zone=eu-west-1a launch MY_CLUSTER_NAME

(your region and zone may be different. If you see errors, this page may help).

This starts a fairly beefy cluster with one master and 4 slaves using the same version of Spark as you downloaded. Unfortunately, there wasn't much disk space (I was getting "no space left on device" from Spark jobs) so follow the instructions here and if there are problems, this page will almost certainly solve them.

Also, I was having trouble connecting. You must set up a public DNS if you want your local Spark driver to talk to the cluster (see here for more information).

Running Hadoop was/is proving more complicated. I could not use port 9000 as it was being blocked for some reason (my local client socket was left in SYNC_SENT state indicating a firewall issue) so I changed it to 8020. This remains a puzzle.

Also, Spark clients initially talk to the NameNode but then start talking direct to the DataNodes. AWS instances have a public domain name/IP combo and a private domain name/IP combo. Unfortunately, the NameNode was sending the private IP address. This link forces it to send the domain name but at the time of writing, it's only the private domain name. Using the hostname command on the boxes has not solved the problem.

Anyway, it will also help to get rsync set up so you can develop on your laptop and then synch your code with something as simple as

rsync -avz -e "ssh  -i ~/.ssh/MyTestAwsIreland.id_rsa -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"  --exclude '.git' --exclude '.idea'  --progress LOCAL_PROJECT_DIRECTOR  root@YOUR_MACHINE_NAME:/REMOTE_PROJECT_DIRECTORY

Finally, a simple script to run on all your slave boxes during a job is:

while ( true ) ; do { jstat -gc `jps | grep CoarseGrainedExecutorBackend | awk '{print $1}'` | awk  '{print $8 "/" $7 " " $14}' 2>/dev/null ; uptime ; vmstat 1 2 ; echo ; sleep 10 ; } done

which monitors the JVM giving its old generation heap usage over its capacity, the time taken for full GCs plus the load average and various system stats (from vmstat).

Addendum: this seems to install a Spark instance with a really old Hadoop library (see SPARK_HOME/lib/spark-assembly-1.6.0-hadoop1.2.1.jar). I re-installed a better version and had it talking to Hadoop nicely but not before I wasted time trying to solve this issue.

Sunday, January 17, 2016

The Maths of LSA

At the heart of Latent Semantic Analysis is a mathematical pattern called Singular Value Decomposition (SVM). This is an appropriate technique when you have sparse data in many dimensions. It reduces it to smaller dimensions with (hopefully) little loss of accuracy.

First, a little mathematical refresher.

Eigenvalues and Eigenvectors

An eigenvector is a vector that when multiplied by a matrix points in the same direction. It's length may have changed by (a non-zero) factor called the eigenvalue.

Only square matrices can have eigenvalues and eigenvectors. Anything non-square will change the dimension of the vector so the result would never be a multiple of itself. SVM is a hack to work with non-square matrices.

What's more, not all matrices will have real valued eigen-vectors/values. Let's say the matrix is a rotation. No vector (apart from the 0-vector) will point in the same direction after all rotations. In Python, this is a 45-degree rotation.

import numpy as np

def no_complex_elements(vec):
    np.all(map(lambda arr: not np.iscomplex(arr.flatten), vec))

def print_eigenvectors_and_eigenvalues(matrix):
    eigenvalues, eigenvectors = np.linalg.eig(rotation)
    print "real eigenvalues \n", filter(lambda scalar: not isinstance(scalar, complex), eigenvalues)
    print "real eigenvectors\n", filter(lambda vector: no_complex_elements(vector), eigenvectors)

if __name__ == "__main__":
    rotation = np.matrix('0.707106781  -0.707106781  0;'
                         '0.707106781  0.707106781   0;'
                         '0            0             1')

and it output's an empty list.

The trolley-line-location problem

In "Coding the Matrix", Philip Klein compares SVD to finding the optimal path for a straight tram line to run through a city. I'll skip most the proofs (they're in the book) but the gist goes like this:

Imagine the tram line must go through the city centre (the origin) while getting as close as possible to fixed land marks located with vectors ai.

Make each ai a row in a vector A.

Now, each point ai can be expressed in terms of two vectors, one parallel to the tram line, ai||v, and one perpendicular to it, ai¬v (notice that we're choosing a new co-ordinate system) and that since
ai = ai¬v + ai||v
then, squaring:
||ai||2 = ||ai¬v||2 + ||ai||v ||2
and re-arranging:
||ai¬v||= ||ai||2 - ||ai||v ||2

This, of course, is our distance from the tram line squared. If we want to minimize this value for all landmarks, we can express the problem as finding the minimum of:


(||ai||2 - ||ai||v ||2)

which, when you again re-arrange, will give you:
||A||F2  - ||Av||2
where the F indicates the Frobenius Norm (nothing too scary, just a generalization of a vector multiplied by itself) and where the second term is just us cleaning up all the other terms:



where <aiv>is just the dot-product of ai and the unit vector, v which is of course ai||v.

Seems a lot of work. Don't worry, it's all been high-school maths so far.

Anyway, it's this second term that we want to maximize. As the first term is fixed, maximizing the second will find the overall minimum for the whole equation. We won't worry too much how we do this - there are numerical techniques for doing it - this post is just to give the flavour.

Having chosen a suitable unit vector to maximize the second term, we'll call it v1 to remind us that this optimizing of distances between landmarks and the tram line can be viewed as a 1-dimensional approximation to covering all the landmarks as best we can using just a straight line. Perhaps we want to place a tram-stop at the point closest to each landmark. In which case, we could build a set of vectors similar to A only this time each row is a vector to a tram stop. Let's call this matrix Ã. Furthermore, we call ||Av1|| the first singular value (or σ1) and v1 the first right singular vector.

Then, the error in our approximation is A - Ã and we also know that this represents the distance from the tram line to landmark in each row in resultant matrix. Well, we've already calculated this, so:
||A - Ã||F2  =  ||A||F2  - ||Av||2  = ||A||F2  σ12
Note that the positions of these tram stops can also be calculated as the projection of ai onto v1. So, each row of Ã can be expressed as:
<ai, v1v1
So, taking this equation and re-expressing
à = A vv1T  σuv1T 
where we define σu1 = A v1

This is starting to look like our familiar equation in SVD. So, it only remains to note that this "trolley-line" example is a specific case. We could expand it to more than a 2-D map.

We do this by finding the unit vector v2 that is orthogonal to v1 and maximises ||Av|| and again for v3v4, etc.

It doesn't take a great deal of effort to now show that σ1σ2σ3 etc can be turned into a diagonal matrix and that would give us the SVD equation.

It's also not too hard to see that the values of this diagonal matrix are in descending order since at each stage we found the maximum before moving on to the next step.

Thursday, December 31, 2015

Latent Semantic Analysis with Spark

I'm reading the excellent Advanced Analytics with Spark as I need to learn more about natural language processing (NLP).

The method in the book is called Latent Semantic Analysis. Basically, this is building a vector for each document that captures the relevant words and their significance. More specifically, all vectors have the same size and the values are roughly speaking the fraction of a given term in the document multiplied by the number of occurrences of this term across all documents (with some logs thrown in to dampen outliers). A given index in each vector represents the same term over all documents.

The code for the book can be retrieved with:

git clone

The first step is a process of lemmatization which removes stop words and aggregates related words. For this, an NLP library from Stanford University is used. The code looks like this:

    val lemmatized = plainText.mapPartitions(iter => {
      val pipeline = createNLPPipeline()

Interestingly, we "use mapPartitions so that we only initialize the NLP pipeline once per partition instead of once per document". [1]

(The variable lemmatized is an RDD of documents where the documents are just a Seq of Strings that have had the stop words removed and related words aggregated.)

By mapping over lemmatized, we make an RDD of Map[String, Int] that represent the term count per document and we call this docTermFreqs. We combine all these for each partition of documents and merge them all together at the end. "When the records being aggregated and the result object have the same type (eg, in sum), reduce is useful, but when the types differ, as they do here, aggregate  is a more powerful alternative" [1]

Unfortunately, this approach can lead to OutOfMemoryErrors. So, an alternative is to find the distribution of terms over the documents with this:

    val docFreqs = docTermFreqs.flatMap(_.keySet).map((_, 1)).reduceByKey(_ + _, 15)

(where 15 is our number of partitions). We also limit the number of terms:

where numTerms is arbitrary but defaults to 50 000. From this, we have enough information to calculate the Inverse Document Frequencies:{ case (term, count) => (term, math.log(numDocs.toDouble / count))}.toMap

The use of log dampens the effect of outliers.

In turn, with this we can map over all docTermFreqs creating a sparse vector of all the terms and their scores as we go.

The Maths

So much for the Scala, now for the mathematics. LSA depends on a trick called Singular Value Decomposition. This is a very general technique that is used in many diverse maths problems. A good demonstration of it (using Python) is available here. A simple(ish), instuitive explanation of it is here.

One use of SVD is to reduce the dimensions of the problem to something more manageable. One consequence of such a reduction is a loss of accuracy but if this loss is small, the approximation might be acceptable.

A consequence of SVD is that our matrix is broken down into three matrices, each with its own properties:

X = U D VT 

U is an N × p orthogonal matrix (UT U = Ip ) whose columns uj are called the left singular vectors;
V is a p × p orthogonal matrix (VT V = Ip ) with columns vj called the right singular vectors
D is a p × p diagonal matrix, with diagonal elements d1 ≥ d2 ≥ · · · ≥ dp ≥ 0 known as the singular values 
(taken from [2]). In English,

X is an (N x p) matrix that holds the observations. In general, N is the number of samples and p the number of features. For our purposes, N is the number of documents and p the number of terms. That is, the rows correspond to documents and the columns to terms. The values are:

(the calculation for a term from docFreqs seen earlier) x (frequency of this term in this document) / (total number of terms in this document).

V is a matrix "where each row corresponds to a term and each column corresponds to a concept. It defines a mapping between term space (the space where each point is an n-dimensional vector holding a weight for each term) and concept space (the space where each point is an n-dimensional vector holding a weight for each concept)."

U is a "matrix where each row corresponds to a document and each column corresponds to a concept. It defines a mapping between document space and concept space." [1]

There's a lot more to the mathematics and I'll post more soon but this is a good starting place to get more familiar with it.


Now, this is just a refresher on Spark's handling of matrices. Data handled as rows in Spark is easy. But if you want to do anything interesting with a matrix (multiply, take a transpose etc) this works less well. You need to deal with columns and the data in a column might be striped across many machines unlike rows.

At the moment (it's an experimental API), Spark has 4 implementations of DistributedMatrix each with slightly different abilities. The BlockMatrix implementation is currently the only one that allows one distributed matrix to be multiplied by another. As its name suggests, it uses a mathematical trick to better handle operations by breaking it into a block matrix.

Other implementations can be multiplied by a Matrix object (dense or sparse) but this appears to be local to the node on which the code runs.

[1] Advanced Analytics with Spark
[2] Elements of Statistical Learning (free download)

Thursday, December 10, 2015

The Magnet Pattern a variant on the Type Class Pattern where Scala copies a Haskell idiom. A good article is here. The idea is that there is just one method that takes a variety of objects with the same super-trait depending on what parameters were used in calling that method. This replaces having many overloaded methods.

[Just to refresh one's memory: the Type Class pattern is a method of ad hoc polymorphism but it differs from the Pimp My Library pattern. A good comparison of the two can be found here. Essentially, the Pimp My Library pattern appears to add methods to classes that were not there when they were first written. The key thing about the Type Class pattern is that it asks for a witness to evidence that something is possible for a certain type. Think about Scala's TraversableOnce.sum method that asks for evidence that the type it contains has an associated Numeric. In Pimp My Library, you're creating the functionality per type. In the Type Class pattern your providing the functionality for a more general function to do its work.]

What is apparent immediately is that you really need to know about implicit rules. Here are some notes but a really good reference lies here.

Let's take a fairly pointless class:

class AContainer

Using the Type Class Pattern, we can add a method to it so:

  class PimpMe {
    def pimpedMethodOnClass: String = "pimpedMethodOnClass"
  implicit def pimpedWithADefCreatingClass(aContainter: AContainer): PimpMe = new PimpMe

Now, we can call this method as if it were on the class itself:

    println(aContainer.pimpedMethodOnClass) // "pimpedWithADef"

More efficiently (see below) we could write:

  implicit class PimpMeEfficiently(val aContainer: AContainer) extends AnyVal {
    def pimpedMethodOnEfficientClass: String = "pimpedMethodOnEfficientClass"
    println(aContainer.pimpedMethodOnEfficientClass) // "pimpedMethodOnEfficientClass"

The reason for AnyVal is "properly-defined user value classes provide a way to improve performance on user-defined types by avoiding object allocation at runtime, and by replacing virtual method invocations with static method invocations" (from the ScalaDocs).

The Magnet Pattern Simplified
A simplified magnet pattern implementation is a single method. Let's use a slightly more interesting magnet class whose toString method yields a different value. In real life, each subclass would have a more interesting implementation:

  class Magnet(string: String) {
    override def toString() = string

And let's have a very trivial implementation of the method taking the magnet:

  def magnetMethod(magnet: Magnet): Unit = println(magnet)

Now, we can call this method with what look like different arguments, for example:

  implicit def fromStringAndInt(tuple: (String, Int)) = new Magnet(s"Implementation X: (String, Int) = (${tuple._1}, ${tuple._2})")
    magnetMethod("aString", 1)   // "Implementation X: (String, Int) = (aString, 1)" (thanks to fromStringAndInt)
    magnetMethod(("aString", 1)) // ditto

Note calling the magnet method with a tuple has the same effect as calling it with individual arguments.

Now, we could have a different implementation but let's be simple and just pretend there is different functionality. But we really do have a different call site:

  implicit def fromDouble(double: Double) = new Magnet(s"Implementation Y: double = $double")
    magnetMethod(1.1d) // "Implementation Y: double = 1.1" (thanks to fromDouble)

We're calling the same method with different arguments (cardinality as well as types)!

Monday, November 30, 2015

Machine Learning with Spark

Playing on my current pet project, I implemented a Support Vector Machine using Spark's Machine Learning library without fully understanding what it was doing and just following the documentation.

But the documentation is somewhat sparse and I got more understanding by going elsewhere, mainly this excellent book. Here I learned that SVMs are a classification algorithm, that is "a supervised learning method that predicts data into buckets". This contrasts with a regression algorithm which is "the supervised method that predicts numerical target values." Furthermore, it can be a non-parametric algorithm allowing it to handle non-linear data. "Parametric models makes assumptions about the structure of the data. Non-parametric models don’t." [1]

OK, that's some terminology defining the algorithms characteristics out of the way. But then I hit the Spark documentations that talks about the AUC (Area Under Curve) of the ROC (Receiver Operating Characteristics). Hmm.

The concepts are quite easy but I found a dearth of material on the web. So, here is what I gleaned.

First, we start with something called the "Confusion Matrix". This is fairly simple 2x2 matrix of true/false positive/negative rates.

For example, imagine a test for cancer. Some people will have it, some won't. Some people will be told they have it, some won't. Sometimes we tell people they have it when they don't. Sometimes we don't tell people they have it when they do. Oops.

The confusion matrix would look like this:

TruePatient is told he has cancer and indeed he does. Patient does not have cancer and is told so
False Patient is told he has cancer when he does not. Patient is not told he has cancer when he does

If the total number of patients is N, the number told they have cancer is Y and the number who actually have cancer is X, the rates look like this:

TrueX / Y (N - X) / (N - Y)
False (N - X) / Y X / (N - Y)

We can plug the real numbers in and get a matrix with 4 cells each between 0 and 1. Depending how high our threshold is for the cancer test, these numbers will jump around. So, if we varied our threshold between a minimum 0 and a maximum 1, we could plot the true positive against the false positive. This graphics should illustrate:

where the threshold is on the z-axis and the ROC is on the x-y plane.

Incidentally, the graph was plotted using everybody's favourite maths tool, R. The code was:


step <- 0.01
f <- function(v) v ^ 3
x <- f(seq(0,1,step))
y <- seq(0,1,step)
h <- function(x,y) y
z <- c(1:100) 

c = z
c = cut(c, breaks=64)
cols = rainbow(64)[as.numeric(c)]

pairwiseFlatten <- function(binded) {
  bound <- c()
  for (i in 1:(length(binded)/2)) bound = c(bound, binded[i,1], binded[i,2])
  return (bound)

plot3d(x, y, h(x,y), add=TRUE, col=cols)
plot3d(x, y, 0, add=TRUE)
segments3d(pairwiseFlatten(cbind(x,x)), y = pairwiseFlatten(cbind(y,y)), z = pairwiseFlatten(cbind(h(x,y),0)), add=TRUE)

decorate3d(xlab="false positive", ylab="true positive", zlab="threshold", main="ROC curve")

Anyway, we ideally want to maximize the number of true positives no matter what threshold value we have. That is, the curve should hug the top left part of the x-y plane. A curve that represented this maxima would have the greatest area in the x-y plane over all the curves. And this is where our AUC for the ROC comes in. The higher, the better our model.

[1] Real World Machine Learning - Manning.

Sunday, November 29, 2015

Algorithm Recipes

These are some miscellaneous heuristics for writing and choosing algorithms that I have found useful.

Writing an Algorithm

1. If you want to make an algorithm tail recursive, add an accumulator to the method signature.

2. When you're thinking about the signature of a recursive algorithm, start with an argument that represents the "works still to be done" and an accumulator. You may need more, but that's a good start.

When the motivation for an algorithm is to find the optimal answer, one approach might be to recurse through the solution space finding the maximum (or minimum) of any two recursive sub-solutions. A good example is here. This is a C implementation of the Longest Common Subsequence where all sequences are recursively explored but only the maximum of the two sub-problems is chosen at each recursion.

Choosing an Algorithm

If you're stuck, consider sorting the data first. This may (or may not) help you move forward. For example, in the Closest Pair algorithm (where you want to find the closest pair of points in 2-D without incurring O(n * m) cost) we first sort all the points in the X-axis to help us find the closest pair. They're unlikely to be the closest pair in the whole plane but it's a start.

If the performance is O(n2), consider expressing the problem in terms of divide-and-conquer.

You cannot do better than O(n log(n)) in a comparison based sort. If you need to meet a requirement that at first blush appears to rely on a sorted array of elements but needs to be better than O(n log(n)) then consider a Bucket Sort.

[The example Bucket Sort question tries to find the largest gap between a sequence of integers while avoiding an O(n log(n)) sort. Cunningly, it creates a series of "buckets" that cover the whole range but with each bucket holding a range slightly less than the mean difference. Thus, no 2 integers that differ by the mean or more can fall into the same bucket. You need only then to see what is the max and min in all the buckets, which can be done in  O(n)]

You may be able to do better than an n log(n) sort if you know something about the data and don't need to compare the elements. For example, if you have X elements that you know are in the range of 1 to Y and you know there are no duplicates (so necessarily X < Y), you can instantiate an array of length Y and scan through the X elements putting them in their natural position in the Y-length array. At the end of this scan, scan through the Y-length array discarding the nulls. The result is the ordered elements. This was all done in time O(X + Y).

Tuesday, November 24, 2015

Spark's sortByKey doesn't

... or at least not when you map. Sure, if you collect you get the elements in a sorted order. But let's say you want to process the elements in a given order, say, finding the difference from one to the next. You might naively write:

    val pairRdd   = sparkContext.parallelize((1 to 10), 3).map(x => (x, x))
    val sortedRdd = pairRdd.sortByKey()

  def bogusMap(sortedRdd: RDD[(Int, Int)]): Unit = {
    var last = 0
    def checkMonotonicKeys(kv: (Int, Int)): Int = {
      val key = kv._1
      if (key != last + 1) throw new IllegalStateException(s"key = $key, last = $last")
      last = key
    val mappedAndSorted =
    mappedAndSorted.collect().foreach { kv =>

But you'll see an exception thrown something like:

java.lang.IllegalStateException: key = 8, last = 0

The reason is that the keys are sorted within each partition not across all partitions.

One "solution" is to ensure that all the elements are within one partition such as:

    val sortedInto1Partition = pairRdd.sortByKey(numPartitions = 1)

This works but there is little point to using Spark for it since there is no parallelization. The best solution is to generate the differences when the data was incoming.

Incidentally, this article has a good description of what is happening during a sortByKey operation. Basically, each shuffle has two sides. The first "writes out data to local disk" and the second makes "remote requests to fetch that data... The job of the [first] side of the shuffle is to write out records in such a way that all records headed for the same [second] task are grouped next to each other for easy fetching." Note that the second task that groups data is not obligated to also order it within a group.

As another aside, note the importance of persisting an RDD in this use case.

"Failure to persist an RDD after it has been transformed with partitionBy() will cause subsequent uses of the RDD to repeat the partitioning of the data. Without persistence, use of the partitioned RDD will cause reevaluation of the RDDs complete lineage. That would negate the advantage of partitionBy(), resulting in repeated partitioning and shuffling of data across the network, similar to what occurs without any specified partitioner.

"In fact, many other Spark operations automatically result in an RDD with known partitioning information, and many operations other than join() will take advantage of this information. For example, sortByKey() and groupByKey() will result in range-partitioned and hash-partitioned RDDs, respectively. On the other hand, operations like map() cause the new RDD to forget the parent’s partitioning information, because such operations could theoretically modify the key of each record." [1]

The code above that forces all the data into one partition (using numPartitions = 1) seems immune to map forgetting the the parent RDD's partitioning information. Since there is only one partition, there is no information to forget.

[1] Learning Spark - Karau and Konwinski