Sunday, December 3, 2017

The bias/variance tradeoff


In putting models together, you'll typically have "overfitting (high variance) or underfitting (high bias)" [1] The bias/variance tradeoff is the process of balancing the two.

"Variance is how sensitive a prediction is to what training set was used. Ideally, how we choose the training set shouldn’t matter – meaning a lower variance is desired. Bias is the strength of assumptions made about the training dataset. Making too many assumptions might make it hard to generalize, so we prefer low bias as well." [2]

But what is variance and bias?

Random variables are not random variables

Firstly, we clean up some terminology. From the excellent Count Baysie: "Random Variables, which are neither random nor variables. The Random Variable is the thing that translates 'H' or 'T' into 1 or 0. But when we hear "Random Variable" it's very tempting to think "Oh this must work like a 'Random Number Generator' where each time I look at this variable it has a random value."

Great Expectations

Now, "the Expectation of a Random Variable is the sum of its values weighted by their probability... Expectation is just the mean of a Random Variable! But words like mean and average carry a special intuition"

Variance

"If we took a random sample of our data, say, 100 points and generated a linear model, we' d have a set of weights. Say we took another set of random points and generated another linear model. Comparing the amount that the weights change will tell us the variance in our model." [3]

Variance is typically taught at high school to be:

σ2 = Σ (x - μ)2 / n

where μ is the mean and n is the number of samples. In a continuous probability distribution, this is:

σ2 = ∫ (x - μ)2 f(x) dx

which looks a lot like the definition of expectation

[Aside: note that if (x - μ) were raised to the power 3, we'd be talking about the skew and if it were raised to the power 4, we'd be talking about the kurtosis ("how pointy a distribution is"). These are the moments of a random variable. But that is another story. See here for more.]

Anyway, the variance of a random variable X is:

Var(X) = E[(X - μ)2] = E[(X - E[X])2]

expanding and noting that an expectation of an expectation is the original expectation itself:

Var(X) = E[X2] - E[X]2

Thus when X is a random variable, Variance is the difference between the expectation of X squared and the squared expectation of X.

Aside: PCA and Variance

In PCA, we're trying to minimize the squared distances of all points from a vector. But in doing so, we're increasing the spacing between the positions on this vector that are the shortest distance between the points and the line (see this StackExchange answer) - basically, the "errors".

That is, this is the vector with greatest variance between all the data points. This fits our intuition that the principal components are the most distinctive elements of the data (in the example given, it is the qualities of wines - alcohol content, colour etc - or more often linear combinations of these qualities that distinguish wines.

Bias

"Bias measures how far off the predictions are from the correct values in general if we rebuild the model multiple times on different training datasets; bias is the measure of systematic error that is not due to randomness." [1] In other words, E[approximate f(x) - true f(x)].

Why would we introduce bias? Well, one reason would be that we have more columns than features. For instance, in linear regression we need to invert a matrix made of the inputs (see the last equation in my post on linear regression). This is not possible if the matrix has a rank less than the number of rows (viewed as a set of equations, it has no solution). So, we could reduce the number of columns thus making the model less complex and adding bias.

Another reason is to introduce regularization. "The concept behind regularization is to introduce additional information (bias) to penalize extreme parameter weights." [1]

[1] Python Machine Learning
[2] Machine Learning with TensorFlow
[3] Machine Learning in Action

Sunday, November 19, 2017

The what and why of ALS


ALS is another matrix decomposition technique. It has been used by people like Netflix in recommender systems. "The idea of characterizing users as collections of items, and items as collections of users, is the basis of collaborative filtering, in which users are clustered based on common item preferences, and items are clustered based on the affinities of common users."[1]

In ALS, "you decompose your large user/item matrix into lower dimensional user factors and item factors" (Quora).

The algorithm distributes easily and Spark implements it out-of-the-box.

What is it?

The maths behind it says: imagine a matrix of ratings, R, that approximates to XTY where X represents the users and Y the items.

Thus, the problem is reduced to:

minX,Y = Σrui(rui - xuTyi)2 + λ (Σu||xu||2 + Σi||yi||2)

So, basically we want to minimize a standard least squares problem plus an error. ALS does this by fixing X then solving for Y, then fixing Y solving for X. Rinse and repeat until it converges to an answer.

But why?

But that's not the interesting thing (to me, at least). The question is why it works.

The error in the above equation is often referred to as regularization. "Regularization is a technique to structure the parameters in a form we prefer, often to solve the problem of overfitting" [2] This regularization penalizes outliers. OK, but that's unsatisfactory. Why did we choose that particular equation?

A little digging found this on StackExchange. Basically, if we say:

R = XTY + ε

where ε is the error with a Gaussian distribution, our minimization equation drops out naturally.

If the problem is ill-posed (ie, there is not a unique solution) then we introduce the assumptions that the distribution is a multivariate Gaussian. The conjugate prior of a Gaussian is another Gaussian. Taking the log of the product of the distribution and its conjugate prior, we see something very like the minX,Y equation above. (See the section on the Bayesian interpretation of Tikhonov Regularization).

This comes straight from Bayes theorem where we ignore the denominator as it does not depend on our parameters (see maximum a posteriori or MAP). We then maximize the parameters (by differentiating) and find the most likely parameters.

MAP deserves  a blog post all to itself and I will do that when I have time.

Convexity

"The term 'convex' can be applied to both sets and to functions. A set S ∈ ℝis a convex set if the straight line segment connecting any two points in S lies entirely in S... The function f is a convex function if its domain S is a convex set and if for any two points x and y in S, the following property is satisfied:

f(α x + (1-α) y) ≤ αf(x) + (1-α)f(y)

Interesting properties derive from this.

"For convex programming problems, and more particularly for linear programs, local solutions are alos global solutions. General nonlinear problems, both constrained and unconstrained, may posses local solutions that are not global solutions" [3] (Convex programming describes a constrained optimization problem where the objective function is convex, the equality constraint functions are linear and the inequality functions are concave [see 3, p8]

"If the objective function in the optimization problem and the feasible region are both convex, then any local solution is in fact a global solution." [3]

An interesting explanation for why this is the case is here on StackExchange. If x* is the lowest point of f, the substituting in any other x will always reduce the equation to f(x*) > f(x). Try it!

For the equation to hold, f must be a linear equation. Since the minimization equation above has a quadratic, it is not linear so approximating it is the only option.

[1] Real World Machine Learning, Harrington
[2] Machine Learning with TensorFlow, Shukla

Friday, November 10, 2017

BFGS - part 2


Last time we looked at finding the minimum over a field. We used Python to describe a field as:

f = z ** 2 * cos(x) ** 2 +  cos(y) ** 2

(note: I just made this equation up for fun and it has no other significance).

The direction towards the minimum for every point is a vector, thus we have a vector field. (See this description of a vector field at Quora where the direction somebody is looking while riding a rollercoaster is analogous to a vector field. "It's OK to gradually turn your head, which will result in a vector field --- at every point of the rollercoaster ride, you'd be looking somewhere.")

[Aside: this is a great resource for plotting vector fields in Python]

What we didn't address is: where on that vector would we find the minimum? For example, let's take one vector for a point (source code for the plots can be found here).
Direction to the minimum at a give (arbitrary) point.
I've shown the surface for which f=0 on which this point lies just for giggles.

Anyway, taking this point, and plot f over its length. Let's introduce:

φ(α) = f(xk + αpk)

where α ≥ 0

and we get:

So, our vector points towards a minimum but where exactly is it?

The Wolfe Conditions


The principal reason for imposing the Wolfe conditions in an optimization algorithm where is to ensure convergence of the gradient to zero (see this wiki).

"However, it is usually expensive to accurately compute a minimizer of φ (and, in fact, usually impossible to find the global minimizer of φ given the available information)" [1]

Armijo Condition

This stops us from overshooting the minimum with a step that's too large.

"In this case, the iterations repeatedly go from one side of a valley to the other, always reducing f but never by much. The problem is that the reduction in each step is very little compared to the length of the steps--the steps are too long." [1]

f(xk + αpk) ≤ f(xk) + αc1pkT ∇ f(xk)

Curvature Condition

This stops us from choosing a step too small.

"The second way that an algorithm can reduce f without reducing it sufficiently is to take steps that are too short." [1] For example, we may asymptotically approach a value but that value is above the minimum.

pkT ∇f(xk + αpk) ≥ c2 pkT ∇f(xk)

where α is the step length and 0 < c1 < c2 < 1.

Combined

Combining these two conditions on the same graph gives us:


where the green dots indicate a valid region for the Armijo condition (we don't overshoot) and the red dots indicate valid regions for the curvature condition (we don't undershoot). Where the two overlap is a potential region for our next step.

How the Wolfe conditions apply

Let's introduce:

sk = xk+1  - xk
yk = ∇f(xk+1) - ∇f(xk)

then

Bk+1 sk = yk

This is the secant equation.

Since the curvature condition above can now be expressed in terms of s and y as:

sykT > 0

then we are already fulfilling this condition. Why? Take skT and multiply it by the secant equation. Note that since Bk+1 is  positive definite (as we proved in the first blog post) sBk+1  skT > 0. QED.

Another post should hopefully draw all this together and explain why BFGS works.

[1] Globalizing Newton's Method: Line Searches (Prof Gockenbach)

Saturday, November 4, 2017

BFGS - part 1


Nothing to do with Roald Dahl but a trick used to optimize machine learning algorithms (see Spark's mllib library). We're iteratively trying to find the lowest point in some space and representing this value with mk where k is the iteration step number.

We start with the quadratic model:

mk = f(xk) + ∇ f(xk) T pk + (pkT Bk pk) / 2

taken from a Taylor expansion where pk is the step and Bk is a Hessian matrix that is positive definite (for reasons concerning convergence - see below).

At the minimum, ∇mk will equal 0. Also, the last term will disappear (since our model is quadratic). Re-arranging gives:

pk = - Bk-1 ∇ f(xk)

So, the next step can be given as:

xk+1 = xk + αkBk-1 ∇ f(xk)

where αk is the step size at the k-th step.

Why positive definite?

A positive definite matrix, M, satisfies xTM x > 0 for all x. Note that if we do a Taylor expansion around the minimum point x*, we get

f(x* + p) = f(x*) + pT∇ f(x*) + pT ∇2 f(x*p / 2 + ...

where obviously ∇ f(x*) is zero and f(x*) is less than f(x* + p) by the definition of x*. Therefore, the final term is greater than 0 and hence it is positive definite. QED.

See p16 of Numerical Optimization (Norcedel & Wright) for a more convincing argument.

Surfaces

We can treat the space just like an electromagnetic field that has an electric potential at every point in that space. So, although there is a scalar associated with every point in that space, how that scalar changes can lead to a vector associated with every point in that space.

Lets' take an example (full source code available here):

import mayavi.mlab as plt
import numpy as np
import math as m
from sympy import *

x, y, z = symbols('x y z')


f = z ** 2 * cos(x) ** 2 +  cos(y) ** 2

what does this look like? Well, since it's a scalar value at every point in 3-D space, it's hard to visualize. We could view it as a heatmap at various values of z (that is, take a slice on the x-y plane for a given z value).

Slices of the space of f at given values of z
Or we could fix f for an arbitrary value and look at the surface that is defined by all points that map to that value (an "equipotential" in physics terms). For instance:
Surfaces at f=0 (bottom), f=2 (middle) and f=4 (top)
For simplicity, let's arbitrarily look at the surface formed by f=0 and look at the gradient - that is ∇f(xk) - along the line x=0 that we chose arbitrarily.

Because I'm a careless mathematician, it's a pleasure to find that Sympy does the work for me. I write:

div = [f.diff(v) for v in [x, y, z]]

then I can calculate the gradient vector at all points on this surface with:

def gradient(xpt, ypt, zpt):
    gradx = div[0].subs(x, xpt).subs(y, ypt).subs(z, zpt).evalf()
    grady = div[1].subs(x, xpt).subs(y, ypt).subs(z, zpt).evalf()
    gradz = div[2].subs(x, xpt).subs(y, ypt).subs(z, zpt).evalf()
    return float(gradx), float(grady), float(gradz)

and plot it so:
gradient where f=0 and x=0
Unsurprisingly, the gradient vectors are perpendicular to the surface.

Now we add the vectors corresponding to Bk-1 ∇ f(xk) along this line of x=0. Again, Sympy does all the heavy lifting:

def eval_p_at(xpt, ypt, zpt):
    co_ords = [xpt, ypt, zpt]
    hessian_fn = [[f.diff(a).diff(b) for a in [x, y, z]] for b in [x, y, z]]
    values = [sub_vals(item, co_ords) for item in hessian_fn]
    hess_mat_eval = Matrix(values)
    div_vals = sub_vals(div, co_ords)
    if hess_mat_eval.det() == 0:
        print "uninvertible hessian at ", xpt, ypt, zpt
        return 0, 0, 0
    else:
        p = - (hess_mat_eval.inv() * Matrix(div_vals))
        return float(p[0]), float(p[1]), float(p[2])


def sub_vals(items, vals):

    return [i.subs(x, vals[0]).subs(y, vals[1]).subs(z, vals[2]).evalf() for i in items]


where this is the equation for p we see at the top of this post. Plotting it gives:
Gradient and p along f=0, x=0
Note how the p vectors are all pointing downwards, trying to minimize f.

Now, if we look at these vectors of p at varies points on this arbitrarily chosen surface, f=0, it looks like this:

p vectors on f=0
which is starting to look like our heatmap if we looked directly down on it. The p vectors are trying to find the "coldest" spot, the minimum of the function f.

I'll flesh out the algorithm more in the next post.

Wednesday, October 11, 2017

HBase, Cassandra and CAP


Which is best - Cassandra or HBase? Well, it depends what you're trying to do.

The oft-quoted comparisons generally involve Eric Brewer's CAP-theorem (the formal proof by Gilbert and Lynch can be found here and an updated review here) and you can see pictures like this dotted around the web:

From "Cassandra: the Definitive Guide" quoted here
(where the RDMBS offerings imply Two-Phased Commit).

Most developers have some awareness of CAP but there are a lot of misconceptions - typically that you can have at most two elements of Consistency, Availability or Partition tolerance - but not all three. "The 2 of 3 formulation was always misleading because it tended to oversimplify the tensions among properties" (from an article by Brewer himself found here).
Allowing at least one node to update state will cause the nodes to become inconsistent, thus forfeiting C. Likewise, if the choice is to preserve consistency, one side of the partition must act as if it is unavailable, thus forfeiting A. Only when nodes communicate is it possible to preserve both consistency and availability, thereby forfeiting P. The general belief is that for wide-area systems, designers cannot forfeit P and therefore have a difficult choice between C and A.
Here are some common misconceptions corrected.

  • The C in CAP "has got nothing to do with the C in ACID, even though that C also stands for 'consistency'" [1]. "Gilbert and Lynch use the word “atomic” instead of consistent in their proof, which makes more sense" [2]. The C in ACID has several similar but not mutually exclusive meanings which include not violating any database constraints, linearizability (see below), ensuring the data is consistent (eg, domain-specific rules such as taking X out of one bank account and into another means the total money overall does not change).
  • "The CAP model says nothing about transactions that touch multiple objects. They are simply out of scope for the theorem" [2].

In "Please stop calling databases CP or AP", Martin Kleppmann says:

"But if you’re using some other notion of consistency or availability, you can’t expect the CAP theorem to still apply. Of course, that doesn’t mean you can suddenly do impossible things, just by redefining some words! It just means that you can’t turn to the CAP theorem for guidance, and you cannot use the CAP theorem to justify your point of view.

"If the CAP theorem doesn’t apply, that means you have to think through the trade-offs yourself. You can reason about consistency and availability using your own definitions of those words, and you’re welcome to prove your own theorem. But please don’t call it CAP theorem, because that name is already taken."

In the CAP theorem:
  • Consistent means Linearizability,  think of visibility in terms of variables in the JVM. "If operation B started after operation A successfully completed, then operation B must see the the system in the same state as it was on completion of operation A, or a newer state." [1] A master that asynchronously replicates to slaves is an example of a non-linearizable system. Note: some systems (eg a DB that uses MVCC) are intentionally non-linearizable.
  • Availability means “every request received by a non-failing [database] node in the system must result in a [non-error] response” [from the proof]. Note, the response can take an arbitrary amount of time which might violate some people's notion of availability...
  • "Partition Tolerance (terribly mis-named) basically means that you’re communicating over an asynchronous network that may delay or drop messages." Which is basically the internet.
Using these definitions, Cassandra is not AP for quorum read/writes. If there is a partition split, the nodes on the wrong side of the split cannot reach a consensus and therefore are not available. (Interestingly, Brewer notes "as some researchers correctly point out, exactly what it means to forfeit P is unclear.")

So, although Cassandra and HBase both have their pros and cons, mentioning the CAP theory is orthogonal.

(Aside: although Elastic Search is a search engine not a database, there is an interesting chat about how CAP applies to it here.)

A solution?

There's a cheekily entitled blog post (it's titled "How to beat the CAP theorem" but in it he says "You can't avoid the CAP theorem, but you can isolate its complexity and prevent it from sabotaging your ability to reason about your systems"). This isolation is via immutability.

In this post, author Nathan Marz talks of DBs "choosing availability over consistency":
The best consistency guarantee these systems can provide is eventual consistency. If you use an eventually consistent database, then sometimes you'll read a different result than you just wrote. Sometimes multiple readers reading the same key at the same time will get different results... It is up to you to repair the value once you detect that the values have diverged. This requires tracing back the history using vector clocks and merging the updates together (called read repair)
Marz' proposal to use immutable data leads us to this conclusion:
If you choose consistency over availability, then not much changes from before. Sometimes you won't be able to read or write data because you traded off availability... Things get much more interesting when you choose availability over consistency. In this case, the system is eventually consistent without any of the complexities of eventual consistency. Since the system is highly available, you can always write new data and compute queries. In failure scenarios, queries will return results that don't incorporate previously written data. Eventually that data will be consistent and queries will incorporate that data into their computations.
This mitigates Brewer's proposal that revolves around reconciliation during a partition recovery phase. Marz' proposes no reconciliation, just a deferred sharing of the data.

[1] Martin Kleppmann's blog.
[2] Julian Browne's blog.

Sunday, October 8, 2017

Gradients Refresher


There is a lot of talk of gradients in Spark's machine learning library. Here are some notes to remind you what it's all about.

Approximation techniques

Newton method uses differentials. Secant methods use finite differences.

"For problems with more than a few variables, it also quickly became clear that the cost of evaluation Jacobians or Hessians [see below] at every step could be exorbitant. Faster methods were needed that might make use of inexact Jacobians or Hessians and/or inexact solutions of the associated linear equations, while still achieving superlinear convergence. An early breakthrough of this kind was the discovery of  Quasi-Newtonian methods in the 1960s by Broyden, Davidon, Fletcher and Powell, in which partial information is used to generate steadily improving estimates of the true Jacobian or Hessian or its matrix factors." [1]

Under certain circumstances "the convergence is quadratic, meaning that the error at each stage is roughly the square of the error at the previous stage" [1]

Taylor Expansion

A polynomial can be expressed as:

f(x) = f(a) + (x - a) f'(a) + (x - a)2 f''(a) / 2! + ... + (x - a)n fn(a) / n! + ...

where f' is the first differential, f'' the second and fn the n-th. (see Boas [2] p25 for a proof).

The multivariate version (that is, when x and a are vectors) looks like this:

f(x) = f(a) + (x - a) (∂f(a)/∂x+ ∂f(a)/∂x2) + (x - a)2 (∂2f(a)/∂x12 + 2 ∂f(a)/∂x1∂x2 + ∂2f(a)/∂x22) / 2! + ...

in the case of 2 dimensions (that is, when x = (x1, x2)) although it could be any number of dimensions.

For a polynomial of degree 2, the first three terms are all you need as any further differentiation will always yield zero. Alternatively, we might choose that the first three terms are "good enough". Either way, let's now drop all the other terms. We also note that the above equation, when we drop the extraneous terms, can be more succinctly expressed as matrices, thus:

f(x) = f(a) + pT ∇ f(a) + pT B p / 2

where p = x - a and B is the Hessian (see below). By putting some simple values into a square matrix, it's not hard to see that this last term in the matrix equation when expanded is the same as the more verbose equation above it.

Gradients

φ is a vector if φ is a field ("a collection of values with a plus operator and a times operator"[3]). In Cartesian 3 dimensions, it's:

φ = grad φ = i ∂φ /∂x + j ∂φ /∂y + k ∂φ /∂z

where φ is a scalar and i, j and k are unit vectors although there is no reason to restrict ourselves to 3 dimensions. It's just commonly used in the mathematical sciences.

"The vector ∇ φ is perpendicular to the surface φ = const" (see Boas [2] p292, 293, 296)

Divergence

If  acts on a vector, V:

V = i Vx(x, y, z) + j Vy(x, y, z) + k Vz(x, y, z)

then

 . V = div V = ∂Vx/∂x + ∂Vy/∂y + ∂Vz/∂z

Again, this is the special case in 3-D but we need not be restricted to just 3 dimensions.

Laplacian

The Laplacian occurs in a variety of problems in physics (wave equations, diffusion [2] etc). Again, this is the 3-D version but it can be generalized to any number of dimensions:

2 φ = div grad φ = ∂2φ /∂x2 + ∂2φ /∂y2 + ∂2φ /∂z2

"Now, that is just the mathematical definition, depending on the field where the Laplacian appears it may have different interpretations. The one I like is the relation with the mean curvature of a surface (how much bent the surface is). The mean curvature is the divergence of the unit normal of a surface, if the normal is given by the gradient of a function (that means that your surface is a level curve, an equipotential, an isopotential, etc...) then the curvature is proportional to the Laplacian." (from Quora)

Jacobians

"The Jacobian of x, y with respect to s, t is the determinant" [2]

J =
∂x/δs∂x/∂t
∂y/∂s∂y/∂t

The Jacobian matrix is the matrix for which this is the determinant, namely:

Ji,j = ∂fi / ∂xj

From what I can tell, 'Jacobian' can refer to either the matrix or its determinant depending on the context.

Hessians

A Hessian is a matrix that looks like:

Hi,j = ∂2f / ∂xi ∂xj

Confusingly, 2 the operator is used for both the Laplacian and the Hessian. However, if  is a column matrix, the Hessian is T and the Laplacian is ∇ T.

Note that the "trace of the Hessian (the sum of the diagonal elements) is the Laplace operator." (from Quora.com)

Note that the "Hessian is always symmetric" [1].

[1] The Princeton Companion to Mathematics
[2] Mathematical Methods in the Physical Sciences, Boas
[3] Coding the Matrix, Klein

Monday, September 18, 2017

Spark Partitions


Some miscellaneous Spark/GraphX partitioning notes that I have made.

Co-partitioned and co-located are not the same thing

"Even if the same partitioner is used for an RDD that needs to be partitioned within the current job as was used to partition another RDD that is needed in the current job but whose partitioning was persisted in a prior job, then the RDDs will still be co-partitioned but won't necessarily be co-located."

"There are two things here. One is data co-partition, and then data co-location.

"As long as you specify the same hash partitioner, a1 and b1 will be co-partitioned (i.e. partitioned the same way). The partitions might not be co-located, so your job will still incur network traffic, but you do reduce a whole round of shuffle.

"If a1 and b1 are put into memory by different jobs, there is no guarantee their partitions will be co-located, even though they are co-partitioned.

"What if you add a node to the cluster, or a node dies? Or what if a node is busy and can never be run anything - are you going to block the entire program forever so you could put a partition on that node?

"You can still write your program in a way that does mostly local joins. And even if the joins need to fetch blocks from remote nodes, the joins still avoid an extra round of shuffle, which is much more expensive than just fetching data."

From a Google Groups discussion (here).

Re/partitioning

Partitioning graphs present another interesting challenge. Edge cuts (ie, partitioning vertices) are a viable but generally inefficient way to partition a graph.

Most graphs obey a power law, for example something like Zipf's law if you're processing natural language. ("Zipf's law states that given some corpus of natural language utterances, the frequency of any word is inversely proportional to its rank in the frequency table." from Wikipedia). Consequently, the partitioning of vertices will be 'lumpy'.

GraphX allows you to chose your partitioning strategy. It offers these:

From "Spark GraphX in Action", p214.
You can find my experience of EdgePartition2D here.

"Introducing a .repartition will increase the amount of work that the Spark engine has to do, but the benefit can significantly outweigh that cost." (from here). In this example, the data was "lumpy" causing a single executor to do most of the work.

I've also seen repartitioning on a given field used to optimize joins. See my previous post where repartitiong a Dataframe by field X and writing it to a file was used to make joining on X more efficient.

Joins

"join has the same semantics as in sql join, e.g. every row after the join is (k, v1, v2), where v1 is from rdd1, and v2 is from rdd2.

cogroup just groups values of the same key together, and every row looks like (k, seq1, seq2), where seq1 contains all the values having the same key from rdd1."

(From another Google Groups discussion).

Note that Datasets have slightly different semantics. The outer-joins of RDDs can return RDDs that contain Options. Dataset's join returns another Dataset whose rows can be accessed as usual while its @experimental joinWith returns a Dataset[(T, U)]. Note that T and U can be null. No nice Options for us here.

Sorting

"In Apache Hadoop, the grouping is done in two places - the partitioner, which routes map outputs to reduce tasks, and the grouping comparator, which groups data within a reduce task.  Both of these are pluggable per-job.  The sorting is pluggable by setting the output key comparator...

"The order of the values within a reduce function call, however, is typically unspecified and can vary between runs. Secondary sort is a technique that allows the MapReduce programmer to control the order that the values show up within a reduce function call." (from here). This can then help the next stage.

Sorting also helped us when writing Parquet files to HDFS.

An interesting trick to improve sort performance can be found here: "Sorting in general has good cache hit rate due to the sequential scan access pattern. Sorting a list of pointers, however, has a poor cache hit rate because each comparison operation requires dereferencing two pointers that point to randomly located records in memory. So how do we improve the cache locality of sorting? A very simple approach is to store the sort key of each record side by side with the pointer. For example, if the sort key is a 64-bit integer, then we use 128-bit (64-bit pointer and 64-bit key) to store each record in the pointers array."

DAGs

You may see the DAG in the GUI not being as linear as you'd expect. It is Spark's prerogative to do this.  "Stages that are not interdependent may be submitted to the cluster for execution in parallel: this maximizes the parallelization capability on the cluster. So if operations in our dataflow can happen simultaneously we will expect to see multiple stages launched" (from here).


Tuesday, September 12, 2017

Neural Networks are just Matrices


DeepLearning4J brings neural nets to Java programmers. They suggest in the getting started section to run the XorExample. This is a neural net that, given XOR inputs and outputs, learns its logic. This is non-trivial for a simple neural net (see here) as the true and false values are not linearly separable in a single XOR matrix. DL4J provides a way of making more complicated neural nets but hides a lot of detail.

Matrices

The network in XorExample "consists in 2 input-neurons, 1 hidden-layer with 4 hidden-neurons, and 2 output-neurons... the first fires for false, the second fires for true".

But instead of talking about neurons, it's much easier to think of this neural net as matrices (at least if you're familiar with simple linear algebra).

So, imagine this neural net of just:

  • A 4 x 2 matrix of the inputs ("features").
  • A hidden layer that is a 2 x 4 matrix 
  • A layer that is a 4 x 2 matrix that yields the output.

We need to apply functions to these matrices before we multiply, but that's essentially it.

Multi-layer Neural Nets in a few lines of Python

To do this, I'm not going to use TensorFlow or other frameworks dedicated to neural nets. I'll just use Numpy as basically syntactic sugar around manipulating arrays of arrays.

Also, I'll present a more intuitive approach to the maths. A more thorough analysis can be found on Brian Dohansky's blog here.

Finally, I tried to do this in Scala using the Breeze linear algebra library but it was just so much easier to do it in Python and Numpy as it ran in a fraction of the time it took Scala to even compile.

The code

As mentioned, we need Numpy.

import numpy as np

Now, let's take the input to a XOR operation (our first matrix):

features = np.matrix('0.0, 0.0;'
                     '1.0, 0.0;'
                     '0.0, 1.0;'
                     '1.0, 1.0')

and the expected output:

labels = np.matrix('1.0, 0.0;'
                   '0.0, 1.0;'
                   '0.0, 1.0;'
                   '1.0, 0.0')

(Remember that that this is not trying to be a truth table. Instead, the first column indicates that the output is true if it's 1 and and the second column indicates it's false if it's 1).

We need those other 2 matrices. It doesn't matter what their values are initially as we'll correct them whatever they are. So, let's chose some random values but in a well-defined shape:

weightsLayer1 = np.random.rand(2, 4)
weightsLayer2 = np.random.rand(4, 2)

We also need the gradient of the weights and biases. We could have subsumed the biases into our matrices - that's mathematically equivalent - but the DeepLearning4J example doesn't do this so we won't either.  The weights and biases for the first and second layers respectively are:

_0_W = np.zeros([2, 4])
_0_b = np.zeros([1, 4])
_1_W = np.zeros([4, 2])
_1_b = np.zeros([1, 2])

Finally, we need a step value and a batch size:

learning_rate = 0.1
mini_batch_size = np.shape(features)[0]

Now we can do our first multiplication (or forward propagation):

    s0 = features * weightsLayer1
    s0 += _0_b   

But we need to apply a function to this (an activation function). In this example, we'll use a sigmoid function. It has a simple derivative that we'll need later.

def f_sigmoid(X):
        return 1 / (1 + np.exp(-X))

So, now we can apply the sigmoid activation function to the first layer:

sigmoided = f_sigmoid(s0)

This we'll feed into the next layer with another matrix multiplication:

    s1 = sigmoided * weightsLayer2
    s1 += _1_b

No we need our second activation function to apply to this matrix. It's the softmax function that basically normalizes to 1 all the values in each row allowing us to treat each row as a probability distribution. It looks like this (as stolen from Brian):

def f_softmax(X):
    Z = np.sum(np.exp(X), axis=1)
    Z = Z.reshape(Z.shape[0], 1)
    return np.exp(X) / Z

We apply this to the output from the previous layer and find the delta with what the output should be:

softmaxed = f_softmax(s1)
delta = softmaxed - labels

We calculate the delta weighted according to the weights of this layer, Transposing appropriately:

epsilonNext = (weightsLayer2 * delta.T).T

Now, it's a good job that sigmoid function has an easy derivative. It looks like this:

dLdz = np.multiply(sigmoided, (1-sigmoided)) 

where, note, this is an element-wise multiplication, not a matrix multiplication. Similarly, we calculate the back propagation (which "allows the information from the cost to then flow backward through the network in order to compute the gradient" [1]) using the same Numpy operation:

backprop = np.multiply(dLdz, epsilonNext)

Intuitively, you can think of this as each element of the weighted delta only being multiplied by the gradient of the function at that element.

Note:
"The term back-propagation is often misunderstood as meaning the whole learning algorithm for multi layer neural networks. Actually, back-propagation refers only to the method for computing the gradient, while another algorithm such as stochastic gradient descent, is used to perform learning using the gradient." [1]
Anyway, we can now update the gradients and the weights:

    _0_W = (features.T * backprop) + _0_W
    _1_W = (sigmoided.T * delta) + _1_W
    _0_b = _0_b - (learning_rate * np.sum(backprop, 0))
    _1_b = _1_b - (learning_rate * np.sum(delta, 0))
    weightsLayer1 = weightsLayer1 - (learning_rate * _0_W)
    weightsLayer2 = weightsLayer2 - (learning_rate * _1_W)

Note that the biases are derived from the columnar sums of the backprop and delta matrices.

Now, repeat this some 500 times and the output looks like:

print softmaxed

[[  1.00000000e+00   3.16080274e-36]
 [  1.50696539e-52   1.00000000e+00]
 [  1.64329041e-30   1.00000000e+00]
 [  1.00000000e+00   8.51240114e-40]]

which for all intents and purposes is the same as labels. QED.

Some things we ignored

For brevity, I didn't address the score (that tells us how close we are to our desired values).

Also, any attempt at regularization (that attempts to avoid over-fitting) was ignored. The DL4J XorExample set the L1 (New York taxi distance) and L2 (the Euclidean distance) to zero so we're true to the original there.

[1] Deep Learning (Goodfellow et al)

Sunday, September 10, 2017

Givens Rotations



The problem

If A is a matrix whose columns are {v1v2v3, ... vn }, then any point in its span can be described with an appropriate choice of x thus:
x = 
...
v1v2v3
...
x1
x2
x3
where x1, x2 ... etc are the multipliers we want of a given dimension summed over all the vectors, vi.

What this means is that any vector, b, can be expressed as A x. Equivalently, if we solve |b - A x| = 0 we're solving the least squares problem, beloved of loss functions in machine learning. "The advantage of an algorithm for least squares is that it can be used even when the matrix-vector equation has no solution" [1].

One algorithm: Gram Schmidt

The approach used in "Coding the Matrix" is the Gram Schmidt method. The circuitous route goes like this:
  1. We want to find the closest point between a vector that we'll call b and a space defined by the span of vectors {v1v2v3, ... vn}. We will assume that vi is orthogonal to vj for all i ≠ j. If they're not orthogonal, it's not hard to turn them into vectors that are but that's another problem.
  2. Given a k-dimensional vector space, V, the vector b can be expressed as the sum of two vectors - one that is in V and one that is orthogonal to V. In Klein's notation, this says b = b||V + b⊥V. "The point in V closest to b is b||V and the distance is ||b⊥V|| ... it suffices to find b⊥V, for we can then obtain b||V." [1].
  3. Given the orthogonal set of vectors{v1v2v3, ... vn}, b⊥V equals b with all the projections, <b, vi>, shaved off. This creates a recursive algorithm that looks like:

    bi = bi-1 - <bi-1, vi> / <vi, vi>

    where b0 = b. For convenience, we say σi = = bi-1 - <bi-1, vi> / <vi, vi>
  4. Now b can be expressed as

    b = σ0v0 + ... + σk-1vk-1 + b⊥V

    which just like the matrix at the top of this post can be expressed like:
    b = 
    ......
    v0...vk-1b⊥V
    ......
    σ0
    .
    .
    σk-1
    1
Similarly, if {v1v2v3, ... vn} are not orthogonal, we can invoke the same recursive algorithm to generate the original vectors from a linear combination of mutually orthogonal vectors, {v1*, v2*, v3*, ... vn*}. It would look like this:
A = 
......
v*1v*2...v*n
......

1σ12.........σ1n
01σ23......σ2n
001σ34...σ3n
......
....1σn-1
000001





QR-factorization

QR-factorization is reducing an m x n matix A into two matrices, Q and R where Q is an m x n column orthogonal (ie, the columns are orthogonal and have a unit norm) matrix and R is a triangular matrix [1]. In other words, it looks like the matrix above where R is the upper triangular matrix.

Another algorithm: Givens Rotation

Now, there is another way of achieving the same end. We can rotate the matrix such that a particular cell becomes zero. Keep doing this and we get the upper triangular matrix as above. This is called a Givens rotation. Let's call each rotation Gi then:

R = Gk Gk-1... G1  A

But note that each rotation is of the form:

10..........
01..........
......
..c.-s.
......
..s.c.
......
000001

where c2+s2=1, the c values are on the diagonal and all the other diagonals are 1. Or, to put it more succinctly in Python:

def givensRotation(A, i, j):
    G = np.eye(len(A))
    aij = A[i, j]
    ajj = A[j, j]
    r = ((aij ** 2) + (ajj ** 2)) ** 0.5
    c = ajj / r
    s = - aij / r
    G[j, i] = -s
    G[j, j] = c
    G[i, i] = c
    G[i, j] = s
    return G

Rearranging that equation for R, we get:

A = G1-1 G2-1 ... Gk-1-1 Gk-1  R

but as luck would have it, these G matrices are their own inverses (since c2+s2=1. Try it.). So:

A = G1 G2 ... Gk-1 Gk  R

and all we need to do is keep the original G matrices in the right order and setting

Q = G1 G2 ... Gk-1 Gk

we get:

A = Q R

which is our QR decomposition. Booyah.

Given a triangle represented in Breeze as:

    val m = DenseMatrix.zeros[Double](rows = 3, cols = 3)
    m(0, ::) := DenseVector(1.0, 2.0, 3.0).t
    m(1, ::) := DenseVector(-6.0, 5.0, -4.0).t
    m(2, ::) := DenseVector(-7.0, 1.0, 9.0).t

we can transform it with Givens rotations to give us:
Red is original, blue is reflection

Distributed Givens Rotation using Spark

I tried this in my own toy linear algebra library for Spark, Algernon. Performance currently sucks but as a proof-of-concept it shows it can be done. I avoid Spark's matrix multiplication code for efficiency. Since the G matrix is mostly 1s in the diagonal and this is effectively a NoOp in matrix multiplication for that particular row, we can write:

    val mathOps = implicitly[Numeric[T]]
    import session.sqlContext.implicits._
    import mathOps.mkNumericOps
    ithRow.joinWith(jthRow, '_1 ("j") === '_2 ("j"), "outer").flatMap { case (x, y) =>

      def calculate(x: MatrixCell[T], y: MatrixCell[T], _s: T) =
        {if (x == null) mathOps.zero else (x.x * c)} + {if (y == null) mathOps.zero else (y.x * _s)}

      val newIth = if (x == null) Seq() else Seq(MatrixCell(x.i, x.j, calculate(x, y, s)))
      val newJth = if (y == null) Seq() else Seq(MatrixCell(y.i, y.j, calculate(x, y, -s)))

      newIth ++ newJth
    }

Here, we're only interested in the i-th and j-th rows, multiplying them with the c and s values we have already calculated.

[1] Coding the Matrix, Klein

Friday, August 25, 2017

More HBase tuning


I've come across two HBase clusters where writes have been extremely fast and reads extremely slow. Note: these reads were not full scans (which would unsurprisingly be slow) but batched get() calls.

This post is about making HBase read things faster. "The rule of thumb is to have your hot data set in RAM. Does not fit? Increase RAM, increase # of servers." (from here).

How much data?

Check how big the HBase table is

hadoop fs -du -h  /hbase/data/default/ 

Compression

The more compressed your data, the more likely it is that it will all fit into RAM. Compressing data in the cache may increase CPU usages but reduce IO. You can check which native libraries Hadoop knows about with:

hadoop/bin/hadoop checknative -a

but note: "If the native libs are NOT present, you will see lots of Got brand-new compressor reports in your logs" (from here).

Wait for the cache to fill

It takes time for the cache to be populated. Run vmstat (on Unix-like OSs) and watch the number of blocks read (bi). It will be large to begin with (thousands or tens of thousands) then shrinks after the app has been running for a while, down to basically zero.

You can watch the progress in the region server's logs:

$ grep "BucketCacheStatsExecutor" hbase/logs/hbase-ubuntu-regionserver-ip-172-30-0-139.log
2017-08-23 09:15:28,192 INFO  [BucketCacheStatsExecutor] bucket.BucketCache: failedBlockAdditions=0, totalSize=5.86 GB, freeSize=4.28 GB, usedSize=1.58 GB, cacheSize=1.56 GB, accesses=49844, hits=2231, IOhitsPerSecond=7, IOTimePerHit=0.03, hitRatio=4.48%, cachingAccesses=49844, cachingHits=2231, cachingHitsRatio=4.48%, evictions=0, evicted=0, evictedPerRun=NaN
2017-08-23 09:20:28,192 INFO  [BucketCacheStatsExecutor] bucket.BucketCache: failedBlockAdditions=0, totalSize=5.86 GB, freeSize=837.15 MB, usedSize=5.04 GB, cacheSize=4.97 GB, accesses=150478, hits=33665, IOhitsPerSecond=104, IOTimePerHit=0.03, hitRatio=22.37%, cachingAccesses=150478, cachingHits=33665, cachingHitsRatio=22.37%, evictions=1, evicted=13644, evictedPerRun=13644.0
2017-08-23 09:25:28,192 INFO  [BucketCacheStatsExecutor] bucket.BucketCache: failedBlockAdditions=5, totalSize=5.86 GB, freeSize=552.66 MB, usedSize=5.32 GB, cacheSize=5.25 GB, accesses=299660, hits=95870, IOhitsPerSecond=207, IOTimePerHit=0.03, hitRatio=31.99%, cachingAccesses=299660, cachingHits=95870, cachingHitsRatio=31.99%, evictions=7, evicted=95977, evictedPerRun=13711.0

Types of caches

There are two types of caches: memcache and blockcache. The memcache is write-through cache. the blockcache is for read-only access.

You may want "to reduce the block size of the data stored in disk. Why? When a row is requested by client, the block corresponding to where the row is stored on disk (store file) is read into memory (cache) before sending it back the requested data to the client. So by decreasing the block size more relevant data can be stored in cache which can improve read performance." (from here).

Types of Blocks

From here: "HBase blocks and HDFS blocks are different things:

  • HBase blocks are the unit of indexing (as well as caching and compression) in HBase and allow for fast random access. 
  • HDFS blocks are the unit of the filesystem distribution and data locality"
Types of sizes

From here:

"Generally there are 2 sizes involved:

1. HBase Filesize
2. HBase Blocksize

#1 sets the maximum size of a region before it is split. Default used to be 512mb, it's now 1g (but usually it should be even larger)

#2 is the size of the blocks inside the HFiles. Smaller blocks mean better random access, but larger block indexes. I would only increase that if you have large cells."

Heap or off-Heap?

Making the cache off-heap really improved matters for me:

"When bucket cache is enabled in HBase, it will act as L2 (level 2 similar to processor cache hierarchy) cache and the default block cache will act as L1 cache. What that means is data need to be copied from L2 cache into L1 cache for HBase to service the query request. So on a read request when bucket cache is enabled, data block will be copied from disk onto bucket cache and then onto the block cache before served to the client making the request. On further reads of the same data, it will either be served directly from block cache or copied from bucket cache into block cache before serving the client request avoiding disk reads. So if there is a 10 node cluster with 128 GB of RAM each, it is possible to have almost 1 TB of data stored in HBase cache with bucket cache enabled and not get impacted by JVM GC which is not the case using just the default block cache." (from here).

So, trying assign more heap memory seems like a fool's errand. That will just cause a lot of GC. Instead, use off-heap memory.

Convolution


I was wondering if convolutional neural network had anything to do with convolution in mathematics. They do. But just as an introduction, here's what convolution in maths is all about.

Laplace transforms

These are useful in solving differential equations. Boaz defines them as:

L(f) = ∫f(t) e-pt dt = F(p)

If you use the key f(t), you can lookup the solution. For instance, if

f(t) =  e-at

then

F(t) = 1/(a + p)

You can work this out by simple integration, look it up in a table like this:
or use software like Sympy:

from sympy.abc import t, s
from sympy import *
import sympy

a = symbols('a', positive=True)
p = symbols('p')
print laplace_transform(sympy.exp(-a * t), t, p) 

which prints out:

(1/(a + p), 0, True)

Where this gets interesting is when we get recursive. Let f(t) = y and for brevity let y' = dy/dt etc. Let's plug this into the definition of a Laplace transformation at the top of the page:

L(y')y' e-pt dt

by a standard integration by parts (that says ∫u dv = uv - ∫v du) then with u=e-pt and dv=dy:

L(y') = y' e-pt dt = e-pt dy = e-pt y|0 - (-p)y e-pt dt
     = -y(0) + pL(y)

There was no particular reason to chose f(t) = y. In fact, let's chose f(t) = y' and do the same all over again. We get:

L(y'')  = -y'(0) + pL(y')

Taking these two equations, we can cancel out L(y') giving us:

L(y'')  = pL(y) - py(0) - y'(0)

and so on for any number of derivatives of y. We have our table of equations for L(y) so we only need plug that in.

Convolution

Now, as an example, take the equation:

A y''(t) + B y'(t) + C y(t) = f(t) where y(0) = y'(0) = 0

which is a typical physics equation (where a particle is at rest and the force applied to it start at at t=0). YMMV.

then applying the Laplace transform to all terms:

p2 L(y) + B p L (y) + C L(y) = L(f)

Re-arranging:

L(y) =      L(f)        =      L(f)
      (p+ B p + C)     A(p + a)(p + b) 

where a and b are chosen to make the constants B and C disappear.

But, as it happens, this factor is also a Laplace transform. From Boaz' table:

Let's call it T(p).

So, now we have:

L(y) = T(p) L(f)

and we can conclude that y "is the inverse transform of two functions whose inverse we know".

Ok, that's an example. Let's see the general case. Let G(p) and H(p) be transforms of g(t) and h(t) respectively. Then:

G(p) H(p) = g(σ) e-pσ dσ h(τ) e-pτ 

from the definition of a Laplace transformation (where we are using σ and τ to avoid confusion with a duplicated t).

Let's do another change of variables and have σ=t-τ for fixed τ which means that dσ=dt but the limits slightly change. So:

G(p) H(p) = τg(t-τ) e-p(t-τ)dt h(τ) e-pτ 
          = τ e-pt g(t-τ) h(τ) dτ dt

Since τ ranges from 0, that limit changes and this equation becomes:

G(p) H(p) = L (0 g(t-τh(τ) dτ ) = L ( g * h )

Note that τ introduces a sliding window that will be used in convolutional neural nets (part 2).