Tuesday, March 20, 2018

Fighting Orcs

Orc has an amazing ability to compress data. A Parquet file of 1.1tb shrank to 15.7gb when saved as Orc although note that the Orc file is compressed. "The default compression format for ORC is set to snappy" (StackOverflow). This indeed seems to be the case as you can see snappy in the name of files in the HDFS directory.

But how fast is querying it?

Well, there was an initial pause when first reading the file with:

val orcFile = spark.read.orc(HDFS_DIR_NAME)

It seemed that the driver was doing all the work and all 60 executors I had asked my shell to employ were doing nothing. Using jstack showed that the driver was trying to infer the schema (see o.a.s.s.e.d.DataSource.getOrInferFileFormatSchema). Adding the schema would mitigate this.

Now, let's group by a field called dax.


(There are 36 different values for dax over nearly 890 million rows with 292 columns).

This groupBy took about 50s when the underlying format was Parquet and 37s when it was Orc (not including the perfectly avoidable large read time) so not bad. How does Orc do it? It has 'indexes'. Sort of. "The term 'index' is rather inappropriate. Basically it's just min/max information persisted in the stripe footer at write time, then used at read time for skipping all stripes that are clearly not meeting the WHERE requirements, drastically reducing I/O in some cases" (StackOverflow).

In fact, you can see this metadata using the Hive command line:

hive --orcfiledump HDFS_DIR_NAME

Can we make it even faster? I thought sorting and repartitioning might make a difference so I executed this:


Note, cacheing the original Dataframe would cause OOMEs for reasons unknown to me at the moment. And without repartitioning, the third (of four) stages would take so long I had to kill the job. Repartitioning helped but I did notice a huge amount of shuffle (100s of gbs when the original file itself was only 16gb).

Also, I had to keep increasing the drivers' memory until I stopped getting OOMEs in the last stage. For reasons that I don't currently understand, the memory had to increase to 20gb before the job finished. Even then, it took 2.5 hours on 30 executors with 5 cores each and the resulting directory took 146gb of disk space.

However, the speed of a query was staggeringly fast. This:

partitionedOrc.where('dax === 201501).groupBy('dax).agg(count('dax)).show(10)

took a mere 2s. By comparison, the same query on the unpartitioned and unsorted Orc file took about 32s.

This speed is comparable to Apache Impala which "can be awesome for small ad-hoc queries" (StackOverflow).

Note that Impala, being "highly memory intensive (MPP), it is not a good fit for tasks that require heavy data operations like joins etc., as you just can't fit everything into the memory. This is where Hive is a better fit" (ibid).

Note that this speed increase appears to be available only when querying purely on that which is partitioned. For instance, let's take another field, mandant, and run a slightly modified version of the query above:

partitionedOrc.where('mandant === 201501).groupBy('dax).agg(count('dax)).show(10)

This takes 32s as well.

Monday, March 5, 2018

DeepLearning4J in a Spark cluster

Running DL4J on Spark was not too hard but there were some none obvious gotchas.

Firstly, my tests ran on a Windows machine but I needed to add dependencies to get it to run on our Linux cluster where I was getting "no openblas in java.library.path" errors. This link helped me and now my dependencies looked like this:

    <dependency> <!-- remember to run spark with conf "spark.kryo.registrator=org.nd4j.Nd4jRegistrator" -->



Second, you need to add --conf "spark.kryo.registrator=org.nd4j.Nd4jRegistrator"  to the CLI when you start a Spark shell.

Thirdly, I randomly grabbed some Recurrent Neural Network from here just to test my code. I found that it was immensely memory-hungry. I needed to give my driver 20gb, my executors 30gb and only 1 core per executor to avoid occasional errors in the Spark stages.

Even then, I couldn't measure the accuracy of my neural net because of this issue. Apparently, it's fixed in the SNAPSHOT but then there are issues with the platform JARs not being up to date. I asked about this on the DeepLearning4J gitter channel archived here. (The team also helpfully told me to use org.deeplearning4j.eval.Evaluation with an argument to avoid the bug).

Finally, I started getting results but not before one last gotcha and this time in Spark: use RDD.sample to get your hands on data with which to test rather than take as you want a nice distribution over all categories. With this, I started getting more sensible answers when evaluation my results.

Friday, February 16, 2018

Spark and Neural Nets

Spark support neural nets but the variety is quite limited. For instance, somebody asked on the Spark mailing list whether Recurrent Neural Networks would soon be implemented. To which the answer was "The short answer is there is none and highly unlikely to be inside of Spark MLlib any time in the near future."

Deep learning in YARN

Since we're looking at getting serious with neural nets in Spark, we were forced to look at alternatives. A Yahoo! offering is the one we will soon try. It promises to run TensorFlow within Yarn. This also suits our hardware as it looks as if we may be able to exploit Remote Direct Memory Access - sharing memory between machines without it going via the (slow) OS. "Infiniband provides faster connectivity and supports direct access to other servers’ memories over RDMA. Current TensorFlow releases, however, only support distributed learning using gRPC over Ethernet. To speed up distributed learning, we have enhanced the TensorFlow C++ layer to enable RDMA over Infiniband" the Yahoo! team claim.

Another alternative is just to deploy a library that is dedicated to exotic types of neural nets and DeepLearning4J looks ideal for that. "Data parallelism shards large datasets and hands those pieces to separate neural networks, say, each on its own core. Deeplearning4j relies on Spark for this, training models in parallel and iteratively averages the parameters they produce in a central model. (Model parallelism ... allows models to specialize on separate patches of a large dataset without averaging.)"

Now, regarding GPUs note what the author of netlib-java, the binding that exploits BLAS, says: "Be aware that GPU implementations have severe performance degradation for small arrays."

Pure Spark implementation

So, while we're preparing to install all these libraries, I had time to play with Spark's implementation of neural networks on the public domain newsgroups data I spoke about before. There were some interesting findings.

The first was that a lot of the work is done on the driver. This seems to happen a lot in some of the older Spark code (something I've complained about before). Although there is parallelism in o.a.s.m.o.CostFun where the gradient of partitioned features in an RDD are calculated by adding them altogether, other calculations appears to be done locally in the driver. For instance, the updating of this potentially very large result from the executors is done in o.a.s.m.ann.ANNUpdater and the BFGS calculation appears to take place entirely in Breeze (see breeze.optimize.FirstOrderMinimizer.infiniteIterations).

Consequently, I was forced to give the driver 30gb of memory and set spark.driver.maxResultSize to 8192. Only then could I run against a MultilayerPerceptronClassifier with layer sizes [262144, 100, 80, 20]. Note, these sizes come from the size of my word vector calculated by TF-IDF being 262144 and there being only 20 different newsgroups. The 100 and 80 were guessed by slowly increasing them until I hit memory problems.


Now, with this neural net architecture I was able to achieve about 81% accuracy in about 30 minutes (an architecture with [262144, 50, 40, 20] was much faster and didn't require giving the driver more memory but had an accuracy of about 77%).

But this does not compare favourably with avoiding neural nets altogether. A naive Bayes classifier took no time at all, didn't require much memory and gave me about 84% accuracy. Simplest thing that works, eh?

Spark OOMEs during Matrix Multiplication

It seems that I am not the only person to suffer from the problem of OutOfMemoryErrors while performing matrix multiplication (and other operations) in Spark. This is because "Spark keeps the partition that it is working on in memory (and does not spill to disk even if it is running OOM)" (from the mailing lists).

For instance, computePrincipalComponentsAndExplainedVariance can't handle more than 65535 columns and does all the work on the driver ("local matrix" in the docs) by delegating to Breeze which in turn appears to use a native library (lapack).

There are various proposed solutions. Playing with spark.sql.shuffle.partitions which "configures the number of partitions to use when shuffling data for joins or aggregations" (docs). Tuning this appears interesting. "Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000" (from StackOverflow).

But this seems far too fiddly. It would be better if I never had to worry about such things. To this end, I've created my own simple library to perform "truly scalable matrix multiplication on Spark".

Note that this is the general case of multiplying two different matrices. In the special case that you're multiplying a matrix by its own transpose, you might prefer the approach taken in a previous post on this blog.

Tuesday, February 13, 2018

Neural Network Notes

Some miscellaneous neural network terms. I hope to expand on all of them later.

Deep Learning

"Taking complex raw data and creating higher-order features automatically in order to make a simpler classification (or regression) output is a hallmark of deep learning... The best way to take advantage of this power is to match the input data to the appropriate deep network architecture." [1]

Architecture of ANNs

"We can use an arbitrary number of neurons to define a layer and there are no rules about how big or small this number can be. However, how complex of a problem we can model is directly correlated to how many neurons are in the hidden layers of our networks. This might push you to begin with a large number of neurons from the start but these neurons come with a cost... There are also cases in which a larger model will sometimes converge easier because it will simply 'memorize' the training data." [1]

"Hidden layers are concerned with extracting progressively higher-order features from the raw data."

"A more continuous distribution of input data is generally best modelled with a ReLU activation function. Optionally, we'd suggest using the tanh activation function (if the network isn't very deep) in the event that the ReLU did not achieve good results (with the caveat that there could be other hyperparameter-related issues with the network." [1]

"Well if your data is linearly separable (which you often know by the time you begin coding a NN) then you don't need any hidden layers at all…One hidden layer is sufficient for the large majority of problems." (StackOverflow)

In the output layer for binary classification "we'd use a sigmoid output layer with a single neuron to give us a real value in the range of 0.0 to 1.0". [1]

"the best way to build a neural network model: Cause it to overfit and then regularize it to death... Regularization works by adding an extra term to the normal gradient computed."

The right model

"If we have a mutliclass modeling problem yet we only care about the best score across these classes, we'd use a softmax output layer with an arg-max() function to get the highest score of all the classes. The softmax output layer gives us a probability distribution over all the classes" [1]

"If we want to get multiple classifications per output (eg person + car), we do not want softmax as an output layer. Instead, we'd use the sigmoid output layer with n number of neurons, giving us a probability distribution (0.0 to 1.0) for every class independently". [1]

"In certain architectures of deep networks, reconstruction loss functions help the network extract features more effectively when paired with the appropriate activation function. An example of this would be using the multiclass cross-entropy as a loss function in a layer with a softmax activation function for classification output." [1]


"We define a hyperparameter as any configuration setting that is free to be chosen by the user that might affect [sic] performance." [1] For example layer size, activation functions, loss functions, epochs etc.

"And so on, until we've exhausted the training inputs, which is said to complete an epoch of training."

"First-order optimization algorithms calculate the Jacobian matrix... Second-order algorithms calculate the derivative of the Jacobian (ie, the derivative of a matrix of derivatives) by approximating the Hessian." [1]

"A major difference in first- and second-order methods is that second-order methods converge in fewer steps yet take more computation per step". [1]

"The 'vanilla' version of SGD uses gradient directly, and this can be problematic because gradient can be nearly zero fir any parameter. This causes SGD to take tiny steps in some cases, and steps that are too big for situations in which the gradient is too large. To alleviate these issues, we can use the technique such as:

"AdaGrad is monotonically decreasing and never increases the learning rate" [1]


"An autoencoder is trained to reproduce its own input data."[1]

"The key difference to note between a multilayer perceptron network diagram (from earlier chapters) and an autoencoder diagram is the output layer in an autoencoder has the same number of units as the input layer does." [1]

"Autoencoders are good at powering anomaly detection systems."


If the cost function is C, then "stochastic gradient descent can be used to speed up learning. The idea is to estimate the gradient ∇C by computing ∇Cx for a small sample of randomly chosen training inputs. By averaging over this small sample it turns out that we can quickly get a good estimate of the true gradient ∇C, and this helps speed up gradient descent, and thus learning.

This "works by randomly picking out a small number mm of randomly chosen training inputs. We'll label those random training inputs X1,X2,…,Xm and refer to them as a mini-batch. Provided the sample size m is large enough we expect that the average value of the ∇CXj will be roughly equal to the average over all  ∇Cx" (from neuralnetworksanddeeplearning.com).

Boltzmann machines

Defined as "a network of symmetrically connected, neuron-like units that make stochastic decisions about whether to be on or off". [1]

"The main difference between RBMs and the more general class of autoencoders is in how they calculate the gradients." [1]


"Dropout and DropConnect mute parts of the input to each layer such that the neural network learns other portions".


ReLU - rectified linear units ReLU "are the current state of the art because they have proven to work in many situations. Because the gradient of a ReLU are either zero or constant, it is possible to reign in the vanishing exploding gradient issue. ReLU activation functions have shown [sic] to train better in practice than sigmoid functions".  [1]

Leaky ReLUs

"Unfortunately, ReLU units can be fragile during training and can “die”. For example, a large gradient flowing through a ReLU neuron could cause the weights to update in such a way that the neuron will never activate on any datapoint again. If this happens, then the gradient flowing through the unit will forever be zero from that point on. That is, the ReLU units can irreversibly die during training since they can get knocked off the data manifold. For example, you may find that as much as 40% of your network can be “dead” (i.e. neurons that never activate across the entire training dataset) if the learning rate is set too high. With a proper setting of the learning rate this is less frequently an issue.

Leaky ReLUs are one attempt to fix the “dying ReLU” problem. Instead of the function being zero when x < 0, a leaky ReLU will instead have a small negative slope (of 0.01, or so)" (from here).

[1] Deep Learning - a practitioners guide.

Monday, February 12, 2018

Great expectations

Since this took me a few hours to sort out in my head, I thought I would blog it.

The Bessel Correction is used taking a sample with mean x̄ and variance s2 from  a population whose true mean is μ and true variance is σ2.

The standard deviation, s, is:


but we'd be wrong if we thought we could use this as an estimate for σ2. (We actually divide by n-1).

Note that sometimes we don't need to estimate the population. In the case of a country conducting a census, we have all the information. However, for most use cases we need to sample the country's population.

[Aside: Interestingly, “such a basic concept as standard deviation, with an apparently impeccable mathematical pedigree, is socially constructed and a product of history” Leeds University].

The reasoning goes like this.

The expected value for  is E[] = μ because the expectation is the mean, by definition.

The variance of  is:

E[( - μ)2] - (E[ - μ])2

also by definition.

The second term is clearly zero when we expand it and substitute in the expected value for  we've just stated above.

We then expand the first term and take the expected value of all the resulting terms. However, we have to be careful here as E[A.B] = E[A].E[B] iff A and B are independent.

To illustrate, the expected value of rolling a pair of fair 6-sided dice is (3.5)2 which is 12.5. But the expected value of rolling just one die and squaring the result is (12 + 22 + ... 62)/6 which is about 15. Clearly they are not the same probability distribution as, for example, p(12) is non-zero in the first case but zero in the second (as you can't get a 12 by squaring integers).

So, let's represent the variance of  as:

E[ (Σin(xi-μ)/n) . (Σjn(xj-μ)/n) ]

for all terms where i≠j, the distributions are independent so this becomes:

E[(x-μ)2] = E[x2 - 2xμ + μ2] = E[x2] - 2E[x]μ + E[μ2] = E[x]E[x] - 2μ2μ= μ- 2μ2μ2 = 0

but when i=j and the distributions are not independent, we have

E[ Σin(xi-μ)2/n2 ] = E[ E[(xi-μ)2]/n ] = E[(xi-μ)2]/n = σ2/n

Now, we can re-express our definition of

s= Σin(xi-x̄)2/n

by noting that:

(xi-x̄)= ((xi-μ)-(x̄-μ))2 
       = (xi)- 2(xi)() + ()2

The expectation of the first term is σ2+E[x-μ]2=σ2 from our definition of the variance of the whole population, the last term is σ2/n as we've just shown and as for the middle term, we do the same trick as before. Where i=j, it's zero but for the remaining 1/n cases, it's σ2. So, this middle term must equal 2σ2/n.



so we can estimate σ2 as

σ2≅ =(n/n-1)s2

So, don't forget your n-1 if you're taking a sample and not calculating values for the whole population.

Saturday, February 10, 2018

Textual Clustering

When it comes to classifying documents, there are many ways to skin a cat. The use case at hand is to classify small amounts of text into 21 pre-defined categories. Since this data is private, for this post I'll use the 20 Newsgroups data. This contains some 18k documents that have already been categorised and each document contains a subject and a body.

First we need to process the data. Spark has some great out-of-the box transformers that can tokenize (Tokenizer) the words, remove stop words (StopWordRemover), create n-grams (NGram), generate the TF-IDF vectors (HashingTF and IDF) and finally normalize them (Normalizer).

Now, a natural choice for then categorising these vectors might be KMeans since we know exactly how many clusters we are looking for. However, I had appalling results - over 99% of the vectors all congregated in one cluster while the other 1% was spread over the others.

This had me scuttling back to the code that created my features and checking everything was OK (punctuation removed, all lower case, etc). This Stackoverflow answer had a nice list of what to check. Unfortunately, it didn't help.

Thinking about it, this was not too surprising. KMeans measures the distance between points. That is calculated as:

Σk(vik - vjk)p

where k is the k-th element of the vectors that represent document i and j. The value p is set to 2 if you want Euclidean distances but that might not always be desirable.

Consequently, “k-means, however, may be a bad fit. Because the means computed will not have a realistic sparsity, but will be much more dense” (from Stackoverflow).

Anyway, for most documents (certainly when we're dealing with just the subject lines), it is highly likely that one document may not contain words in the other even if they belong to the same category. As a result, the distance between two documents that belong to the same category but not sharing any common words will likely be the same as two documents belonging to different categories. At this point, KMeans is confused by sparse vectors.

So, an alternative was to use Spark's NaiveBayes implementation. The results here were immediately decent - about 85% success in classifying whole documents and 71% success for just classifying them based on their subject text.

The best explanation of this algorithm I found was here on Stanford University's website. The probability of document d being in category c is:

p(d|c) ∝ p(c) ∏k p(tk|c)

where p(tk|c) is the probability of the k-th term of the bag of words of document d being in category c. This can be calculated from a histogram of words over categories.

The term p(c) is our Bayesian prior and is easily calculated from the distribution of documents over the categories.

We then choose the most likely - that is, highest value of p(d|c) - for all categories.

A non-sparse solution might be to use Spark's Word2Vec implementation. However, since the vector elements can be negative, you can't use NaiveBayes.