Monday, November 30, 2015

Machine Learning with Spark


Playing on my current pet project, I implemented a Support Vector Machine using Spark's Machine Learning library without fully understanding what it was doing and just following the documentation.

But the documentation is somewhat sparse and I got more understanding by going elsewhere, mainly this excellent book. Here I learned that SVMs are a classification algorithm, that is "a supervised learning method that predicts data into buckets". This contrasts with a regression algorithm which is "the supervised method that predicts numerical target values." Furthermore, it can be a non-parametric algorithm allowing it to handle non-linear data. "Parametric models makes assumptions about the structure of the data. Non-parametric models don’t." [1]

OK, that's some terminology defining the algorithms characteristics out of the way. But then I hit the Spark documentations that talks about the AUC (Area Under Curve) of the ROC (Receiver Operating Characteristics). Hmm.

The concepts are quite easy but I found a dearth of material on the web [addendum: since writing this, I found this excellent post]. So, here is what I gleaned.

First, we start with something called the "Confusion Matrix". This is fairly simple 2x2 matrix of true/false positive/negative rates.

For example, imagine a test for cancer. Some people will have it, some won't. Some people will be told they have it, some won't. Sometimes we tell people they have it when they don't. Sometimes we don't tell people they have it when they do. Oops.

The confusion matrix would look like this:

PositiveNegative
TruePatient is told he has cancer and indeed he does. Patient does not have cancer and is told so
False Patient is told he has cancer when he does not. Patient is not told he has cancer when he does


If the total number of patients is N, the number told they have cancer is Y and the number who actually have cancer is X, the rates look like this:

PositiveNegative
TrueX / Y (N - X) / (N - Y)
False (N - X) / Y X / (N - Y)

We can plug the real numbers in and get a matrix with 4 cells each between 0 and 1. Depending how high our threshold is for the cancer test, these numbers will jump around. So, if we varied our threshold between a minimum 0 and a maximum 1, we could plot the true positive against the false positive. This graphics should illustrate:


where the threshold is on the z-axis and the ROC is on the x-y plane.

Incidentally, the graph was plotted using everybody's favourite maths tool, R. The code was:

require("rgl")

dev.off()

step <- 0.01
f <- function(v) v ^ 3
x <- f(seq(0,1,step))
y <- seq(0,1,step)
h <- function(x,y) y
z <- c(1:100) 

c = z
c = cut(c, breaks=64)
cols = rainbow(64)[as.numeric(c)]

pairwiseFlatten <- function(binded) {
  bound <- c()
  for (i in 1:(length(binded)/2)) bound = c(bound, binded[i,1], binded[i,2])
  return (bound)
}

plot3d(x, y, h(x,y), add=TRUE, col=cols)
plot3d(x, y, 0, add=TRUE)
segments3d(pairwiseFlatten(cbind(x,x)), y = pairwiseFlatten(cbind(y,y)), z = pairwiseFlatten(cbind(h(x,y),0)), add=TRUE)

decorate3d(xlab="false positive", ylab="true positive", zlab="threshold", main="ROC curve")

Anyway, we ideally want to maximize the number of true positives no matter what threshold value we have. That is, the curve should hug the top left part of the x-y plane. A curve that represented this maxima would have the greatest area in the x-y plane over all the curves. And this is where our AUC for the ROC comes in. The higher, the better our model.

[1] Real World Machine Learning - Manning.

Sunday, November 29, 2015

Algorithm Recipes


These are some miscellaneous heuristics for writing and choosing algorithms that I have found useful.

Writing an Algorithm

1. If you want to make an algorithm tail recursive, add an accumulator to the method signature.

2. When you're thinking about the signature of a recursive algorithm, start with an argument that represents the "works still to be done" and an accumulator. You may need more, but that's a good start.

When the motivation for an algorithm is to find the optimal answer, one approach might be to recurse through the solution space finding the maximum (or minimum) of any two recursive sub-solutions. A good example is here (with a good Wikipedia explanation here). This is a C implementation of the Longest Common Subsequence where all sequences are recursively explored but only the maximum of the two sub-problems is chosen at each recursion.

Choosing an Algorithm

If you're stuck, consider sorting the data first. This may (or may not) help you move forward. For example, in the Closest Pair algorithm (where you want to find the closest pair of points in 2-D without incurring O(n * m) cost) we first sort all the points in the X-axis to help us find the closest pair. They're unlikely to be the closest pair in the whole plane but it's a start.

If the performance is O(n2), consider expressing the problem in terms of divide-and-conquer.

You cannot do better than O(n log(n)) in a comparison based sort. If you need to meet a requirement that at first blush appears to rely on a sorted array of elements but needs to be better than O(n log(n)) then consider a Bucket Sort.

[The example Bucket Sort question tries to find the largest gap between a sequence of integers while avoiding an O(n log(n)) sort. Cunningly, it creates a series of "buckets" that cover the whole range but with each bucket holding a range slightly less than the mean difference. Thus, no 2 integers that differ by the mean or more can fall into the same bucket. You need only then to see what is the max and min in all the buckets, which can be done in  O(n)]

You may be able to do better than an n log(n) sort if you know something about the data and don't need to compare the elements. For example, if you have X elements that you know are in the range of 1 to Y and you know there are no duplicates (so necessarily X < Y), you can instantiate an array of length Y and scan through the X elements putting them in their natural position in the Y-length array. At the end of this scan, scan through the Y-length array discarding the nulls. The result is the ordered elements. This was all done in time O(X + Y).

Tuesday, November 24, 2015

Spark's sortByKey doesn't


... or at least not when you map. Sure, if you collect you get the elements in a sorted order. But let's say you want to process the elements in a given order, say, finding the difference from one to the next. You might naively write:

    val pairRdd   = sparkContext.parallelize((1 to 10), 3).map(x => (x, x))
    val sortedRdd = pairRdd.sortByKey()

    bogusMap(sortedRdd)
.
.
.
  def bogusMap(sortedRdd: RDD[(Int, Int)]): Unit = {
    var last = 0
    def checkMonotonicKeys(kv: (Int, Int)): Int = {
      val key = kv._1
      if (key != last + 1) throw new IllegalStateException(s"key = $key, last = $last")
      last = key
      key
    }
    val mappedAndSorted = sortedRdd.map(checkMonotonicKeys(_))
    mappedAndSorted.collect().foreach { kv =>
      println(kv)
    }
  }

But you'll see an exception thrown something like:

java.lang.IllegalStateException: key = 8, last = 0

The reason is that the keys are sorted within each partition not across all partitions.

One "solution" is to ensure that all the elements are within one partition such as:

    val sortedInto1Partition = pairRdd.sortByKey(numPartitions = 1)
    bogusMap(sortedInto1Partition)

This works but there is little point to using Spark for it since there is no parallelization. The best solution is to generate the differences when the data was incoming.

Incidentally, this article has a good description of what is happening during a sortByKey operation. Basically, each shuffle has two sides. The first "writes out data to local disk" and the second makes "remote requests to fetch that data... The job of the [first] side of the shuffle is to write out records in such a way that all records headed for the same [second] task are grouped next to each other for easy fetching." Note that the second task that groups data is not obligated to also order it within a group.

As another aside, note the importance of persisting an RDD in this use case.

"Failure to persist an RDD after it has been transformed with partitionBy() will cause subsequent uses of the RDD to repeat the partitioning of the data. Without persistence, use of the partitioned RDD will cause reevaluation of the RDDs complete lineage. That would negate the advantage of partitionBy(), resulting in repeated partitioning and shuffling of data across the network, similar to what occurs without any specified partitioner.

"In fact, many other Spark operations automatically result in an RDD with known partitioning information, and many operations other than join() will take advantage of this information. For example, sortByKey() and groupByKey() will result in range-partitioned and hash-partitioned RDDs, respectively. On the other hand, operations like map() cause the new RDD to forget the parent’s partitioning information, because such operations could theoretically modify the key of each record." [1]

The code above that forces all the data into one partition (using numPartitions = 1) seems immune to map forgetting the the parent RDD's partitioning information. Since there is only one partition, there is no information to forget.

[1] Learning Spark - Karau and Konwinski


Wednesday, November 18, 2015

Interpreting Spark


Spark has a nice web interface that allows you to find problems in the jobs you submit to it. Here are some notes I made on using it. First, let's clarify some terminology:

Task, Stage, Job

"Invoking an action inside a Spark application triggers the launch of a Spark job to fulfill it...  The execution plan consists of assembling the job’s transformations into stages. A stage corresponds to a collection of tasks [map, flatMap etc] that all execute the same code, each on a different subset of the data. Each stage contains a sequence of transformations that can be completed without shuffling the full data.

"At each stage boundary, data is written to disk by tasks in the parent stages and then fetched over the network by tasks in the child stage. Because they incur heavy disk and network I/O, stage boundaries can be expensive and should be avoided when possible." [1]

Narrow and Wide Transformation

"For the RDDs returned by so-called narrow transformations like map and filter, the records required to compute the records in a single partition reside in a single partition in the parent RDD...  Spark also supports transformations with wide dependencies such as groupByKey and reduceByKey. In these dependencies, the data required to compute the records in a single partition may reside in many partitions of the parent RDD" [1]

And this is what can induce a shuffle. Imagine a diagram with time running down the Y-axis. Now imagine this diagram having vertical columns representing a partition at various points in time with horizontal slices representing RDDs in which these partitions live, stacked such that parents are (naturally) above children. Now, if we draw arrows from the data points in one RDD/Partition combo to the data on which it relies, we'd hope those arrows remain in the same vertical partition stream. If they cross streams, a shuffle ensues.

The Web GUI

You may see Jobs that have Skipped Stages. This is nothing to worry about. From the code:

    This may be an underestimate because the job start event references all of the result
    stages' transitive stage dependencies, but some of these stages might be skipped if their
    output is available from earlier runs.
    See https://github.com/apache/spark/pull/3009 for a more extensive discussion.

When looking at my own application's performance (open source finance toy & tool found here on GitHub), a DAG (directed, acyclic graphs)  may look like this:




My application takes end-of-day stock prices of two different companies and runs a Pearson correlation on them.

In the picture above we see the details of Job 1. Stages 4,5 and 6 are skipped as they were already loaded by Job 0 (which is not shown here).

Stages 7 and 8 are more interesting. I join the two data sets (keyed on date) and map over both of them to get just the stock price (discarding the date). Note that this is done on both stock data series in parallel.

From then on, all work is done in Spark's MLLib library. First, the zip and map is done in Correlation.computeCorrelationWithMatrixImpl. Then, in calculating the Pearson Correlation from the data, it calls RDD.treeAggregate twice.

This method takes an identity and two functions. The first function handles how the data is aggregated within a partition. The second then handles the totals of this function over all partitions. Since this latter function requires "crossing the streams", a stage finishes and a new one begins (Stage 8 which happens to be another call to treeAggregate).

This visualization will also tell you when one of the RDDs is cached - it will be denoted by a green highlighted dot [2] although we do not see this on the picture above.

[1] Cloudera blog.

[2] Databricks blog.