Monday, July 10, 2017

Building Big Data Apps


After spending the last 18 months using Spark to write an entity resolution software for over a terrabyte of data, here are some miscellaneous notes of what I wish I'd known from the start. In no particular order:

1. Make sure your app can recover from a failure easily. Write to HDFS after each major stage. This will also help debugging when the answer that comes out of the sausage machine wasn't entirely what you were expecting. ("Simply split long-running jobs into batches and write intermediate results to disk. This way, you have a fresh environment for every batch and don’t have to worry about metadata build-up" from here). Also, the topology for one stage may not be appropriate for another (see here for an example where smaller numbers of executors with more resources - contrary to the general Spark advice - gives better performance).

2. Small inefficiencies can cause big problems. Use jstack liberally.

3. Don't use Spark as a key/value lookup. That's not what it's built for. Use another system. Don't try to hack it by using a broadcast variable as that simply doesn't scale.

4. Use realistic data, both in size and quality. Making fake data is surprisingly hard if you want the output to remotely correspond to the real world.

5. Have an automated test in a realistic environment (you don't want authentication problems to show up late, for example). Run the app daily in this environment to show any performance changes

6. "Don't start modeling before designing some measurable goals" [1]. Define acceptance criteria early on as to what you'd expect to come out and what you wouldn't. Estimate the false positive/negative rate. For example, at one point we expected 220k of company entities to resolve with Orbis data. Using a very simple query, we were seeing about 130k of our businesses resolve to something. Therefore, the true positive rate could not be higher than about 60% (and may have been less) therefore there was work to do here.

7. Pass small objects around, preferably just IDs. This is what the built-in Connected Component algorithm does. It will improve performance.

The stages of my app

There were 6 stages to my application:

1. Build a matrix using TF-IDF to assign weights to words.
2. Calculate the cosine similarities.
3. Execute bespoke business rules.
4. Find connected components.
5. Turn those IDs back into entities.
6. Consolidate the relationships between these resolved entities in a process called Edge Contraction.

There were some interesting modifications to this basic flow.

1. Feature hashing improved run time performance but the largest connected component went from 600 to 26 000 (BlackRock/Merrill Lynch who seem to have created what appears to be a lot of shell companies with similar names).

2. By ignoring all words that appear in over 1000 documents, there was no need for stop words. This was useful since the corpus was multilingual.

A note on requirements gathering

One tip in finding which database suits you come from the late Dr Jim Gray. "Gray's recipe for designing a database for a given discipline is that it must be able to answer the key 20 questions that the scientist wants to ask of it." [2] A real-world example can be found here. The idea being that 20 questions is the roughly the minimum number of questions you need before a pattern emerges.

This teased out of the business that we needed more than just a graph database (like Neo4J) which they seem for some reason to have fixated on at one point. We also needed the batch processing that GraphX gave us.

[1] Practical Data Science with R.
[2] The Fourth Paradigm: Data-Intensive Scientific Discovery, Tony Hey.

Parquet Flaw


This Spark/Parquet abuse bit us hard this week. "In a partitioned table, data are usually stored in different directories, with partitioning column values encoded in the path of each partition directory" (from here). This is great for compression as values do not even appear in the file as they are encoded in the directory structure of a Parquet file.

Unfortunately, if you save the Dataset to HDFS, it appears that a new file on HDFS is created for each contiguous block that belongs to the same Parquet directory. For example, if there were only two values for a key, 1 and 2, and the Dataset would map elements to partitions like this:

1,1,2,1,2,2,2,1,1,1...

then this piece of the sequence would result in 5 different files having keys [1,1], [2], [1], [2,2,2] and [1,1,1].

So, instead of fewer but larger files, we had many smaller files. This is a scenario that HDFS does not handle very well and it manifested its displeasure by a DDOS on the Name Node.

What's more, resolving the problem by deleting them also caused the Name Node pain. Using the -skipTrash flag made things a little better.

The solution was to sort the Dataset before saving.

Tuesday, July 4, 2017

Functional Foibles


Functional languages are supposed to protect you from nasties like NullPointerException etc but there are some gotchas.

      a[NoSuchElementException] should be thrownBy {
        List.empty[Int].head
      }

Curiously, Haskell does the same.

$ ghci
GHCi, version 7.10.3: http://www.haskell.org/ghc/  :? for help
Prelude> head [] :: [Int]
*** Exception: Prelude.head: empty list

But this little annoyance occurred in a Spark job (here illustrated with ScalaTest):

      an[UnsupportedOperationException] should be thrownBy {
        List.empty[Int].max
      }

A neat mitigation can be found here and looks like this:

      List.empty[Int].reduceOption(Math.max) shouldBe None

An interesting point was made here on how things should be:
I would suggest that max is not an option. If you have a type (empty[T]), then this fold (I mean max) should return the neutral element regarding max(x,y), that is, the minimal element of the (partial) order of T, if one exists; if it does not, then it makes sense to throw an exception. 
Same as with other monoidal operations, e.g. empty.sum is a zero, not an exception. 
This would be the right mathematical answer.
As a reminder, partial ordering exhibits anti-symmetry, transitivity and reflexivity, that is x ≤ x (contrast this with total ordering which exhibits anti-symmetry, transitivity and totality, that is x ≤ y or y ≤ x).

So, what's being said here is that the minimal element for, say, a Double is Double.MinValue. This is indeed transitive (obviously), reflexive (clearly MinValue  MinValue) and anti-symmetric (if MinValue  x and  MinValue then the only conclusion is x = MinValue).

Compare this to Double.NaN which is not reflexive and therefore a partial ordering does not exist.

So, to be mathematically correct, List.empty[Int].max should return Double.MinValue.

"Head is a different story; here we have a semigroup, not a monoid. There's no neutral element."

And so it is:

      List.empty[Int].sum shouldBe 0




Spark Dataframes and Datasets


Although RDDs are conceptually simple, all new optimizations are coming from DataFrames and Datasets. As I upgrade my software from RDDs, these are the notes I've made.

Parquet

Apache Parquet is a columnar storage format. It can sometimes lead to improved performance, particularly with large data sets. By using a schema and by Parquet storing each column in its own file, queries over 400mb of data can take just one second.

You can covert CSV to Parquet with something like:

spark
  .read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(csvInPath)
  .write.mode(saveMode)
  .parquet(parquetPath)

DataFrames

We can now read this in with something like:

val df = spark.read.parquet(parquetPath)
df.withColumn("cus_id_no", $"cus_id_no".cast("bigint"))
  .filter($"prim_acc_no".isNotNull)
  .na.fill(0) // fill null numeric
  .na.fill("")// fill null string

Here, we're also filtering and providing defaults.

DataSets

DataSets are type safe. They can be derived from DataFrames with something like this:

headerDS = headerDF.withColumn("COLUMN", UDF).withColumn( .... ).as[MyCaseClass]

and manipulated with something like this:

val ds        = df.as[CaseClass]
val groupedBy = ds.groupByKey(_.x)
val joined    = ds.joinWith(groupedBy, ds("cus_id_no") === groupedBy("_1"), "left_outer")

DataFrames are just of type Dataset[Row] - see the type alias in the package object of org.apache.spark.sql:

type DataFrame = Dataset[Row]

Tungsten

"By avoiding the memory and GC overhead of regular Java objects, Tungsten is able to process larger data sets than the same hand-written aggregations" (from here).

Among Tungsten's clever features, it:

1. does not deserialize the whole object (very useful in a joinBy etc)

2. manages objects off-heap (see here).

Catalyst

"At the core of Spark SQL is the Catalyst optimizer, which leverages advanced programming language features (e.g. Scala’s pattern matching and quasiquotes) in a novel way to build an extensible query optimizer." from here.

The bad news

With these APIs, you have to accept a much less rich interface. For instance, groupByKey returns a KeyValueGroupedDataset which has a limited set of functions (for instance, there is no filter and mapping to any type that doesn't have an org.apache.spark.sql.Encoder associated with it leads to a compile-time error of "... Support for serializing other types will be added in future releases")

You can always convert them to RDDs but then you lose all optimization benefits.

The difference

This takes 1.2 hours on Orbis data:

headerDS.rdd.map(x => x.BVDIDnumber -> Seq(x)).reduceByKey(_++_).filter(_._2.size>1).take(10)

whereas this takes 5 mins:

headerDF.groupByKey(_.getString(0)).flatMapGroups { case (key, iter) => val ys = iter.toSeq ; if (ys.size >1) Seq(ys.map(_.mkString(", "))) else Seq.empty }.take(10)

Aside: this idiom reflects a monadic principle.

The filter method is completely described in one simple law:
FIL1. m filter p ≡ m flatMap {x => if(p(x)) unit(x) else mzero}

(from here).

Joins

If we want to optimize a join, we might want to re-partion the DataFrame with something like this so that all joins will take place in one partition.

aDataFrame.repartition(FIELD_NAME).write.format("parquet").mode("overwrite").save(FILE_NAME)

Now, if we want to do a join, it looks a little like this:

aDataFrame.join(other, other("record_id") <=> $"record".getItem("record_id"))

where

other.printSchema
root
 |-- document_type: string (nullable = true)
 |-- record_id: string (nullable = true)
 |-- entity_id: string (nullable = true)
 |-- entitySize: integer (nullable = true)

and

aDataFrame.printSchema
root
 |-- record: struct (nullable = true)
 |    |-- document_type: string (nullable = true)
 |    |-- record_id: string (nullable = true)
.
.


Sunday, June 18, 2017

Entropy


In my physics degree, we were asked to calculate the entropy of a chess board. Students smarter than me snorted at this silly exercise. Yet, entropy is just a statistical measure. You cannot measure it directly (there are no entropy-ometers) but it exists everywhere and can be used in odd places like machine learning (eg maximum entropy classifiers which "from all the models that fit our training data, selects the one which has the largest entropy").

Alternatively, you might want to find the configuration with the smallest entropy. An example is here where the quality of a clustering algorithm (k-means in this case) is assessed by looking at the entropy of the detected clusters. "As an external criteria, entropy uses external information — class labels in this case. Indeed, entropy measures the purity of the clusters with respect to the given class labels. Thus, if every cluster consists of objects with only a single class label, the entropy is 0. However, as the class labels of objects in a cluster become more varied, the entropy value increases."

For instance, say you are trying to find the parameters for an equation such that it best fits the data. "At the very least, you need to provide ... a score for each candidate parameter it tries. This score assignment is commonly called a cost function. The higher the cost, the worse the model parameter will be... The cost function is derived from the principle of maximum entropy." [1]

What is Entropy

I found this description of heads (H) and tails (T) from tossing a coin enlightening:

"If the frequency [f] of H is 0.5 and f(T) is 0.5, the entropy E, in bits per toss, is

-0.5 log2 0.5

for heads, and a similar value for tails. The values add up (in this case) to 1.0. The intuitive meaning of 1.0 (the Shannon entropy) is that a single coin toss conveys 1.0 bit of information.

Contrast this with the situation that prevails when using a "weighted" or unfair penny that lands heads-up 70% of the time. We know intuitively that tossing such a coin will produce less information because we can predict the outcome (heads), to a degree. Something that's predictable is uninformative. Shannon's equation gives

-0.7 log2 (0.7) = 0.3602

for heads and

-0.3 log2  (0.3) = 0.5211

for tails, for an entropy of 0.8813 bits per toss. In this case we can say that a toss is 11.87% [1.0 - 0.8813] redundant."

Here's another example:

X = 'a' with probability 0.5
    'b' with probability 0.25
    'c' with probability 0.125
    'd' with probability 0.125

The entropy of this configuration is:

H(X) = -0.5 log(0.5) - 0.25 log(0.25) - 0.125 log(0.125) - 0.125 log(0.125) = 1.75 bits

What does this actually mean? Well, if the average number of questions asked ("Is X 'a'? If not, is it 'b'? ...") then "the resulting expected number of binary questions required is 1.75" [2].

Derivation of entropy

Basically, we want entropy to be extensive. That is "parameters that scale with the system. In other words U(aS,aV,aN)=aU(S,V,N)".

So, if SX is the entropy of system X, then the combined entropy of two systems, A and B, would be:

SC = SA + SB

Second, we want it to be largest when all the states are equally probably. Let's call the function f then the average value is:

S = <f> = Σipif(pi)               Equation 1

Now, given two sub-systems, A and B, the system they make up C will have entropy:

SC = ΣiΣjpipjf(pi)f(pj)            Equation 2

that is, the we are summing probabilities over the states where A is in state i and B is in state j.

For equation 2 to conform to the form of equation 1, let's introduce the variable pij=pipj.Then:

SC = ΣiΣjpijf(pij)

For this to be true, f = C ln p since ln(ab) = ln(a) + ln(b).

This is the argument found here.

[1] Machine Learning with Tensor Flow.
[2] Elements of Information Theory.

Sunday, June 11, 2017

Scala Equality


The code:

Set(1) == List(1)

will always return false but will also always compile. This is pathological.

"Equality in Scala is a mess ... because the language designers decided Java interoperability trumped doing the reasonable thing in this case." (from here).

There are ways of solving this problem, the simplest being to use === that a number of libraries offer. Here are a three different ways of doing it:

  def scalazTripleEquals(): Unit = {
    import scalaz._
    import Scalaz._
//    println(List(1) === List("1")) // doesn't compile :)
  }

  def scalaUtilsTripleEquals(): Unit = {
    import org.scalautils.TypeCheckedTripleEquals._
//    println(List(1) === List("1")) // doesn't compile :)
  }

  def scalacticTripleEquals(): Unit = {
    import org.scalactic._
    import TypeCheckedTripleEquals._
//    println(List(1) === (List("1"))) // doesn't compile :)
  }

But what about incorporating it into the Scala language itself?

Scala creator, Martin Odersky's views can be found here here. He is proposing "it is opt-in. To get safe checking, developers have to annotate with @equalityClass ... So this means we still keep universal equality as it is in Scala now - we don’t have a choice here anyway, because of backwards compatibility."

Warning: ScalaTest

There is a horrible gotcha using Scalactic and ScalaTest (which is odd since they are stable mates). The problem is that you want compilation to fail for something likes this:

import org.scalatest.{FlatSpec, Matchers}

class MyTripleEqualsFlatSpec extends FlatSpec with Matchers {

  "triple equals" should "not compile" in {
    List(1)  should === (List("1"))
  }

}

Only it doesn't. It happily compiles! This is not what was expected at all given the code in scalacticTripleEquals() above. The solution can be found here. You must change the class signature to:

class MyTripleEqualsFlatSpec extends FlatSpec with Matchers with TypeCheckedTripleEquals {

for the compiler to detect this error.

Saturday, June 10, 2017

Either Mither


Either has changed. A superficial Google might suggest that Either represents just that - either A or B.

"You cannot, at least not directly, use an Either instance like a collection, the way you are familiar with from Option and Try. This is because Either is designed to be unbiased.

"Try is success-biased: it offers you map, flatMap and other methods that all work under the assumption that the Try is a Success, and if that’s not the case, they effectively don’t do anything, returning the Failure as-is." (from here)

And: "if you use Either for error reporting, then it is true that you want it to be biased to one side, but that is only one usecase of many, and hardcoding a single special usecase into a general interface smells of bad design" from here.

But then you see this in the Scala documentation: "Either is right-biased, which means that Right is assumed to be the default case to operate on. If it is Left, operations like map, flatMap, ... return the Left value unchanged".

This apparent contradiction arises as Scala 2.12 changed Either. It has become biased.

Let's demonstrate using ScalaTest. First, we define an Either and some functions to act on it:

    type LeftType   = List[Int]
    type RightType  = Int
    type EitherType = Either[LeftType, RightType]
    val left        = Left(List[Int]())
    val right       = Right(1)

    val rightFn: (RightType) => RightType = _ + 1
    val leftFn:  (LeftType)  => LeftType  = _ :+ 1

Then, the test looks like:

    "Either" should {
      "be agnostic if left or right has been explicitly stated on an Either that's a Left" in {
        val either: EitherType = left
        either.left.map(leftFn) shouldEqual Left(List(1))
        either.right.map(rightFn) shouldEqual left // unchanged
      }
      "be agnostic if left or right has been explicitly stated on an Either that's a Right" in {
        val either: EitherType = right
        either.right.map(rightFn) shouldEqual Right(2)
        either.left.map(leftFn) shouldEqual right // unchanged
      }
.
.

So far, so good. But this following code is new in 2.12 (it won't compile in earlier versions):

      "ignore the unbiased side" in {
        val either: EitherType = left
        either.map(rightFn) shouldEqual left
      }
      "map the biased side" in {
        val either: EitherType = right
        either.map(rightFn) shouldEqual Right(2)
      }

Either's new monadic functions cannot take anything other than a function of type (RightType) => ...

The mnemonic is that if you're using this for error reporting, then Right is the right answer.