Monday, May 26, 2014

Queues and Statistical Quirks


We use Oracle's Coherence Incubator in our project, in particular the Push Replication pattern to move documents from the primary site to a warm, disaster recovery site located 50 miles away. The Push Replication pattern in turn depends on the Messaging Pattern. This involves having in-memory queues of objects that are waiting to be published.

The trouble is, the distribution across these queues is not even. As a result, some queues can be full (and close to causing an OutOfMemoryError thus forcing us to reject client requests) and others empty. We looked at the distribution function but it seems to be more a quirk of probability.

Take this Scala code to demonstrate what I am talking about. Let's have a given number of buckets that have X documents randomly distributed over them each iteration. Let's consume a fixed number from each bucket each iteration (the batch size). If there are less than the batch size in the bucket, we just take them all and that's that. We then let this run for an arbitrary number of iterations.

The simulation code looks like this:

package com.phenry.stats

import scala.util.Random

object PublisherDistributionMain {
    
    def main(args : Array[String]) = {
        val numBuckets    = args(0).toInt
        val numIterations = args(1).toInt
        val numNewDocs    = args(2).toInt
        val batchSize     = args(3).toInt
        var buckets       = new Array[Int](numBuckets)
        
        for (i <- 0 to numIterations) {
          populate(buckets, numNewDocs)
          buckets = depopulate(buckets, batchSize)
        }
        print(buckets)
    }
    
    def print(buckets : Array[Int]) : Unit = {
      val largest = max(buckets)

      buckets.map(_ * 50 / largest).foldLeft(0)((acc, elem) => {
        println("%2d : ".format(acc) + "#" * elem)
        acc + 1
      })
      
      val stats = new MeanAndStdDeviation
      println("\nmean = %.3f, std dev = %.3f".format(stats.mean(buckets), stats.stdDev(buckets)))
    }

    def max(buckets : Array[Int]) : Int = {
      buckets.reduce((x, y) => if (x > y) x else y)
    }
    
    def depopulate(buckets : Array[Int], batchSize : Int) : Array[Int] = {
      buckets.map(x => if (x < batchSize) 0 else (x - batchSize))
    }
    
    def populate(buckets : Array[Int], numNewDocs : Int) : Unit = {
      val random = new Random
      for (i <- 0 to numNewDocs) {
        val bucket = random.nextInt(buckets.length)
        buckets(bucket) = buckets(bucket) + 1
      }
    }
}

With a simple stats class (MeanAndStdDeviation) looking like this:

package com.phenry.stats

class MeanAndStdDeviation {
  
  implicit def integer2Double(x : Integer)          = x.doubleValue()
  implicit def integer2DoubleList(x : List[Int])    = x.map(i => i.doubleValue())
  implicit def integer2DoubleList(x : Array[Int])   = x.map(i => i.doubleValue())
  implicit def double2Integer(x : Double)           = x.intValue
  implicit def double2IntegerList(x : List[Double]) = x.map(d => d.intValue)
      
  val mean      = (x : Array[Int])      => sum(x) / x.length
  
  def sum(results : List[Double]) : Double = {
    results.foldRight(0d)((x, y) => x + y)
  }
    
  def stdDev(results : Array[Int]) : Double = { 
    val theMean = mean(results);
    var sumOfSquaredDiffs = 0d
    for (aResult <- results) sumOfSquaredDiffs += math.pow((aResult - theMean), 2)
    math.sqrt(sumOfSquaredDiffs / results.length)
  }
  
  def sum(results : Array[Double]) : Double = {
    results.reduce((x, y) => x + y)
  }
  
.
.

(Scala really is nice for writing succinct functions...)

The results after 100 000 iterations look like this:

 0 : ################
 1 : ###############################
 2 : ###############
 3 : ##############
 4 : #################
 5 : ####
 6 : ##########
 7 : ######################
 8 : ##############################
 9 : ##################
10 : ##############
11 : ############
12 : ################
13 : ##############
14 : ##############################
15 : ####
16 : ##################################################
17 : ################
18 : ###################
19 : #############

that is, very lumpy. Bucket #15 has a fraction of what bucket #16 has, for instance.

As a result, our system will reject users' requests complaining it has not enough space despite the fact that many nodes in the cluster have plenty of memory.

Time to write a new consumer algorithm...

No comments:

Post a Comment