Saturday, February 6, 2016

Dimsum fills you up


DIMSUM in an algorithm (described here) that efficiently finds similar columns in a matrix. You get it for free in Spark in the RowMatrix.columnSimilarities method.

The problem is basically taking each row and multiplying all combination of values together. Given a matrix with m rows, and n columns, this is O(mn2) problem. But if the matrix is sparse, with a maximum of L non-zero values per row, the problem becomes O(mL2) complex. DIMSUM promises to improve this efficiency even more (with a small loss in accuracy).

The headache for us was that our matrix was huge - 100 million by 100 million. This was pushing the envelope as it appears it is not best suited for this [see here for this comment: "Column based similarities work well if the columns are mild (10K, 100K, we actually scaled it to 1.5M columns but it really stress tests the shuffle and it needs to tune the shuffle parameters)"]

What we were seeing in the driver logs looked like this:

16/01/28 11:05:56 ERROR TaskSchedulerImpl: Lost executor 3 on ip-172-30-0-65.eu-west-1.compute.internal: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
16/01/28 11:05:56 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 44), so marking it as still running
16/01/28 11:05:56 WARN TaskSetManager: Lost task 72.0 in stage 7.0 (TID 6100, ip-172-30-0-65.eu-west-1.compute.internal): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
.
.

It seemed like the workers were dying with OutOfMemoryErrors. This lead to a scramble to tune the cluster and the usual suspects were called in.

The Spark documentation on memory tuning was also a good read but didn't help. I was using Kryo for compressing objects; I reduced spark.memory.fraction to 25% from the 75% default to maximize the spave "reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records." All to no avail.

This StackOverflow post was informative. I was using Spark 1.6 so didn't need to worry about spark.storage.memoryFraction; I wasn't using broadcast variables; I was using thousands of partitions; and the "task serialized as XXX bytes" mentioned in the logs indicated that XXX was small.

So, I used about as much of the memory the box could give me (55gb out of 60gb - I was also running Hadoop and the kernel needs some memory too). And running jmap on the worker boxes gave an output like this:

 num     #instances         #bytes  class name
----------------------------------------------
   1:            72    42400003008  [D
   2:         20222      174418080  [B
   3:       2070414       99379872  scala.Tuple2$mcID$sp
   4:        403387       69434872  [C
   5:        382592       36728832  io.netty.channel.ChannelOutboundBuffer$Entry
   6:        769266       30770640  io.netty.util.Recycler$DefaultHandle
   7:        245337       29440440  io.netty.buffer.PooledHeapByteBuf

That's 39.5gb of arrays of doubles! Huh?

Looking more closely at the code in Spark for calculating the column similarities, we see:

  def computeColumnSummaryStatistics(): MultivariateStatisticalSummary = {
    val summary = rows.treeAggregate(new MultivariateOnlineSummarizer)(
      (aggregator, data) => aggregator.add(data),
      (aggregator1, aggregator2) => aggregator1.merge(aggregator2))
    updateNumRows(summary.count)
    summary
  }

which basically says call

      (aggregator, data) => aggregator.add(data),

over each partition and the aggregate them altogether with:

      (aggregator1, aggregator2) => aggregator1.merge(aggregator2))

what is this MultivariateOnlineSummarizer class? Well, it contains seven arrays of doubles. Hmm. And what's this in the add method (that is called on each row in a partition)?

  private[spark] def add(instance: Vector, weight: Double): this.type = {
.
.
      n = instance.size

      currMean = Array.ofDim[Double](n)
      currM2n = Array.ofDim[Double](n)
      currM2 = Array.ofDim[Double](n)
      currL1 = Array.ofDim[Double](n)
      nnz = Array.ofDim[Double](n)
      currMax = Array.fill[Double](n)(Double.MinValue)
      currMin = Array.fill[Double](n)(Double.MaxValue)
.
.

Yikes, Scooby, this is 108 doubles per array per partition as DIMSUM calculates summaries used in the next part of the algorithm. Well, that's going to cause problems (note: it doesn't matter how many rows there are, this is a columns-only limitation).

My first attempt at a solution involved now reducing the number of partitions with RDD.coalesce. This is inexpensive as it just logically groups the 2000 partitions into 16 (I choose 16 as I have 4 boxes and each box could just about hold 4 MultivariateOnlineSummarizers in memory at once).

This helped the job to progress further but then Kryo was complaining that it didn't have enough buffer space.

16/02/02 13:52:28 WARN TaskSetManager: Lost task 3.3 in stage 4.0 (TID 4050, ip-172-30-0-64.eu-west-1.compute.internal): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 6, required: 8
Serialization trace:
currL1 (org.apache.spark.mllib.stat.MultivariateOnlineSummarizer). To avoid this, increase spark.kryoserializer.buffer.max value.

Following its advice, this was quickly fixed with:

sparkConf.setExecutorEnv("spark.kryoserializer.buffer.max", "20G")

only for me to run into:

16/02/02 14:05:23 WARN TaskSetManager: Lost task 1.0 in stage 4.1 (TID 6149, ip-172-30-0-65.eu-west-1.compute.internal): FetchFailed(BlockManagerId(0, ip-172-30-0-63.eu-west-1.compute.internal, 44256), shuffleId=2, mapId=7, reduceId=1, message=
org.apache.spark.shuffle.FetchFailedException: Too large frame: 3251949143
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
.
.
Caused by: java.lang.IllegalArgumentException: Too large frame: 3251949143
        at org.spark-project.guava.base.Preconditions.checkArgument(Preconditions.java:119)

        at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:134)
.
.

Well now, this seems the end of the road as the code in TransportFrameDecoder is:

Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: %s", frameSize);

(see here for more information on Spark limits). Indeed the app in the Spark GUI looks like this:



We can see a job that is in the process of retrying having failed already. The interesting things to note are that 16 out of 16 tasks of stage 3 have succeeded - that is the first part of treeAggregate. Our hack using coalesce worked! However, the second part of treeAggregate fails, that is none of the 4 tasks (corresponding to my 4 boxes trying to aggregate their individual results) in stage 4 passed.

Workaround

Well, there isn't one that I am aware of, at least as far as DIMSUM is concerned. However, since my matrix is very sparse, coding up a brute-force routine actually ran in about 45 minutes so maybe I don't need the efficiency DIMSUM promises.


No comments:

Post a Comment