Tuesday, October 6, 2015

Arrays in Java and Scala

Despite the fact they both run on the JVM, Java and Scala have very different type systems. Say you want to create an array of type T[] in Java where T is any type. It is possible but messy. You might call Array.newInstance but it's signature is:

    public static Object newInstance(Class componentType, int length)

that is, it returns an Object so you'd have to cast it.

If you want to get the component type of a particular array, you'd call Class.getComponentType() but that can be called an any class, not necessarily an array. If it's not called on a type that's an array, it returns null. What's more, it returns Class not Class.

Finally, Java's arrays are covariant (unlike its generics) so you run the danger of getting an ArrayStoreException at runtime if you're not careful. Put another way, Java's arrays don't adhere to the Get and Put Principle that its collections do ("The Get and Put Principle: use an extends wildcard when you only get values out of a structure, use a super wildcard when you only put values into a structure, and don't use a wildcard when both get and put." [1])

Yuck. Scala cleans this up using an invariant Array[T] class constructor. If you JAD the code, you can see how Scala does it. Say we want a multi-dimensional array of Ints, we might call:

    val matrix            = Array.ofDim[Int](7,9)

if we then decompiled the .class file of scala.Array into Java, we can see what Scala is doing. It looks like this:

    public Object[] ofDim(int n1, int n2, ClassTag evidence$4) { ...

where our type (Int) has cheekily been added as an extra parameter. Note, the method returns an array of Object.  As it happens, each of these Objects are arrays of the type we want, in this case Int ultimately using the same Java API method, newInstance, we saw above.

[1] Java Generics and Collections - Naftalin and Wadler

Wednesday, September 30, 2015

Future notes (part II)

There's some really nice aspects to Scala's Futures that make them very different to Java's. Because they're monadic, you can do all sorts of interesting things.

Differences between Java and Scala Futures

In Java, you merely block while waiting for the result by calling get(). That's it.

However, you can cancel Java futures. Scala "Futures come without a built-in support for cancellation" [1] although there are ways to roll-your-own.

So what can you do that's so special?

Because both Lists and Futures are monadic, you can "blend" them. For instance, say you have a List of Futures, you can convert them into a Future of Lists. This allows you to react to just a single Future with all the data rather than worrying about many Futures each with some data.

This is built in to the Scala API via Future.sequence. It basically folds over the monadic structure (eg a List) that's passed to it.

A somewhat simplified version of this code (I use the more specific List rather than sequence's more general TraversableOnce and avoids the magic of CanBuildFrom) looks like this:

  def mySequence[T](futures: List[Future[T]])(implicit ec: ExecutionContext): Future[List[T]] = {
    def foldFn(agg: Future[List[T]], ft: Future[T]): Future[List[T]] = {
      agg flatMap { list =>
        ft map {
          element => list :+ element

    val seed: Future[List[T]] = Future {

    futures.foldLeft (seed) (foldFn)

Or, we could use a for-comprehension to give it some syntactic sugar:

foldFn(agg: Future[List[T]], ft: Future[T]): Future[List[T]] = for {
      list <- agg
      t <- ft
    } yield (list :+ t)

They're equivalent. Whether it's this simplified method or the more complicated one in the Scala API, you'd call it with something like:

    val listOfFutures = List(createSimpleFuture("this"), createSimpleFuture("is"), createSimpleFuture("a"), createSimpleFuture("test"))
    val futureOfLists = mySequence(listOfFutures)
    val lists         = Await.result(futureOfLists, Duration(5, TimeUnit.SECONDS))
    println("lists from future of lists from list of futures: " + lists) 

  def createSimpleFuture(eventualResult: String)(implicit ec: ExecutionContext): Future[String] = {
    Future {

which prints out:

List(this, is, a, test)

More funky stuff in part III.

[1] Learning Concurrent Programming in Scala by Prokopec, Aleksandar - Aleksandar Prokopec

Friday, September 25, 2015

Scala's Return

I was reading this StackOverflow answer about the distinction between methods and functions. It had the usual explanations, for example a method "can be easily converted into [a function]" but "one can't convert the other way around"; methods "can receive type parameters"; "a function is an object" etc.

But as pointed out here, return behaves differently in a function compared to a method and can be used to short-circuit a fold etc.

For example:

  def caller(): Any = {
    println("caller started")
    val g = () => { return true } // note: return type must be compatible with this method
    println("caller finished")    // this is never executed!

  def called(innocuousFn: () => Boolean): Unit = {
    println("called: about to call innocuousFn...")
    println("called: finished")   // this is never executed!

prints out:

caller started
called: about to call innocuousFn...

That is, when the flow of control hits the function's return statement, the execution stack is popped as if it were in the enclosing method (it's illegal for a function outside a method to have a return statement).

Worse, if one method passes a returning function back to the method that calls it, you get a Throwable. After all, where does that function return from?

  def executeReturningFnFromOtherMethod(): Unit = {
    println("about to call calledReturnsFnThatReturns")
    val fn: () => Boolean = returnsFnThatReturns()
    println("about to call fn")
    fn()                // Exception in thread "main" scala.runtime.NonLocalReturnControl
    println("finished") // never executes

  def returnsFnThatReturns(): () => Boolean = {
    val fn: () => Boolean = () => {
      return makeTrue

  def makeTrue(): Boolean = {
    return true

Prints out:

about to call calledReturnsFnThatReturns
about to call fn
Exception in thread "main" scala.runtime.NonLocalReturnControl

Methods that may or may not compile in Scala

If you want to sum some integers in a List, you just call the sum method. So far, so obvious. But if the list does not hold elements that are summable, how does Scala give you a method that you may or may not call?

The sum method lives in TraversableOnce and the signature looks like this:

  def sum[B >: A](implicit num: Numeric[B]): B = ...

Here [B >: A] says must be a superclass of A, the type of elements in the list. The implicit says there must exist in the ether something that provides the functionality in trait Numeric for type B (the functionality for plus, minus etc).

Now, for Int, Double etc you get these for free in the Numeric object that's standard to Scala where you see, for example:

  trait IntIsIntegral extends Integral[Int] { ...

which is pulled into the implicit ether with:

  implicit object IntIsIntegral extends IntIsIntegral with Ordering.IntOrdering

in the same file. Note, that unlike Java, Scala's numerics do not extend a common superclass like java.lang.Number.

So, we could happily create our own arithmetic with something like:

  class MyInteger

  trait MyIntegerOrdering extends Ordering[MyInteger] {
    def compare(x: MyInteger, y: MyInteger) = ???
  trait MyIntegral extends Numeric[MyInteger] { // or Integral that extends Numeric[T]
    def plus(x: MyInteger, y: MyInteger): MyInteger = new MyInteger
  implicit object MyIntegerIsIntegral extends MyIntegral with MyIntegerOrdering

It's not a terribly useful arithmetic but it would allow us to have a list of MyIntegers and call sum on it.

Thursday, September 10, 2015

Notes on the Future (part 1)

Scala's Futures are very different to Java's. Where Java's Futures encourage blocking by their API, Scala's encourages asynchronous coding... apart from one gotcha that follows shortly.

Futures are Monads so they have all that Monad goodness and we can use them in for comprehensions. Unfortunately, this very idiom can introduce a bug. Take this code:

for {
  i <- Future(5)
  j <- Future(11)
} yield j

It creates two futures operating that return the values 5 and 11, possibly on different threads, and returns a Future that contains 11 just like a good Monad should.

"Here, the beauty of the for comprehension sugar can work against us. The for comprehension hides all of that ugly nesting from us, making it look like these are on the same level, but nesting is indeed happening here. What might look concurrent isn't." [1]


What happens here is the first Future is executed and only then is the second. The solution is simple. The code should look like this:

val iFuture = Future(5)
val jFuture = Future(11)
for {
  i <- iFuture
  j <- jFuture
} yield j

For just returning a number, you probably won't notice anything amiss. But for something more ambitious, you may very well notice that the first cut of this code is sub-optimal.

"Only men's minds could have unmapped into abstraction..."

In Scala, we can get our dirty little hands on the contents of a Future by mapping like:

import scala.concurrent.ExecutionContext

    implicit val xc = ExecutionContext.global // needed for next map:

    j map { value =>
      println("j = " + value)

but there are other ways, too. Being monads, we can flatMap and so if we slightly modify the code in [1]:

  implicit val xc = ExecutionContext.global

  val timeToWait = Duration(5, TimeUnit.SECONDS)

  val future1 = Future[Int] {
    (1 to 3).foldLeft(0) { (a, i) =>
      log("future1 "  + i)
      a + i

  val future2 = Future[String] {
    ('A' to 'C').foldLeft("") { (a, c) =>
      log("future2 " + c)
      a + c

  def spliceFutures(): Unit = {
    val awaitable = future1 flatMap { numsum: Int =>
      log("in future1.flatMap") // ForkJoinPool-1-worker-1: in future1.flatMap

      future2 map { str: String =>
        log("in future2.map") // ForkJoinPool-1-worker-5: in future2.map
        (numsum, str)

    val awaited = Await.result(awaitable, timeToWait)
    println(awaited) // (6,ABC)

  def log(msg: String): Unit = {
    println(Thread.currentThread().getName() + ": " + msg)

"The most powerful way to use these combinators is in combining the results from futures that are already executing. This means that we can still have a lot of independent futures running, and have another piece of code combine the eventual results from those futures into another "final" future that operates on those results."[1]

You'll notice from the log statements that the code in the map and flatMap run in different threads.

But wait, there's more. You can add callbacks with onSuccessonFailure and onComplete, all of which have return type of Unit (so you know that the functions passed to it must have side-effects) and there is no guarantee that they'll run on the same thread that ran the body of the Future.

  val onCompleteFn: Try[Boolean] => Unit = {
    case Success(x)   => println(" success: " + x)
    case Failure(err) => println(" failure: " + err + ", " + err.getCause)
    aFuture onComplete onCompleteFn 

In fact, Future.onComplete is the daddy. All the other combinator methods (map, flatMap, recover, transform etc) all call it. We'll come back to these other combinators in another post.

Problems in your Future

If an Exception is thrown in your Future, then it will be captured in the failure case in onCompleteFn above. But if an Error is thrown, you'll get a Boxed Error.

Another surprise comes from filter. For Lists, Trys and Options, things are quite straightforward:

    val myList = List(1,2,3)
    val filteredList = myList.filter(_ > 10)
    println(filteredList)   // List()

    val myOption = Option(1)
    val filteredOption = myOption.filter(_ > 10)
    println(filteredOption) // None

    val myTry = Try(1)
    val filteredTry = myTry.filter(_ > 10)
    println(filteredTry)    // Failure(java.util.NoSuchElementException: Predicate does not hold for 1)

That is, a filter whose predicate leads to nothing is represented by the "negative" subclass of the monads. However, there is no such class for an "empty" Future.

    import scala.concurrent.ExecutionContext.Implicits._
    import scala.concurrent.duration._

    val myFuture       = Future { 1 }
    val filteredFuture = myFuture filter {_ > 10}
    filteredFuture onComplete {
      case Success(x) => println(s"success: $x")
      case Failure(x) => println(s"failure: $x")
    } // failure: java.util.NoSuchElementException: Future.filter predicate is not satisfied

    Await.result(filteredFuture, 1.second) // Exception in thread "main" java.util.NoSuchElementException: Future.filter predicate is not satisfied

That is, the Failure case is the path taken by onComplete. If we were so foolish to block using Await, then an Exception is thrown.

[1] Akka Concurrency, Derek Wyatt.

Thursday, August 20, 2015

Spark Shuffle

... is something you should avoid. This is where data is moved over the network between nodes. This can be very expensive on a large data set. If the data is evenly distributed but a query asks for it to be grouped according to an arbitrary criteria, much movement will ensue. Therefore, groupByKey should be avoided if possible as the ScalaDocs indicate this can be an expensive operation.

However, reduceByKey is much more efficient as it will "perform the merging locally on each mapper before sending results to a reducer" resulting in a much smaller transfer of data between nodes.

So, when we were writing a risk engine with Spark holding all our user-defined Record objects, our code would look something like:

def totalByKey(data: RDD[Record]): RDD[(String, Double)] = {
  data.map(record => record.key -> record.value).reduceByKey(_ + _)

which would first turn our Records into key/value pairs then add up all the values for each key without transferring large amounts of data over the network. It avoids this by adding all the values for Records on each node before adding all these subtotals. Only the addition of subtotals requires a network call.

Interestingly, Spark can then do an outer join like this:

val subtotalForDate1 = totalByKey(dataForDate1)
val subtotalForDate2 = totalByKey(dataForDate2)
val outerJoing       = subtotalForDate1.fullOuterJoin(subtotalForDate2)

which returns an RDD of tuples containing the key, the left and the right value. Using this join, we can compare our risk profile from one date to the next.

Note that both reduceByKey and fullOuterJoin don't actually live in the RDD class but the PairRDDFunctions class. They appear to be part of the RDD class by the magic of implicits when the RDD in question contains tuples of pairs.

Further Reading

Nice article on memory usage.

Tuesday, August 18, 2015

Try Monads

Martin Odersky in the Reactive Programming course mentions that Try types are not true monads as they violate the Left Unit rule. But if you try (no pun intended) it, Try does indeed appear to obey the rules for monads.

Just to recap, here are the monad rules expressed in Scala:

  def testMonadicPropertiesOfTry[T, U](f: T => Try[U], g: U => Try[U], m: Try[T], x: T, unit: T => Try[T]): Boolean = {

    // Associativity: (m flatMap f) flatMap g == m flatMap (x => f(x) flatMap g)
    def associativity: Boolean = {
      val associativityLhs = (m flatMap f) flatMap g
      val associativityRhs = m flatMap (x => f(x) flatMap g)
      assertEqual(associativityLhs, associativityRhs)
    val associativityResult = Try(associativity)

    // Left unit: unit(x) flatMap f == f(x)
    def leftUnit: Boolean = {
      val leftUnitLhs = unit(x) flatMap f
      val leftUnitRhs = f(x) 
      assertEqual(leftUnitLhs, leftUnitRhs)
    val leftUnitResult = Try(leftUnit)

    // Right unit: m flatMap unit == m
    def rightUnit: Boolean = {
      val rightUnitLhs = m flatMap unit
      assertEqual(rightUnitLhs, m)
    val rightUnitResult = Try(rightUnit)

    (associativityResult, leftUnitResult, rightUnitResult) match {
      case (Success(_), Success(_), Success(_)) => true
      case _ => false

Where my assertEqual method looks like:

  def assertEqual[T](try1: Try[T], try2: Try[T]): Boolean = {
    try1 match {
      case Success(v1) => try2 match {
        case Success(v2) => v1 == v2
        case _ => false
      case Failure(x1) => try2 match {
        case Failure(x2) => x1.toString == x2.toString
        case _ => false

That is, it will compare Failures by looking at the text of their messages. This is because Java exceptions don't have an equals() method.

Now, if we run the code below (borrowed liberally from here) where we're deliberately trying to cause a java.lang.NumberFormatException as the code attempts to convert our 'a' into a numeric:

    def factory[T](x: => T): Try[T] = Try(x)

    def unit[T](x: T): Try[T]   = factory(x)
    def f(x: String): Try[Int]  = factory(x.toInt)
    def g(x: Int): Try[Int]     = factory(x + 1)
    val x                       = "a"
    val m                       = factory(x)

    val isMonadic = testMonadicPropertiesOfTry(f, g, m, x, unit[String])
    println("is monadic? " + isMonadic)

the output says it's true, Try[T] obeys the rules for a monad. What gives?

Mauricio Linhares says: "there is some debate as to if Try[U] is a full monad or not. The problem is that if you think unit(x) is Success(x), then exceptions would be raised when you try to execute the left unit law since flatMap will correctly wrap an exception but the f(x) might not be able to do it. Still, if you assume that the correct unit is Try.apply then this would not be an issue."

So, let's take the first line of the last code snippet and make it thus:

    def factory[T](x: => T): Try[T] = Success(x)

whereupon we are told that Success is not a monad at all. Further investigation reveals that in the leftUnit method:

      val leftUnitLhs = unit(x) flatMap f

works fine but:

      val leftUnitRhs = f(x) 

blows up. The left hand side does not equal the right.

The reason for this is that Success.flatMap catches any NonFatal exceptions just like the constructor of Try. But the constructor of Success does not. And it's this asymmetry that means Try acts like a monad and Success does not.

Further reading

An interesting debate about monads and exceptions here.