Tuesday, July 19, 2016

Independence Day


In Daphne Koller's excellent "Probabilistic Graphical Models", she talks of independence between variables and leaves as an exercise to the reader (viewer) to prove to themselves that of the following Conditional Probability Distribution (CPD), one represents independent variables and the other represents dependent variables.



Being a bit slow, it took me some time to work it out. One must remember that independence is defined as:

For events α, β then P ⊢ α  β (read: "P satisfies that α and β are independent") if:

P(α, β)  = P(α) P(β)
P(α | β) = P(α) 
P(β | α) = P(β)

These 3 equations are equivalent.

Now, back to our two CPDs, we can see that the ratio of d0 to d1 is the same for either i0 or i1 (in SQL-land, imagine a "group by"; it's called marginalization in probability distributions). That is, having been told the value of i, the probable value of d is unchanged. Therefore, the equations above are satisfied and i and d are independent.

The same is not true of the CPD on the right hand side, therefore in this CPD, i and d are not independent.

In other words, if we call left hand side graph of dependencies G1 and the right hand side G2 then I(G1) = { D ⊥ I } and I(G2) = ∅. This introduces the notion of I-Maps defined below.

I-Maps

"Let P be a distribution over X. We define I(P) to be the set of independence assertions of the form (X ⊥ Y | Z) that hold in P . We can now rewrite the statement that “P satisfies the local independencies associated with G” simply as I(G) ⊆ I(P). In this case, we say that G is an I-map (independency map) for P."

We define P as "a distribution P satisfies the local independencies associated with a graph G if and only if P is representable as a set of CPDs associated with the graph G" [1]

[1] Probabilistic Graphical Models: Principles and Techniques, Koller.

Monday, July 18, 2016

The theory that dare not speak its name

From XKCD (https://xkcd.com/1132/)
The author of the excellent XKCD cartoons misrepresents the arguments of Frequentists and Bayesians for the purpose of jocularity but found he'd "kicked a hornet's nest" by doing so.

Bayes Theorem has been controversial for hundreds of years. In fact, books have been written on it (see here for a recent and popular one). One reason is that it depends on subjectivity. But Allen B Downey, author of Think Bayes (freely downloadable from here), thinks this is a good thing. "Science is always based on modeling decisions, and modeling decisions are always subjective.  Bayesian methods make these decisions explicit, and that’s a feature, not a bug. But all hope for objectivity is not lost.  Even if you and I start with different priors, if we see enough data (and agree on how to interpret it) our beliefs will converge.  And if we don’t have enough data to converge, we’ll be able to quantify how much uncertainty remains." [1]

Bayesian Networks

Taking the examples from Probabilistic Graphical Models available at Coursera, imagine this scenario: a student wants a letter of recommendation which depends on their grades, intelligence, SAT scores and course difficulty like so:


The course uses SamIam but I'm going to use Figaro as it's written in Scala. Representing these probability dependencies then looks like this:

class ProbabilisticGraphicalModels {

  implicit val universe = Universe.createNew()

  val Easy          = "Easy"
  val Hard          = "Hard"
  val Dumb          = "Dumb"
  val Smart         = "Smart"
  val A             = "A"
  val B             = "B"
  val C             = "C"
  val GoodSat       = "GoodSat"
  val BadSat        = "BadSat"
  val Letter        = "Letter"
  val NoLetter      = "NoLetter"

  def chancesOfDifficultIs(d: Double): Chain[Boolean, String]
    = Chain(Flip(d), (b: Boolean) => if (b) Constant(Hard) else Constant(Easy))

  def chancesOfSmartIs(d: Double): Chain[Boolean, String]
    = Chain(Flip(d), (b: Boolean) => if (b) Constant(Smart) else Constant(Dumb))

  def gradeDistributionWhen(intelligence: Chain[Boolean, String] = defaultIntelligence,
                            difficulty:   Chain[Boolean, String] = defaultDifficulty): CPD2[String, String, String]
    = CPD(intelligence, difficulty,
    (Dumb, Easy)   -> Select(0.3   -> A,   0.4   -> B,   0.3   -> C),
    (Dumb, Hard)   -> Select(0.05  -> A,   0.25  -> B,   0.7   -> C),
    (Smart, Easy)  -> Select(0.9   -> A,   0.08  -> B,   0.02  -> C),
    (Smart, Hard)  -> Select(0.5   -> A,   0.3   -> B,   0.2   -> C)
  )

  def satDist(intelligence: Chain[Boolean, String] = defaultIntelligence): CPD1[String, String]
    = CPD(intelligence,
    Dumb  -> Select(0.95  -> BadSat,  0.05  -> GoodSat),
    Smart -> Select(0.2   -> BadSat,  0.8   -> GoodSat)
  )

  def letterDist(gradeDist: CPD2[String, String, String] = defaultGradeDist): CPD1[String, String]
    = CPD(gradeDist,
    A -> Select(0.1   -> NoLetter,  0.9   -> Letter),
    B -> Select(0.4   -> NoLetter,  0.6   -> Letter),
    C -> Select(0.99  -> NoLetter,  0.01  -> Letter)
  )
.
.

And we can query it like so:

.
.
  def probabilityOf[T](target: Element[T], fn: (T) => Boolean): Double = {
    val ve = VariableElimination(target)
    ve.start()
    ve.probability(target, fn)
  }
.
.

Finally, let's add some sugar so I can use it in ScalaTests:

.
.
  val defaultDifficulty   = chancesOfDifficultIs(0.6)
  val easier              = chancesOfDifficultIs(0.5)
  val defaultIntelligence = chancesOfSmartIs(0.7)
  val defaultGradeDist    = gradeDistributionWhen(intelligence = defaultIntelligence, difficulty = defaultDifficulty)
  val defaultLetterDist   = letterDist(defaultGradeDist)
  val defaultSatDist      = satDist(defaultIntelligence)

  def being(x: String): (String) => Boolean = _ == x

  def whenGettingAnABecomesHarder(): Unit = defaultGradeDist.addConstraint(x => if (x == A) 0.1 else 0.9)

  def whenTheCourseIsMoreLikelyToBeHard(): Unit = defaultDifficulty.addConstraint(x => if (x == Hard) 0.99 else 0.01)

  def whenLetterBecomesLessLikely(): Unit = defaultLetterDist.addConstraint(x => if (x == Letter) 0.1 else 0.9)

  def whenTheSatIsKnownToBeGood(): Unit = defaultSatDist.observe(GoodSat)

}

Flow of Probabilistic Influence

If we wanted to know the chances of receiving a letter of recommendation, we'd just have to run probabilityOf(letterDist, being(Letter))  (which equals about  0.603656). The interesting thing is what happens if we observe some facts - does this change the outcome?

The answer is: it depends.

If we observe the SAT score for an individual, then their probability of receiving the letter changes. For example, executing satDist.observe(GoodSat) means that the chancesOf method now returns a probability of about 0.712 (and similarly a BadSat reduces the probability to about 0.457).

The general idea is given in this screen grab:

From Daphne Koller's Probabilistic Graphical Models


The X-> Y and X <- Y are easy. "In general, probabilistic influence is symmetrical. That is, if X can influence Y, Y can influence X." [2]

Probabilities can flow through nodes in this Bayesian network so X -> W -> Y and X <- W <- Y are not terribly surprising if we know nothing about the intermediate step (the column on the left hand side). Conversely, if we do know something about it, that's all we need and it doesn't matter what the first step does.

The V-Structure (X <- W -> Y) is more interesting. Here, we can use ScalaTest to demonstrate what's going on. The first case is when we have no observed evidence. Here, Koller tells us

"If I tell you that a student took a class and the class is difficult, does that tell you anything about the student's intelligence? And the answer is 'no'."
And sure enough, Figaro agrees:

  "probabilities" should {
    "flow X -> W <- Y ('V-structure')" in new ProbabilisticGraphicalModels {
      val smartBefore = probabilityOf(defaultIntelligence, being(Smart))
      whenTheCourseIsMoreLikelyToBeHard()
      val smartAfter  = probabilityOf(defaultIntelligence, being(Smart))
      smartBefore shouldEqual smartAfter
    }

with the caveat that this is true if and only if
"... W and all of its descendants are not observed."
if it is observed, then the following is true:

    "flow X -> W <- Y ('V-structure')" in new ProbabilisticGraphicalModels {
      // Can difficulty influence intelligence via letter?
      val smartBefore = probabilityOf(defaultIntelligence, being(Smart))
      whenLetterBecomesLessLikely() // "this too activates the V-structure"
      val smartAfter  = probabilityOf(defaultIntelligence, being(Smart))
      smartBefore should be > smartAfter
    }

As mentioned, if we do know something about the intermediate step, the first step can't influence the outcome. Take these examples:
"I know the student got an A in the class, now I'm telling you that the class is really hard.does that change the probability of the distribution of the letter? No because ... the letter only depends on the grade."
  "Given evidence about Z" should {
    
    "make no difference in (X -> W -> Y)" in new ProbabilisticGraphicalModels {
      defaultGradeDist.observe(A)
      val letterChanceBefore  = probabilityOf(defaultLetterDist, being(Letter))
      whenTheCourseIsMoreLikelyToBeHard()
      val letterChanceAfter  = probabilityOf(defaultLetterDist, being(Letter))
      letterChanceBefore shouldEqual letterChanceAfter
    }
"If I tell you that the student is intelligent, then there is no way the SAT can influence the probability influence in grade."
    "make no difference in X <- W -> Y" in new ProbabilisticGraphicalModels {
      defaultIntelligence.observe(Smart)
      val chancesOfABefore = probabilityOf(defaultGradeDist, being(A))
      whenTheSatIsKnownToBeGood()
      val chancesOfAAfter = probabilityOf(defaultGradeDist, being(A))
      chancesOfAAfter shouldEqual chancesOfABefore
    }

More notes to come.

[1] Prof Allen B Downey's blog
[2] Daphne Koller, Coursera.

Friday, July 1, 2016

Cosine Similarities on a AWS cluster


De-duping data is perennial problem that must be solved before you can do anything interesting with big data. In our case, we have about a terabyte of data that represents all the companies in the world from various sources. There is obviously duplication between sources but also within sources as their data is dirty (aside: I know one startup that's buying data for their domain, de-duping it and selling it back to the vendor thus cutting costs. There's money in them there dupes).

The algorithms used in this are TF-IDF and Cosine similarity. TF-IDF is simply the ratio of the frequency of a term in the entire corpus of text to the frequency of that term in a single document that makes up that corpus. Cosine similarity is a little harder but not beyond anybody with a secondary education in maths. However, other people have explained it better than me (see here) so I won't dwell on it.

First, we did a Proof of Concept on an AWS cluster. This was just randomly generated data on a Spark cluster on five c4.8xlarge boxes. The random data was a 100 million x 100 million sparse matrix (by sparse, I mean one data point every hundredth element). There were a few hiccups (see here and here for performance tuning hints) but we eventually got it to work. Processing the Cosine similarity took a mere 28 minutes. Cool.

But like any PoC, it had the wind knocked out of its sails when it met the real world. The first problem was the dictionary for the TF-IDF was too large to be broadcast to all the nodes since there is a built in memory limit to any single object that is sent across the wire between nodes (see here and here). So, we needed somewhere we could store the dictionary and chose HBase.

The next performance problem that my PoC  did not prepare me for was how long it took to load (and parse) a terabyte of data that happened to be in XML format on HDFS. I hit two problems here and both were discovered by judicious use of jstack running on the executor nodes. Something like:

while ( true ) ; do { jstack `jps | grep CoarseGrainedExecutorBackend | awk '{print \$1}'` | grep -A35 ^\\\"Execut ; uptime ; sleep 20 ; } done

 (I recommend this approach no matter which Big Data framework you are using).

The first problem was lock contention coming from Xerces. In Scala, parse your XML with something like:

scala.xml.XML.loadXML(xmlString, saxParser)

where is saxParser is cached. This will save a huge amount of contention.

The second problem was thread contention in Spark's use of Kryo. It was instantiating a serializer on each deserialization it seemed (see here) resulting in shuffles taking about 1.2 hours (max. 5.1 hours). When I used plain old Java serialization, the average was about 3.5 minutes (max. 8 minutes). This made Java serialization faster than Kryo! Upgrading from Spark 1.3.1 to 1.6.1 is expected to fix this.

In both cases, the load average went up from about 1.0 to 20.0 after  the fix was implemented. This is a good thing (these are 32-core machines). If the beefy boxes in your cluster have a load average of about 1.0 then you're clearly not using all their processing ability.

The Cosine similarities result in pairs of IDs. Running Spark's GraphX over these pairs finds a graph of entities that are similar. However, there is a problem here that manifested itself by one partition taking what felt like forever to finish when all the other partitions had finished (resulting in the job timing out). Here, jstack wasn't so useful. I could see all the time was taken in Scala's Seq.toSet function that lead me on a wild goose chase wondering why Scala's collection library was so damned slow. In fact, it was the data that was causing a problem.

All my tests looked pretty good but when faced with huge amounts of realistic data, this strange phenomena (typically experienced in Single-linkage Clustering as we're doing here) was seen. Imagine we are making chains of similar words and putting matching clusters together. Imagine this chain:

cat -> mat -> mate -> make -> Mike

(where -> means "is similar to") then they're all grouped together and cat is similar to Mike - clearly nonsense. My code was then doing a 2nd round check (nothing to do with Cosine similarities this time) and comparing each element with every other. With X elements this lead to X(X-1) comparisons. In my tests, this was fine but in the real world, X was about 10 000 and so there were millions of comparisons.

For the time being, I've increased the threshold of similarity to 0.9 (out of a maximum of 1.0) and I don't see this phenomena any more. However, I have to think of a more robust solution for the long term, such as Louvain Modularity.

[Aside - final note on Cosine similarities: Cosine Similarity is not a distance metric (as pointed out on this StackOverflow answer)]