Sunday, March 26, 2017

You have nothing to lose but your (Markov) chains!

I've been playing around with Markov Chain Monte Carlo so decided to take a Coursera course that deals with them (Statistical Mechanics allows you plot configurations of objects that are as probable as if you took a snapshot of a classical Newtonian system).

"Starting from a given initialdistribution, a Markov chain defines a sequence of probability distributions over states of the system. For any Markov chain satisfying certain mathematical conditions, the distribution at successive states converges to a single distribution. This single distribution is called the stationary distribution... Much of the art of designing an MH algorithm is in choosing the proposal distribution." PPP (p343).

"The Metropolis algorithm takes a random walk in a probability space."

"The only difference between Metropolis and Metropolis-Hastings is that the first ones always sample from a symmetric distribution and thus they don't have the Hastings correction ratio." - see here

What is a Markov chain?

"If we think of Xt as a state X at time t and invoke the following dependency condition on each state:

Pr(Xt+1 = xt+1 | Xt = xt, Xt-1 = xt-1, ..., X0 = x0) = Pr(Xt+1 = xt+1 | Xt = xt)

then the stochastic process is known as a Markov chain" (from here)

What is a Monte Carlo simulation?

"The Monte Carlo method is a statistical - almost experimental - approach to computing integrals using random positions, called samples, whose distribution is carefully chosen." [Krauth]

In the two Coursera examples, pi is calculated by:

  1. throwing stones into a square and observing the ratio that are inside and outside a circle that just fits in the square (direct sampling).
  2. wandering around the space throwing stones and observing how high the pile is for each grid reference (Markov chain sampling). Note, there are interesting restrictions when a stone is tossed over the boundary of the space.

Note that both approaches approximate an integral in multiple dimensions, namely:

∫ 1-11-1 p(x, y) O(x, y) dx dy

    ∫ 1-1∫ 1-1 p(x, y) dx dy

where p is the probability of being at point (x, y) and O is the observed size of the pile of stones at (x, y).

Detailed balance condition

This is a sufficient condition for Monte Carlo. The argument goes like this:

Imagine moving around a 3x3 grid exploring all the squares. If we are on the square in the top right corner (call it 'a') and we can only move horizontally and vertically then the probabilities for the next move are obviously:

p(a → a) + p(a → b) + p(a → c) = 1                (1)

(where 'b' and 'c' are the square immediately to the left and immediately below 'a')

Similarly, I can only get to 'a' via 'a', 'b' and 'c' so:

πa . 1 = πb p(b → a) + πc p(c → a) + πa p(a → a)   (2)

substitute equation 1 into 2:

πa p(a → b) + πa p(a → c) = πb p(b → a) + πc p(c → a)

There are many ways to satisfy the global balance condition but one way is to say the first term on the left hand side equals the first in the right hand side and the second term on the left hand side equals the second term on the right hand side.

This is the detailed balance condition:

πa p(a → b) = πb p(b → a)
πa p(a → c) = πc p(c → a)

where πx is the probability of being on square x.

If we want to sweep out all the configurations equally, we want:

πa = πb = πc

(This is not true in the Metropolis algorithm. Metropolis et al proposed another function for πx)

This implies p(a → b) =  p(b → a).

What's interesting is that "the Monte Carlo algorithm is nothing but a matrix, the transfer matrix of transition probabilities p(a→b) for moving from the site a to the site b." In this 3x3 game, the matrix is 9x9 (the 9 squares from which we move to 9 squares to which we can move).

The transfer matrix

In Python, it looks like this:

from numpy import dot, matrix, set_printoptions
from numpy.linalg import eig

m = matrix('2  1  0  1  0  0  0  0  0;'
           '1  1  1  0  1  0  0  0  0;'
           '0  1  2  0  0  1  0  0  0;'
           '1  0  0  1  1  0  1  0  0;'
           '0  1  0  1  0  1  0  1  0;'
           '0  0  1  0  1  1  0  0  1;'
           '0  0  0  1  0  0  2  1  0;'
           '0  0  0  0  1  0  1  1  1;'
           '0  0  0  0  0  1  0  1  2') / 4.0

This conforms to a mathematics 'design pattern'. Chase the problem onto known territory and deal with it there with the tools designed by other people.

Where it gets cool is when we take the eigen-decomposition (a.k.a spectral decomposition) of this matrix. This says that for a square matrix, A, we can express it as:

A = Q  Λ Q-1

where the columns of Q are the eigenvectors and Λ is a diagonal matrix whose values are the eigenvalues (a.k.a the spectrum of A).

In Python, we can find Q and Λ by doing this:

eigenvalues, eigenvectors = eig(m)

And why is this interesting? Because if we keep multiplying the transfer matrix with the vector representing the probabilities of which square we're currently in, it converges to the stationary distribution.

How do we prove this? Well, let's say we multiply A by itself x times:

Ax = (Q  Λ Q-1)x =  Q  Λ Q-1 Q  Λ Q-1 Q  Λ Q-1...  Q  Λ Q-1 Q  Λx Q-1

so, we only have to times the eigenvalues by themselves x times. That makes the calculations more efficient.

What are the eigenvalues?

print eigenvalues

[-0.5   0.    1.    0.75  0.5   0.25 -0.    0.25  0.75]

Note that all but one value is less than 1.0 so if we keep multiplying a vector of these values by itself, all the values other than the third will disappear. 

Now, let's look at the eigenvectors (Q):

print eigenvectors

[[ 0.17 -0.41 -0.33  0.58  0.5   0.33  0.22  0.04  0.11]
 [-0.33  0.41 -0.33  0.29  0.   -0.17  0.12 -0.52 -0.23]
 [ 0.17  0.   -0.33 -0.   -0.5   0.33 -0.34  0.04 -0.57]
 [-0.33  0.41 -0.33  0.29 -0.   -0.17 -0.57  0.48  0.34]
 [ 0.67 -0.   -0.33  0.   -0.   -0.67  0.   -0.08 -0.  ]
 [-0.33 -0.41 -0.33 -0.29  0.   -0.17  0.57  0.48 -0.34]
 [ 0.17  0.   -0.33 -0.   -0.5   0.33  0.34  0.04  0.57]
 [-0.33 -0.41 -0.33 -0.29  0.   -0.17 -0.12 -0.52  0.23]
 [ 0.17  0.41 -0.33 -0.58  0.5   0.33 -0.22  0.04 -0.11]]
[[ 0.17  0.   -0.33 -0.   -0.5   0.33 -0.34  0.04 -0.57]]

and note that the third column (corresponding to the third eigenvalue) is uniform. All other columns will disappear as their corresponding eigvalues, when multiplied by themselves a sufficient number of times, tend to 0. And that's why, after a sufficient number of matrix multiplications, we'll reach equilibrium.

Let's see it in action by multiplying the matrix by itself 10 times:

for i in range(10):
    m = dot(m, m)

print m

[[ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]]

(Ir)Reducible matrices

We could easily extend our 3x3 box into a 6x6 box and make the transfer matrix such that only the top left hand 3x3 square and bottom right hand 3x3 square were possibly territories. If there were no means to transition from one such territory to the other, this would violate the Markov Chain principle outlined above. If we ever found ourselves in one quadrant, we'd know with 100% certainty that we'd never visited nor ever will visit the other.

"One of the two mathematically rigorous conditions, for a Monte Carlo Markov chain algorithm is that this pulling apart is not possible, that would be irreducible."

One way of looking at this is picturing the matrix as a graph. Usually, we picture graphs as adjacency matrices but if we do it the other way around we'd see this matrix as two disjoint sets of vertices. In linear algebra jargon, the matrix is reducible into these two sets. In graph theory jargon, we'd say the graph was not strongly connected. They are equivalent concepts.

Further reading

Friday, March 10, 2017

Hadoop and HBase in an integration test

It's always very nice to have your integration tests run out-of-the-box as soon as you check the code out of Git. No fiddling with your environment, just click and go.

Unfortunately, Hadoop uses some native Windows libraries and for those unfortunate enough to develop on Windows, this is a pain - particularly if you have a large, distributed team. So, this is a nice hack I've used a couple of times to get the integration tests to run on developers' laptops without any manual configuration.

First, check the binaries hadoop.dll and winutils.exe in. They're not very big (about 40kb). Then you need to set the System properties:

System.setProperty("java.library.path", BINARY_DIRECTORY)
System.setProperty("hadoop.home.dir",   BINARY_DIRECTORY)

where BINARY_DIRECTORY is the absolute path name of the directory the binaries live in.

Then, before running Hadoop code, you need this dirty hack that introspectively assures Hadoop that everything is OK:

val classLoader = this.getClass.getClassLoader
val field       = classOf[ClassLoader].getDeclaredField("usr_paths")
val usrPath     = field.get(classLoader).asInstanceOf[Array[String]]
val newUsrPath  = new Array[String](usrPath.length + 1)
System.arraycopy(usrPath, 0, newUsrPath, 0, usrPath.length)
newUsrPath(usrPath.length) = BINARY_DIRECTORY
field.set(classLoader, newUsrPath)

val field = classOf[org.apache.hadoop.fs.FileSystem].getDeclaredField("FILE_SYSTEMS_LOADED")
field.setBoolean(null, true)

val nativeCodeLoadedField = classOf[org.apache.hadoop.util.NativeCodeLoader].getDeclaredField("nativeCodeLoaded")
nativeCodeLoadedField.set(null, false)

(This is Scala but it translates to Java easily enough)

It's ugly and it's dirty but it does the job. It basically tricks Hadoop into thinking everything is OK. It's good enough for tests but don't do it in production. Also, it's not at all necessary for *Nix systems.

Having done this, you can start running the integration tests against HDFS which is backed by your own hard disk. Again, you'd never do this in production (what's the point?) but it really helps with tests.

You do this with the following code (assuming you now want HBase to live on top of Hadoop):

val configuration = org.apache.hadoop.hbase.HBaseConfiguration.create()
configuration.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)
configuration.set("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
val utility = new org.apache.hadoop.hbase.HBaseTestingUtility(configuration)

And you're good to go. HBase and HDFS both working on a single machine.

Saturday, February 18, 2017

Knuth and Poissons

Trying to get my head around how Donald Knuth generates samples from a Poisson distribution. The formula is simple:

Let L=eλ, p=1. Count the number of times you multiply p by a uniform distribution in the range [0,1] until p

Simple. But why does it work?


The Poisson distribution relies on the Poisson process. That is, the probability that something is about to occur has nothing to do with what has gone on in the past. This is seen throughout nature and makes it very useful in the sciences. For example, the probability of an atom radioactively decaying has nothing to do with how long it has been sitting around for before the decay.

In other words, if X is the inter-arrival times of a memory-less event, X = (X1X2, X3, ... ):
"At each arrival time and at each fixed time, the process must probabilistically restart, independent of the past. The first part of that assumption implies that X is a sequence of independent, identically distributed variables [IIDs] . The second part of the assumption implies that if the first arrival has not occurred by time s, then the time remaining until the arrival occurs must have the same distribution as the first arrival time itself" (from here).
This memory-less property can be expressed mathematically like this:

P(X > t + s | X > s) = P(X > t)

Let F(t) = P(X > t) for t > 0. Then:

F(t + s) = F(t) F(s)

since the probabilities of independent events can be multiplied to give the probability of both happening. (The mathematically inclined may look at this equation and suspect that logarithms are about to make an appearance...)

Now, this is the clever bit. Let a = F(1). Then:

F(n) = F(Σn1) = ΠnF(1) = an

The proof for this being true for all rational n is here but, briefly, if we let λ = - ln (a) then:

F(t) = e-λt

which, if you recall, is the probability that an event will happen at t or later. That is, it's a value between 0 and 1.

Now, we use a method called inversion sampling. The basic idea is that given a plot, y = P(x) then taking uniformly random samples on the y-axis then the corresponding values on the x-axis will give you a distribution that conforms to P(x).

Note that y represents a probability so necessarily 0 ≤ y ≤ 1. Since a uniform random number generator comes with pretty much any programming language, the problem is getting simpler.

The last piece of the jigsaw is documented here and explains why Knuth's algorithm is so simple. Basically, we rearrange the equation for F(t) by taking natural logs:

t = - ln F(t) / λ

and since we're using inversion sampling, we know that the probability F(t) is going to be simulated by a uniformly distributed random variable between 0 and 1. We call it U.

Now, given any random sample Uthere will be a value ti. The question is Knuth asks is: how many random samples Ui do I need before t is a unit of time (ie, 1)?

More mathematically, what is k for:

Σki=1 ti              ≤   1  ≤ Σk+1i=1 ti

- Σki=1  ln U / λ ≤   1  ≤ - Σk+1i=1 ln U / λ

 Σki=1  ln U      ≤  -λ  ≤ Σk+1i=1 ln U

ln (Πki=1 Ui)       ≤  -λ  ≤ ln (Πk+1i=1 Ui)

Πki=1 Ui              ≤  e ≤ Πk+1i=1 Ui

which is Knuth's algorithm. Notice that the beauty of this is that it's computationally cheap for relatively small k. This should be fine for most applications.

Tuesday, February 14, 2017

Tweaking TF-IDF

TF-IDF is a simple statistic (with a few variants) used in information retrieval within a corpus of text. The funny thing is that although it works well, nobody seems to be sure why. Robertson says in On theoretical arguments for IDF:

"The number of papers that start from the premise that IDF is a purely heuristic device and ends with a claim to have provided a theoretical basis for it is quite startling."

The simplest expression for TF-IDF is:

(TF) . (IDF) = (ft,d) . (log (N / |{d∈D : t∈d}|))

where t is a "term" (word, ngram etc); d is a document in corpus D; N is the total number of documents in corpus D; and ft,d is the frequency of term t in document d. (Often the term frequency component is actually a function of the raw ft,d) . "The base of the logarithm is not in general important" (Robertson).

It has been mooted that the IDF component looks like the log of a probability. That is, if we define the number of documents with term ti to be ni then we could expect the probability of a given document containing ti to be:

P(ti) = P(ti occurs in d) = ni/N

and the IDF term looks like:

idf(ti) = - log P(ti)

The nice thing about this is that "if we assume the occurrences of different terms in documents are statistically independent, then addition is the correct thing to do with the logs":

idf(t1∧t2) = -log P(t1∧t2
           = -log(P(t1)P(t2)) 
           = -(log P(t1) + log P(t2)) 
           = idf(t1) + idf(t2)

Thus we can add IDF terms to give us a total score.


Lucene used to use this implementation but appear to moving to something called BM25 with which they're getting good results. I tried BM25 but it was unfortunately prone to over-linking my documents compared to the classic formula.

So, I returned to the classic implementation but thought again about an approximation I had made. I had capped the maximum frequency of a given term to be 1000 simply for practical reasons of storage. All other terms are discarded as something that occurs too often is not likely to be useful in this particular domain. Note that I could not filter out a list of stopwords as lots of my corpus was not English. The hope was that stopwords would naturally drop out as part of this restriction. Imposing this limit still gave me over 90 million terms.

The result was the difference between the a rare term and the most common term that we record was not that great. Since I am comparing documents, the minimum document frequency for a term to be useful is 2 (a frequency of 1 obviously doesn't compare documents). If the maximum frequency is 1000 then the ratio of the IDFs would be:

log (N / 2) / log (N / 1000)

which for 90 million documents is only about 1.5. No wonder the results for my document comparisons were not great. Now, if I set N = 1000 + 1 (the +1 to avoid a nasty divide by 0), the ratio between the weighting for the rarest term and the most common is about 6218 which seems more reasonable. And sure enough, more documents seemed to match. (Caveat: I say "seemed to match" as I only examined a sample then extrapolated. There are far too many documents to say definitely that the matching is better).

Wednesday, January 25, 2017

Poisoned Poisson

I needed to code up a simple implementation of the probability mass function (PMF) for the boring old Poisson distribution. Five minutes work, or so I thought.

My first cut of the code looked like this:

  def poissonWithSmallMean(x: Int, mu: Double): Double = Math.pow(mu, x) * Math.exp(-mu) / factorial(x)

where factorial is a very simple implementation of factorial using tail recursion. I could have implemented in a more efficient manner using caching etc but I wasn't expecting too much trouble.

Going to the excellently named StatTrek to get some test values, my test looked like:

    poissonWithSmallMean(10, 12) shouldEqual 0.104837255883659 +- tolerance

and it passed. Great. 

Things started getting strange with slightly larger numbers. StatTrek was telling me that with x=998 and mu=999, we should get the modestly-sized answer of 0.0126209223360251. But my code was blowing up.

It's not hard to see why. That factorial goes crazy pretty quickly. Given that the expected value is not particularly large and not terribly small, how do we compute it simply? 

The answer is here. It depends on logs. If:

n! = n * (n-1) * (n-2) * ... * 1


log n! = log n + log(n-1) + log(n-2) + ... + log(1)

Now, our equation to calculate the PMF looks like this:

  def poissonWithLargeMean(x: Integer, mu: Double): Double = {
    val logged = (x * Math.log(mu)) - mu - logFactorial(x)

where logFactorial is just a simple implementation of the equation for log n! above. This handles x=998, mu=999 beautifully.

We could start implementing logFactorial in a much more efficient manner by using the Stirling approximation (log n!  n log n). This is a handy equation which incidentally also gives us the limit on comparison sorts. Take a look at John D Cooke's blog in the link above to see how to do this. But this simple solution is fine for me for now.

Friday, January 20, 2017

HBase File Structure

Some notes from a colleague who knows HBase better than I do.

Quick overview of how HBase stores data

When HBase batch writes in-memory data to disk, it may do so in separate files. Each file is sorted by the key.

So, say HBase persists data with keys, A, F, B. It sorts them so the file looks like:

[A, B, F]

Some time later, HBase persists G, P and M. Now there are two files that look like:

[A, B, F],  [G, M, P]

At some later time still, data with keys C, R and D arrive. Key R clearly comes after the last letter already written to the files (P) but D and C comes in the middle of the first file. So, what does it do? It creates a new file with [C, D, R] in it. To summarize, the files look like this:

[A, B, F], [G, M, P], [C, D, R]

Even later, a client wants to find the value for key D. Assuming it's not cached in memory, where does HBase look for it? Well, although all data within a file is ordered (so we can call off the search early if it's not where we would expect it in the file), we don't know which file D is in. HBase must search all files even if it doesn't search through all of a file.


To help matters, HBase can compact these three files into one that looks like:

[A, B, C], [D, F, G], [M, P, R]

Well, that makes searching much easier.

Read-Write Patterns

How can we leverage this? Well, in certain read-write patterns, we could force a compaction at an appropriate point. For instance, I'm using HBase to store a dictionary I build when processing a corpus of text. Later, I refer to that dictionary but I never update or add any more data. So, after I have finished my last write, I can call:


After compacting the table, my read throughput went up by a factor of 4.

For more information on HBase compaction see here.

Thursday, December 22, 2016

Spark and the GC

Spark is memory hungry and that means you need to consider the garbage collector when tuning.

There are a number of GCs to chose from that may or may not be appropriate for your job. Some say that G1 may give you (soft) guarantees for maximum latency time but are not good for batch jobs. Others point out that CMS may give lower pauses than ParallelGC (the default on 64-bit machines) but that ParallelGC has better overall throughput and since we're running a batch job not a web server, ParallelGC may suit Spark.

More logs than a lumberjack

Whichever GC you use, you'll need to turn logging on and analyse the logs.

You can see objects being promoted from young to old generation (see here and here) but it's inferred rather than stated explicitly. At a young GC, you'll see something like:


Which describes the memory usage in the young generation before GC (BEFORE_YOUNG), after (AFTER_YOUNG) and the capacity of the young generation.

It also shows the usage of the heap as a whole before (BEFORE_HEAP), after (AFTER_HEAP) and the heap's total capacity.

Note that it is not necessarily the case that:


This is because not all of the young generation was GCed. Some was promoted to the old generation.

In an attempt to avoid premature promotion, and with JVMs that had 8gb of memory, I set -XX:MaxNewSize=6G -XX:SurvivorRatio=6 but instead of taking 5 hours, it took 6. I tried this because there was a lot of new generation GC and not a lot of old.

Trying G1GC took just over 6 hours. And setting -XX:MaxNewSize=1G -XX:SurvivorRatio=3 just had to be killed as it looked like it was going to take days. So, I had to look elsewhere.


You can also check your disk access times by using a script taken from here, namely:

dd if=/dev/zero of=/tmp/output.img bs=8k count=256k
rm /tmp/output.img

A decent HDD will typically give you about 300MB/s, a bog-standard SSD (my NUC at home) about 500MB/s.

The best way to avoid Garbage Collection...

... is not to create any garbage. This might not always be possible but you can minimize GC. Kryo will compress objects amazingly well by replacing the verbose String that represents their FQNs with just a byte or two. However, it will only compress components of the class you register with it, not a deep tree of components. For instance, if you register org.apache.spark.mllib.linalg.SparseVector, you'd do well to also register Array[Int] and Array[Double] as these are what hog memory.

It is not sufficient to just register Array[_].

You can put something like sparConf.set("spark.kryo.registrationRequired", "true")in your code to see what needs to be explicitly added. This will cause runtime exceptions to be thrown until everything to be (un)compressed has been explicitly registered.

After these changes, the shuffle went from some 2.3TB to 300GB and consequently the time for the job dropped from about 5 hours to less than 45 minutes. With less memory to churn there was less GC and this makes a huge difference to how long your Spark job takes.