Sunday, May 21, 2017

Lazy Scala


This might be elementary but when a simple error trips me more than once, I blog it.

You probably know that an Iterator can only be accessed once as it's stateful. "An Iterator can only be used once because it is a traversal pointer into a collection, and not a collection in itself" from here. (Iterable just means the implementing class can generate an Iterator).

The simple mistake I made was to check the size of the iterator in a temporary log statement before mapping over it. What was a little more interesting was that other collections behave the same way if you call them via their TraversableOnce interface.

To demonstrate, say we have a Set

    val aSet = Set(1, 2, 3)

and two functions that are identical other than the type of their argument:

  def mapSet[T](xs: Set[T]): TraversableOnce[String] = {
    val mapped = xs.map(_.toString)
    println(mapped.mkString("\n"))
    mapped
  }

  def mapTraversableOnce[T](xs: TraversableOnce[T]): TraversableOnce[String] = {
    val mapped = xs.map(_.toString)
    println(mapped.mkString("\n"))
    mapped
  }

then mapTraversableOnce will return an empty iterator while mapSet will return a Set of Strings. This will come as a surprise to anybody expecting Object Oriented behaviour.

Iterator also violates the functor laws. Take two otherwise identical methods:

  def isTraversableFunctor[T, U, V](xs: Traversable[T], f: T => U, g: U => V): Boolean = {
    val lhs = xs.map(f).map(g)
    val rhs = xs.map(g compose f)
    (lhs == rhs)
  }

  def isTraversableOnceFunctor[T, U, V](xs: TraversableOnce[T], f: T => U, g: U => V): Boolean = {
    val lhs = xs.map(f).map(g)
    val rhs = xs.map(g compose f)
    (lhs == rhs)
  }

and pass them aSet. The first will say it's a functor, the second says it's not.

This is somewhat trivial as the reason it fails is that the iterator has not been materialized. "TraversableOnce's map is not a functor map, but, then again, it never pretended to be one. It's design goals specifically precluded it from being a functor map" from Daniel C. Sobral.

Laziness

Iterator comes into its own when we want access to the underlying elements lazily. But there are other ways to do this like Stream "which implements all its transformer methods lazily."

Almost all collections are eager by default. Use the view method to make them lazy. Use force on these to make them eager again.

Finally, some methods are naturally lazy. For instance, exists terminates quickly if it finds what it's looking for.


Tuesday, May 16, 2017

Playing with Pregel


Implementing your own distributed Pregel algorithm in GraphX is surprisingly simple but there are a few things to know that will help you.

First, what is GraphX's Pregel implementation? Well, it takes three functions:

1. one that merges messages of type T, that is (T, T) => T.

2. one that runs on each vertex with an attribute type of VD and that takes that message and creates a new state. Its type is (VertexId, VD, T) => VD.

3. one that runs on each edge/vertex/vertex combination and produces messages to be sent to the vertices. Its type is (EdgeTriplet[VD, ED]) => Iterator[(VertexId, VD)] where ED is the edge's attribute type.

TL;DR: the vertex holds its state and the edges send it messages. If this starts sounding like the Actor Model pattern to you, you're not alone

"It's a bit similar to the actor mode if you think of each vertex as an actor, except that vertex state and messages between vertices are fault tolerant and durable, and communication proceeds in fixed rounds: at every iteration the framework delivers all messages sent in the previous iteration. Actors normally have no such timing guarantee." - Martin Kleppmann.

So, off I went and wrote my code where all the paths through a vertex are stored as state at that vertex.

And it ran like a dog. After five hours of processing, it died with out of memory exceptions.

Judicious jstack-ing the JVMs in the cluster showed that threads were hanging around in Seq.containsSlice (we're using Scala 2.10.6). This Scala method was being used to find sub-sequences of VertexIds (which are just an alias for Longs) in the paths that had already been seen.

Desperate, I turned the Seq of Longs to Strings and then used String.contains and the whole thing ran in less than 25 minutes.

This is not the first time I've seen innocent looking code bring a cluster to its knees. Curious, I wrote some micro-benchmarking code comparing these two methods using JMH and got this:


     Benchmark                                   Mode  Cnt          Score          Error  Units
     ContainsSliceBenchmark.scalaContainsSlice  thrpt    5     594519.717 ±    33463.038  ops/s
     ContainsSliceBenchmark.stringContains      thrpt    5  307051356.098 ± 44642542.440  ops/s

That's three orders of magnitude slower.

Although it's a hack, this approach gives great performance. And it shows that taking jstack snapshots of your cluster is the best way to understand why your big data application is so slow.

Friday, May 5, 2017

Spark OutOfMemoryErrors.


Spark can deal with data sets that are much larger than the available physical memory. So, you never have problems with OutOfMemoryErrors, right? Well, no. It depends what you're doing.

When asked how to avoid OOMEs, the first thing people generally say is increase the number of partitions so that there is never too much data in memory at one time. However, this doesn't always work and you're better off having a look at the root cause.

This very week was one of those times. I was getting FetchFailedException. "This error is almost guaranteed to be caused by memory issues on your executors" (from here). Increasing the number of partitions didn't help.

The problem was that I had to find similar entities and declare there was a relationship between them. Foolishly, I tried to do a Cartesian product of all entities that were similar. This is fine 99.99% of the time but scales very badly - O(n2) - for outlying data points where n is large.

The symptoms were that everything was running tickety-boo until the very end where the last few partitions were taking tens of minutes and then the fatal FetchFailedException was thrown.

These pairs were going to be passed to GraphX's Connected Components algorithm so instead of creating a complete graph (which is what the Cartesian product of the entities would give you) you need to a different graph topology. This is not as simple as it sounds as you have to be a little clever in choosing an alternative. For instance, a graph that is just a line will take ages for GraphX to process if it is sufficiently large. For me, my driver ran out of memory after tens of thousands of Pregel supersteps.

A more efficient approach is a star topology that has branches of length 1. This graph is very quickly explored by Pregel/ConnectedComponents and has the same effect as a complete graph or a line since all vertices are part of the same connected component and we don't care at this point how they got there.

The result was the whole graph was processed in about 20 minutes.


Monday, April 24, 2017

Crash Course in Conditional Random Fields


In an attempt to try to grok Conditional Random Fields used in the Mallet library in a hurry, I've made some miscellaneous notes.

The basics


You will have been taught at school that the joint probability of A and B is:

P(A ∩ B) = P(A) P(B)

iff A and B and independent (see here).

Depending on your interpretation of probability theory, it is axiomatic that the relationship between the joint probability and the conditional probability is:

P(A ∩ B) = P(A|B) P(B)

These will come in useful.

CRFs

"A conditional random field is simply a conditional distribution p(y|x) with an associated graphical structure." [1]

We consider the distribution over

V = X ∪ Y

where:

V are random variables
X observed inputs
Y outputs we'd like to predict

"The main idea is to represent a distribution over a large number of random variables by a product of local functions [ΨA] that each depend on only a small number of variables." [1]

The Local Function

The local function has the form:

ΨA (xA, yA) = exp { Σk θA,k fA,k(xA, yA) }

where k is the k-th feature of a feature vector.

Graphical Models

"Traditionally, graphical models have been used to represent the joint probability distribution p(y, x) ... But modeling the joint distribution can lead to difficulties ... because it requires modeling the distribution p(x), which can include complex dependencies [that] can lead to intractable models. A solution to this problem is to directly model the conditional distribution p(y|x). " [1]

Undirected Graphical Model

If these variables are A ⊂ V, we define an undirected graphical model as the set of all distributions that can be written as:

p(x, y) = (1/Z) ∏A ΨA (xA, yA)

for any choice of factors, F = {ΨA}, where:

ΨA is a function υn → ℜ+ where υ is the set of values v can take
Z is the partition function that normalizes the values such that

Z = Σx,yA ΨA (xA, yA)

Factor Graph

This undirected graphical model can be represented as a factor graph. "A factor graph is a bipartite graph G = (V, F, E)" that is, a graph with two disjoint sets of vertices. One set is the variables, vs ∈ V, the other is the factors ΨA ∈ F. All edges are between these two sets. An edge only exists if vertex vs is an argument for vertex ΨA.

Directed Model (aka Bayesian Network)

This is based on a directed graph G = (V, E) and is a family of distributions (aka "model") that factorize as:

p(x,y) = ∏v∈V p(v| π(v))

where π(v) are the parents of v in the graph G.

Naive Bayesian Classifier

As a concrete example, let's look at Mallet's implementation of Naive Bayes.

Firstly, what make Naive Bayes naive? "Naive Bayes is a simple multiclass classification algorithm with the assumption of independence between every pair of features." (from here).

We ask ourselves: given the data, what is the probability that the classifiers is C? Or, in math-speak, what is p(C|D)?

Now, the data, D, is made up of lots of little data points, { d1, d2, d3, ... }. And given that little equation at the top of this post, if all these data points are independent then:

p(D|C) = p({ d1, d2, d3, ... } | C) = p(d1|C) p(d2,|C) p(d3,|C) ...

Mallet's Java code for this is surprisingly easy and there is a JUnit test that demonstrates it. In the test, there is a dictionary of all the words in a corpus. It's a small dictionary of the words { win, puck, team, speech, vote }.

We create two vectors that represent the weightings for these words if a document relates to politics or sport. Not surprisingly, {speech, vote, win} have a higher weighting for politics and {team, puck, win} have a higher weighting for sports but all words have a nominal value of 1 added (before normalization) in each vector. This is Laplace Smoothing and it ensures the maths doesn't blow up when we try to take the log of zero.

Note that these weightings for each category are by definition p(D|C), that is, the probability of the data given a classification.

This being Bayes, we must have a prior. We simply assume the probability of an incoming document to be about sports or politics as equally likely since there is no evidence to the contrary.

Now, a feature vector comes in with {speech, win} equally weighted. To which class should it go?

The code effectively
  1. Calculates an inner product between the feature vector and each of (the logarithm of) the class's vector and then adds (the logarithm of) the bias. This is the multiplication in the Local Function section above.
  2. Deducts the maximum result for all results. This appears to be to handle rounding errors and drops out of the equation when we normalize (see below)
  3. Takes the exponential of each result.
  4. Normalizes by dividing by the sum of these exponentials. This is the partition function we saw above, Z.
The most probable is the class we're looking for.

But why does the code do this? Bayes theorem gives us:

p(C|D) = p(D|C) p(C) / p(D) 
            = prior x p(D|C) / Z 
            = prior x p(d1|C) p(d2,|C) p(d3,|C) ...  / Z

now, if we let:

λy    = ln(prior)
λy,j  = ln(p(dj|Cy))

then

p(C|D) = (eλy ∏j=1K eλy,jdj) / Z =  (eλy + Σj=1K λy,jdj) / Z

which is what the Java code does and is the same as equation 1.6 in [1]. Note the Java effectively has a e-maxScore in the numerator and denominator which of course cancels out.

Finally, that exponential value λy + Σj=1K λy,jdj can be more elegantly expressed as a matrix multiplication between our dictionary weights for the classifier, f and the vector to classify, θ (where the bias has been subsumed into the 0-th entry of the vector). That then gives us the Local Function above and so we have come full circle.

Aside: One last note on Naive Bayesian Classifiers. Like the TF-IDF statistic, it works far better than it should do. "Statisticians are somewhat disturbed by use of the NBC (which they dub Idiot's Bayes) because the naive assumption of independence is almost always invalid in the real world.  However, the method has been shown to perform surprisingly well in a wide variety of contexts.Research continues on why this is." (from here)

[1] An Introduction to Conditional Random Fields for Relational Learning.

Sunday, April 9, 2017

Generating a Gaussian distribution


Firstly, let's consider what a Gaussian is. It can be derived here. The definition Dr Wilson gives is: "Data are said to be normally distributed if the rate at which the frequencies fall off is proportional to the distance of the score from the mean, and to the frequencies themselves". Or, in maths speak:

df = k (x - μ) f(x)
dx

where k is a constant and μ is the mean. Integrating this formula and setting it to 1 (since it's a probability distribution) via integration by parts and integration by substitution, you'll eventually get:

f(x) =   e (x - μ)2/2 σ 2 
       σ √2π

which is the Gaussian, where σ is the standard deviation.

Now, squaring it and integrating it twice still equals 1 (since obviously 12 = 1) and looks like this:

(1/2π) ∫-∞-∞  e-x2/2 e-y2/2  dx dy

where I've absorbed the constants into variables and used both x and y for each integration. The trick is now to use polar co-ordinates, so:

x     = r cos θ
y     = r sin θ
dx dy = r dr dθ

and that integral becomes:

(1/2π) ∫0  ∫0  r e-r2/2  dr

which, with another integration by substitution with ψ = r2/2, becomes:

(1/2π) ∫0  ∫0  e  dψ

and again with Υ = e.

(1/2π) ∫0  ∫10  Υ dΥ = (1/2π)2π x 1 = 1

Where Transformation Sampling comes in

"We don't care about this integral. What we do is we use our relationship between integration and sampling" [Werner Krauth at Coursera] where θ is a random number between 0 and  and Υ is a random number between 0 and 1.

So, now we work backwards. If ψ = r2/2 and Υ = e, then r =  √ (- 2 ln Υ) where Υ is sampled over the interval [0,1]. The easy bit is θ which is simply sampled in the interval [0, ].

Plugging these numbers back into the equations for polar co-ordinates, we can now give samples for x and y. Notice that you actually get 2 values (x and y) for the price of one. So, implementations (like java.lang.Random.nextGaussian) maintain state (the next Gaussian value if it's available) which is calculated for "free" - except that the method is synchronized...

Implementation

What we have above is a decent implementation for generating Gaussian values but it's inefficient. There is a nice math-hack to make it better.

Java Number Cruncher (p367) says:
We begin this algorithm by generating two random values v1 and v2 that are uniformly distributed between -1 and +1. We then compute the value s = v12 + v22. If s>1, we reject the values of v1 and v2 generate new ones... With two good values of v1 and v2, we then compute:
x1 = v1 √((-2 ln s) / s)
x2 = v2 √((-2 ln s) / s)
Where this comes from had me scuttling off to here which describes the Box Muller method. Basically, we want a number between 0 and 1 for Υ and any value in [0, ] for θ. Well, this is just finding uniformly distributed values within the unit circle. Since the process of scattering points uniformly over a box in no way favours nor discriminates between points within a circle within that box, we can be sure that just taking those inner points is also uniformly distributed.

What we want to do is get rid of those expensive sin and cos functions for x and y. We note that

cos θ = adjacent / hypotenuse = v1 / s
sin θ = opposite / hypotenuse = v2 / s

since s2 = r is the length of our vector (as defined in the above algorithm from Java Number Crunchers). We then plug that into our polar co-ordinates equation above, we get the equations for x1 and x2 we see above. QED.

Saturday, April 8, 2017

Video games to statistical mechanics


A friend was writing a computer game in his spare time and wanted to speckle a sphere with texture uniformly over its surface. Idly, he asked our team how to do this. We came up with some naive ideas like taking the x,y and z co-ordinates from a uniform sample and normalizing them to make a vector or length r, the radius of the sphere. In Python, this would be:

import random, math, pylab, mpl_toolkits.mplot3d

x_list, y_list, z_list = [],[],[]
nsamples = 10000
for sample in xrange(nsamples):
    x, y, z = random.uniform(-1.0, 1.0), random.uniform(-1.0, 1.0), random.uniform(-1.0, 1.0)
    radius = math.sqrt(x ** 2 + y ** 2 + z ** 2)
    x_list.append(x / radius)
    y_list.append(y / radius)
    z_list.append(z / radius)
fig = pylab.figure()
ax = fig.gca(projection='3d')
ax.set_aspect('equal')
pylab.plot(x_list, y_list, z_list, '+')
pylab.show()

but this is not quite uniformly distributed over the sphere. It produces a sphere like this:
Sphere with Cartesian co-ordinates taken from a uniform sample and then normalized

Using polar co-ordinates and uniformly sampling over 2π doesn't make things better either. This:

for sample in xrange(nsamples):
    phi, theta = random.uniform(0, 2.0) * math.pi, random.uniform(0, 2.0) * math.pi
    x_list.append(math.cos(phi) * math.cos(theta))
    y_list.append(math.cos(phi) * math.sin(theta))
    z_list.append(math.sin(phi))

gives:
Sphere with polar co-ordinates taken from a uniform sample
which is also clearly not evenly spread.

The solution involves Gaussian (a.k.a 'Normal') distributions, thus:

for sample in xrange(nsamples):
    x, y, z = random.gauss(0.0, 1.0), random.gauss(0.0, 1.0), random.gauss(0.0, 1.0)
    radius = math.sqrt(x ** 2 + y ** 2 + z ** 2)
    x_list.append(x / radius)
    y_list.append(y / radius)
    z_list.append(z / radius)

Why this works I'll leave to another post. Suffice to say this even distribution is used not just in games but in thermodynamics where you might want to model the velocities of molecules of a gas. In this scenario, (x ** 2 + y ** 2 + z ** 2) would model the squared velocity of a molecule, useful in calculating the energy, ½mv2


Saturday, April 1, 2017

Feature Hashing


Here's a cool little trick called feature hashing. The idea is that you want to turn some corpus of text into feature vectors without resorting to an external repository for the indices in those vectors. Looking things up in an external repository can be slow and complicates the code base.

Instead, you avoid this through hashing the words to an integer. Spark uses the HashingTF class for this.

However, the Spark code is very dependent on all your words being able to fit into positive Ints. This is probably OK for most corpus of text but when you start using n-grams ("Groups of words in a split are known as n-grams." [1]), then the numbers start to approach what an Int can hold.

But what's more worrying is that hashing leads to the possibility of hash collisions. How many you expect can be calculated from the formula here. Basically, given M possible slots and N words that need fitting into them, the probability of a word going into a particular slot, z, is 1/M. Therefore, the probability of avoiding a slot is:

p(miss z) = 1 - 1 = M - 1 = Y
                M     M

The probability that we miss z for all N words as we add them to the slots is YN. So, obviously, the probability that the slot receives at least one word is 1-YN.

This is a formula for the probability of a slot being filled but what is the expected number of collisions? Here, we note a nice trick outlined here.
The expectation of a sum is the sum of the expectations. This theorem holds "always." The random variables you are summing need not be independent.
Now, we introduce a function that is 1 if slot z contains one or more words and 0 otherwise. Note: we're not interested in the number of words in the slot. We're only interested if the slot contains something.

We call this function for slot z, fz.

So, the expected total number of slots that hold at least one word is:

E(X) = E(f0) + E(f1) + E(f2) + ... + E(fM) 

The expected value of anything is the sum of all possible values multiplied by their probability. So, the expected value of an individual f for any of the M slots is:

E(fz) = 0 * p(fz=0) +  1 * p(fz=1) = p(fz=1)

(Note that the expected value need not be an integer nor indeed sensible. The expected number when throwing a 6-sided die is 3.5.)

Given M slots, the expected number of slots that contain something is:

E(X) = M (1-YN)

So the total number of collisions is the difference between this and N as the Quora link says.

For a spike, we used the MurmerHash3 hashing algorithm over 820 million n-grams and the number of slots that had more than one word was 70 million (close to the 74 million this equation predicts when M = 2^32 and N = 820 million).

So, although it's a nice idea, Spark's current implementation is not for us. We'll look at rolling-our-own using Longs for which we could reasonably expect no collisions (using the above equation with M = 2^64).

[1] Real World Machine Learning.

Sunday, March 26, 2017

You have nothing to lose but your (Markov) chains!


I've been playing around with Markov Chain Monte Carlo so decided to take a Coursera course that deals with them (Statistical Mechanics allows you plot configurations of objects that are as probable as if you took a snapshot of a classical Newtonian system).

"Starting from a given initialdistribution, a Markov chain defines a sequence of probability distributions over states of the system. For any Markov chain satisfying certain mathematical conditions, the distribution at successive states converges to a single distribution. This single distribution is called the stationary distribution... Much of the art of designing an MH algorithm is in choosing the proposal distribution." PPP (p343).

"The Metropolis algorithm takes a random walk in a probability space."

"The only difference between Metropolis and Metropolis-Hastings is that the first ones always sample from a symmetric distribution and thus they don't have the Hastings correction ratio." - see here

What is a Markov chain?

"If we think of Xt as a state X at time t and invoke the following dependency condition on each state:

Pr(Xt+1 = xt+1 | Xt = xt, Xt-1 = xt-1, ..., X0 = x0) = Pr(Xt+1 = xt+1 | Xt = xt)

then the stochastic process is known as a Markov chain" (from here)

What is a Monte Carlo simulation?

"The Monte Carlo method is a statistical - almost experimental - approach to computing integrals using random positions, called samples, whose distribution is carefully chosen." [Krauth]

In the two Coursera examples, pi is calculated by:

  1. throwing stones into a square and observing the ratio that are inside and outside a circle that just fits in the square (direct sampling).
  2. wandering around the space throwing stones and observing how high the pile is for each grid reference (Markov chain sampling). Note, there are interesting restrictions when a stone is tossed over the boundary of the space.

Note that both approaches approximate an integral in multiple dimensions, namely:

∫ 1-11-1 p(x, y) O(x, y) dx dy
_______________________________

    ∫ 1-1∫ 1-1 p(x, y) dx dy

where p is the probability of being at point (x, y) and O is the observed size of the pile of stones at (x, y).

Detailed balance condition

This is a sufficient condition for Monte Carlo. The argument goes like this:

Imagine moving around a 3x3 grid exploring all the squares. If we are on the square in the top right corner (call it 'a') and we can only move horizontally and vertically then the probabilities for the next move are obviously:

p(a → a) + p(a → b) + p(a → c) = 1                (1)

(where 'b' and 'c' are the square immediately to the left and immediately below 'a')

Similarly, I can only get to 'a' via 'a', 'b' and 'c' so:

πa . 1 = πb p(b → a) + πc p(c → a) + πa p(a → a)   (2)

substitute equation 1 into 2:

πa p(a → b) + πa p(a → c) = πb p(b → a) + πc p(c → a)

There are many ways to satisfy the global balance condition but one way is to say the first term on the left hand side equals the first in the right hand side and the second term on the left hand side equals the second term on the right hand side.

This is the detailed balance condition:

πa p(a → b) = πb p(b → a)
πa p(a → c) = πc p(c → a)

where πx is the probability of being on square x.

If we want to sweep out all the configurations equally, we want:

πa = πb = πc

(This is not true in the Metropolis algorithm. Metropolis et al proposed another function for πx)

This implies p(a → b) =  p(b → a).

What's interesting is that "the Monte Carlo algorithm is nothing but a matrix, the transfer matrix of transition probabilities p(a→b) for moving from the site a to the site b." In this 3x3 game, the matrix is 9x9 (the 9 squares from which we move to 9 squares to which we can move).

The transfer matrix

In Python, it looks like this:

from numpy import dot, matrix, set_printoptions
from numpy.linalg import eig

m = matrix('2  1  0  1  0  0  0  0  0;'
           '1  1  1  0  1  0  0  0  0;'
           '0  1  2  0  0  1  0  0  0;'
           '1  0  0  1  1  0  1  0  0;'
           '0  1  0  1  0  1  0  1  0;'
           '0  0  1  0  1  1  0  0  1;'
           '0  0  0  1  0  0  2  1  0;'
           '0  0  0  0  1  0  1  1  1;'
           '0  0  0  0  0  1  0  1  2') / 4.0

This conforms to a mathematics 'design pattern'. Chase the problem onto known territory and deal with it there with the tools designed by other people.

Where it gets cool is when we take the eigen-decomposition (a.k.a spectral decomposition) of this matrix. This says that for a square matrix, A, we can express it as:

A = Q  Λ Q-1

where the columns of Q are the eigenvectors and Λ is a diagonal matrix whose values are the eigenvalues (a.k.a the spectrum of A).

In Python, we can find Q and Λ by doing this:

eigenvalues, eigenvectors = eig(m)

And why is this interesting? Because if we keep multiplying the transfer matrix with the vector representing the probabilities of which square we're currently in, it converges to the stationary distribution.

How do we prove this? Well, let's say we multiply A by itself x times:

Ax = (Q  Λ Q-1)x =  Q  Λ Q-1 Q  Λ Q-1 Q  Λ Q-1...  Q  Λ Q-1 Q  Λx Q-1

so, we only have to times the eigenvalues by themselves x times. That makes the calculations more efficient.

What are the eigenvalues?

set_printoptions(precision=2)
set_printoptions(suppress=True)
print eigenvalues

[-0.5   0.    1.    0.75  0.5   0.25 -0.    0.25  0.75]

Note that all but one value is less than 1.0 so if we keep multiplying a vector of these values by itself, all the values other than the third will disappear. 

Now, let's look at the eigenvectors (Q):

print eigenvectors

[[ 0.17 -0.41 -0.33  0.58  0.5   0.33  0.22  0.04  0.11]
 [-0.33  0.41 -0.33  0.29  0.   -0.17  0.12 -0.52 -0.23]
 [ 0.17  0.   -0.33 -0.   -0.5   0.33 -0.34  0.04 -0.57]
 [-0.33  0.41 -0.33  0.29 -0.   -0.17 -0.57  0.48  0.34]
 [ 0.67 -0.   -0.33  0.   -0.   -0.67  0.   -0.08 -0.  ]
 [-0.33 -0.41 -0.33 -0.29  0.   -0.17  0.57  0.48 -0.34]
 [ 0.17  0.   -0.33 -0.   -0.5   0.33  0.34  0.04  0.57]
 [-0.33 -0.41 -0.33 -0.29  0.   -0.17 -0.12 -0.52  0.23]
 [ 0.17  0.41 -0.33 -0.58  0.5   0.33 -0.22  0.04 -0.11]]
[[ 0.17  0.   -0.33 -0.   -0.5   0.33 -0.34  0.04 -0.57]]

and note that the third column (corresponding to the third eigenvalue) is uniform. All other columns will disappear as their corresponding eigvalues, when multiplied by themselves a sufficient number of times, tend to 0. And that's why, after a sufficient number of matrix multiplications, we'll reach equilibrium.

Let's see it in action by multiplying the matrix by itself 10 times:

for i in range(10):
    m = dot(m, m)

print m

[[ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]
 [ 0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11  0.11]]

(Ir)Reducible matrices

We could easily extend our 3x3 box into a 6x6 box and make the transfer matrix such that only the top left hand 3x3 square and bottom right hand 3x3 square were possibly territories. If there were no means to transition from one such territory to the other, this would violate the Markov Chain principle outlined above. If we ever found ourselves in one quadrant, we'd know with 100% certainty that we'd never visited nor ever will visit the other.

"One of the two mathematically rigorous conditions, for a Monte Carlo Markov chain algorithm is that this pulling apart is not possible, that would be irreducible."

One way of looking at this is picturing the matrix as a graph. Usually, we picture graphs as adjacency matrices but if we do it the other way around we'd see this matrix as two disjoint sets of vertices. In linear algebra jargon, the matrix is reducible into these two sets. In graph theory jargon, we'd say the graph was not strongly connected. They are equivalent concepts.

Further reading



Friday, March 10, 2017

Hadoop and HBase in an integration test


It's always very nice to have your integration tests run out-of-the-box as soon as you check the code out of Git. No fiddling with your environment, just click and go.

Unfortunately, Hadoop uses some native Windows libraries and for those unfortunate enough to develop on Windows, this is a pain - particularly if you have a large, distributed team. So, this is a nice hack I've used a couple of times to get the integration tests to run on developers' laptops without any manual configuration.

First, check the binaries hadoop.dll and winutils.exe in. They're not very big (about 40kb). Then you need to set the System properties:

System.setProperty("java.library.path", BINARY_DIRECTORY)
System.setProperty("hadoop.home.dir",   BINARY_DIRECTORY)

where BINARY_DIRECTORY is the absolute path name of the directory the binaries live in.

Then, before running Hadoop code, you need this dirty hack that introspectively assures Hadoop that everything is OK:

val classLoader = this.getClass.getClassLoader
val field       = classOf[ClassLoader].getDeclaredField("usr_paths")
field.setAccessible(true)
val usrPath     = field.get(classLoader).asInstanceOf[Array[String]]
val newUsrPath  = new Array[String](usrPath.length + 1)
System.arraycopy(usrPath, 0, newUsrPath, 0, usrPath.length)
newUsrPath(usrPath.length) = BINARY_DIRECTORY
field.set(classLoader, newUsrPath)

val field = classOf[org.apache.hadoop.fs.FileSystem].getDeclaredField("FILE_SYSTEMS_LOADED")
field.setAccessible(true)
field.setBoolean(null, true)

val nativeCodeLoadedField = classOf[org.apache.hadoop.util.NativeCodeLoader].getDeclaredField("nativeCodeLoaded")
nativeCodeLoadedField.setAccessible(true)
nativeCodeLoadedField.set(null, false)


(This is Scala but it translates to Java easily enough)

It's ugly and it's dirty but it does the job. It basically tricks Hadoop into thinking everything is OK. It's good enough for tests but don't do it in production. Also, it's not at all necessary for *Nix systems.

Having done this, you can start running the integration tests against HDFS which is backed by your own hard disk. Again, you'd never do this in production (what's the point?) but it really helps with tests.

You do this with the following code (assuming you now want HBase to live on top of Hadoop):

val configuration = org.apache.hadoop.hbase.HBaseConfiguration.create()
configuration.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)
configuration.set("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
val utility = new org.apache.hadoop.hbase.HBaseTestingUtility(configuration)
utility.startMiniCluster()

And you're good to go. HBase and HDFS both working on a single machine.


Saturday, February 18, 2017

Knuth and Poissons


Trying to get my head around how Donald Knuth generates samples from a Poisson distribution. The formula is simple:

Let L=eλ, p=1. Count the number of times you multiply p by a uniform distribution in the range [0,1] until p

Simple. But why does it work?

Poisson

The Poisson distribution relies on the Poisson process. That is, the probability that something is about to occur has nothing to do with what has gone on in the past. This is seen throughout nature and makes it very useful in the sciences. For example, the probability of an atom radioactively decaying has nothing to do with how long it has been sitting around for before the decay.

In other words, if X is the inter-arrival times of a memory-less event, X = (X1X2, X3, ... ):
"At each arrival time and at each fixed time, the process must probabilistically restart, independent of the past. The first part of that assumption implies that X is a sequence of independent, identically distributed variables [IIDs] . The second part of the assumption implies that if the first arrival has not occurred by time s, then the time remaining until the arrival occurs must have the same distribution as the first arrival time itself" (from here).
This memory-less property can be expressed mathematically like this:

P(X > t + s | X > s) = P(X > t)


Let F(t) = P(X > t) for t > 0. Then:

F(t + s) = F(t) F(s)

since the probabilities of independent events can be multiplied to give the probability of both happening. (The mathematically inclined may look at this equation and suspect that logarithms are about to make an appearance...)

Now, this is the clever bit. Let a = F(1). Then:

F(n) = F(Σn1) = ΠnF(1) = an

The proof for this being true for all rational n is here but, briefly, if we let λ = - ln (a) then:

F(t) = e-λt

which, if you recall, is the probability that an event will happen at t or later. That is, it's a value between 0 and 1.

Now, we use a method called inversion sampling. The basic idea is that given a plot, y = P(x) then taking uniformly random samples on the y-axis then the corresponding values on the x-axis will give you a distribution that conforms to P(x).

Note that y represents a probability so necessarily 0 ≤ y ≤ 1. Since a uniform random number generator comes with pretty much any programming language, the problem is getting simpler.

The last piece of the jigsaw is documented here and explains why Knuth's algorithm is so simple. Basically, we rearrange the equation for F(t) by taking natural logs:

t = - ln F(t) / λ

and since we're using inversion sampling, we know that the probability F(t) is going to be simulated by a uniformly distributed random variable between 0 and 1. We call it U.

Now, given any random sample Uthere will be a value ti. The question is Knuth asks is: how many random samples Ui do I need before t is a unit of time (ie, 1)?

More mathematically, what is k for:

Σki=1 ti              ≤   1  ≤ Σk+1i=1 ti

- Σki=1  ln U / λ ≤   1  ≤ - Σk+1i=1 ln U / λ

 Σki=1  ln U      ≤  -λ  ≤ Σk+1i=1 ln U

ln (Πki=1 Ui)       ≤  -λ  ≤ ln (Πk+1i=1 Ui)

Πki=1 Ui              ≤  e ≤ Πk+1i=1 Ui

which is Knuth's algorithm. Notice that the beauty of this is that it's computationally cheap for relatively small k. This should be fine for most applications.

Tuesday, February 14, 2017

Tweaking TF-IDF


TF-IDF is a simple statistic (with a few variants) used in information retrieval within a corpus of text. The funny thing is that although it works well, nobody seems to be sure why. Robertson says in On theoretical arguments for IDF:

"The number of papers that start from the premise that IDF is a purely heuristic device and ends with a claim to have provided a theoretical basis for it is quite startling."

The simplest expression for TF-IDF is:

(TF) . (IDF) = (ft,d) . (log (N / |{d∈D : t∈d}|))

where t is a "term" (word, ngram etc); d is a document in corpus D; N is the total number of documents in corpus D; and ft,d is the frequency of term t in document d. (Often the term frequency component is actually a function of the raw ft,d) . "The base of the logarithm is not in general important" (Robertson).

It has been mooted that the IDF component looks like the log of a probability. That is, if we define the number of documents with term ti to be ni then we could expect the probability of a given document containing ti to be:

P(ti) = P(ti occurs in d) = ni/N

and the IDF term looks like:

idf(ti) = - log P(ti)

The nice thing about this is that "if we assume the occurrences of different terms in documents are statistically independent, then addition is the correct thing to do with the logs":

idf(t1∧t2) = -log P(t1∧t2
           = -log(P(t1)P(t2)) 
           = -(log P(t1) + log P(t2)) 
           = idf(t1) + idf(t2)

Thus we can add IDF terms to give us a total score.

Implementations

Lucene used to use this implementation but appear to moving to something called BM25 with which they're getting good results. I tried BM25 but it was unfortunately prone to over-linking my documents compared to the classic formula.

So, I returned to the classic implementation but thought again about an approximation I had made. I had capped the maximum frequency of a given term to be 1000 simply for practical reasons of storage. All other terms are discarded as something that occurs too often is not likely to be useful in this particular domain. Note that I could not filter out a list of stopwords as lots of my corpus was not English. The hope was that stopwords would naturally drop out as part of this restriction. Imposing this limit still gave me over 90 million terms.

The result was the difference between the a rare term and the most common term that we record was not that great. Since I am comparing documents, the minimum document frequency for a term to be useful is 2 (a frequency of 1 obviously doesn't compare documents). If the maximum frequency is 1000 then the ratio of the IDFs would be:

log (N / 2) / log (N / 1000)

which for 90 million documents is only about 1.5. No wonder the results for my document comparisons were not great. Now, if I set N = 1000 + 1 (the +1 to avoid a nasty divide by 0), the ratio between the weighting for the rarest term and the most common is about 6218 which seems more reasonable. And sure enough, more documents seemed to match. (Caveat: I say "seemed to match" as I only examined a sample then extrapolated. There are far too many documents to say definitely that the matching is better).


Wednesday, January 25, 2017

Poisoned Poisson


I needed to code up a simple implementation of the probability mass function (PMF) for the boring old Poisson distribution. Five minutes work, or so I thought.

My first cut of the code looked like this:

  def poissonWithSmallMean(x: Int, mu: Double): Double = Math.pow(mu, x) * Math.exp(-mu) / factorial(x)

where factorial is a very simple implementation of factorial using tail recursion. I could have implemented in a more efficient manner using caching etc but I wasn't expecting too much trouble.

Going to the excellently named StatTrek to get some test values, my test looked like:

    poissonWithSmallMean(10, 12) shouldEqual 0.104837255883659 +- tolerance

and it passed. Great. 

Things started getting strange with slightly larger numbers. StatTrek was telling me that with x=998 and mu=999, we should get the modestly-sized answer of 0.0126209223360251. But my code was blowing up.

It's not hard to see why. That factorial goes crazy pretty quickly. Given that the expected value is not particularly large and not terribly small, how do we compute it simply? 

The answer is here. It depends on logs. If:

n! = n * (n-1) * (n-2) * ... * 1

then

log n! = log n + log(n-1) + log(n-2) + ... + log(1)

Now, our equation to calculate the PMF looks like this:

  def poissonWithLargeMean(x: Integer, mu: Double): Double = {
    val logged = (x * Math.log(mu)) - mu - logFactorial(x)
    Math.exp(logged)
  }

where logFactorial is just a simple implementation of the equation for log n! above. This handles x=998, mu=999 beautifully.

We could start implementing logFactorial in a much more efficient manner by using the Stirling approximation (log n!  n log n). This is a handy equation which incidentally also gives us the limit on comparison sorts. Take a look at John D Cooke's blog in the link above to see how to do this. But this simple solution is fine for me for now.


Friday, January 20, 2017

HBase File Structure


Some notes from a colleague who knows HBase better than I do.

Quick overview of how HBase stores data

When HBase batch writes in-memory data to disk, it may do so in separate files. Each file is sorted by the key.

So, say HBase persists data with keys, A, F, B. It sorts them so the file looks like:

[A, B, F]

Some time later, HBase persists G, P and M. Now there are two files that look like:

[A, B, F],  [G, M, P]

At some later time still, data with keys C, R and D arrive. Key R clearly comes after the last letter already written to the files (P) but D and C comes in the middle of the first file. So, what does it do? It creates a new file with [C, D, R] in it. To summarize, the files look like this:

[A, B, F], [G, M, P], [C, D, R]

Even later, a client wants to find the value for key D. Assuming it's not cached in memory, where does HBase look for it? Well, although all data within a file is ordered (so we can call off the search early if it's not where we would expect it in the file), we don't know which file D is in. HBase must search all files even if it doesn't search through all of a file.

Compaction

To help matters, HBase can compact these three files into one that looks like:

[A, B, C], [D, F, G], [M, P, R]

Well, that makes searching much easier.

Read-Write Patterns

How can we leverage this? Well, in certain read-write patterns, we could force a compaction at an appropriate point. For instance, I'm using HBase to store a dictionary I build when processing a corpus of text. Later, I refer to that dictionary but I never update or add any more data. So, after I have finished my last write, I can call:

hbaseConnection.getAdmin.majorCompact(TABLE_NAME)

After compacting the table, my read throughput went up by a factor of 4.

For more information on HBase compaction see here.