Saturday, October 22, 2016

State monads


State is generally an anathema to functional programmers but necessary to the rest of us. So, how are these forces reconciled? Perhaps not surprisingly it requires monads.

"We'll say that a stateful computation is a function that takes some state and returns a value along with some new state." (from LYAHFGG). So, if you had a random number generator (RNG), for each call you'd receive not just a random number but the generator with a new state for the next call. Calling the same generator repeatedly will give you the same value which makes testing easier.

Scalaz gives you a state out of the box. As an example, let's say we're modelling a stack data structure. The pop would look like this:

import scalaz._
import Scalaz._

    type Stack = List[Int]

    val pop = State[Stack, Int] { // (f: S => (S, A)) where S is Stack and A is Int
      case x::xs =>
        println("Popping...")
        (xs, x)
    }

Note that pop is a val not a def. Like the RNG mentioned earlier, it returns the popped value and the new state. We'll ignore error conditions like popping an empty stack for now.

Also note "the important thing to note is that unlike the general monads we've seen, State specifically wraps functions." (Learning Scalaz)

Let's now define what a push is:

    def push(a: Int) = State[Stack, Unit] {
      case xs =>
        println(s"Pushing $a")
        (a :: xs, ())
    }

Once more, the function returns the new state and a value - only this time the value isn't actually defined. What value can a push to a stack return that you didn't have in the first place?

Because State is a monad, "the powerful part is the fact that we can monadically chain each operations using for syntax without manually passing around the Stack values". That is, we access the contents of State via map and flatMap.

So, let's say we want to pop the value at the top of the stack then push two values onto it, 3 and 4.

    val popx1Push2: State[Stack, Int] = for {
      a <- pop
      _ <- push(3)
      _ <- push(4)
    } yield a 

which does absolutely nothing. What is to be popped? Onto what do we push 3 and 4? We need to run it and then map (or foreach or whatever) over the result to access the values. "Essentially, the reader monad lets us pretend the value is already there." (from Learning Scalaz)
   
    popx1Push2.run(List(0, 1, 2)).foreach { case (state, popped) =>
      println(s"New state = $state, popped value = $popped") // New state = List(4, 3, 1, 2), popped value = 0
    }

But we don't need Scalaz. We can roll our own state (with code liberally stolen from here).

case class State [S, +A](runS: S => (A, S)) {

  def map[B](f: A => B) =
    State [S, B]( s => {
      val (a, s1) = runS(s)
      (f(a), s1)
    })
    
  def flatMap[B](f: A => State [S, B]) =
    State [S, B]( s => {
      val (a, s1) = runS(s)
      f(a).runS(s1)
    })
}

Now it becomes clearer what State actually represents. It is a monad (see map and flatMap) that contains a function that given one state, S, can give you the next plus an accompanying value, A.

So, manipulating the state for our stack example looks like this:

  def getState[S]: State [S, S] =
    State (s => (s, s))
     
where the function says: given a state, return that state.

  def setState[S](s: S): State [S, Unit] =
    State (_ => ((), s))

where the function says: I don't care which state you give me, the next one is s. Oh, and I don't care about the accompanying value.

  def pureState[S, A](a: A): State[S, A] =
    State(s => (a , s))

where the function says the next state is the last state and the accompanying value is what I was given. Pure in this sense refers to the construction of a state, sometimes called unit, and called point in Scalaz.

Now, we do a stateful computation. It's very simple, it just maps over the state monads to generate a unit of work that adds 1 to whatever is given.

    val add1: State[Int, Unit] = for {
      n   <- getState
      b   <- setState(n + 1)
    } yield (b)

    println(add1.runS(7)) // "8"

This is trivial but you can do more complicated operations like:

  def zipWithIndex[A](as: List [A]): List [(Int , A)] =
    as.foldLeft (
      pureState[Int, List[(Int , A)]](List())
    )((acc, a) => for {
      xs  <- acc
      n   <- getState
      _   <- setState(n + 1)
    } yield (n , a) :: xs ).runS(0)._1.reverse

Yoiks! What's this doing? Well, reading it de-sugared and pretending we've given it an array of Chars may make it easier:

    type IndexedChars = List[(Int, Char)]

    val accumulated: State[Int, IndexedChars] = as.foldLeft(
      pureState(List[(Int, Char)]())      // "the next state is the last state and the accompanying value is an empty list"
    )((acc: State[Int, IndexedChars], a: Char) => {
      acc.flatMap { xs: IndexedChars =>   // xs is the accompanying value that the accumulator wraps
        getState.flatMap { n: Int =>      // "Given a state, return that state." (n is fed in via runS)
          setState(n + 1).map(ignored =>  // "I don't care which state you give me, the next one is n+1"
            ((n, a) +: xs)                // the accompanying value is now the old plus the (n,a) tuple.
          )
        }
      }
    })

Which is a bit more understandable.

First steps into Scalaz


Scalaz adds some nice features to Scala.

In the Haskell world, "functions are applicative functors. They allow us to operate on the eventual results of functions as if we already had their results" (from LYAHFGG).

(Where the "Functor typeclass, ... is basically for things that can be mapped over", "all applicatives are functors" [1] and applicatives have a function that has a type of f (a -> b) -> f a -> f b.)

So, why can't we map over functions in Scala? Well, now with Scalaz, we can!

import scalaz._
import Scalaz._

    val f: Int => Int = _ * 2
    val g: Int => Int = _ + 10

    val mapped = f.map(g)
    println(mapped(7))        // "24", ie (7 * 2) + 10

which is the same as saying f.andThen(g) and so the opposite of saying f.compose(g).

We can also flatMap over functions now and as a result can use for comprehensions:

    val addStuff: Int => Int = for {
      a <- f
      b <- g
    } yield (a + b)

    println("addStuff = " + addStuff(7)) // "31"

Why this monadic behaviour is so nice is that "the syntactic sugar of the for-comprehensions abstracts the details nicely enough for the user, who is completely oblivious of the underlying machinery of binding monads" (from here).

We can even raise a plain old Int to the level of monad with:

println(3.point[Id].map(_ + 1)) // prints out "4"

By using point, the "constructor is abstracted out... Scalaz likes the name point instead of pure, and it seems like it’s basically a constructor that takes value A and returns F[A]." (from Learning Scalaz).

[1] Functional Programming in Scala (Chiusano and Bjarnason)

Friday, October 21, 2016

A Spark Quick Win


If you're having trouble making Spark actually work, you're not alone. I can sympathize with the author of these gotchas. Something killing me for weeks was how slow a Spark job was - tens of hours. Further investigation showed lots of GC - something like a third of the run time. Isn't Spark supposed to handle huge volumes of data?

This one tip proved to be a quick-win:

rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

This meant that the data held in memory was serialized. As a result, the memory footprint was 80% less (we use Kryo). In itself, that's cool. But the great thing was that all our data now fitted into the memory of the cluster.

Sure, Spark can process more memory than is physically available but if it's swapping memory in and out from disk, that's not only slow but it's going to cause a lot of garbage collection.

With this setting, full garbage collections took only 30s for hours of processing.

The downside is that obviously this requires more CPU. But not much more and a lot less than a JVM that's constantly garbage collecting.

Saturday, October 8, 2016

Maths and the Principle of Least Surprise


Well, it surprised me.

While Java's maximum and minimum values of Integers really do represent the highest and lowest int values, the same is not true of Doubles. Here, the maximum and minimum values represent the largest and smallest values of the magnitude of the numbers. That is, both values are positive even though Doubles are signed. The nomenclature is confusing but it makes sense (see here). Unlike Integers, Doubles are symmetric so the smallest number you can represent is -Double.MAX_VALUE.

Just when you get used to that, you find that Scala does it differently: Double.MinValue really is the smallest value a double can hold. That is,

Double.MinValue == -java.lang.Double.MAX_VALUE

It's debatable which is the most surprising.

Friday, October 7, 2016

HBase Troubleshooting


Nasty little problem. Connecting to HBase kept timing out after lots of errors that looked like:

client.RpcRetryingCaller: Call exception, tries=17, retries=35, retryTime=208879ms, msg row '' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=HOST,60020,1475135661112, seqNum=0

while logging in the HBase shell seemed to be OK.

HBase uses Zookeeper to keep a record of its regions. So, first I tried checking that:

> echo ruok | nc ZOOKEEPER_HOST 2181
imok

Running a repair didn't seem to help either.

Time to look at the HBase transaction logs. They kept repeating every second or so something like:

master.splitLogManager: total tasks =3 unassigned = 0 tasks={ /hbase-secure/splitWAL/WALs/HOST,600201474532814834-splitting ... status = in_progess ...

WALs are write ahead logs. "HBase data updates are stored in a place in memory called memstore for fast write. In the event of a region server failure, the contents of the memstore are lost because they have not been saved to disk yet. To prevent data loss in such a scenario, the updates are persisted in a WAL file before they are stored in the memstore".

These WALs appeared to be constantly splitting. So, a quick look at the WALs directory  was in order. It looked something like this:

drwr-x-r-x - hbase app-hdfs     0 2016-07-29 /apps/hbase/xxx/WALs/MACHINE_NAME,60020,1474532814834-splitting
drwr-x-r-x - hbase app-hdfs     0 2016-09-29 /apps/hbase/xxx/WALs/MACHINE_NAME,60020,1474532435621
drwr-x-r-x - hbase app-hdfs     0 2016-09-30 /apps/hbase/xxx/WALs/MACHINE_NAME,60020,1474532365837
drwr-x-r-x - hbase app-hdfs     0 2016-07-28 /apps/hbase/xxx/WALs/MACHINE_NAME,60020,1474532463823-splitting
.
.

Splitting is natural but shouldn't take too long (a split file from months ago is definitely a bad sign).

We were lucky that this was a test environment so we could happily delete these splitting files and restart HBase (YMMV. Think long and hard about doing that in a production environment...) But the problem went away for us.