Friday, December 29, 2017

Linear algebra crib sheet #1


Here are some notes I've made about linear algebra but you may also find this link to a good undergraduate lecture notes useful.

Positive definite

Square matrix M is positive definite if zT M z is positive for every non-zero z.

"A matrix is positive definite if it's symmetric and all its eigenvalues are positive" from here. It has applications in finding minima and maxima.


Hermitian Matrices

Where Ai,j = Aj,i

where A is the complex conjugate of A.

This means it is necessarily a square matrix.

For real matrices, this means it's just a symmetric matrix.


Gramian/Gram Matrix

The Hermitian matrix, G, where:

Gi,j =  < vi, vi >

That is, it "is the matrix of all possible inner products" (from MathWorld).


Degrees of Freedom of a matrix

The number of degrees of freedom of an nxn matrix is n(n-1)/2. The degrees of freedom equate to the number of linearly independent planes.

The best explanation of how we calculate this number I found was here on StackExchange. Basically, we are rotating the system around planes not axes (although in ℝ3, you'll find you get the same degrees of freedom if you erroneously treat n as the number of axis). To define a plane, you need 2 axis and there are n(n-1)/2 such pairings.


Cholesky decomposition

Cholesky decomposition says a Hermitian positive definite (see above) matrix, A, can be decomposed such that A = LL* where L is a lower triangular matrix.

The beauty of the L matrices being triangular is that you can solve linear equations Ax = b by substituting in LL*, solving for y by back substitution in Ly=b and then solving for x by forward substitution in L*y = x.


Diagonalization

The recipe for diagonalizing a matrix, M follows:
  1. Find the eigenvalues.
  2. For each eigenvalue, find the eigenvectors and normalize
  3. Re-express the original equation in terms of M C = C D where C is matrix with the normalized eigenvectors as columns and D is a diagonal matrix whose diagonal elements are eigenvalues.(Note: this is just a representation of the original problem of finding the eigen-vector/values).
  4. Re-arrange for M noting that det(C) cannot be 0.
Which leads us to what we want, that is the decomposition:

M = C D C-1

“A matrix has real eigenvalues and can be diagonalized by a unitary similarity transformation if and only if it is Hermitian.” Boaz, p154


Similar Matrices

Matrices A and B are similar if A = P-1 B P.

“Similar matrices represent the same linear operator under two (possibly) different bases, with P being the change of basis matrix” (Wikipedia)

See "Diagonalization" above.


Symmetric matrices

A notable property of a symmetric matrix is that their eigenvalues are all real. This is a good proof from my alma mater, Imperial. Briefly, if A ∈ ℝn x n and one of its eigenvectors is u ∈ ℂn, then Au=λu. Take the complex conjugate of both sides, pre-multiply by u* and re-arrange. You'll find that (because eigenvectors are not 0), λ-λ*=0. The only way for this to be true is that λ is real.

The eigenvectors of a symmetric matrix can be chosen (that is, we chose to scale them to unit vectors) to be orthonormal. We can prove this by noting for a symmetric matrix A that A = AT. Plugging this into the equation for diagonalization, there is a hint [Strang, p330] that CT = C-1 which is exactly that property of orthogonal matrices.

The proof is Au1=λ1u1 and Au2=λ2u2 then pre-multiply the first equation with u2T, re-arrange and substitute in the second. You'll find for λ1≠λ2, u2Tu1=0.

They can always be diagonalized (StackExchange).


Covariant and Contravariant matrices

This has nothing to do with type co- and contravariance that you find in programming languages (so I am told by a maths PhD).

Basically, “the differentials of the coordinates are the components of a contravariant vector. Similarly, … the partial derivatives of a function are the components of a covariant vector.” [Boas p530].

More concretely, “every quantity which under a transformation of coordinates, transforms like the coordinate differentials is called a contravariant tensor.… Every quantity which under a coordinate transformation, transforms like the derivatives of a scalar is called a covariant tensor.” (StackExchange).

I've seen various analogies to explain this on the Web but it's hard to because “in three-dimensional Euclidean space,… contravariant and covariant tensors are equivalent.... The two types of tensors do differ in higher dimensions, however.” (MathWorld), specifically they "become identical for Cartesian vectors" [Boaz p529].

In terms of their notation, “covariant vectors are representable as row vectors. Contravariant vectors are representable as column vectors.” (StackExchange)

As a mnemonic, remember “co-low-row”. That is, covariant matrices have a lower (subscript) index in notation and by convention are represented as row vectors.


QR factorization

I've discussed this before. It decomposes a matrix into two matrices, Q and R, where R is upper triangular. It does not give you eigenvalues but you can use the related QR Algorithm to do so but note: "The QR algorithm only converges using Wilkinson-Shift strategie[sic]" (StackExchange).


Singular Value Decomposition

Note that despite similarities, singular values are not eigenvalues although singular values can be used to compute eigenvalues and vectors. In fact, in a real, symmetric matrix, the singular values and the eigenvalues are the same - but then few matrices are real and symmetric.

Intuitively the difference between SVD and eigendecompositon can be found here at StackExchange:

"SVD says for any linear map, there is an orthonormal frame in the domain such that it is first mapped to a different orthonormal frame in the image space, and then the values are scaled. Eigendecomposition says that there is a basis, it doesn't have to be orthonormal... Consider the difference for a rotation matrix in ℜ2... Here, there are no real eigenvalues and this corresponds to there being no choice of basis which under the transformation is simply a scaling. On the other hand, SVD makes a lot of sense here because it says we can take the standard basis in the domain, map it to the rotated version of this basis (thought of as in the image space), and scale everything by 1."

The relationship between eigenvalues and singular values can be found here on StackExchange. Basically, if we are looking at matrix X, let C = XTX. Diagonalise this (see above) so C = VDV-1 but also note that X = USVT from SVD. Substitute this into the original equation for C and equate to the diagonalized one (noting UUT=I) and you can see that λi ~ si.

Interestingly, "the parallelization of dense eigenvalue problems is still an area of active research" (see here).


Thursday, December 28, 2017

Introduction to the maths of SVMs


The machine learning technique Support Vector Machines get their names from the mathematical concept of supports - that is, the smallest closed set of a topological space that do not map to zero. We'll see this below.

They deal purely with numeric data but nominal values can easily be mapped to numeric values by having a column for all categories and a value 1 for those data that fall into that category and 0 for all others.

They are known to have a "low generalization error, [be] computationally inexpensive, easy to interpret results" [1] but the maths behind them is pretty complicated.

What is the kernel trick?

I found this nice explanation here.
It provides a bridge from linearity to non-linearity to any algorithm that can expressed solely on terms of dot products between two vectors. It comes from the fact that, if we first map our input data into a higher-dimensional space, a linear algorithm operating in this space will behave non-linearly in the original input space. Now, the Kernel trick is really interesting because that mapping does not need to be ever computed. If our algorithm can be expressed only in terms of a inner product between two vectors, all we need is replace this inner product with the inner product from some other suitable space. That is where resides the “trick”... This is highly desirable, as sometimes our higher-dimensional feature space could even be infinite-dimensional and thus unfeasible to compute.

An example

A great example can be found here. Chitta Ranjan takes the classic example of points that are not linearly separable in 2-dimensions and decides that he will use 3-dimensions. The function he uses for this is:

Φ(x) → x1, x2, √2 x1x2

Leading to a similarity measure of:

<Φ(xi), Φ(xj) >

Although this raising the number of dimensions does indeed separate the data, his friend, Sam, notes that you this can equivalently be written as:

<xi, xj > 2

All Sam had done "is realize there is some higher dimensional space which can separate the data. Choose a corresponding Kernel and voila! He is now working in the high-dimension while doing computation in the original low dimensional space."

Which Kernel?

Although lists of kernels exist, "people just go with the infinite dimension by using the Gaussian (RBF) Kernel. RBF is the most commonly used Kernel. It has a tuning hyperparameter σσ that is tuned to choose between a smooth or curvy boundary. In short, as a rule of thumb, once you realize linear boundary is not going to work try a non-linear boundary with an RBF Kernel."

Prerequisites

In an attempt to understand things better, I turned to Learing with Kernels (Schölkopf). These are some notes I made:

ℌ, a feature space that has a dot product.
Χ, a set that contains the inputs.
Φ, a map from the inputs to the feature space. This may be trivial or a more exotic, non-linear mapping.
m is the number of inputs and n is the size of a feature vector.
k(x, x') is the kernel function that gives us a similarity measure.

A naive implementation

Given two clusters, we can calculate their centres:

c+ = m+-1 Σ{i|yi=+1} xi

c- = m--1 Σ{i|yi=-1} xi

where we differentiate the two clusters with + and - symbols.

We define w as the vector between the two centres, that is c+-c-.

Half way between these two clusters (c++c-/2) is point c. Which cluster x is closest to can be given by the sign of <x-c, w>. Or, to put it another way, the sign of:

<x-cw> = <x-(c+c-/ 2), c+-c-> = <xc+>-<xc-> - b

where b = (||c+||2- ||c-||2) / 2

The sign of this equation essentially gives us our decision function, y. However, to use the kernel trick in this equation, we need to express everything in terms of x. Since w and b can be expressed in terms of c and c in terms of x, this gives:

y = sgn(m+-1 Σ{i|yi=+1} <x, xi> - m--1 Σ{i|yi=-1} <xxi > + b)
  = sgn(m+-1 Σ{i|yi=+1} k(x, xi) - m--1 Σ{i|yi=-1} k(x, xi) + (m+-2 Σ{i,j|yi=+1} k(xi, xj) - m--2 Σ{i,j|yi=-1} k(xi, xj))/2)

Aside: if we then choose the origin to be equidistant from the two two centres, then b becomes zero. And if the kernel function, k, is normalized such that we can view it as a probability density function (that is, for a given x' the probability ) then:

Χ k(x, x') dx = 1 ∀ x ∈ Χ

This leads to the equation for y taking the form:

y = sgn(Σ αi k(x, xi) + b)

which looks like the Bayes Classifier apparently.

A better implementation

This is just a toy implementation where the hyperplane is just the mean between the two centroids. What we really want is a hyperplane that maximizes the distance between the clusters. That is, given a decision function f,

f(x) = sgn(<xw>+ b)

we want to maximize the minimum distance between the two sets of points either side of the boundary. Let's say that the two points in each set that are the closest are x1 and x2. With the right scaling, we can say:

<x1w> + b = 1
<x2w> + b = -1

Taking these as simultaneous equations, we can eliminate b and get:

<x1 - x2w> = 2

or equivalently:

<x1 - x2/ |w|> = 2 / |w|

The left hand side of this equation is the distance between  x1 and x2 projected on the normal of the hyperplane. If we want to maximize this, we want to maximize (2/|w|), or to put it another way, minimize |w|.

To this end, let's introduce an objective function:

τ(w) = |w|2 / 2

that we wish to maximize (why we square it and divide it by 2 will become apparent later. Suffice to say minimizing is the same as minimizing τ).

Now, one solution would be the trivial w = 0 so we need to add the inequality constraints:

yi (<xiw>+ b) ≥ 1

Now we use Lagrange multipliers. That is, we note at the minimum of τ that dτ/dw is 0 (standard calculus). Let's propose the Lagrangian we want to maximize as:

L(w, b, α) = τ(w) - Σi=1m αi (yi (<xiw>+ b) - 1)

Note that if the inequality constraint is not exact, the term in the summation is negative. Since we we want to maximize the Lagrangian, necessarily its corresponding αi must be zero (note that we are subtracting the summation). So, αi will only be non-zero for points that make the inequality constraint exact. This is the Karush-Kuhn-Tucker condition. It agrees with our intuition that we only care about nearest points. All other points are irrelevant.

Also, these non-zero αis will obviously have their corresponding term equal to 0 as they're exactly 1, and we subtract 1 from such (in)equality constraints. This is enough information to make our Lagrangian.

Differentiating the Lagrangian by w yields:

wΣi=1m αi yi xi

and by b yields:

Σi=1m αi yi = 0

This latter equation compliments the former by being a constraint on the solution.

Substituting this equation for w into that for τ(w), we can now say that its minimum is at:

Σi,j=1m ααj yi yj <xixij>

and the objective function we started with at the top of this section is now:

f(x) = sgn(Σi=1m αi yi <xxi>+ b)

The hard part is calculating the alphas etc in that minimization. This is the dual optimization problem and it beyond the scope of this post.

Shattering

One last point is how effective the boundary is. "Since the labels are in {±1} there are at most 2m different labelings for m patterns. A very rich function class might be able to realize all 2m separations, in which case it is said to shatter the m points."

[1] Machine Learning in Action published by Manning.

Wednesday, December 27, 2017

Spark dataframes notes


As I wean myself off inefficient (but easy), maintenance-mode RDDs, these are some notes I made on Datasets and DataFrames.

Datasets for all

In this example, I was looking at bank account transactions for which I wanted to build histograms of nominal values. I created my case class so:

case class Record(acc: String, histo: String, name: String, value: Double, number: Int)

and wanted to create histogram buckets of usage data. I won't bore you with the details but suffice to say I had a function that looked like:

  type Bucket[T, U] = (T, U, Int)
  type Buckets[K] = Seq[Bucket[K, Double]]
  def processAll(rows: Iterator[Row]): Map[String, Buckets[String]] = ...

Now I can run on pipe delimited raw data:

  val byAccountKey = rawDataDF.groupByKey(_.getString(IBAN_INDEX))
  val accTimeHistoCase = byAccountKey.flatMapGroups { case(acc, rows) => processAll(rows).flatMap(bs => bs._2.map(x => Record(acc, bs._1, x._1, x._2, x._3)) ) }
  val ds = accTimeHistoCase.as[Record]

only for it to give me runtime exceptions because Spark doesn't know how to serialize my Records. Oops. 

Not to worry. Let's just add a row encoder.

val schema = StructType(
  StructField("acc", StringType, nullable = false) ::
  StructField("histo", StringType, nullable = false) ::
  StructField("name", StringType, nullable = false) ::
  StructField("value", DoubleType, nullable = false) ::
  StructField("number", IntegerType, nullable = false) ::
  Nil)
implicit val recordEncoder = RowEncoder(schema)

and we're good to go.

User Defined Functions

... except my data is in a Continental format where commas are used as decimal points. This causes confusion when creating Doubles. So, we can add a UDF so:

def commasAsPeriods(x: String): Double = x.replace(",", ".").toDouble
val numFormatter = udf(commasAsPeriods _)
val df = rawDataDF.withColumn("CONTINENTAL_NUMBER", numFormatter('CONTINENTAL_NUMBER))

and now we can parse the whole file.

Built-in Functions

There are a plethora of functions for Spark Datasets. For instance, if you wanted to do a sort of flatMap from Dataset[K, Seq[V]] to Dataset[K, V], you would use explode. (But be careful that this doesn't consume too much memory. I've seen OutOfMemoryErrors by using this function.)

Similarly, collect_set is sort of the opposite. It aggregates a number of rows into one. For instance, if we wanted to know all the bucket names for a single histogram type, we could write:

scala> ds.groupBy($"histo").agg(collect_set($"name")).show()
+----------------+--------------------+
|           histo|   collect_set(name)|
+----------------+--------------------+
|receivingAccount|[ATXXXXXXXXXXXXXX...|
|     countryCode|[NA, XK, VN, BE, ...|
|       dayOfWeek|[FRIDAY, MONDAY, ...|
|         dayTime|[EVENING, FRIDAY,...|
+----------------+--------------------+

These are fairly simple functions but we can make them arbitrarily complex (see this SO answer for a recipe).

aggRo

Now my Dataset, ds, is bucket data for a given histogram for a given account number.

We can see how much bucket data there is for each histogram.

scala> ds.groupByKey(_.histo).agg(count("name")).show()
+----------------+-----------+
|           value|count(name)|
+----------------+-----------+
|receivingAccount|    1530942|
|     countryCode|     465430|
|       dayOfWeek|     866661|
|         dayTime|     809537|
+----------------+-----------+

Since we're dealing with nominal data, a lot of the bucket names are going to be duplicated. For example, there are many data points for the same histogram and bucket (but obviously with different account numbers). For example, the countryCode histogram will have names of CH, GB, FR etc. Let's see this:

scala> ds.groupBy('histo).agg(countDistinct("name")).show()

+----------------+--------------------+
|           histo|count(DISTINCT name)|
+----------------+--------------------+
|receivingAccount|              732020|
|     countryCode|                 152|
|       dayOfWeek|                   8|
|         dayTime|                  10|
+----------------+--------------------+

Evidently, there are 152 different countries to which our customers send money.

We can, of course, aggregate by multiple columns, thus:

scala> ds.groupBy('histo, 'name).agg(count("name")).show()
+----------------+--------------------+-----------+
|           histo|                name|count(name)|
+----------------+--------------------+-----------+
|     countryCode|                  CH|        956|
.
.

groupBy vs. groupByKey

The method groupBy produces a RelationalGroupedDataset and is "used for untyped aggregates using DataFrames. Grouping is described using column expressions or column names."

Whereas groupByKey produces a KeyValueGroupedDataset that is "used for typed aggregates using Datasets with records grouped by a key-defining discriminator function" (Jacek Laskowski).

There's a subtle difference in the API between using groupBy and groupByKey. The agg() method on the result of the former takes a Column but the agg() of latter takes a TypedColumn.

"In Datasets, typed operations tend to act on TypedColumn instead." Fortunately, "to create a TypedColumn, all we have to do is call as[..]" (GitHub)

So, the following:

scala> ds.groupBy('histo).agg(countDistinct('name')).show()

and

scala> ds.groupByKey(_.histo).agg(countDistinct('name').as[Long]).show()

are equivalent.

Similarly, the functions for groupByKey must use typed. For instance:

scala> import org.apache.spark.sql.expressions.scalalang._
scala> ds.groupByKey(_.histo).agg(typed.avg(_.value)).show()

and

scala> ds.groupBy('histo).agg(avg('value)).show()

are also equivalent.

Differences between Datasets

I'd not heard of anti-joins before but they're a good way to find the elements in one Dataset that are not in another (see the Spark mailing list here). The different types (with examples) can be found here on SO where Spark's "left_anti" is the interesting one.


Thursday, December 21, 2017

Ridge regression


"Ridge regression adds an additional λI to the matrix XTX so that it's non-singular, and we can take the inverse of the whole thing" [1]

It adds bias. Take a matrix that has rank 1 (that is, each row adds no more information).

import numpy as np
from numpy.linalg import matrix_rank, det, inv, norm

mat_rank_1 = np.matrix('1    2    3;'
                       '10   20   30;'
                       '100, 200, 300')

We can demonstrate it does indeed have rank 1:

print mat_rank_1, "has rank ", matrix_rank(mat_rank_1)  # it is indeed 1

Now if were to use it, say, in linear regression, we'd have problems when we invert it:

mTm = np.dot(mat_rank_1.T, mat_rank_1)

print "\ndeterminant of rank 1 matrix multiplied by its transpose:", det(mTm)  # and not too surprisingly, the determinant is 0

The following blows up with "numpy.linalg.linalg.LinAlgError: Singular matrix":

inv(mmt)

So, we can add some bias:

l = np.eye(3, 3) * 0.1

wbTwb = np.dot(mat_rank_1.T, mat_rank_1) + l
print wbTwb, "has determinant", det(wbTwb), "and rank", matrix_rank(wbTwb)

print inv(wbTwb)

and this doesn't blow up.

"Ridge regression was originally developed to deal with the problem of having more features than data points. But it can also be used to add bias into our estimations. We can use the λ value to impose a maximum value on the sum of all our [weights]" [1]

John D Cook warns: "A little noise makes the system go from theoretically impossible to solve to theoretically possible to solve, but that may not be very useful in practice. It will let you compute a solution, but that solution may be meaningless... You use knowledge of your domain beyond the specific data at hand to guide how you change your matrix.

Ill Conditioned

He goes on: "The condition number of a matrix is the norm of the matrix times the norm of its inverse... A small change to a matrix might not change its norm much, but it might change the norm of its inverse a great deal. If that is the case, the matrix is called ill-conditioned because it has a large condition number.

"You can think of condition number as an error multiplier... Note that condition number isn’t limited to loss of precision due to floating point arithmetic. If you have some error in your input b, say due to measurement error, then you will have some corresponding error in your solution to Ax = b, even if you solve the system Ax = b exactly. If the matrix A is ill-conditioned, any error in b (rounding error, measurement error, statistical uncertainty, etc.) will result in a correspondingly much larger error in x."

In our case, it we can calculate this condition number thus:

print "norm of original matrix", norm(wbTwb)
print "norm of inverse of original matrix", norm(inv(wbTwb))
print "Condition number", norm(wbTwb) * norm(inv(wbTwb))

Which results in:

norm of original matrix 141414.1
norm of inverse of original matrix 14.1421356238
Condition number 1999897.38132

Yoiks. That modest λ=0.1 made our matrix quite ill conditioned.

Interestingly, making our original matrix rank 3 by avoid linear dependencies between rows:

mat_rank_1 = np.matrix('1    2    3;'
                       '20   10   30;'
                       '100, 300, 200')

reduces the condition number by an order of magnitude even for the same value of λ.

What's more, making λ much bigger (say 10) makes the original matrix less ill-conditioned by two orders of magnitude. But of course, now we're significantly distorting our data - the "meaningless" that Cook talks of.

An excellent article on how ill-conditioning can effect neural networks can be found here.

[1] Machine Learning in Action


Parquet vs. Avro vs. Orc


Different big data access patterns require different data formats. Here are some notes I made while playing with the common ones.

File formats

"Avro is a Row based format. If you want to retrieve the data as a whole you can use Avro. Parquet is a Column based format. If your data consists of lot of columns but you are interested in a subset of columns then you can use Parquet" (StackOverflow).

Parquet

Parquet is based on Dremel which "represents nesting using groups of fields and repetition using repeated fields. There is no need for any other complex types like Maps, List or Sets as they all can be mapped to a combination of repeated fields and groups" (Twitter blogs).

You can get the Parquet Tools from here. It allows you to examine the Parquet files with commands like this on some address data:

$ hadoop jar ~/Tools/parquet-tools-1.9.0.jar meta  /HDFS/FILE.parquet
17/12/14 09:02:59 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5
17/12/14 09:02:59 INFO hadoop.ParquetFileReader: reading another 1 footers
17/12/14 09:02:59 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5
file:        /HDFS/FILE.parquet
creator:     parquet-mr version 1.5.0-cdh5.12.0 (build ${buildNumber})
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"LON","type":"double","nullable":true,"metadata":{}},{"name":"LAT","type":"double","nullable":true,"metadata":{}},{"name":"NUMBER","type":"string","nullable":true,"metadata":{}},
.
.


file schema: spark_schema
--------------------------------------------------------------------------------
LON:         OPTIONAL DOUBLE R:0 D:1
LAT:         OPTIONAL DOUBLE R:0 D:1
NUMBER:      OPTIONAL BINARY O:UTF8 R:0 D:1
.
.

row group 1: RC:16 TS:1067 OFFSET:4
--------------------------------------------------------------------------------
LON:          DOUBLE SNAPPY DO:0 FPO:4 SZ:181/177/0.98 VC:16 ENC:BIT_PACKED,PLAIN,RLE
LAT:          DOUBLE SNAPPY DO:0 FPO:185 SZ:181/177/0.98 VC:16 ENC:BIT_PACKED,PLAIN,RLE
NUMBER:       BINARY SNAPPY DO:0 FPO:366 SZ:95/95/1.00 VC:16 ENC:PLAIN_DICTIONARY,BIT_PACKED,RLE
.
.

Looking at the stats demonstrate that your data really is partitioned as you expect and has sensible splits (eg, A-z is wrong but A-F, F-M etc is better). See Ryan Blue's lectures here and his slide deck here for more information.

This is the metadata of a Parquet file that was generated from a Spark DataFrame that looked like:

scala> df.printSchema
root
 |-- LON: double (nullable = true)
 |-- LAT: double (nullable = true)
 |-- NUMBER: string (nullable = true)

[Note that NUMBER is actually a string because some house numbers can look like "221b" - as for Sherlock Holmes].

Because Parquet only accesses the parts of files it needs, it's been reported to be even faster than having the data in memory.

The R and D stand for Repetition Level and Definition Level.

"Definition levels specify how many optional fields in the path for the column are defined" (Parquet documentation) Put another way "to support nested records we need to store the level for which the field is null... from 0 at the root of the schema up to the maximum level for this column". In our case, it's 1 as we expect one or zero (they're nullable) values for our columns.

Repetition is 0 since none of our fields live in a collection ("in a flat schema there is no repetition and the repetition level is always 0")

Of course, the downside of a columnar format is when you want to retrieve/use many columns or even whole rows. This is where Avro might be a better choice.

Avro

"The project was created to address the major downside of Hadoop Writables: lack of language portability. Avro assumes the schema is always present - at read and write time - which makes for very compact encoding" [1] and it "suports schema evolution" (SlideShare).

Orc

"The ORC format showed up in Hive 0.11." (MapR).  It "breaks rows into row groups and applies columnar compression and indexing within these row groups... If a column is sorted, relevant records will get confined to one area on disk and the other pieces will be skipped very quickly." (HortonWorks)

It "offers excellent compression" [ibid]. Indeed, when I was storing the same data structure (for open source address data for Austria) in Parquet and Orc files, Orc was roughly twice as efficient.

Embarrassingly good compression

Although Parquet and Orc produce roughly equivalent sized files, Orc has a neat trick up its sleeve that is fantastic under certain circumstances. If you have a common field, Orc appears to compress it very efficiently. This is very nice if you're using massively denormalized data that once lived in a highly normalized RDMBS.

To demonstrate, in Spark, let's do:

scala> val fname = "/user/Data/repeated_large_val"

scala> val _1000Chars = "012345678901234... " // 1000  long String

scala> sc.parallelize(1 to 100000000, 100).cache().map(x => (x, _1000Chars)).toDF.write.mode(org.apache.spark.sql.SaveMode.Overwrite).parquet(fname)

scala> sc.parallelize(1 to 100000000, 100).cache().map(x => (x, _1000Chars)).toDF.write.mode(org.apache.spark.sql.SaveMode.Overwrite).orc(fname + "_orc")

Now, on the UNIX CLI:

$ hadoop fs -du -h /user/Data/
567.9 M  1.7 G   /user/Data/repeated_large_val
2.6 M    7.8 M   /user/Data/repeated_large_val_orc

That's a saving of 2 orders of magnitude!

And just to show there is nothing up my sleeve:

scala> val orcFromFile = spark.read.orc(fname + "_orc")
orcFromFile: org.apache.spark.sql.DataFrame = [_1: int, _2: string]

scala> orcFromFile.count()
res15: Long = 100000000

Partitioning

Note the effects of partitioning a Spark Dataset on the underlying file structure. It can break the file into sub-directories where a key becomes a directory name.

For example, take this code:

      spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .csv("IN_FILE").sort(partitionBy)
        .write.mode(SaveMode.Overwrite)
        .parquet("OUT_UNPARTITIONED")

Creates a file structure like this:

$ hadoop fs -ls OUT_UNPARTITIONED.parquet | head
Found 201 items
-rw-rw-r--   3 user user          0 2017-11-23 09:10  OUT_UNPARTITIONED.parquet/_SUCCESS
-rw-rw-r--   3 user user     489818 2017-11-23 09:09  OUT_UNPARTITIONED.parquet/part-00000-51284a5b-ebf6-47c4-8188-315e09f240e1-c000.snappy.parquet
-rw-rw-r--   3 user user     565884 2017-11-23 09:09  OUT_UNPARTITIONED.parquet/part-00001-51284a5b-ebf6-47c4-8188-315e09f240e1-c000.snappy.parquet
.
.

Whereas, partitioning so:

      spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .csv("IN_FILE").sort(partitionBy)
        .write.partitionBy("STREET").mode(SaveMode.Overwrite)
        .parquet("OUT_PARTITIONED")

produces a directory structure like:

$ hadoop fs -ls OUT_PARTITIONED.parquet | head
Found 69415 items
drwxrwxr-x   - user user          0 2017-11-23 09:16 OUT_PARTITIONED.parquet/STREET=%22%22%22Glück-auf%22%22-Straße%22
drwxrwxr-x   - user user          0 2017-11-23 09:16 OUT_PARTITIONED.parquet/STREET=%22Kellergasse %22%22Moorberg%22%22%22
drwxrwxr-x   - user user          0 2017-11-23 09:16 OUT_PARTITIONED.parquet/STREET=0Montangebiet
drwxrwxr-x   - user user          0 2017-11-23 09:16 OUT_PARTITIONED.parquet/STREET=0Tallage

where in this particular instance, I had street data. Note that since I partitioned on STREET, each street now has its own directory.

This can be handy when you want to join within a partition but be careful you don't create too many entries in the Hadoop NameNode.

You get a similar difference if you use Orc, that is:

.
.
   .write.partitionBy("STREET").mode(SaveMode.Overwrite).orc("OUT_PARTITIONED_ORC")


Data interchange

Hive does not mandate any particular file format. Files are stored verbatim.” [1] Indeed, if we load a (Parquet) file with Spark and save it into the HDFS directory used byHive:

scala> val df = spark.read.parquet(file)
scala> df.write.saveAsTable("HIVE_DB_NAME.austria_table")

then open Hive, we can read it:

$ hive
hive> use HIVE_DB_NAME;
OK
Time taken: 1.435 seconds
hive> show tables;
OK
austria_table
hive> select count(*) from austria_table;
.
.
Stage-Stage-1: Map: 2  Reduce: 1   Cumulative CPU: 23.32 sec   HDFS Read: 44969 HDFS Write: 8 SUCCESS
Total MapReduce CPU Time Spent: 23 seconds 320 msec
OK
2804994
Time taken: 44.574 seconds, Fetched: 1 row(s)

Now, if we use Spark to do a similar thing with Orc:

scala> val dfOrc = spark.read.orc(“/Data/OpenAddress/austria_partionedByStreet_2_orc")
scala> val df = spark.read.option("header", "true").option("inferSchema", "true").csv("/Data/OpenAddress/austria.csv")
scala> df.write.mode(SaveMode.Overwrite).saveAsTable("HIVE_DB_NAME.austria_table_orc")

We can read it just as easily in Hive:

hive> select count(*) from austria_table_orc;
.
.
2804994
Time taken: 36.396 seconds, Fetched: 1 row(s)

[1] Hadoop, the Definitive Guide


Decision Trees and Spark


What they are

"We start at the root and split the data on the feature that results in the largest information gain... In an iterative process, we can then repeat this splitting procedure at each child node until the leaves are pure. This means that the samples at each node all belong to the same class. In practice, this can result in a very deep tree with many nodes, which can easily lead to overfitting. Thus, we typically prune the tree by setting a limit for the maximal depth of the tree." [1]

Why they're important

Typically when approaching a problem, a data scientist will first start with a decision tree.

"As for learning algorithms Logistic Regression and Decision Trees / Random Forests are standard first-round approaches that are fairly interpretable (less so for RF) and do well on many problems" (Quora). This can be especially important when legislation (like the forthcoming GDPR) gives consumers the right to know why they were or were not targeted for a product.

Why they work

While looking at Spark's decision tree algorithm you see that each split is trying to increase the entropy. There are other formulas for information gain than entropy but you get the idea.

Spark's Implementation

Spark has a nice example here but it loads example data in LIBSVM format. What is this? Well, it's basically T- > Vector where T is our label type. Instead of using the example LIBSVM data, let's take my data set of bank accounts and write:

val data = ds.map(r =>(asDouble(r.histo), new DenseVector(Array(asDouble(r.acc), asDouble(r.name), asDouble(r.number), asDouble(r.value)))))

where ds is of type Dataset[Record] and Record is given by:

case class Record(acc: String, histo: String, name: String, value: Double, number: Int)

and asDouble is just a function that turns anything into a double. It is just test code after all.

Now it's a in a form I can feed into Spark using the code here that gives us a Model by fitting the Pipeline to the training data. (The pipeline is the label indexer, feature indexer, decision tree classifier and label converter in that order).

I extract my model from the pipeline so

scala> val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]

and I can actually see what the tree looks like in a very human-readable form:

scala> treeModel.toDebugString
res40: String =
"DecisionTreeClassificationModel (uid=dtc_f122057dc098) of depth 5 with 49 nodes
  If (feature 1 <= -1.940284966E9)
   If (feature 1 <= -2.016375591E9)
    Predict: 0.0
   Else (feature 1 > -2.016375591E9)
    If (feature 2 <= 50.0)
     If (feature 1 <= -2.004972479E9)
      If (feature 3 <= 5.8760448E7)
       Predict: 1.0
.
.

[Note that ml.classification.DecisionTreeClassifier is DataFrame oriented while the older mllib.tree.DecisionTree is RDD-based]

Now, the Model can transform the test data and we can see how effective our tree is:

scala> val transformed = model.transform(testData).cache()
scala> transformed.show()
+--------------+--------------------+------------+--------------------+--------------------+--------------------+----------+--------------+
|            _1|                  _2|indexedLabel|     indexedFeatures|       rawPrediction|         probability|prediction|predictedLabel|
+--------------+--------------------+------------+--------------------+--------------------+--------------------+----------+--------------+
|-1.477067101E9|[-2.146512264E9,2...|         3.0|[-2.146512264E9,2...|[326.0,0.0,0.0,27...|[0.00116486398605...|       3.0|-1.477067101E9|
|-1.477067101E9|[-2.14139878E9,20...|         3.0|[-2.14139878E9,20...|[326.0,0.0,0.0,27...|[0.00116486398605...|       3.0|-1.477067101E9|
|-1.477067101E9|[-2.13437752E9,20...|         3.0|[-2.13437752E9,20...|[326.0,0.0,0.0,27...|[0.00116486398605...|       3.0|-1.477067101E9|
.
.

where a typical row of this DataFrame looks like:

res46: org.apache.spark.sql.Row = [-1.477067101E9,[-2.146512264E9,2099.0,1569.0,48563.0],3.0,[-2.146512264E9,2099.0,1569.0,48563.0],[326.0,0.0,0.0,279535.0],[0.0011648639860502177,0.0,0.0,0.9988351360139498],3.0,-1.477067101E9]

The documentation is useful in interpreting this (see here) but suffice to say that the rawPrediciton and probability is the number of training instances/probability of the datapoint falling into a given class (see StackExchange).

Note that:

scala> ds.count()
res1: Long = 3680675
scala> transformed.count()
res1: Long = 1194488

So, we have a tree with the number of leaves being about one third of the actual data. Looks like we're overfitting (see [1]) but, again, this is just me playing.

Finally, we can use a MulticlassClassificationEvaluator to find the overall accuracy of our model.

scala> val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel")
scala> val accuracy = evaluator.evaluate(transformed)
scala> println("Test Error = " + (1.0 - accuracy))
Test Error = 0.1842248695787883

Hmm, pretty bad but we were just playing.

[1] Python Machine Learning

Sunday, December 3, 2017

The bias/variance tradeoff


In putting models together, you'll typically have "overfitting (high variance) or underfitting (high bias)" [1] The bias/variance tradeoff is the process of balancing the two.

"Variance is how sensitive a prediction is to what training set was used. Ideally, how we choose the training set shouldn’t matter – meaning a lower variance is desired. Bias is the strength of assumptions made about the training dataset. Making too many assumptions might make it hard to generalize, so we prefer low bias as well." [2]

But what is variance and bias?

Random variables are not random variables

Firstly, we clean up some terminology. From the excellent Count Baysie: "Random Variables, which are neither random nor variables. The Random Variable is the thing that translates 'H' or 'T' into 1 or 0. But when we hear "Random Variable" it's very tempting to think "Oh this must work like a 'Random Number Generator' where each time I look at this variable it has a random value."

Great Expectations

Now, "the Expectation of a Random Variable is the sum of its values weighted by their probability... Expectation is just the mean of a Random Variable! But words like mean and average carry a special intuition"

Variance

"If we took a random sample of our data, say, 100 points and generated a linear model, we' d have a set of weights. Say we took another set of random points and generated another linear model. Comparing the amount that the weights change will tell us the variance in our model." [3]

Variance is typically taught at high school to be:

σ2 = Σ (x - μ)2 / n

where μ is the mean and n is the number of samples. In a continuous probability distribution, this is:

σ2 = ∫ (x - μ)2 f(x) dx

which looks a lot like the definition of expectation

[Aside: note that if (x - μ) were raised to the power 3, we'd be talking about the skew and if it were raised to the power 4, we'd be talking about the kurtosis ("how pointy a distribution is"). These are the moments of a random variable. But that is another story. See here for more.]

Anyway, the variance of a random variable X is:

Var(X) = E[(X - μ)2] = E[(X - E[X])2]

expanding and noting that an expectation of an expectation is the original expectation itself:

Var(X) = E[X2] - E[X]2

Thus when X is a random variable, Variance is the difference between the expectation of X squared and the squared expectation of X.

Note that the Akaike Information Criteria [Wikipedia] can be used in model selection to dismiss models that have too much overfitting. Note that this only applies to certain types of models (eg GLMs).

Aside: PCA and Variance

In PCA, we're trying to minimize the squared distances of all points from a vector. But in doing so, we're increasing the spacing between the positions on this vector that are the shortest distance between the points and the line (see this StackExchange answer) - basically, the "errors".

That is, this is the vector with greatest variance between all the data points. This fits our intuition that the principal components are the most distinctive elements of the data (in the example given, it is the qualities of wines - alcohol content, colour etc - or more often linear combinations of these qualities that distinguish wines.

Bias

"Bias measures how far off the predictions are from the correct values in general if we rebuild the model multiple times on different training datasets; bias is the measure of systematic error that is not due to randomness." [1] In other words, E[approximate f(x) - true f(x)].

Why would we introduce bias? Well, one reason would be that we have more columns than features. For instance, in linear regression we need to invert a matrix made of the inputs (see the last equation in my post on linear regression). This is not possible if the matrix has a rank less than the number of rows (viewed as a set of equations, it has no solution). So, we could reduce the number of columns thus making the model less complex and adding bias.

Another reason is to introduce regularization. "The concept behind regularization is to introduce additional information (bias) to penalize extreme parameter weights." [1]

[1] Python Machine Learning
[2] Machine Learning with TensorFlow
[3] Machine Learning in Action