Saturday, September 24, 2016

Probabilistic Programming - Part 1


Probabilistic programming is an exciting, new-ish area that is eating a lot of my spare time.

"Probabilistic Programming and Bayesian Methods for Hackers" is an excellent book I am playing with. Not only is it free (see here), you can download it and read it as an IPython book (with the command ipython notebook) and tinker with the equations (use shift-enter to execute them).

As you might have gathered, it's Python-based but there are moves to integrate the library it uses extensively (PyMC) with Spark.

In the Java/Scala world, Figaro is making in-roads. In this first part post, I'm making notes on the Python implementation. In part two, I'm hoping to implement a Scala/Figaro version.

Bayes again

Bayesian probability requires a different way of thinking.

If frequentist and Bayesian inference were programming functions, with inputs being statistical problems, then the two would be different in what they return to the user. The frequentist inference function would return a number, representing an estimate (typically a summary statistic like the sample average etc.), whereas the Bayesian function would return probabilities
For example, in our debugging problem above, calling the frequentist function with the argument "My code passed all tests; is my code bug-free?" would return a YES. On the other hand, asking our Bayesian function "Often my code has bugs. My code passed all tests; is my code bug-free?" would return something very different: probabilities of YES and NO. The function might return: 
YES, with probability 0.8; NO, with probability 0.2 
This is very different from the answer the frequentist function returned. Notice that the Bayesian function accepted an additional argument:  "Often my code has bugs". This parameter is the prior. By including the prior parameter, we are telling the Bayesian function to include our belief about the situation.
The formula for Bayesian probability goes like this:

P(A|X) = P(X|A) P(A)
                            P(X)

Which reads in English as: the probability of A given X is equal to the probability of X give A multiplied by the probability of A by itself and divided by the probability of X by itself.

P(A|X)is called our posterior and P(A) is called our prior.

Probability distributions refresher

Probability distributions can be discrete (integers) or continuous (real numbers). The distribution of a discrete variable is called a probability mass function, a continuous variable has a probability density function.

One (of many) discrete functions is the Poisson distribution that dates back centuries and was derived to estimate the probabilities of cavalry horses losing a shoe in (or some other independent event). It looks like:

P(Z = k) = λk e
              k!

and this has the convenient property that its expected value is λ (or, in maths speak, E[Z|λ]λ).

So, this looks as good a distribution as any for our prior. The trouble is, what is λ? Aye, there's the rub. λ can be anything (an integer or any real number), so to model it, we can choose an exponential distribution. It is continuous and it has some convenience. The probability density function looks like:

f(z|λ) = λ e-λz ,  z >= 0

and the convenient fact is that E[Z|λ] = λ-1.

PyMC

The task is to deduce when (if at all) somebody's behavior changed given a list of the number of text messages they sent over a certain time period. To do this, the book uses PyMC, a Python library.

The workhorse of the code is the generation of simulated data:

@pm.deterministic
def lambda_(tau=tau, lambda_1=lambda_1, lambda_2=lambda_2):
    out = np.zeros(n_count_data)
    out[:tau] = lambda_1  # lambda before tau is lambda1
    out[tau:] = lambda_2  # lambda after (and including) tau is lambda2
    return out

which simply creates an array where the elements are one of two values. All the values before tau are lambda_1 and the rest are lambda_2. Simples.

"Inside the deterministic decorator [@pm.deterministic], the Stochastic variables passed in behave like scalars or Numpy arrays (if multivariable), and not like Stochastic variables."

By putting some print statements in, you can see that this function gets hit thousands of times when we feed it into the Poisson function that is given to a Model function that is passed to a MCMC function (Markov Chain Monte Carlo). Phew! The details can be found in the freely downloadable book and I'll explain more when I show the Figaro implementation.

"MCMC returns samples from the posterior distribution, not the distribution itself... MCMC performs a task similar to repeatedly asking "How likely is this pebble I found to be from the mountain I am searching for?", and completes its task by returning thousands of accepted pebbles in hopes of reconstructing the original mountain. In MCMC and PyMC lingo, the returned sequence of "pebbles" are the samples, cumulatively called the traces."

And it's these traces that (when graphed) can help us to see when it is most likely that behavior changed. I'll go into more detail with the Scala implementation in part 2.


Saturday, September 3, 2016

Yet more maths and graphs - Lagrange Multiplier


Here's a little maths trick called the Lagrange Multiplier that helps in finding the maximum/minimum of a function f(x, y) given a constant that also uses x and y. We "form the function

F(x, y) = f(x, y) + λ Φ(x, y) 

and set the two partial derivatives of F equal to zero. Then solve these two equations and the equation Φ(x, y) = constant for the three unknowns x, y and λ".

A nice proof can be found in the essential undergraduate text, Mathematical Method in the Physical Sciences. Let's say we're trying to find the max/min of the function f(x, y). Obviously, at this point.

df = δf dx + δf dy = 0                               (1)
     δx      δy

and say we've been given a function that is a constant Φ(x, y). Once again, it should be obvious that:

dΦ = δΦ dx + δΦ dy = 0                               (2)
     δx      δy

since Φ(x, y) is a constant.

Now, multiply the second equation by λ and add it to the first.

(δf + λ δΦ )dx + (δf + λ δΦ)dy = 0                   (3)
 δx     δx        δy     δy

Let's take that second term. Since λ is arbitrary, let's choose it to be such that:

δf + λ δΦ = 0                                        (4)
δy     δy

(That is, we've decided

λ = - δf  / δΦ                                       (5)
              δy /  δy

But it's not necessary to worry about that as we're not particularly interested in the value of λ. We just use it to solve the equations).

Similarly, we do the same for the first term of (3):

δf + λ δΦ = 0                                        (6)
δx     δx

Now, with equations (4) and (5) and (6) we have enough information to find the values of x and y at the position of the min/max point.

Example

The best way to understand this, I found, was an example from Boaz. Say, we're given:

f(x) = x2 + y2

and the constraint:

ϕ(x,y) = x2 + y = 1

Then, the equation to minimize is:

F(x,y) = f(x,y) + λ ϕ(x,y) = x2 + y2 + λ(x2 + y)

Because ϕ is a constant and will disappear when we differentiate, this point of minimization is exactly that of f(x,y) which is what we really want to find. So:

δF = 2y + λ = 0
δy     

and

δF = 2x + 2λx = 0
δx     

that is, either x=0 or λ=-1.

If, x=0, y=1 (from the equation for ϕ) and λ=-2 (from the first differential equation).

If λ=-1, then y=½ (from the first differential equation) and x2=½ (from the equation for ϕ).

QED.

Conclusion

Why am I writing this post? Because we're going to need the Lagrange Multiplier shortly in partitioning a graph.

Friday, September 2, 2016

Miscellaneous HBase tips


I'm new(ish) to HBase so here are a few notes I've jotted down over the last few months. Basically, Spark is great at what it does, but if you need to look something up while processing an element, you're best relying on another tool. HBase has been our choice.

HBase is not like a normal database

You can't do a simple select count(*)... You need to do something like:

$ hbase org.apache.hadoop.hbase.mapreduce.RowCounter

See here for more information.

The HBase shell is not even SQL like. If you want to limit the number of rows returned, the syntax looks like:

hbase> scan 'test-table', {'LIMIT' => 5}

In "Hadoop: the Definitive Guide" there is a great "Whirlwind Tour of the Data Model". What follows is an even more condensed precis.
"Table cells - the intersection of rows and column coordinates - are versioned. By default, their version is a timestamp... A cell's contents is an uninterpreted array of bytes.
"Table rows are sorted by ... the table's primary key... All table accesses are via the primary key
"Row columns are grouped into column families. All column family members have a common prefix.
"Physically, all column family members are stored together on the filesystem [therefore] it is advised that all column family members have the same general access patterns and size characteristics.
"Tables are automatically partitioned horizontally by HBase into regions. Initially, a table comprises of a single region but as the size of the region grows, it splits...Regions are the units that get distributed over an HBase cluster.
"Row updates are atomic no matter how many row columns constitute the row-level transaction." 
Unsurprisingly for a solution based on Hadoop, HBase shares a similar architecture. The "HBase master node orchestrat[es] a cluster of one or more regionserver slaves." Unlike Hadoop, "HBase depends on ZooKeeper ... as the authority on cluster state".

"At a minimum, when bootstrapping a client connection to an HBase cluster, the client must be passed the location of the ZooKeeper ensemble. Thereafter, the client navigates the ZooKeeper hierarchy to learn cluster attributes such as server locations." For the purposes of creating a Spark artifact, this is simply a matter of adding the host machine's hbase-site.xml at the top level.

Tuning

Find the load the cluster is under with:

echo "status 'detailed'" | hbase shell 2>/dev/null | grep requestsPerSecond | perl -pe s/,.*//g

With this, I've seen nodes in my cluster happily reach 60 000 requests per second each, which is most pleasing.

However, the load over the nodes is not terribly evenly distributed. One way to tackle this problem is salting. I did actually preempt this problem by reversing the Strings that were my keys. Since each key has a prefix taken from a fairly small set, I was expecting them to form "lumps" of data. However, I then create the HBase table with something like:

hbaseAdmin.createTable(hTableDescriptor, toBytes('0'), toBytes('z'), 20)

(where 20 is my number of regions).

However, this assumes that the text (even when reversed) that I am using as keys is evenly distributed over the alphanumerics (it's not as it's English words rather than random text). So, I still have some lumpiness.

Another optimization is to define the regions at table creation time (see the HBaseAdmin.createTable method that takes a startKey and endKey). This is to mitigate the problem that when a table is created, there is only one region. "The reason HBase creates only one region for the table is that it cannot possibly know how to create the split points within the row key space. Making such decisions is based highly on the distribution of the keys in your data." (from here). "Since pre-splitting will ensure that the initial load is more evenly distributed throughout the cluster, you should always consider using it if you know your key distribution beforehand."

Note that for an even distribution over Bytes, you'll want the the start and end keys to be java.xml.bind.DatatypeConverter.parseHexBinary("00") and java.xml.bind.DatatypeConverter.parseHexBinary("FF").

Also, storing data in the L1 cache can speed things up hugely. This can be set on using this method. There are caveats (L1 is generally used for metadata like Bloom Filters where "false positive matches are possible, but false negatives are not" etc). But when I implemented this my tables, throughput using a get call went up by an order of magnitude. Before implementing it, the symptoms were that the CPU usage was very low (a load average of about 1 to 2 on a 24-core machine), the client threads waiting on the get method and not much activity in HBase itself.

Finally, if you're reading, consider how you're batching. The get method is overloaded to take a number of keys. But batch size matters. By increasing the batch size from 10s to 100s, I achieved a 5 times increase in performance (to about 130 000 gets per second per node). Too high a batch size, however, and it blows up. Quite why is something I am investigating.