Tuesday, July 19, 2016

Independence Day


In Daphne Koller's excellent "Probabilistic Graphical Models", she talks of independence between variables and leaves as an exercise to the reader (viewer) to prove to themselves that of the following Conditional Probability Distribution (CPD), one represents independent variables and the other represents dependent variables.



Being a bit slow, it took me some time to work it out. One must remember that independence is defined as:

For events α, β then P ⊢ α  β (read: "P satisfies that α and β are independent") if:

P(α, β)  = P(α) P(β)
P(α | β) = P(α) 
P(β | α) = P(β)

These 3 equations are equivalent.

Now, back to our two CPDs, we can see that the ratio of d0 to d1 is the same for either i0 or i1 (in SQL-land, imagine a "group by"; it's called marginalization in probability distributions). That is, having been told the value of i, the probable value of d is unchanged. Therefore, the equations above are satisfied and i and d are independent.

The same is not true of the CPD on the right hand side, therefore in this CPD, i and d are not independent.

In other words, if we call left hand side graph of dependencies G1 and the right hand side G2 then I(G1) = { D ⊥ I } and I(G2) = ∅. This introduces the notion of I-Maps defined below.

I-Maps

"Let P be a distribution over X. We define I(P) to be the set of independence assertions of the form (X ⊥ Y | Z) that hold in P . We can now rewrite the statement that “P satisfies the local independencies associated with G” simply as I(G) ⊆ I(P). In this case, we say that G is an I-map (independency map) for P."

We define P as "a distribution P satisfies the local independencies associated with a graph G if and only if P is representable as a set of CPDs associated with the graph G" [1]

[1] Probabilistic Graphical Models: Principles and Techniques, Koller.

Monday, July 18, 2016

The theory that dare not speak its name

From XKCD (https://xkcd.com/1132/)
The author of the excellent XKCD cartoons misrepresents the arguments of Frequentists and Bayesians for the purpose of jocularity but found he'd "kicked a hornet's nest" by doing so.

Bayes Theorem has been controversial for hundreds of years. In fact, books have been written on it (see here for a recent and popular one). One reason is that it depends on subjectivity. But Allen B Downey, author of Think Bayes (freely downloadable from here), thinks this is a good thing. "Science is always based on modeling decisions, and modeling decisions are always subjective.  Bayesian methods make these decisions explicit, and that’s a feature, not a bug. But all hope for objectivity is not lost.  Even if you and I start with different priors, if we see enough data (and agree on how to interpret it) our beliefs will converge.  And if we don’t have enough data to converge, we’ll be able to quantify how much uncertainty remains." [1]

Bayesian Networks

Taking the examples from Probabilistic Graphical Models available at Coursera, imagine this scenario: a student wants a letter of recommendation which depends on their grades, intelligence, SAT scores and course difficulty like so:


The course uses SamIam but I'm going to use Figaro as it's written in Scala. Representing these probability dependencies then looks like this:

class ProbabilisticGraphicalModels {

  implicit val universe = Universe.createNew()

  val Easy          = "Easy"
  val Hard          = "Hard"
  val Dumb          = "Dumb"
  val Smart         = "Smart"
  val A             = "A"
  val B             = "B"
  val C             = "C"
  val GoodSat       = "GoodSat"
  val BadSat        = "BadSat"
  val Letter        = "Letter"
  val NoLetter      = "NoLetter"

  def chancesOfDifficultIs(d: Double): Chain[Boolean, String]
    = Chain(Flip(d), (b: Boolean) => if (b) Constant(Hard) else Constant(Easy))

  def chancesOfSmartIs(d: Double): Chain[Boolean, String]
    = Chain(Flip(d), (b: Boolean) => if (b) Constant(Smart) else Constant(Dumb))

  def gradeDistributionWhen(intelligence: Chain[Boolean, String] = defaultIntelligence,
                            difficulty:   Chain[Boolean, String] = defaultDifficulty): CPD2[String, String, String]
    = CPD(intelligence, difficulty,
    (Dumb, Easy)   -> Select(0.3   -> A,   0.4   -> B,   0.3   -> C),
    (Dumb, Hard)   -> Select(0.05  -> A,   0.25  -> B,   0.7   -> C),
    (Smart, Easy)  -> Select(0.9   -> A,   0.08  -> B,   0.02  -> C),
    (Smart, Hard)  -> Select(0.5   -> A,   0.3   -> B,   0.2   -> C)
  )

  def satDist(intelligence: Chain[Boolean, String] = defaultIntelligence): CPD1[String, String]
    = CPD(intelligence,
    Dumb  -> Select(0.95  -> BadSat,  0.05  -> GoodSat),
    Smart -> Select(0.2   -> BadSat,  0.8   -> GoodSat)
  )

  def letterDist(gradeDist: CPD2[String, String, String] = defaultGradeDist): CPD1[String, String]
    = CPD(gradeDist,
    A -> Select(0.1   -> NoLetter,  0.9   -> Letter),
    B -> Select(0.4   -> NoLetter,  0.6   -> Letter),
    C -> Select(0.99  -> NoLetter,  0.01  -> Letter)
  )
.
.

And we can query it like so:

.
.
  def probabilityOf[T](target: Element[T], fn: (T) => Boolean): Double = {
    val ve = VariableElimination(target)
    ve.start()
    ve.probability(target, fn)
  }
.
.

Finally, let's add some sugar so I can use it in ScalaTests:

.
.
  val defaultDifficulty   = chancesOfDifficultIs(0.6)
  val easier              = chancesOfDifficultIs(0.5)
  val defaultIntelligence = chancesOfSmartIs(0.7)
  val defaultGradeDist    = gradeDistributionWhen(intelligence = defaultIntelligence, difficulty = defaultDifficulty)
  val defaultLetterDist   = letterDist(defaultGradeDist)
  val defaultSatDist      = satDist(defaultIntelligence)

  def being(x: String): (String) => Boolean = _ == x

  def whenGettingAnABecomesHarder(): Unit = defaultGradeDist.addConstraint(x => if (x == A) 0.1 else 0.9)

  def whenTheCourseIsMoreLikelyToBeHard(): Unit = defaultDifficulty.addConstraint(x => if (x == Hard) 0.99 else 0.01)

  def whenLetterBecomesLessLikely(): Unit = defaultLetterDist.addConstraint(x => if (x == Letter) 0.1 else 0.9)

  def whenTheSatIsKnownToBeGood(): Unit = defaultSatDist.observe(GoodSat)

}

Flow of Probabilistic Influence

If we wanted to know the chances of receiving a letter of recommendation, we'd just have to run probabilityOf(letterDist, being(Letter))  (which equals about  0.603656). The interesting thing is what happens if we observe some facts - does this change the outcome?

The answer is: it depends.

If we observe the SAT score for an individual, then their probability of receiving the letter changes. For example, executing satDist.observe(GoodSat) means that the chancesOf method now returns a probability of about 0.712 (and similarly a BadSat reduces the probability to about 0.457).

The general idea is given in this screen grab:

From Daphne Koller's Probabilistic Graphical Models


The X-> Y and X <- Y are easy. "In general, probabilistic influence is symmetrical. That is, if X can influence Y, Y can influence X." [2]

Probabilities can flow through nodes in this Bayesian network so X -> W -> Y and X <- W <- Y are not terribly surprising if we know nothing about the intermediate step (the column on the left hand side). Conversely, if we do know something about it, that's all we need and it doesn't matter what the first step does.

The V-Structure (X <- W -> Y) is more interesting. Here, we can use ScalaTest to demonstrate what's going on. The first case is when we have no observed evidence. Here, Koller tells us

"If I tell you that a student took a class and the class is difficult, does that tell you anything about the student's intelligence? And the answer is 'no'."
And sure enough, Figaro agrees:

  "probabilities" should {
    "flow X -> W <- Y ('V-structure')" in new ProbabilisticGraphicalModels {
      val smartBefore = probabilityOf(defaultIntelligence, being(Smart))
      whenTheCourseIsMoreLikelyToBeHard()
      val smartAfter  = probabilityOf(defaultIntelligence, being(Smart))
      smartBefore shouldEqual smartAfter
    }

with the caveat that this is true if and only if
"... W and all of its descendants are not observed."
if it is observed, then the following is true:

    "flow X -> W <- Y ('V-structure')" in new ProbabilisticGraphicalModels {
      // Can difficulty influence intelligence via letter?
      val smartBefore = probabilityOf(defaultIntelligence, being(Smart))
      whenLetterBecomesLessLikely() // "this too activates the V-structure"
      val smartAfter  = probabilityOf(defaultIntelligence, being(Smart))
      smartBefore should be > smartAfter
    }

As mentioned, if we do know something about the intermediate step, the first step can't influence the outcome. Take these examples:
"I know the student got an A in the class, now I'm telling you that the class is really hard.does that change the probability of the distribution of the letter? No because ... the letter only depends on the grade."
  "Given evidence about Z" should {
    
    "make no difference in (X -> W -> Y)" in new ProbabilisticGraphicalModels {
      defaultGradeDist.observe(A)
      val letterChanceBefore  = probabilityOf(defaultLetterDist, being(Letter))
      whenTheCourseIsMoreLikelyToBeHard()
      val letterChanceAfter  = probabilityOf(defaultLetterDist, being(Letter))
      letterChanceBefore shouldEqual letterChanceAfter
    }
"If I tell you that the student is intelligent, then there is no way the SAT can influence the probability influence in grade."
    "make no difference in X <- W -> Y" in new ProbabilisticGraphicalModels {
      defaultIntelligence.observe(Smart)
      val chancesOfABefore = probabilityOf(defaultGradeDist, being(A))
      whenTheSatIsKnownToBeGood()
      val chancesOfAAfter = probabilityOf(defaultGradeDist, being(A))
      chancesOfAAfter shouldEqual chancesOfABefore
    }

More notes to come.

[1] Prof Allen B Downey's blog
[2] Daphne Koller, Coursera.

Friday, July 1, 2016

Cosine Similarities on a AWS cluster


De-duping data is perennial problem that must be solved before you can do anything interesting with big data. In our case, we have about a terabyte of data that represents all the companies in the world from various sources. There is obviously duplication between sources but also within sources as their data is dirty (aside: I know one startup that's buying data for their domain, de-duping it and selling it back to the vendor thus cutting costs. There's money in them there dupes).

The algorithms used in this are TF-IDF and Cosine similarity. TF-IDF is simply the ratio of the frequency of a term in the entire corpus of text to the frequency of that term in a single document that makes up that corpus. Cosine similarity is a little harder but not beyond anybody with a secondary education in maths. However, other people have explained it better than me (see here) so I won't dwell on it.

First, we did a Proof of Concept on an AWS cluster. This was just randomly generated data on a Spark cluster on five c4.8xlarge boxes. The random data was a 100 million x 100 million sparse matrix (by sparse, I mean one data point every hundredth element). There were a few hiccups (see here and here for performance tuning hints) but we eventually got it to work. Processing the Cosine similarity took a mere 28 minutes. Cool.

But like any PoC, it had the wind knocked out of its sails when it met the real world. The first problem was the dictionary for the TF-IDF was too large to be broadcast to all the nodes since there is a built in memory limit to any single object that is sent across the wire between nodes (see here and here). So, we needed somewhere we could store the dictionary and chose HBase.

The next performance problem that my PoC  did not prepare me for was how long it took to load (and parse) a terabyte of data that happened to be in XML format on HDFS. I hit two problems here and both were discovered by judicious use of jstack running on the executor nodes. Something like:

while ( true ) ; do { jstack `jps | grep CoarseGrainedExecutorBackend | awk '{print \$1}'` | grep -A35 ^\\\"Execut ; uptime ; sleep 20 ; } done

 (I recommend this approach no matter which Big Data framework you are using).

The first problem was lock contention coming from Xerces. In Scala, parse your XML with something like:

scala.xml.XML.loadXML(xmlString, saxParser)

where is saxParser is cached. This will save a huge amount of contention.

The second problem was thread contention in Spark's use of Kryo. It was instantiating a serializer on each deserialization it seemed (see here) resulting in shuffles taking about 1.2 hours (max. 5.1 hours). When I used plain old Java serialization, the average was about 3.5 minutes (max. 8 minutes). This made Java serialization faster than Kryo! Upgrading from Spark 1.3.1 to 1.6.1 is expected to fix this.

In both cases, the load average went up from about 1.0 to 20.0 after  the fix was implemented. This is a good thing (these are 32-core machines). If the beefy boxes in your cluster have a load average of about 1.0 then you're clearly not using all their processing ability.

The Cosine similarities result in pairs of IDs. Running Spark's GraphX over these pairs finds a graph of entities that are similar. However, there is a problem here that manifested itself by one partition taking what felt like forever to finish when all the other partitions had finished (resulting in the job timing out). Here, jstack wasn't so useful. I could see all the time was taken in Scala's Seq.toSet function that lead me on a wild goose chase wondering why Scala's collection library was so damned slow. In fact, it was the data that was causing a problem.

All my tests looked pretty good but when faced with huge amounts of realistic data, this strange phenomena (typically experienced in Single-linkage Clustering as we're doing here) was seen. Imagine we are making chains of similar words and putting matching clusters together. Imagine this chain:

cat -> mat -> mate -> make -> Mike

(where -> means "is similar to") then they're all grouped together and cat is similar to Mike - clearly nonsense. My code was then doing a 2nd round check (nothing to do with Cosine similarities this time) and comparing each element with every other. With X elements this lead to X(X-1) comparisons. In my tests, this was fine but in the real world, X was about 10 000 and so there were millions of comparisons.

For the time being, I've increased the threshold of similarity to 0.9 (out of a maximum of 1.0) and I don't see this phenomena any more. However, I have to think of a more robust solution for the long term, such as Louvain Modularity.

[Aside - final note on Cosine similarities: Cosine Similarity is not a distance metric (as pointed out on this StackOverflow answer)]

Friday, June 17, 2016

Spark, HBase and security


Getting Spark and HBase to talk to each other is clearly non-trivial if Jiras are anything to go by (see here and here).

My situation is this: I'm building TF-IDF statistics from a corpus of text (almost a terabyte of data). The dictionary is far too large to pass around as a broadcast variable that is typically done in most examples (eg, chapter 6 of Advanced Analytics with Spark). So, an external data store is needed. Simply because of various constraints, it has to be HBase version 0.98.18 and Spark 1.3.1.

The data is highly confidential and must be protected by Kerberos. The system is set up by simply using su to start a new shell thus giving a fresh ticket. The application runs under YARN and the Driver connects to HBase and initialises the tables before outsourcing the work to the Executors. And it's there were we see "Failed to find any Kerberos tgt" [Ticket Giving Ticket] when the Executors try to access these tables.

What is happening looks like this:

From "Hadoop: The Definitive Guide" by Tom White
The exact stack trace I was seeing is described at StackOverflow here but unfortunately their solution didn't work for me.

After a lot of head-scratching, I looked at the code in the new(ish) hbase-spark codebase. After some judicious thievery from the HBaseContext code, I wrote something like:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{Job, TableMapReduceUtil}
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.spark.{SparkContext, SerializableWritable}
import org.apache.spark.broadcast.Broadcast

object HBaseKerberosSecurity {

  def callThisFromTheDriverCode(config: Configuration, sc: SparkContext): Broadcast[SerializableWritable[Credentials]] = {
    val job = Job.getInstance(config)
    TableMapReduceUtil.initCredentials(job)
    sc.broadcast(new SerializableWritable(job.getCredentials))
  }

  def callThisFromYourExecutorCode(credentialConf: Broadcast[SerializableWritable[Credentials]]): Unit = {
    val ugi = UserGroupInformation.getCurrentUser
    val credentials = ugi.getCredentials // see YarnSparkHadoopUtils
    if (credentials != null) {
      ugi.addCredentials(credentials)
      ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
      ugi.addCredentials(credentialConf.value.value)
  }

}

and success! The executors too were allowed to read and write from HBase.


Thursday, May 26, 2016

From the Gradle to the Grave


After an annoying week in Classpath Hell, here are some things I've learned.

It all started with an upgrade to Elastic Search that was forced upon us (1.6 to 2.3.1). This caused all sorts of issues and a good deal of them to do with Guava. The Guava team have a very aggressive policy to deprecation: "Deprecated non-beta APIs will be removed two years after the release in which they are first deprecated. You must fix your references before this time. If you don't, any manner of breakage could result (you are not guaranteed a compilation error)." Compare that to the JDK which has never deleted any deprecated code.

Well, this is fine. I don't directly use Guava. Unfortunately, both ES and HBase do. Only my HBase (0.98.18-hadoop2) pulls in Guava version 12.0.1 as a transitive dependency and my Elastic Search (2.3.1) pulls in Guava 18.0. This leads to:

java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor
 at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:190)
.
.

when clearly the method existed in my IDE.

The solution was to use shading. Shading not only relocates the contentious classes but updates all the call sites in the artefact. In Maven you use Shade plugin. Unfortunately, I'm using Gradle (which annoys me because I can't control the order of dependencies like I can in Maven). In Gradle you use Gradle Shadow where your build.gradle looks something like:

  apply plugin: 'com.github.johnrengelman.shadow'
.
.
  shadowJar {
    // this moves the classes and the reference to the classes
    relocate 'com.google.common', 'com.example.com.google.common'

    // where this is to address another shadowing problem (see here)
    zip64 true
    transform(ServiceFileTransformer) {
        path = 'META-INF/spring*'
    }
  }
.
.

NOTE! You must delete the uber jar before running this. I was confused for an hour or more wondering why it appeared to not be working. When I cleaned the project, the problem went away.

Checking that the classes had been renamed in the artefact was simple, checking the call sites a little harder. To make sure the ElasticSearch call site has been changed, I copy the offending class out of the JAR and run:

$ javap -p org/elasticsearch/threadpool/ThreadPool.class | grep google
private volatile com.example.com.google.common.collect.ImmutableMap<java.lang.String, org.elasticsearch.threadpool.ThreadPool$ExecutorHolder> executors;
private final com.example.com.google.common.collect.ImmutableMap<java.lang.String, org.elasticsearch.common.settings.Settings> defaultExecutorTypeSettings;

Yup, looks good.

All of this has wasted me a few days that could have been better spent analysing Big Data problems. Which made me think that the Paul Phillips talk ("We're doing it all wrong") that I watched at the weekend is only a small part of the story we face as developers. It's lovely to have a language that allows code to be "obviously without defects rather than without obvious defects" but most of my unproductive time seems to be handing builds and classpaths.


Saturday, May 21, 2016

Lazy Spark


You may want Spark to write to some data store (after all, there are some things that other technologies do better). Given an RDD distributed over many partitions on many nodes, how do you write the data efficiently?

You may choose to do something like this (where I've used println statements rather than explicit calls to the technologies of your choice):

    rdd foreachPartition { iterator =>
      println("create connection")
      iterator foreach { element =>
        println("insert")
      }
      println("close connection")
    }

Naturally, you need to get a connection inside the function (as the function will be serialized and run on other partitions that might be on other machines). 

Note that if the connection is referenced with a lazy val then each function will have its own connection (even on the same machine) and it will only be instantiated when it's run on its partition.

So, how do we know when to close the connection? A common answer is to just use connection pools.

Also note that the return type of foreachPartition is Unit so it's not too surprising that this is executed immediately since Unit hints at side effects. A quick look at the code of RDD confirms this.

Great. But what if you want to read data from some store and enhance the RDD? Now we're using mapPartitions that may very well be lazy. So, with similar code, the function might look like this:

    val mapped = rdd mapPartitions { iterator =>
      lazy val connection = getConnection()

      val mapped = iterator map { element =>
        if (connection.isOpen) select(element)
      }

      connection.close()
      mapped
    }

where I'm pretending to open and close some sort of pseudo-connection thus:

  val nSelects        = new AtomicInteger(0)
  val nConnections    = new AtomicInteger(0)

  class Connection {
    @volatile private var open = true
    def close(): Unit = { open = false }
    def isOpen(): Boolean = open
  }

  def getConnection(): Connection = {
    nConnections.incrementAndGet()
    new Connection
  }

  def select(element: Int): Int = nSelects.incrementAndGet()
  
Now, let's run RDD.count() so it should force even the lazy map to do something:

    println("Finished. Count  = " + mapped.count())
    println("nSelects         = " + nSelects.get() + ", nConnections = " + nConnections.get())

giving:

Finished. Count  = 10000
nSelects         = 0, nConnections = 0

What's this? We get the correct number of elements but no selects? What gives?

The issue is that the iterator given to the function passed to mapPartitions is lazy. This is nothing to do with Spark but is due to Scala's lazy collections (this iterator is actually a class of scala.collection.Iterator$$anon$11). If we ran this code with a real connection, we'd see that the it had closed by the time inner function wanted to do something with it. Odd, we might say since the call to close it comes later.

We could just force the mapping to run by calling size() on the resulting collection (which may or may not be efficient) but one solution suggested to me was:

    val batchSize = 100
    rdd mapPartitions { iterator =>
      val batched = iterator.grouped(batchSize)
      batched.flatMap { grouped =>
        val connection = getConnection()
        val seqMapped  = grouped.map(insert(_))
        connection.close()
        seqMapped
      }
    }

This creates (and closes) a connection for batchSize each of elements but this could be more efficient than the usual contention in a connection pool. Remember, small contention can make a big difference when dealing with hundreds of millions of elements.


Scala Curry


"All functions are curried in Haskell by default" that is, all functions in Haskell take only one argument. How does it do this? By currying.

Scala has currying built-in. To demonstrate, take this method that takes tow Ints and returns an Int:

  def aMethodWith2Args(x: Int, y: Int): Int = x + y

Let's lift it:

  val lifted: (Int, Int) => Int = aMethodWith2Args(_, _)

and curry it:

  val curried: Int => Int => Int = lifted.curried
   
So, we too now have a function that only takes one argument. It just returns another function that also takes only one argument. That's currying.

Now we can partially apply it

  val partialInt => Int = curried(1)
  println(partial(2)) // prints out '3'

Just like Haskell.