Thursday, December 31, 2015

Latent Semantic Analysis with Spark


I'm reading the excellent Advanced Analytics with Spark as I need to learn more about natural language processing (NLP).

The method in the book is called Latent Semantic Analysis. Basically, this is building a vector for each document that captures the relevant words and their significance. More specifically, all vectors have the same size and the values are roughly speaking the fraction of a given term in the document multiplied by the number of occurrences of this term across all documents (with some logs thrown in to dampen outliers). A given index in each vector represents the same term over all documents.

The code for the book can be retrieved with:

git clone https://github.com/sryza/aas.git

The first step is a process of lemmatization which removes stop words and aggregates related words. For this, an NLP library from Stanford University is used. The code looks like this:

    val lemmatized = plainText.mapPartitions(iter => {
      val pipeline = createNLPPipeline()
.
.

Interestingly, we "use mapPartitions so that we only initialize the NLP pipeline once per partition instead of once per document". [1]

(The variable lemmatized is an RDD of documents where the documents are just a Seq of Strings that have had the stop words removed and related words aggregated.)

By mapping over lemmatized, we make an RDD of Map[String, Int] that represent the term count per document and we call this docTermFreqs. We combine all these for each partition of documents and merge them all together at the end. "When the records being aggregated and the result object have the same type (eg, in sum), reduce is useful, but when the types differ, as they do here, aggregate  is a more powerful alternative" [1]

Unfortunately, this approach can lead to OutOfMemoryErrors. So, an alternative is to find the distribution of terms over the documents with this:

    val docFreqs = docTermFreqs.flatMap(_.keySet).map((_, 1)).reduceByKey(_ + _, 15)

(where 15 is our number of partitions). We also limit the number of terms:

    docFreqs.top(numTerms)(ordering)

where numTerms is arbitrary but defaults to 50 000. From this, we have enough information to calculate the Inverse Document Frequencies:

    docFreqs.map{ case (term, count) => (term, math.log(numDocs.toDouble / count))}.toMap

The use of log dampens the effect of outliers.

In turn, with this we can map over all docTermFreqs creating a sparse vector of all the terms and their scores as we go.

The Maths

So much for the Scala, now for the mathematics. LSA depends on a trick called Singular Value Decomposition. This is a very general technique that is used in many diverse maths problems. A good demonstration of it (using Python) is available here. A simple(ish), instuitive explanation of it is here.

One use of SVD is to reduce the dimensions of the problem to something more manageable. One consequence of such a reduction is a loss of accuracy but if this loss is small, the approximation might be acceptable.

A consequence of SVD is that our matrix is broken down into three matrices, each with its own properties:

X = U D VT 

U is an N × p orthogonal matrix (UT U = Ip ) whose columns uj are called the left singular vectors;
V is a p × p orthogonal matrix (VT V = Ip ) with columns vj called the right singular vectors
D is a p × p diagonal matrix, with diagonal elements d1 ≥ d2 ≥ · · · ≥ dp ≥ 0 known as the singular values 
(taken from [2]). In English,

X is an (N x p) matrix that holds the observations. In general, N is the number of samples and p the number of features. For our purposes, N is the number of documents and p the number of terms. That is, the rows correspond to documents and the columns to terms. The values are:

(the calculation for a term from docFreqs seen earlier) x (frequency of this term in this document) / (total number of terms in this document).

V is a matrix "where each row corresponds to a term and each column corresponds to a concept. It defines a mapping between term space (the space where each point is an n-dimensional vector holding a weight for each term) and concept space (the space where each point is an n-dimensional vector holding a weight for each concept)."

U is a "matrix where each row corresponds to a document and each column corresponds to a concept. It defines a mapping between document space and concept space." [1]

There's a lot more to the mathematics and I'll post more soon but this is a good starting place to get more familiar with it.

Spark

Now, this is just a refresher on Spark's handling of matrices. Data handled as rows in Spark is easy. But if you want to do anything interesting with a matrix (multiply, take a transpose etc) this works less well. You need to deal with columns and the data in a column might be striped across many machines unlike rows.

At the moment (it's an experimental API), Spark has 4 implementations of DistributedMatrix each with slightly different abilities. The BlockMatrix implementation is currently the only one that allows one distributed matrix to be multiplied by another. As its name suggests, it uses a mathematical trick to better handle operations by breaking it into a block matrix.

Other implementations can be multiplied by a Matrix object (dense or sparse) but this appears to be local to the node on which the code runs.


[1] Advanced Analytics with Spark
[2] Elements of Statistical Learning (free download)

Thursday, December 10, 2015

The Magnet Pattern


...is a variant on the Type Class Pattern where Scala copies a Haskell idiom. A good article is here. The idea is that there is just one method that takes a variety of objects with the same super-trait depending on what parameters were used in calling that method. This replaces having many overloaded methods.

[Just to refresh one's memory: the Type Class pattern is a method of ad hoc polymorphism but it differs from the Pimp My Library pattern. A good comparison of the two can be found here. Essentially, the Pimp My Library pattern appears to add methods to classes that were not there when they were first written. The key thing about the Type Class pattern is that it asks for a witness to evidence that something is possible for a certain type. Think about Scala's TraversableOnce.sum method that asks for evidence that the type it contains has an associated Numeric. In Pimp My Library, you're creating the functionality per type. In the Type Class pattern your providing the functionality for a more general function to do its work.]

Implicits
What is apparent immediately is that you really need to know about implicit rules. Here are some notes but a really good reference lies here.

Let's take a fairly pointless class:

class AContainer

Using the Type Class Pattern, we can add a method to it so:

  class PimpMe {
    def pimpedMethodOnClass: String = "pimpedMethodOnClass"
  }
  implicit def pimpedWithADefCreatingClass(aContainter: AContainer): PimpMe = new PimpMe

Now, we can call this method as if it were on the class itself:

    println(aContainer.pimpedMethodOnClass) // "pimpedWithADef"

More efficiently (see below) we could write:

  implicit class PimpMeEfficiently(val aContainer: AContainer) extends AnyVal {
    def pimpedMethodOnEfficientClass: String = "pimpedMethodOnEfficientClass"
  }
.
.
    println(aContainer.pimpedMethodOnEfficientClass) // "pimpedMethodOnEfficientClass"

The reason for AnyVal is "properly-defined user value classes provide a way to improve performance on user-defined types by avoiding object allocation at runtime, and by replacing virtual method invocations with static method invocations" (from the ScalaDocs).

The Magnet Pattern Simplified
A simplified magnet pattern implementation is a single method. Let's use a slightly more interesting magnet class whose toString method yields a different value. In real life, each subclass would have a more interesting implementation:

  class Magnet(string: String) {
    override def toString() = string
  }

And let's have a very trivial implementation of the method taking the magnet:

  def magnetMethod(magnet: Magnet): Unit = println(magnet)

Now, we can call this method with what look like different arguments, for example:

  implicit def fromStringAndInt(tuple: (String, Int)) = new Magnet(s"Implementation X: (String, Int) = (${tuple._1}, ${tuple._2})")
.
.
    magnetMethod("aString", 1)   // "Implementation X: (String, Int) = (aString, 1)" (thanks to fromStringAndInt)
    magnetMethod(("aString", 1)) // ditto

Note calling the magnet method with a tuple has the same effect as calling it with individual arguments.

Now, we could have a different implementation but let's be simple and just pretend there is different functionality. But we really do have a different call site:

  implicit def fromDouble(double: Double) = new Magnet(s"Implementation Y: double = $double")
.
.
    magnetMethod(1.1d) // "Implementation Y: double = 1.1" (thanks to fromDouble)

We're calling the same method with different arguments (cardinality as well as types)!


Monday, November 30, 2015

Machine Learning with Spark


Playing on my current pet project, I implemented a Support Vector Machine using Spark's Machine Learning library without fully understanding what it was doing and just following the documentation.

But the documentation is somewhat sparse and I got more understanding by going elsewhere, mainly this excellent book. Here I learned that SVMs are a classification algorithm, that is "a supervised learning method that predicts data into buckets". This contrasts with a regression algorithm which is "the supervised method that predicts numerical target values." Furthermore, it can be a non-parametric algorithm allowing it to handle non-linear data. "Parametric models makes assumptions about the structure of the data. Non-parametric models don’t." [1]

OK, that's some terminology defining the algorithms characteristics out of the way. But then I hit the Spark documentations that talks about the AUC (Area Under Curve) of the ROC (Receiver Operating Characteristics). Hmm.

The concepts are quite easy but I found a dearth of material on the web [addendum: since writing this, I found this excellent post]. So, here is what I gleaned.

First, we start with something called the "Confusion Matrix". This is fairly simple 2x2 matrix of true/false positive/negative rates.

For example, imagine a test for cancer. Some people will have it, some won't. Some people will be told they have it, some won't. Sometimes we tell people they have it when they don't. Sometimes we don't tell people they have it when they do. Oops.

The confusion matrix would look like this:

PositiveNegative
TruePatient is told he has cancer and indeed he does. Patient does not have cancer and is told so
False Patient is told he has cancer when he does not. Patient is not told he has cancer when he does


If the total number of patients is N, the number told they have cancer is Y and the number who actually have cancer is X, the rates look like this:

PositiveNegative
TrueX / Y (N - X) / (N - Y)
False (N - X) / Y X / (N - Y)

We can plug the real numbers in and get a matrix with 4 cells each between 0 and 1. Depending how high our threshold is for the cancer test, these numbers will jump around. So, if we varied our threshold between a minimum 0 and a maximum 1, we could plot the true positive against the false positive. This graphics should illustrate:


where the threshold is on the z-axis and the ROC is on the x-y plane.

The axis label 'true positive' indicates the sensitivity (TP / TP+FN) and the axis labeled 'false positive' is the specifity (TN / TN+FP) - see here for further details.

Incidentally, the graph was plotted using everybody's favourite maths tool, R. The code was:

require("rgl")

dev.off()

step <- 0.01
f <- function(v) v ^ 3
x <- f(seq(0,1,step))
y <- seq(0,1,step)
h <- function(x,y) y
z <- c(1:100) 

c = z
c = cut(c, breaks=64)
cols = rainbow(64)[as.numeric(c)]

pairwiseFlatten <- function(binded) {
  bound <- c()
  for (i in 1:(length(binded)/2)) bound = c(bound, binded[i,1], binded[i,2])
  return (bound)
}

plot3d(x, y, h(x,y), add=TRUE, col=cols)
plot3d(x, y, 0, add=TRUE)
segments3d(pairwiseFlatten(cbind(x,x)), y = pairwiseFlatten(cbind(y,y)), z = pairwiseFlatten(cbind(h(x,y),0)), add=TRUE)

decorate3d(xlab="false positive", ylab="true positive", zlab="threshold", main="ROC curve")

Anyway, we ideally want to maximize the number of true positives no matter what threshold value we have. That is, the curve should hug the top left part of the x-y plane. A curve that represented this maxima would have the greatest area in the x-y plane over all the curves. And this is where our AUC for the ROC comes in. The higher, the better our model.

[1] Real World Machine Learning - Manning.

Sunday, November 29, 2015

Algorithm Recipes


These are some miscellaneous heuristics for writing and choosing algorithms that I have found useful.

Writing an Algorithm

1. If you want to make an algorithm tail recursive, add an accumulator to the method signature.

2. When you're thinking about the signature of a recursive algorithm, start with an argument that represents the "works still to be done" and an accumulator. You may need more, but that's a good start.

When the motivation for an algorithm is to find the optimal answer, one approach might be to recurse through the solution space finding the maximum (or minimum) of any two recursive sub-solutions. A good example is here (with a good Wikipedia explanation here). This is a C implementation of the Longest Common Subsequence where all sequences are recursively explored but only the maximum of the two sub-problems is chosen at each recursion.

Choosing an Algorithm

If you're stuck, consider sorting the data first. This may (or may not) help you move forward. For example, in the Closest Pair algorithm (where you want to find the closest pair of points in 2-D without incurring O(n * m) cost) we first sort all the points in the X-axis to help us find the closest pair. They're unlikely to be the closest pair in the whole plane but it's a start.

If the performance is O(n2), consider expressing the problem in terms of divide-and-conquer.

You cannot do better than O(n log(n)) in a comparison based sort. If you need to meet a requirement that at first blush appears to rely on a sorted array of elements but needs to be better than O(n log(n)) then consider a Bucket Sort.

[The example Bucket Sort question tries to find the largest gap between a sequence of integers while avoiding an O(n log(n)) sort. Cunningly, it creates a series of "buckets" that cover the whole range but with each bucket holding a range slightly less than the mean difference. Thus, no 2 integers that differ by the mean or more can fall into the same bucket. You need only then to see what is the max and min in all the buckets, which can be done in  O(n)]

You may be able to do better than an n log(n) sort if you know something about the data and don't need to compare the elements. For example, if you have X elements that you know are in the range of 1 to Y and you know there are no duplicates (so necessarily X < Y), you can instantiate an array of length Y and scan through the X elements putting them in their natural position in the Y-length array. At the end of this scan, scan through the Y-length array discarding the nulls. The result is the ordered elements. This was all done in time O(X + Y).

Tuesday, November 24, 2015

Spark's sortByKey doesn't


... or at least not when you map. Sure, if you collect you get the elements in a sorted order. But let's say you want to process the elements in a given order, say, finding the difference from one to the next. You might naively write:

    val pairRdd   = sparkContext.parallelize((1 to 10), 3).map(x => (x, x))
    val sortedRdd = pairRdd.sortByKey()

    bogusMap(sortedRdd)
.
.
.
  def bogusMap(sortedRdd: RDD[(Int, Int)]): Unit = {
    var last = 0
    def checkMonotonicKeys(kv: (Int, Int)): Int = {
      val key = kv._1
      if (key != last + 1) throw new IllegalStateException(s"key = $key, last = $last")
      last = key
      key
    }
    val mappedAndSorted = sortedRdd.map(checkMonotonicKeys(_))
    mappedAndSorted.collect().foreach { kv =>
      println(kv)
    }
  }

But you'll see an exception thrown something like:

java.lang.IllegalStateException: key = 8, last = 0

The reason is that the keys are sorted within each partition not across all partitions.

One "solution" is to ensure that all the elements are within one partition such as:

    val sortedInto1Partition = pairRdd.sortByKey(numPartitions = 1)
    bogusMap(sortedInto1Partition)

This works but there is little point to using Spark for it since there is no parallelization. The best solution is to generate the differences when the data was incoming.

Incidentally, this article has a good description of what is happening during a sortByKey operation. Basically, each shuffle has two sides. The first "writes out data to local disk" and the second makes "remote requests to fetch that data... The job of the [first] side of the shuffle is to write out records in such a way that all records headed for the same [second] task are grouped next to each other for easy fetching." Note that the second task that groups data is not obligated to also order it within a group.

As another aside, note the importance of persisting an RDD in this use case.

"Failure to persist an RDD after it has been transformed with partitionBy() will cause subsequent uses of the RDD to repeat the partitioning of the data. Without persistence, use of the partitioned RDD will cause reevaluation of the RDDs complete lineage. That would negate the advantage of partitionBy(), resulting in repeated partitioning and shuffling of data across the network, similar to what occurs without any specified partitioner.

"In fact, many other Spark operations automatically result in an RDD with known partitioning information, and many operations other than join() will take advantage of this information. For example, sortByKey() and groupByKey() will result in range-partitioned and hash-partitioned RDDs, respectively. On the other hand, operations like map() cause the new RDD to forget the parent’s partitioning information, because such operations could theoretically modify the key of each record." [1]

The code above that forces all the data into one partition (using numPartitions = 1) seems immune to map forgetting the the parent RDD's partitioning information. Since there is only one partition, there is no information to forget.

[1] Learning Spark - Karau and Konwinski


Wednesday, November 18, 2015

Interpreting Spark


Spark has a nice web interface that allows you to find problems in the jobs you submit to it. Here are some notes I made on using it. First, let's clarify some terminology:

Task, Stage, Job

"Invoking an action inside a Spark application triggers the launch of a Spark job to fulfill it...  The execution plan consists of assembling the job’s transformations into stages. A stage corresponds to a collection of tasks [map, flatMap etc] that all execute the same code, each on a different subset of the data. Each stage contains a sequence of transformations that can be completed without shuffling the full data.

"At each stage boundary, data is written to disk by tasks in the parent stages and then fetched over the network by tasks in the child stage. Because they incur heavy disk and network I/O, stage boundaries can be expensive and should be avoided when possible." [1]

Narrow and Wide Transformation

"For the RDDs returned by so-called narrow transformations like map and filter, the records required to compute the records in a single partition reside in a single partition in the parent RDD...  Spark also supports transformations with wide dependencies such as groupByKey and reduceByKey. In these dependencies, the data required to compute the records in a single partition may reside in many partitions of the parent RDD" [1]

And this is what can induce a shuffle. Imagine a diagram with time running down the Y-axis. Now imagine this diagram having vertical columns representing a partition at various points in time with horizontal slices representing RDDs in which these partitions live, stacked such that parents are (naturally) above children. Now, if we draw arrows from the data points in one RDD/Partition combo to the data on which it relies, we'd hope those arrows remain in the same vertical partition stream. If they cross streams, a shuffle ensues.

The Web GUI

You may see Jobs that have Skipped Stages. This is nothing to worry about. From the code:

    This may be an underestimate because the job start event references all of the result
    stages' transitive stage dependencies, but some of these stages might be skipped if their
    output is available from earlier runs.
    See https://github.com/apache/spark/pull/3009 for a more extensive discussion.

When looking at my own application's performance (open source finance toy & tool found here on GitHub), a DAG (directed, acyclic graphs)  may look like this:




My application takes end-of-day stock prices of two different companies and runs a Pearson correlation on them.

In the picture above we see the details of Job 1. Stages 4,5 and 6 are skipped as they were already loaded by Job 0 (which is not shown here).

Stages 7 and 8 are more interesting. I join the two data sets (keyed on date) and map over both of them to get just the stock price (discarding the date). Note that this is done on both stock data series in parallel.

From then on, all work is done in Spark's MLLib library. First, the zip and map is done in Correlation.computeCorrelationWithMatrixImpl. Then, in calculating the Pearson Correlation from the data, it calls RDD.treeAggregate twice.

This method takes an identity and two functions. The first function handles how the data is aggregated within a partition. The second then handles the totals of this function over all partitions. Since this latter function requires "crossing the streams", a stage finishes and a new one begins (Stage 8 which happens to be another call to treeAggregate).

This visualization will also tell you when one of the RDDs is cached - it will be denoted by a green highlighted dot [2] although we do not see this on the picture above.

[1] Cloudera blog.

[2] Databricks blog.

Wednesday, October 21, 2015

Option-null


This is a very interesting comparison between how the architects of Java and Scala each implemented optional values.

Functor Laws

First, what is a functor? From Categories for the Working Mathematician (Mac Lane):

A functor is a morphism of categories. In detail, for categories C and B a functor T:C->B with domain C and codomain B consists of two suitably related functions: The object function T, which assigns to each object c of C an object TC of B and the arrow function (also written T) which assigns to each arrow f:c->c' of C and Tf: Tc->Tc' of B, in such a way that
T(1c) = 1Tc, T(g o f) = Tg o Tf
This takes several readings to sink in. So don't bother. Let's look at some code.

Actually, the first rule is the identity law that says if you map over something with the output being the same as the input you get what you started with. That's too trivial to spend any more time on.

The second can be written in Scala as:

val x: Option = ...
.
.
x.map(f compose g) } == x.map(g).map(f) // true

or in Java 8 as:

Optional x = ...
.
.
x.map(f).map(g).equals( x.map(g.compose(f)) ) // true... Sometimes. See below.

Why is it useful?

"If I map a function across a List, I should get back a List of the same size as I started with, and with all the elements in the corresponding order. This means I can guarantee some things, like for instance if I reverse the order of the list and then map the function, I will get the same result as mapping the function and then reversing the list. I can refactor without fear." (from Wesely-Smith's post)

How Scala and Java differ

The two radically diverge on their treatment of nulls. Firstly, in Java, if you did this:

Optional ofNull = Optional.of(null);

I'd have a NullPointerException thrown!

Whereas, in Scala you could do this:

val optionWithNull = Option(null)

which would actually be a None. Or, you could do this:

val someWithNull = Some(null)

which is very different. It really is Something that contains a reference to a null.

In this way, Java is less expressive. If, say we wanted to get a value (mapped in an Option) from a Map, Scala could distinguish between the value not being there (None) and it being there but it being null (Some(null)). An equivalent in Java:

        Map map = new HashMap<>();
        map.put("A", null);
        map.put("B", null);
        map.put("C", null);
        System.out.println(map.size()); // 3

gives us no way to express this as if we had a method that returned an Optional, it would blow up for keys "A", "B" and "C".

Higher Kinded Types in Scala and Java


What are they?

"For lack of a fuzzier name, types that abstract over types that abstract over types are called higher-kinded types” (from here). This StackOverflow answer has some more details. Note the answer uses Type Lambdas, an unusual syntax for partial application of types.

What do they look like?

Well, in Java you won't see them (not directly anyway) as that's one of the limitations of the language. You can't write, for instance:

class HigherKinded<T<U>> { } // doesn't compile

But in Scala, they might look like this:

  trait Functor[F[_]] {
    def map[A, B](fa: F[A])(f: A => B): F[B]
  }

the Functor trait is a higher-kinded type as it abstracts over F which in turn abstracts over something else. By the way, F[_] is an existential type.

Why is this useful?

By parametrizing over the type constructor rather than a particular type, eg F[String], one can use the parameter in method definitions with different types. In particular, in the definition of map, the return type is F[B], where B is a type parameter of the method map; with parametrization by types only, map would have to be homogeneous. (This is a slightly modified quote taken from here).

This is to say something that's not higher-kinded that might look like this:

  trait NotAHigherKindedType[List[A]] {
    def map[A, B](fa: List[A])(f: A => B): List[B]
  }

is unnecessarily concrete. It only works for Lists. Something more abstract would look like:

  trait FunctorHKT[F[_]] {
    def map[A, B](fa: F[A])(f: A => B): F[B]
  }

and could be implemented with:

  class MyFunctorForGenTraversable extends FunctorHKT[GenTraversable] {
    override def map[A, B](fa: GenTraversable[A])(f: (A) => B): GenTraversable[B] = fa map f
  }

and would work equally well for Lists as Sets (although not, say, Futures but that's just because the don't implement GenTraversable).

A more sophisticated example that uses implicits can be found here. Because of the limitation we mention above about our solution working for Sets and Lists but not futures, Wesley-Smith notes that this is why it's "less powerful and also more complicated (both syntactically and semantically) than ad-hoc polymorphism via type-classes."

Sunday, October 18, 2015

In- Co- Contravariants and Mutability


Take a Covariant type constructor:

  class ContainerCovariant[+A](element: A) {

Let's say we're writing it to contain an A, whatever that is. We'll leave it generic. OK, here's the set method:

  class ContainerCovariant[+A](element: A) {
     def set(what: A): Unit = ??? // covariant type A occurs in contravariant position in type A of value what

Eek, it doesn't compile. (Note: all argument types are covariant on a JVM. Haskell doesn't have sub- and super-types so you encounter Co- and Contra-variance less often.)

Why it doesn't compile can be understood by Proof by Contradiction. Say it did compile. Then we could write:

    val covariantWithChild:  ContainerCovariant[Child]  = new ContainerCovariant[Child](aChild)
    val covariantWithParent: ContainerCovariant[Parent] = covariantWithChild
    covariantWithParent.set(aParent)
    val aChildRight: Child = covariantWithChild.get     // what the hey? this is a parent not a child!

So, one way the compiler makes it hard for us to do this is the above error. There are more.

[Note that when a type constructor is co- or contra-variant, it is relative to the reference pointing at it. For example, in the above code, the reference on the right hand side (a ContainerCovariant[Child]) is covariant to the reference on the left (a ContainerCovariant[Parent])

This leads to an important mnemonic, the five Cs:

"Co- and Contra-variance of Classes are Compared to the Call Site".]

Anyway, we can get around this compilation error (see here and the other question linked off it) by making the type of the parameter contra-variant in A as the compiler is asking us to do:

    def set[B >: A](what: B): Unit = ???

and the above block of code compiles. So, we've beaten the system, right? Not quite. If we're mutating the state of our covariantWithChild, then we have to have a var (or similar strategy). So, let's add it:

  class ContainerCovariant[+A](element: A) {

    var varA: A = null.asInstanceOf[A] // <-- "covariant type A occurs in contravariant position in type A of value varA_
    def set[B >: A](what: B): Unit = { varA = what }

Only this time, the compiler complains at the var declaration. Making it a val helps:

    val valA: A = null.asInstanceOf[A] // this compiles

But now our class isn't mutable so we're foiled again.

Remembering the Get/Set Principle, a mutable class should have an invariant type, so let's add:

  class ContainerCovariant[+A](element: A) {
    class InvariantMutableRef[B](var b: B)
    val invariantMutableRef = new InvariantMutableRef(valA) // "covariant type A occurs in invariant position in type => ..."

If A is covariant, all references that use it must be to. The same problem exists if we try to introduce a contra-variant holder:

  class ContainerCovariant[+A](element: A) {
    class ContravariantRef[-B] { ... }
    val contravariantRef = new ContravariantRef[A] // "covariant type A occurs in contravariant position in type => ContainerCovariant.this.ContravariantRef[A] of value contravariantRef..."

What's more, our contra-variant container couldn't have a member either, mutable or immutable:

    class ContravariantRef[-B] {
      val b: B = null.asInstanceOf[B] // "contravariant type B occurs in covariant position in type => B of value b"
      var b: B = null.asInstanceOf[B] // "contravariant type B occurs in covariant position in type => B of method b"
   
because we could refer to ContravariantRef with a more specific type of B when B wasn't more specific at all. Double eek.

So, mutability is related to contr-, co- and invariance down at the compiler level which tries to stop us from doing something evil.

Thursday, October 15, 2015

Cheeky and efficient maths algorithm


There is a very efficient algorithm in Breeze for calculating polynomials. That is, say you have an equation like:

10x3 + 5x2 + 3x + 1

and you want to find the answer for a given x. You could of course calculate x to the power of 3, times it by 10 add this to 5 times x to the power of 2 etc etc...

But a much more efficient way is to observe that the very same equation can be written:

( ( (10 * x) + 5) * x ) + 3 ) * x ) + 1

and run this code:

    var i = coefs.length-1
    var p = coefs(i)
    while (i>0) {
      i -= 1
      p = p*x + coefs(i)
    }

where coefs are the array of numbers that come before the xs, in our case 10, 5, 3 and 1 (note that the 1 is actually 1 x0  = 1). The answer at the end is in the var p.

The code uses vars which are generally considered bad practise but there is an argument for being pragmatic too. We could have been more functional and written:

    coefs.drop(1).foldRight(0d)(
      (coeff, agg) => (agg + coeff) * x
    ) + coefs(0)

But it is debatable if it's any nicer.

Architectural and Development Patterns


Event Sourcing
"[E]very operational command executed on any given Aggregate instance in the domain model will publish at least one Domain Event that describes the execution outcome. Each of the events is saved to an Event Store in the order in which it occurred. When each Aggregate is retrieved from its Repository, the instance is reconstituted by playing back the Events in the order in which they previously occurred... To avoid this bottleneck we can apply an optimization the uses Aggregate state snapshots." [1]

CQRS
Instead of having one data store for everything, "the change that CQRS introduces is to split that conceptual model into separate models for update and display, which it refers to as Command and Query respectively". [2]

I've seen this work well in a project that used a Lambda Architecture. Here, risk data was being written slowly but surely to a Hadoop cluster. When a day's data was complete, the data could be converted into a format that was digestible by the Risk Managers.

"Having separate models raises questions about how hard to keep those models consistent, which raises the likelihood of using eventual consistency." [2] For us this was fine as the Risk Managers were in bed while the munging was taking place.

"Interacting with the command-model naturally falls into commands or events, which meshes well with Event Sourcing." [2] And this is where Kafka came in.

Martin Fowler is cautious about using CQRS [2] but we had a good experience. In our domain, the Risk Managers were not interested in the Query model after two weeks and so it could be thrown away. Had it been needed at any time in the future, it could always have been generated again from the Command model.

Lambda Architecture
Lambda Architectures consist of three main pieces:
  1. The batch layer, managing the master dataset (an immutable, append-only set of raw data) and pre-computing batch views.
  2. The serving layer, indexing batch views so that they can be queried in a low-latency, ad-hoc way.
  3. The speed layer, dealing with recent data only, and compensating for the high latency of the batch layer.
(taken from here).

As mentioned in the CQRS subsection above, "even if you were to lose all your serving layer datasets and speed layer datasets, you could reconstruct your application from the master dataset. This is because the batch views served by the serving layer are produced via functions on the master dataset, and since the speed layer is based only on recent data, it can construct itself within a few hours." [3]

Since Hadoop has an append-only, highly available file system (HDFS), it makes it an obvious choice for the batch layer.

"The serving layer is a specialized distributed database that loads in a batch Batch layer view and makes it possible to do random reads on it (see figure 1.9). When new batch views are available, the serving layer automatically swaps those in so that more up-to-date results are available. A serving layer database supports batch updates and random reads. Most notably, it doesn’t need to support random writes." [3] Cassandra was our choice for this layer.

Finally, "the speed layer only looks at recent data, whereas the batch layer looks at all the data at once. Another big difference is that in order to achieve the smallest latencies possible, the speed layer doesn’t look at all the new data at once. Instead, it updates the realtime views as it receives new data instead of recomput­ing the views from scratch like the batch layer does. The speed layer does incre­mental computation instead of the recomputation done in the batch layer." [3]

Lambda architecture shares some similarities with CQRS but is very different in that it never updates data (although it can add data that renders old data obsolete).

Feature Branching
The idea here is that each developer works on his own branch. He regular synchs with the main branch to lessen the pain of later merging his work into it. With Git, he'd regularly run:

> git checkout master
> git pull
> git checkout BRANCH_NAME
> git merge master
> git push

There is some debate on whether Feature Branching is an anti-pattern. Ideally, the system should be architected such that a feature is isolated to a particular silo of the code and not spill out to other components. Thus, a developer can merrily work on that particular silo and check into the main branch all the time. Mainly for political reasons, though, this is not always possible. Software like Stash that integrates with Jira can make code reviews even within a distributed team quite pleasant.

[1] Implementing Domain Driven Design, Vaughn Vernon
[2] Martin Fowler's blog.
[3] Big Data, Nathan Marz

Tuesday, October 6, 2015

Arrays in Java and Scala


Despite the fact they both run on the JVM, Java and Scala have very different type systems. Say you want to create an array of type T[] in Java where T is any type. It is possible but messy. You might call Array.newInstance but it's signature is:

    public static Object newInstance(Class componentType, int length)

that is, it returns an Object so you'd have to cast it.

If you want to get the component type of a particular array, you'd call Class.getComponentType() but that can be called an any class, not necessarily an array. If it's not called on a type that's an array, it returns null. What's more, it returns Class not Class<T>.

Finally, Java's arrays are covariant (unlike its generics) so you run the danger of getting an ArrayStoreException at runtime if you're not careful. Put another way, Java's arrays don't adhere to the Get and Put Principle that its collections do ("The Get and Put Principle: use an extends wildcard when you only get values out of a structure, use a super wildcard when you only put values into a structure, and don't use a wildcard when both get and put." [1])

Yuck. Scala cleans this up using an invariant Array[T] class constructor. If you JAD the code, you can see how Scala does it. Say we want a multi-dimensional array of Ints, we might call:

    val matrix            = Array.ofDim[Int](7,9)

if we then decompiled the .class file of scala.Array into Java, we can see what Scala is doing. It looks like this:

    public Object[] ofDim(int n1, int n2, ClassTag evidence$4) { ...

where our type (Int) has cheekily been added as an extra parameter. Note, the method returns an array of Object.  As it happens, each of these Objects are arrays of the type we want, in this case Int ultimately using the same Java API method, newInstance, we saw above.

This is another nice feature of Scala in that we can dynamically create arrays of a given type. The ofDim method looks like:

  def ofDim[T: ClassTag](n1: Int): Array[T] = ...

or, equivallently:

  def ofDim(n1: Int)(implicit T: ClassTag[T]): Array[T] = ...

allowing us to do something like:

      val anArray = new Array[T](length) 

without this ClassTag, you'll get "cannot find class tag for element type T".

[1] Java Generics and Collections - Naftalin and Wadler


Wednesday, September 30, 2015

Future notes (part II)


There's some really nice aspects to Scala's Futures that make them very different to Java's. Because they're monadic, you can do all sorts of interesting things.

Differences between Java and Scala Futures

In Java, you merely block while waiting for the result by calling get(). That's it.

However, you can cancel Java futures. Scala "Futures come without a built-in support for cancellation" [1] although there are ways to roll-your-own.

So what can you do that's so special?

Because both Lists and Futures are monadic, you can "blend" them. For instance, say you have a List of Futures, you can convert them into a Future of Lists. This allows you to react to just a single Future with all the data rather than worrying about many Futures each with some data.

This is built in to the Scala API via Future.sequence. It basically folds over the monadic structure (eg a List) that's passed to it.

A somewhat simplified version of this code (I use the more specific List rather than sequence's more general TraversableOnce and avoids the magic of CanBuildFrom) looks like this:

  def mySequence[T](futures: List[Future[T]])(implicit ec: ExecutionContext): Future[List[T]] = {
    def foldFn(agg: Future[List[T]], ft: Future[T]): Future[List[T]] = {
      agg flatMap { list =>
        ft map {
          element => list :+ element
        }
      }
    }

    val seed: Future[List[T]] = Future {
      List[T]()
    }

    futures.foldLeft (seed) (foldFn)
  }

Or, we could use a for-comprehension to give it some syntactic sugar:

    def 
foldFn(agg: Future[List[T]], ft: Future[T]): Future[List[T]] = for {
      list <- agg
      t <- ft
    } yield (list :+ t)

They're equivalent. Whether it's this simplified method or the more complicated one in the Scala API, you'd call it with something like:

    val listOfFutures = List(createSimpleFuture("this"), createSimpleFuture("is"), createSimpleFuture("a"), createSimpleFuture("test"))
    val futureOfLists = mySequence(listOfFutures)
    val lists         = Await.result(futureOfLists, Duration(5, TimeUnit.SECONDS))
    println("lists from future of lists from list of futures: " + lists) 


  def createSimpleFuture(eventualResult: String)(implicit ec: ExecutionContext): Future[String] = {
    Future {
      Thread.sleep(100)
      eventualResult
    }
  }

which prints out:

List(this, is, a, test)

More funky stuff in part III.

[1] Learning Concurrent Programming in Scala by Prokopec, Aleksandar - Aleksandar Prokopec

Friday, September 25, 2015

Scala's Return


I was reading this StackOverflow answer about the distinction between methods and functions. It had the usual explanations, for example a method "can be easily converted into [a function]" but "one can't convert the other way around"; methods "can receive type parameters"; "a function is an object" etc.

But as pointed out here, return behaves differently in a function compared to a method and can be used to short-circuit a fold etc.

For example:

  def caller(): Any = {
    println("caller started")
    val g = () => { return true } // note: return type must be compatible with this method
    called(g)
    println("caller finished")    // this is never executed!
  }

  def called(innocuousFn: () => Boolean): Unit = {
    println("called: about to call innocuousFn...")
    innocuousFn()
    println("called: finished")   // this is never executed!
  }

prints out:

caller started
called: about to call innocuousFn...

That is, when the flow of control hits the function's return statement, the execution stack is popped as if it were in the enclosing method (it's illegal for a function outside a method to have a return statement).

Worse, if one method passes a returning function back to the method that calls it, you get a Throwable. After all, where does that function return from?

  def executeReturningFnFromOtherMethod(): Unit = {
    println("about to call calledReturnsFnThatReturns")
    val fn: () => Boolean = returnsFnThatReturns()
    println("about to call fn")
    fn()                // Exception in thread "main" scala.runtime.NonLocalReturnControl
    println("finished") // never executes
  }

  def returnsFnThatReturns(): () => Boolean = {
    val fn: () => Boolean = () => {
      return makeTrue
    }
    fn
  }

  def makeTrue(): Boolean = {
    return true
  }

Prints out:

about to call calledReturnsFnThatReturns
about to call fn
Exception in thread "main" scala.runtime.NonLocalReturnControl


Methods that may or may not compile in Scala


If you want to sum some integers in a List, you just call the sum method. So far, so obvious. But if the list does not hold elements that are summable, how does Scala give you a method that you may or may not call?

The sum method lives in TraversableOnce and the signature looks like this:

  def sum[B >: A](implicit num: Numeric[B]): B = ...

Here [B >: A] says must be a superclass of A, the type of elements in the list. The implicit says there must exist in the ether something that provides the functionality in trait Numeric for type B (the functionality for plus, minus etc).

Now, for Int, Double etc you get these for free in the Numeric object that's standard to Scala where you see, for example:

  trait IntIsIntegral extends Integral[Int] { ...

which is pulled into the implicit ether with:

  implicit object IntIsIntegral extends IntIsIntegral with Ordering.IntOrdering

in the same file. Note, that unlike Java, Scala's numerics do not extend a common superclass like java.lang.Number.

So, we could happily create our own arithmetic with something like:

  class MyInteger

  trait MyIntegerOrdering extends Ordering[MyInteger] {
    def compare(x: MyInteger, y: MyInteger) = ???
  }
  trait MyIntegral extends Numeric[MyInteger] { // or Integral that extends Numeric[T]
    def plus(x: MyInteger, y: MyInteger): MyInteger = new MyInteger
.
.
.
  }
  implicit object MyIntegerIsIntegral extends MyIntegral with MyIntegerOrdering

It's not a terribly useful arithmetic but it would allow us to have a list of MyIntegers and call sum on it.

Thursday, September 10, 2015

Notes on the Future (part 1)


Scala's Futures are very different to Java's. Where Java's Futures encourage blocking by their API, Scala's encourages asynchronous coding... apart from one gotcha that follows shortly.

Futures are Monads so they have all that Monad goodness and we can use them in for comprehensions. Unfortunately, this very idiom can introduce a bug. Take this code:

for {
  i <- Future(5)
  j <- Future(11)
} yield j

It creates two futures operating that return the values 5 and 11, possibly on different threads, and returns a Future that contains 11 just like a good Monad should.

"Here, the beauty of the for comprehension sugar can work against us. The for comprehension hides all of that ugly nesting from us, making it look like these are on the same level, but nesting is indeed happening here. What might look concurrent isn't." [1]

Gotcha

What happens here is the first Future is executed and only then is the second. The solution is simple. The code should look like this:

val iFuture = Future(5)
val jFuture = Future(11)
for {
  i <- iFuture
  j <- jFuture
} yield j

For just returning a number, you probably won't notice anything amiss. But for something more ambitious, you may very well notice that the first cut of this code is sub-optimal.

"Only men's minds could have unmapped into abstraction..."

In Scala, we can get our dirty little hands on the contents of a Future by mapping like:

import scala.concurrent.ExecutionContext

    implicit val xc = ExecutionContext.global // needed for next map:

    j map { value =>
      println("j = " + value)
    }

but there are other ways, too. Being monads, we can flatMap and so if we slightly modify the code in [1]:

  implicit val xc = ExecutionContext.global

  val timeToWait = Duration(5, TimeUnit.SECONDS)

  val future1 = Future[Int] {
    (1 to 3).foldLeft(0) { (a, i) =>
      log("future1 "  + i)
      pause()
      a + i
    }
  }

  val future2 = Future[String] {
    ('A' to 'C').foldLeft("") { (a, c) =>
      log("future2 " + c)
      pause()
      a + c
    }
  }

  def spliceFutures(): Unit = {
    val awaitable = future1 flatMap { numsum: Int =>
      log("in future1.flatMap") // ForkJoinPool-1-worker-1: in future1.flatMap

      future2 map { str: String =>
        log("in future2.map") // ForkJoinPool-1-worker-5: in future2.map
        (numsum, str)
      }
    }

    val awaited = Await.result(awaitable, timeToWait)
    println(awaited) // (6,ABC)
  }

  def log(msg: String): Unit = {
    println(Thread.currentThread().getName() + ": " + msg)
  }

"The most powerful way to use these combinators is in combining the results from futures that are already executing. This means that we can still have a lot of independent futures running, and have another piece of code combine the eventual results from those futures into another "final" future that operates on those results."[1]

You'll notice from the log statements that the code in the map and flatMap run in different threads.

But wait, there's more. You can add callbacks with onSuccessonFailure and onComplete, all of which have return type of Unit (so you know that the functions passed to it must have side-effects) and there is no guarantee that they'll run on the same thread that ran the body of the Future.

  val onCompleteFn: Try[Boolean] => Unit = {
    case Success(x)   => println(" success: " + x)
    case Failure(err) => println(" failure: " + err + ", " + err.getCause)
  }
.
.
    aFuture onComplete onCompleteFn 

In fact, Future.onComplete is the daddy. All the other combinator methods (map, flatMap, recover, transform etc) all call it. We'll come back to these other combinators in another post.

Problems in your Future

If an Exception is thrown in your Future, then it will be captured in the failure case in onCompleteFn above. But if an Error is thrown, you'll get a Boxed Error.

Another surprise comes from filter. For Lists, Trys and Options, things are quite straightforward:

    val myList = List(1,2,3)
    val filteredList = myList.filter(_ > 10)
    println(filteredList)   // List()

    val myOption = Option(1)
    val filteredOption = myOption.filter(_ > 10)
    println(filteredOption) // None

    val myTry = Try(1)
    val filteredTry = myTry.filter(_ > 10)
    println(filteredTry)    // Failure(java.util.NoSuchElementException: Predicate does not hold for 1)

That is, a filter whose predicate leads to nothing is represented by the "negative" subclass of the monads. However, there is no such class for an "empty" Future.

    import scala.concurrent.ExecutionContext.Implicits._
    import scala.concurrent.duration._

    val myFuture       = Future { 1 }
    val filteredFuture = myFuture filter {_ > 10}
    filteredFuture onComplete {
      case Success(x) => println(s"success: $x")
      case Failure(x) => println(s"failure: $x")
    } // failure: java.util.NoSuchElementException: Future.filter predicate is not satisfied

    Await.result(filteredFuture, 1.second) // Exception in thread "main" java.util.NoSuchElementException: Future.filter predicate is not satisfied

That is, the Failure case is the path taken by onComplete. If we were so foolish to block using Await, then an Exception is thrown.

[1] Akka Concurrency, Derek Wyatt.