Thursday, August 20, 2015

Spark Shuffle


... is something you should avoid. This is where data is moved over the network between nodes. This can be very expensive on a large data set. If the data is evenly distributed but a query asks for it to be grouped according to an arbitrary criteria, much movement will ensue. Therefore, groupByKey should be avoided if possible as the ScalaDocs indicate this can be an expensive operation.

However, reduceByKey is much more efficient as it will "perform the merging locally on each mapper before sending results to a reducer" resulting in a much smaller transfer of data between nodes.

So, when we were writing a risk engine with Spark holding all our user-defined Record objects, our code would look something like:

def totalByKey(data: RDD[Record]): RDD[(String, Double)] = {
  data.map(record => record.key -> record.value).reduceByKey(_ + _)
}

which would first turn our Records into key/value pairs then add up all the values for each key without transferring large amounts of data over the network. It avoids this by adding all the values for Records on each node before adding all these subtotals. Only the addition of subtotals requires a network call.

Interestingly, Spark can then do an outer join like this:

val subtotalForDate1 = totalByKey(dataForDate1)
val subtotalForDate2 = totalByKey(dataForDate2)
val outerJoing       = subtotalForDate1.fullOuterJoin(subtotalForDate2)

which returns an RDD of tuples containing the key, the left and the right value. Using this join, we can compare our risk profile from one date to the next.

Note that both reduceByKey and fullOuterJoin don't actually live in the RDD class but the PairRDDFunctions class. They appear to be part of the RDD class by the magic of implicits when the RDD in question contains tuples of pairs.

Further Reading

Nice article on memory usage.

Tuesday, August 18, 2015

Try Monads


Martin Odersky in the Reactive Programming course mentions that Try types are not true monads as they violate the Left Unit rule. But if you try (no pun intended) it, Try does indeed appear to obey the rules for monads.

Just to recap, here are the monad rules expressed in Scala:

  def testMonadicPropertiesOfTry[T, U](f: T => Try[U], g: U => Try[U], m: Try[T], x: T, unit: T => Try[T]): Boolean = {

    // Associativity: (m flatMap f) flatMap g == m flatMap (x => f(x) flatMap g)
    def associativity: Boolean = {
      val associativityLhs = (m flatMap f) flatMap g
      val associativityRhs = m flatMap (x => f(x) flatMap g)
      assertEqual(associativityLhs, associativityRhs)
    }
    val associativityResult = Try(associativity)

    // Left unit: unit(x) flatMap f == f(x)
    def leftUnit: Boolean = {
      val leftUnitLhs = unit(x) flatMap f
      val leftUnitRhs = f(x) 
      assertEqual(leftUnitLhs, leftUnitRhs)
    }
    val leftUnitResult = Try(leftUnit)

    // Right unit: m flatMap unit == m
    def rightUnit: Boolean = {
      val rightUnitLhs = m flatMap unit
      assertEqual(rightUnitLhs, m)
    }
    val rightUnitResult = Try(rightUnit)

    (associativityResult, leftUnitResult, rightUnitResult) match {
      case (Success(_), Success(_), Success(_)) => true
      case _ => false
    }
  }

Where my assertEqual method looks like:

  def assertEqual[T](try1: Try[T], try2: Try[T]): Boolean = {
    try1 match {
      case Success(v1) => try2 match {
        case Success(v2) => v1 == v2
        case _ => false
      }
      case Failure(x1) => try2 match {
        case Failure(x2) => x1.toString == x2.toString
        case _ => false
      }
    }
  }

That is, it will compare Failures by looking at the text of their messages. This is because Java exceptions don't have an equals() method.

Now, if we run the code below (borrowed liberally from here) where we're deliberately trying to cause a java.lang.NumberFormatException as the code attempts to convert our 'a' into a numeric:

    def factory[T](x: => T): Try[T] = Try(x)

    def unit[T](x: T): Try[T]   = factory(x)
    def f(x: String): Try[Int]  = factory(x.toInt)
    def g(x: Int): Try[Int]     = factory(x + 1)
    val x                       = "a"
    val m                       = factory(x)

    val isMonadic = testMonadicPropertiesOfTry(f, g, m, x, unit[String])
    println("is monadic? " + isMonadic)

the output says it's true, Try[T] obeys the rules for a monad. What gives?

Mauricio Linhares says: "there is some debate as to if Try[U] is a full monad or not. The problem is that if you think unit(x) is Success(x), then exceptions would be raised when you try to execute the left unit law since flatMap will correctly wrap an exception but the f(x) might not be able to do it. Still, if you assume that the correct unit is Try.apply then this would not be an issue."

So, let's take the first line of the last code snippet and make it thus:

    def factory[T](x: => T): Try[T] = Success(x)

whereupon we are told that Success is not a monad at all. Further investigation reveals that in the leftUnit method:

      val leftUnitLhs = unit(x) flatMap f

works fine but:

      val leftUnitRhs = f(x) 

blows up. The left hand side does not equal the right.

The reason for this is that Success.flatMap catches any NonFatal exceptions just like the constructor of Try. But the constructor of Success does not. And it's this asymmetry that means Try acts like a monad and Success does not.

Further reading

An interesting debate about monads and exceptions here.

Monday, August 10, 2015

The Mathematics of Linear Regression


There's an excellent and free book online called The Elements of Statistical Learning that can be found here. I was browsing through it but hit one of those points where a derivation misses some steps and confuses the reader. It took me a few hours to work it out and it's not too hard but just in case somebody it's the same problem, here is a step-by-step derivation.

The section was on Linear Models and Least Squares (2.3.1) that says:
Ŷ β̂0   p 

j = 1

Xβ̂ 


Where β̂is a constant called the bias, β̂ is a vector of coefficients, X is a vector of inputs and Ŷ is the prediction.

To make things more mathematically convenient, let's subsume β̂0 and the β̂ vector of coefficients into one big vector such that the above equation becomes:

Ŷ = XTβ̂

This is exactly the same equation in matrix notation rather than summation notation (XT is simply the transpose of X).

So, the question becomes what's the optimal value of β̂ ?

Let's re-express this thus: say that we have N trials each with a result yi (this y has nothing to do with Ŷ). All these results can be put in a vector, y. Let's say the matrix X has N rows each representing p data points that are properties of the system (that is, it's an N x p matrix).

Then the error can be expressed as:

y - X ß

(that is, the output values minus all of the input values that are multiplied by our coefficients). This is what we want to minimize.

How do we do this? Well an obvious way is to find the smallest sum of squares of the differences. So, let's square it:

(y - X ß)T (y - X ß)

then expand it noting that matrices are distributive, associative but not commutative and:

(A + B)T = AT + BT              (1)

and

(AB)T = BT AT                   (2)

so:

= (yT - ßT XT ) (y - X ß)
= yTy - yT X ß ßT XT y + ßT XT X ß

and differentiate it with respect to ß. Where the result of this differentation equals 0 is our optimal value for ß.

The first term is easy to differentiate. There is no dependence of y on ß so it disappears and our equation becomes.

 δ (- yT X ß ßT XT y + ßT XT X ß) = 0     (3)
δß

The next bit is tricky. To help, let's take a look at this useful summary on matrix calculus. The interesting sections for us are Proposition 7 (equations 36 and 37) and Proposition 9 (equation 49). They say:

If:

α = yT A x

then:

δαyT A              (4)
δx

and:

δα = xT AT             (5)
δy

where A is independent of and y.

And if:

α = xT A x

then

δα = 2 xT A            (6)
δx

where A is symmetric.

Now, take the first term of (3). That's basically equation (4) if we replace A for X and x for ß.

Take the second term of (3). That's basically equation (5) if we replace A for XT and y for ß and x for y.

Finally, take the third term of (3) and that looks like (6) if we replace A for XX and x for ß. How do we know that XX is symmetric? Well any matrix multiplied by its transpose is symmetric (see here for the simple proof).

So, taking these three observations, equation (3) becomes:

0 = - yT - yT X + 2 ßXX
  ßX- 2 yT X 
  = (ßXT - yTX 

Now take the transpose of both sides (the transpose of 0 is 0) and using (1) and (2):

0 = XT (ßXT - yT)T
  = XT ((ßXT)T- (yT)T)
  = XT (ß y)
  = XT ß XT y

Rearranging:

XT ß = XT y

and multiplying both sides with (XT X)-1 gives:

ß = (XT X)-1 XT y

which is equation 2.6 in The Elements of Statistical Learning

Wednesday, August 5, 2015

K-Means Clustering


K-Means is a clustering algorithm that tells you were the centres are in a set of data points. It "has the major disadvantage that you must pick k [the number of centres] in advance. On the plus side, it’s easy to implement". A simple implementation is just a few lines of Scala although it can be used on a much larger scale in MLLib/Spark.

First, let's create some data - just rows of x,y such that when plotted in R it looks like this:

data <- read.csv("/tmp/point.csv", header=F)
matplot(data[,1],data[,2] ,type="p", pch=3, xlab="x", ylab="y")

It's clear where to the human eye that there are 2 clusters here and where their centres are, although it would be much harder just given the raw data.

The snippet of Scala code to calculate K-Means looks something like this:

.
.
case class Distance(point: Point2D, distance: Double)
case class Point2D(x: Double, y: Double)
.
.
  implicit val pointOrdering = new Ordering[Distance] {
    override def compare(x: Distance, y: Distance): Int = (x.distance - y.distance).toInt
  }

  def iterate(centres: Seq[Point2D], others: Seq[Point2D]): Seq[Point2D] = {
    val clusterPairs = centres.map(centre => (centre, Seq[Point2D]()))
    val clusters     = Map(clusterPairs: _*) 
    val assigned     = assign(others, centres, clusters)
    assigned.map(centrePoint => update(centrePoint._2 :+ centrePoint._1)).toSeq.to[collection.immutable.Seq]
  }
  
  def update(points: Seq[Point2D]): Point2D = {
    val distances = points.map(point => sumOfDistances(point, points))
    distances.min.point
  }
  
  def sumOfDistances(point: Point2D, others: Seq[Point2D]): Distance = {
    val distances = others.map(other => distanceFn(other, point))
    Distance(point, distances.sum)
  }
  
  def distancesFrom(centre: Point2D, others: Seq[Point2D]): Seq[Distance] = 
    others.map(point => Distance(point, distanceFn(centre, point)))

Where we call iterate an arbitrary number of times until we're happy we have the centres. The iterate method is initially called with k randomly chosen points that will act as our initial guesses at where the centres are. With each iteration, out guess will be refined.

For each iteration, we do two things. First we assign each point to one of our k estimated centres. Then, within each grouping, we find the point that is the most central. These points will be the basis for our next iteration.

We define the central points within a grouping as those that have the minimum value of adding up all the distances to the other points.

The interesting thing is that the resulting cluster points may change somewhat between runs as the initial choice of points may make a difference. (that is, it's unstable) Therefore, it might be necessary to run it a few times and see the most common outcome.

We can verify the results in R with:

> kmeans(data, 2) # 2 being the number of clusters we're expecting
K-means clustering with 2 clusters of sizes 33, 33

Cluster means:
  V1 V2
1 30 30
2 10 10

Which is the same result as my code when run with 10 iterations and taking the most popular results from 10 runs.


Monday, July 20, 2015

Shapeless


There's a very interesting Scala library called Shapeless that let's you do some weird things with types. One such "what magic is this?!" moment is asserting a checksum derived from a list of numbers is a certain value at compile time rather than run time.

This impressed me no end, even if I can't think of a practical use for it. I'm still trying to get my head around Travis Brown's excellent tutorial since he is so much more clever than me. But here are some notes I made along the way.

First, we borrow a bit from the Peano Axioms where every natural number is a successor to the previous except 0 which has no successor. We model this much like Shapeless does with something like:

  trait NaturalNumber

  trait NaturalNumber0 extends NaturalNumber

  trait NaturalNumberSucceeds[T <: NaturalNumber] extends NaturalNumber

Next we'll have a type representing each and every number. I'll only bother with the natural numbers 0 to 4 to make things simple:

    type Type0 = NaturalNumber0
    type Type1 = NaturalNumberSucceeds[Type0]
    type Type2 = NaturalNumberSucceeds[Type1]
    type Type3 = NaturalNumberSucceeds[Type2]
    type Type4 = NaturalNumberSucceeds[Type3]

This is just shorthand. Really, Type4 is of type:

NaturalNumberSucceeds[NaturalNumberSucceeds[NaturalNumberSucceeds[NaturalNumberSucceeds[NaturalNumber0]]]]

But that's far too verbose.

So much for the types, let's have some instances:

    implicit val _0 = new Type0 {}
    implicit val _1 = new Type1 {}
    implicit val _2 = new Type2 {}
    implicit val _3 = new Type3 {}
    implicit val _4 = new Type4 {}

Now comes our first function that makes assertions at compile time. It asserts that T is the immediate successor to U and looks like this:

  def immediateSuccessor[T <: NaturalNumberSucceeds[U], U <: NaturalNumber]: Unit = {}

And we can see it immediately working:

    immediateSuccessor[Type3, Type2] // compiles
//  immediateSuccessor[Type3, Type1] // doesn't compile because Type3 is not the immediate successor to Type1

A more useful examples tells us what is the successor without us having to assert:

    implicit def add1[A <: NaturalNumberSucceeds[_]](implicit b: NaturalNumberSucceeds[A]): NaturalNumberSucceeds[A] = b
    val result3add1: Type4 = add1[Type3]

Well, we assert insofar as we define the type of result3add1 but we didn't have to.

Subtraction proved a little more difficult for me to understand. Playing around with some Shapeless code I got this:

    trait NaturalPair[A <: NaturalNumber, B <: NaturalNumber]

    implicit def sub[A <: NaturalNumber, B <: NaturalNumber](implicit e: NaturalPair[NaturalNumberSucceeds[A], NaturalNumberSucceeds[B]])
      = new NaturalPair[A, B] {}

So, A is the number we want to start with and then we reduce it by B. This implicit function will then recurse until one or the other reaches 0.

    implicit val analyseThis = new NaturalPair[Type3, Type1] {}
    val result3sub1          = implicitly[NaturalPair[Type2, Type0]]
//  val result3sub1Wrong     = implicitly[NaturalPair[Type1, Type0]] // doesn't compile because 3 - 1 != 1

Here we send into the implicit ether a request to take a 3 and subtract a 1. This is picked up by our implicit sub function that decrements both 3 and 1 until one or both reach 0. We then assert that a (2,0) pair is out there in the ether and indeed it is. Our code compiles.

The example Brown uses is much more complicated than this but I thought this would be a more accessible introduction.

Thursday, July 9, 2015

Setting up a little Spark/HDFS cluster


Setting up Spark is not too hard but there are few environment issues you might need to know about. For this, I'm assuming you're using some Linux distro.

The environment

First, set up SSH keys on your boxes so you don't have to type your password all the time. This is a good read. I also use rdist to keep everything in synch. Just be careful about copying the data directory over...

A simple Spark cluster

Having installed the various binaries, the documentation covers most of what you need to know. I have two computers, nuc and phillsdell. Somewhat arbitrarily, on nuc, I run the master with:

export SPARK_MASTER_IP=[nuc's IP address]
export SPARK_LOCAL_IP=[nuc's IP address]
$SPARK_INSTALLATION/sbin/start-master.sh

and a slave with

$SPARK_INSTALLATION/sbin/start-slave.sh spark://[nuc's IP address]:7077

On phillsdell, I run:

$SPARK_INSTALLATION/sbin/start-slave.sh spark://[nuc's IP address]:7077

for a slave and:

$SPARK_INSTALLATION//bin/spark-shell --master  spark://[nuc's IP address]:7077

for the Spark shell.

If you see lots of errors like this:

15/07/06 08:40:42 INFO Worker: Connecting to master akka.tcp://sparkMaster@nuc:7077/user/Master...
15/07/06 08:40:53 INFO Worker: Retrying connection to master (attempt # 1)

you'll need to configure your firewall to accept all the node chatter. If you're using Ubuntu, this is a good resource.

Anyway, to check everything is OK, let's run:

scala> val myRdd = sc.parallelize(1 to 1000000)
myRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at :21

scala> val myRddWKeys = myRdd.map(x => (x % 1000, x))
myRddWKeys: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at map at :23

scala> val myRddReduced = myRddWKeys reduceByKey (_ + _)
myRddReduced: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[6] at reduceByKey at :25

scala> myRddReduced foreach (key => println(s"key = ${key._1} value = ${key._2}") )

15/07/06 10:19:43 INFO SparkContext: Starting job: foreach at :28
.
.

This creates an RDD with a million numbers, divides them into one thousand groups and reduces the numbers in each group by adding them all up. Finally, for each group, we print out the reduced number. The results can be seen at http://phillsdell:8081 and http://nuc:8081 where there are 500 rows printed to stdout on each slave.

HDFS

Now, let's add Hadoop to the mix by starting it on our boxes.

If you see something in the namenode log like:

2015-07-03 12:17:31,300 ERROR org.apache.hadoop.hdfs.server.namenode.NameNode: Failed to start namenode.
org.apache.hadoop.hdfs.server.common.InconsistentFSStateException: Directory /tmp/hadoop-henryp/dfs/name is in an inconsistent state: storage directory does not exist or is not accessible.

then don't forget to run:

hadoop namenode -format

(more information here). I also needed to add this:

  <property>
    <name>fs.default.name</name>
    <value>hdfs://[phillsdell's IP address]</value>
  </property>
  <property>
    <name>dfs.data.dir</name>
    <value>file:///home/henryp/Tools/Cluster/Hadoop/tmp/dfs/name/data</value>
    <final>true</final>
  </property>
  <property>
    <name>dfs.name.dir</name>
    <value>file:///home/henryp/Tools/Cluster/Hadoop/tmp/dfs/name</value>
    <final>true</final>
  </property>

to my ./etc/hadoop/core-site.xml file.

Now, I can go to:

http://phillsdell:50070/dfshealth.html#tab-datanode

and see both data nodes in my cluster :)

Pulling it all together

I download historical data of the FTSE 100 from here. Using this helpful list of commands, I executed:

hadoop-2.7.0/sbin$ ../bin/hadoop fs -mkdir /ftse100
hadoop-2.7.0/sbin$ ../bin/hadoop fs -put ~/Downloads/table.csv /ftse100

and noticed in the web admin page that the file has been copied onto HDFS. Now, in the Spark shell:

scala> val csvRDD = sc.textFile("hdfs://192.168.1.15:8020/ftse100/table.csv")
.
.

scala> csvRDD.count
.
.
res3: Long = 8211

Which is not coincidentally the line count of the original file:

$ wc -l ~/Downloads/table.csv 
8211 /home/henryp/Downloads/table.csv

So, it appears that my standalone spark cluster is quite happily talking to my Hadoop HDFS.


Tuesday, June 30, 2015

Scala's conform idiom


This is an odd little fellow in akka.testkit.TestFSMRef:

class TestFSMRef[S, D, T <: Actor](
  system: ActorSystem,
  props: Props,
  supervisor: ActorRef,
  name: String)(implicit ev: T <:< FSM[S, D])

What is that <:< symbol? It turns out that it comes from Predef, thus:

  sealed abstract class <:<[-From, +To] extends (From => To) with Serializable
  private[this] final val singleton_<:< = new <:<[Any,Any] { def apply(x: Any): Any = x }
  implicit def $conforms[A]: A <:< A = singleton_<:<.asInstanceOf[A <:< A]

The documentation says:

Requiring an implicit argument of the type `A <:< B` encodes the generalized constraint `A <: B`.

So, A is a subclass of B. Got it. But how is that enforced for each and every child/parent relationship?

Well, there is a single, implicit $conforms[A] lingering in the ether that insists that A is a superclass or same as A. That's what A <:< A is saying. This is using a type infix operator for syntactic sugar and is equivalent to <:<[A, A].

"A is a superclass or same as A"? This in itself is not terribly useful. However,  <:<[B, A] also must be possible if and only if B is a superclass of A because  <:< is contravariant in its first type (-From) and covariant it its second (+To). That is, <:<[B, A] is a subtype of <:<[A, A] so everything compiles.

So, going back to our original Akka code, the type T <:< FSM[S, D] is enforcing T being a subclass of FSM[S, D] by virtue of there implicitly existing T <:< T and the only way this is satisfied without compilation errors is T <: FSM[S, D].