Monday, July 20, 2015

Shapeless


There's a very interesting Scala library called Shapeless that let's you do some weird things with types. One such "what magic is this?!" moment is asserting a checksum derived from a list of numbers is a certain value at compile time rather than run time.

This impressed me no end, even if I can't think of a practical use for it. I'm still trying to get my head around Travis Brown's excellent tutorial since he is so much more clever than me. But here are some notes I made along the way.

First, we borrow a bit from the Peano Axioms where every natural number is a successor to the previous except 0 which has no successor. We model this much like Shapeless does with something like:

  trait NaturalNumber

  trait NaturalNumber0 extends NaturalNumber

  trait NaturalNumberSucceeds[T <: NaturalNumber] extends NaturalNumber

Next we'll have a type representing each and every number. I'll only bother with the natural numbers 0 to 4 to make things simple:

    type Type0 = NaturalNumber0
    type Type1 = NaturalNumberSucceeds[Type0]
    type Type2 = NaturalNumberSucceeds[Type1]
    type Type3 = NaturalNumberSucceeds[Type2]
    type Type4 = NaturalNumberSucceeds[Type3]

This is just shorthand. Really, Type4 is of type:

NaturalNumberSucceeds[NaturalNumberSucceeds[NaturalNumberSucceeds[NaturalNumberSucceeds[NaturalNumber0]]]]

But that's far too verbose.

So much for the types, let's have some instances:

    implicit val _0 = new Type0 {}
    implicit val _1 = new Type1 {}
    implicit val _2 = new Type2 {}
    implicit val _3 = new Type3 {}
    implicit val _4 = new Type4 {}

Now comes our first function that makes assertions at compile time. It asserts that T is the immediate successor to U and looks like this:

  def immediateSuccessor[T <: NaturalNumberSucceeds[U], U <: NaturalNumber]: Unit = {}

And we can see it immediately working:

    immediateSuccessor[Type3, Type2] // compiles
//  immediateSuccessor[Type3, Type1] // doesn't compile because Type3 is not the immediate successor to Type1

A more useful examples tells us what is the successor without us having to assert:

    implicit def add1[A <: NaturalNumberSucceeds[_]](implicit b: NaturalNumberSucceeds[A]): NaturalNumberSucceeds[A] = b
    val result3add1: Type4 = add1[Type3]

Well, we assert insofar as we define the type of result3add1 but we didn't have to.

Subtraction proved a little more difficult for me to understand. Playing around with some Shapeless code I got this:

    trait NaturalPair[A <: NaturalNumber, B <: NaturalNumber]

    implicit def sub[A <: NaturalNumber, B <: NaturalNumber](implicit e: NaturalPair[NaturalNumberSucceeds[A], NaturalNumberSucceeds[B]])
      = new NaturalPair[A, B] {}

So, A is the number we want to start with and then we reduce it by B. This implicit function will then recurse until one or the other reaches 0.

    implicit val analyseThis = new NaturalPair[Type3, Type1] {}
    val result3sub1          = implicitly[NaturalPair[Type2, Type0]]
//  val result3sub1Wrong     = implicitly[NaturalPair[Type1, Type0]] // doesn't compile because 3 - 1 != 1

Here we send into the implicit ether a request to take a 3 and subtract a 1. This is picked up by our implicit sub function that decrements both 3 and 1 until one or both reach 0. We then assert that a (2,0) pair is out there in the ether and indeed it is. Our code compiles.

The example Brown uses is much more complicated than this but I thought this would be a more accessible introduction.

Thursday, July 9, 2015

Setting up a little Spark/HDFS cluster


Setting up Spark is not too hard but there are few environment issues you might need to know about. For this, I'm assuming you're using some Linux distro.

The environment

First, set up SSH keys on your boxes so you don't have to type your password all the time. This is a good read. I also use rdist to keep everything in synch. Just be careful about copying the data directory over...

A simple Spark cluster

Having installed the various binaries, the documentation covers most of what you need to know. I have two computers, nuc and phillsdell. Somewhat arbitrarily, on nuc, I run the master with:

export SPARK_MASTER_IP=[nuc's IP address]
export SPARK_LOCAL_IP=[nuc's IP address]
$SPARK_INSTALLATION/sbin/start-master.sh

and a slave with

$SPARK_INSTALLATION/sbin/start-slave.sh spark://[nuc's IP address]:7077

On phillsdell, I run:

$SPARK_INSTALLATION/sbin/start-slave.sh spark://[nuc's IP address]:7077

for a slave and:

$SPARK_INSTALLATION//bin/spark-shell --master  spark://[nuc's IP address]:7077

for the Spark shell.

If you see lots of errors like this:

15/07/06 08:40:42 INFO Worker: Connecting to master akka.tcp://sparkMaster@nuc:7077/user/Master...
15/07/06 08:40:53 INFO Worker: Retrying connection to master (attempt # 1)

you'll need to configure your firewall to accept all the node chatter. If you're using Ubuntu, this is a good resource.

Anyway, to check everything is OK, let's run:

scala> val myRdd = sc.parallelize(1 to 1000000)
myRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at :21

scala> val myRddWKeys = myRdd.map(x => (x % 1000, x))
myRddWKeys: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at map at :23

scala> val myRddReduced = myRddWKeys reduceByKey (_ + _)
myRddReduced: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[6] at reduceByKey at :25

scala> myRddReduced foreach (key => println(s"key = ${key._1} value = ${key._2}") )

15/07/06 10:19:43 INFO SparkContext: Starting job: foreach at :28
.
.

This creates an RDD with a million numbers, divides them into one thousand groups and reduces the numbers in each group by adding them all up. Finally, for each group, we print out the reduced number. The results can be seen at http://phillsdell:8081 and http://nuc:8081 where there are 500 rows printed to stdout on each slave.

HDFS

Now, let's add Hadoop to the mix by starting it on our boxes.

If you see something in the namenode log like:

2015-07-03 12:17:31,300 ERROR org.apache.hadoop.hdfs.server.namenode.NameNode: Failed to start namenode.
org.apache.hadoop.hdfs.server.common.InconsistentFSStateException: Directory /tmp/hadoop-henryp/dfs/name is in an inconsistent state: storage directory does not exist or is not accessible.

then don't forget to run:

hadoop namenode -format

(more information here). I also needed to add this:

  <property>
    <name>fs.default.name</name>
    <value>hdfs://[phillsdell's IP address]</value>
  </property>
  <property>
    <name>dfs.data.dir</name>
    <value>file:///home/henryp/Tools/Cluster/Hadoop/tmp/dfs/name/data</value>
    <final>true</final>
  </property>
  <property>
    <name>dfs.name.dir</name>
    <value>file:///home/henryp/Tools/Cluster/Hadoop/tmp/dfs/name</value>
    <final>true</final>
  </property>

to my ./etc/hadoop/core-site.xml file.

Now, I can go to:

http://phillsdell:50070/dfshealth.html#tab-datanode

and see both data nodes in my cluster :)

Pulling it all together

I download historical data of the FTSE 100 from here. Using this helpful list of commands, I executed:

hadoop-2.7.0/sbin$ ../bin/hadoop fs -mkdir /ftse100
hadoop-2.7.0/sbin$ ../bin/hadoop fs -put ~/Downloads/table.csv /ftse100

and noticed in the web admin page that the file has been copied onto HDFS. Now, in the Spark shell:

scala> val csvRDD = sc.textFile("hdfs://192.168.1.15:8020/ftse100/table.csv")
.
.

scala> csvRDD.count
.
.
res3: Long = 8211

Which is not coincidentally the line count of the original file:

$ wc -l ~/Downloads/table.csv 
8211 /home/henryp/Downloads/table.csv

So, it appears that my standalone spark cluster is quite happily talking to my Hadoop HDFS.


Tuesday, June 30, 2015

Scala's conform idiom


This is an odd little fellow in akka.testkit.TestFSMRef:

class TestFSMRef[S, D, T <: Actor](
  system: ActorSystem,
  props: Props,
  supervisor: ActorRef,
  name: String)(implicit ev: T <:< FSM[S, D])

What is that <:< symbol? It turns out that it comes from Predef, thus:

  sealed abstract class <:<[-From, +To] extends (From => To) with Serializable
  private[this] final val singleton_<:< = new <:<[Any,Any] { def apply(x: Any): Any = x }
  implicit def $conforms[A]: A <:< A = singleton_<:<.asInstanceOf[A <:< A]

The documentation says:

Requiring an implicit argument of the type `A <:< B` encodes the generalized constraint `A <: B`.

So, A is a subclass of B. Got it. But how is that enforced for each and every child/parent relationship?

Well, there is a single, implicit $conforms[A] lingering in the ether that insists that A is a superclass or same as A. That's what A <:< A is saying. This is using a type infix operator for syntactic sugar and is equivalent to <:<[A, A].

"A is a superclass or same as A"? This in itself is not terribly useful. However,  <:<[B, A] also must be possible if and only if B is a superclass of A because  <:< is contravariant in its first type (-From) and covariant it its second (+To). That is, <:<[B, A] is a subtype of <:<[A, A] so everything compiles.

So, going back to our original Akka code, the type T <:< FSM[S, D] is enforcing T being a subclass of FSM[S, D] by virtue of there implicitly existing T <:< T and the only way this is satisfied without compilation errors is T <: FSM[S, D].


Friday, June 5, 2015

Monads are easy too!


A lot of people are frightened of Monads for no reason. Perhaps it's because their origins are based in Category Theory. Or that they are used extensively in Haskell. But neither of these need to be understood for the average Java/Scala programmer.

All that needs to be understood are the Monadic Laws (from Coursera):

Monad Laws

Associativity

(m flatMap f) flatMap g == m flatMap (x => f(x) flatMap g)

Left unit

unit(x) flatMap f == f(x)

Right unit

m flatMap unit == m

(think of unit as just a factory function).

Note: f(x) will return a monadic structure, that is: f is of type X => M[X]  (if x is of type X and m is of type M in the above laws).

Like monoids, monads obey a few simple rules and that's that. So, what's so good about them?

Well, because monoids obey some laws, you can make assumptions about them (eg, when folding, you don't care if it's a left or right fold if you're using monoids).

With monads, you can make similar assumptions. For instance, given these simple functions:

  def addOne(x: Int)            : Int       = x + 1
  def times10(x: Int)           : Int       = x * 10

and this simple monad:

    val option7 = Some(7)

then these two lines are exactly equivalent:

    val outer = option7.flatMap(x => Some(times10(x))).flatMap(x => Some(addOne(x)))
    val inner = option7.flatMap(x => Some(times10(x)).flatMap(x => Some(addOne(x))))

This is useful when dealing with for-comprehensions (which are just syntactic sugar). 

Monads are also used a lot in Haskell as it's a lazy language and they make it actually do something (note: this is not my area of expertise).

The Monad API

Neither Scala nor Java 8 have a common trait or interface that all their monads implement even though they code. For instance, that's exactly what is done in Scalaz.

But, as mentioned, they're used in for-comprehensions in Scala so you might have been using them without even knowing! The compiler doesn't care that the monads don't have a common interface. It knows for-comprehension syntactic sugar can be converted to their respective map and flatMap methods. Then, using the monadic laws above, refactoring for-comprehensions is safe and easy.

Conclusion

So, what monads are is easy. Why they are useful is more subtle. Here is an interesting video on the subject of monads.


Thursday, May 28, 2015

Testing a Lambda Architecture in a single JVM


Lambda Architecture
 has become very fashionable but how best to test it in a single JVM?

Basically, Lambda Architecture is composed of three layers.

- a large store of data (eg, HDFS) against which you can run batch jobs
- real time data processing (eg, Spark Streaming)
- a fast access layer (eg, Cassandra)

Ideally, you want to code such that your integration tests can be run in your IDE with no additional systems needed. This can be done as follows.

HDFS

Hadoop has a MiniCluster that facilitates testing. Pulling it into your project is simply a matter of adding a dependency to your pom.xml (see here). Note: I had to do some fiddly hacks to get it working with my Windows machines without installing binaries.

Once you have changed your POM, you can use it with:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.{DistributedFileSystem, MiniDFSCluster}
import java.nio.file.Files
.
.
    val baseDir           = Files.createTempDirectory("tests").toFile.getAbsoluteFile
    val conf              = new Configuration()
    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath)
    val builder           = new MiniDFSCluster.Builder(conf)
    val hdfsCluster       = builder.build()
    val distributedFS     = hdfsCluster.getFileSystem
    val hdfsUri           = "hdfs://127.0.0.1/" + hdfsCluster.getNameNodePort + "/"


Spark Streaming

There is an excellent article on how to use Spark Streaming in you integration tests here. Given a SparkContext, you can parallelize any data you have in memory and test against that as in the link.

But, if you want to run against a real Kafka instance, you want code that looks something like the following. Let's say we're dealing just with Strings. You can add your own compression etc later. But let's define the following:

  type KeyType                                  = String
  type ValueType                                = String
  type RddKeyValue                              = (KeyType, ValueType)

Now assume we have Kafka and Zookeeper up and running. We'll need a function that takes the Kafka stream:

import org.apache.spark.streaming.dstream.DStream

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

import org.apache.hadoop.mapred.JobConf
.
.
  def handler(path: String): (DStream[(KeyType, ValueType)]) => Unit = {
    stream =>
      stream.foreachRDD { rdd =>
        if (rdd.count() > 0) {
          // do what you need to do here.
          // After, we save the data to HDFS so:
          rdd.saveAsNewAPIHadoopFile(
            path,
            classOf[KeyType],
            classOf[ValueType],
            classOf[TextOutputFormat[KeyType, ValueType]],
            createHdfsPersistenceJobConf)
        }
      }
  } 

  def createHdfsPersistenceJobConf: JobConf = {
    val jobConf = new JobConf
    jobConf

  }

Note: this createHdfsPersistenceJobConf method must be a separate method in an object otherwise you'll see runtime messages such as:

NotSerializableException: org.apache.hadoop.mapred.JobConf

because it will otherwise have a reference to its enclosing function.

Now, we need a function that kicks the whole streaming process off:

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils

import kafka.serializer.StringDecoder
.
.
  def creatingFunc(checkpointFolder: String): () => StreamingContext = () => {
    val conf                           = new SparkConf().setMaster("local[*]").setAppName("myAppName")
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val streamingContext               = new StreamingContext(conf, Duration(3000))
    streamingContext.checkpoint(checkpointFolder)

    val dStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[KeyType, ValueType, StringDecoder, StringDecoder](streamingContext,
        kafkaConf,
        Set("my_topic_name"))

    handler(checkpointFolder)(dStream)
    streamingContext
  }

Then tell Spark Streaming to start using this function:

  val checkpointFolder  = hdfsUri + "checkpoint_directory"
  val streamingContext  = StreamingContext.getOrCreate(checkpointFolder, creatingFunc(checkpointFolder))

  streamingContext.start()

The handler will be called first at the StreamingContext.getOrCreate and then as Spark Streaming polls the stream.

Note that HDFS is also used for Kafka's checkpointing. Upon failure, the checkpoint directory holds all information needed to keep going. For this reason, you want your functions to be idempotent as there is a possibility that a message in the stream may be processed again.

One final note: Spark fires up a Jetty instance as an admin tool. This was initially giving me:

class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package

errors when I used:

      <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.3.1</version>
      </dependency>       
      <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
      </dependency>  
      <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.3.1</version>
      </dependency>  
      <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-minicluster</artifactId>
            <version>2.6.0</version>
      </dependency>

  So, you need to make sure your dependency on Jetty (org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016 for me) is the first in your classpath.


Friday, April 17, 2015

Cassandra Table Design


Our customer wants a table that has hundreds of thousands of rows stored in it per day. No problem, Cassandra handles it well. Our performance test shows that with a Replication Factor of 3, writes in a 5-node cluster take a touch less than 1ms and reads about 2ms per row. If you want to pull out more rows, then it's even better - 1000 rows takes about 400ms.

However, our customer wants to slice and dice the data, pivoting on an arbitrary set of columns, aggregating values as they go. Now, Cassandra is great at storing data but does not offer the richness you'd get in a SQL DB for retrieving subsets of it. You really want to search on primary keys. You can create indexes but they just create invisible tables for you. And even if you search on primary keys, you need the right permutation of keys to avoid WITH FILTERING.

So, our first stab got us creating way too many tables because of our customer requirements. The number of column families we can pivot on causes the number of tables to grow factorially. If our customer requires us to pivot on say 5 columns, that's a combination of

n!
r!(n-r)!

where n in our case is the 5 potential choices on offer and r is the number the user chooses.

Of course, our user can choose any number, from 1 to 5, so the number of permutations now looks like:

 5 

r = 1
n!
r!(n-r)!


Which is starting to cause the number of tables to balloon since

up to 3 choices means 7 tables
up to 4 choices means 15 tables
up to 5 choices means 31 tables
.
.

The number of rows also starts rising like:


 
r = 1
 r 

p = 1

dp


where r and p are just indexes and dp is the number of distinct elements for a given dimension p.

Note: this is the worse case scenario. It depends on your data how many rows there will actually be. As it happens, since our data "clumps" around certain regions, our numbers were much smaller than this.

But this article has a very interesting way of querying the data for just what you want and using one table in which to do it. To leverage it, we need to pre-compute the aggregations. As it happens, we're using Apache Spark for that then dumping it into Cassandra.

Friday, March 27, 2015

Not so clever Spark


More playing with new shiny toys...

Apache Spark promises to make handling big data faster. It also interfaces with (among other things) Cassandra. This is very useful as Cassandra has limited commands for processing data.

I tried reading in my data from Cassandra on the Spark CLI, so:

val rdd = sc.cassandraTable("keyspace", "table")

Now, I want to sum up all the values for a particular subset of my rows in table. Being little more than a key/value store, I can't do this in Cassandra so let's try it in Spark:

rdd.filter(_.getString("my_where_field") == "X").map(_.getDouble("my_value")).fold(0)((x, y) => x + y)

Not so fast. Really. It takes minutes. The solution is to be more precise earlier:

val rdd = sc.cassandraTable("keyspace", "table").where("my_where_field" = 'X')
rdd.aggregate(0.0D)((acc, row) => acc + row.getDouble("my_value"), (d1, d2) => d1 + d2)

(The second function ((d1, d2) => d1 + d2) defines how the results from all the jobs are processed.)

This is much faster! About 3s over 10 000 rows out of 100 million.

Spark is very nice but don't expect queries to run within a time the average web user's is familiar with. It spins up JVMs on all the boxes in the cluster and executes jobs by default in a serial manner. However, if you want to reduce number crunching from, say, an hour down to minutes, it might be the tool for you.

But do be aware that it is still somewhat immature. I downloaded the latest DataStax bundle that has version 1.1.0 (it's on 1.3 at the moment) and got a ClassCastException deep down in Scala code using Spark SQL (which to be fair is not considered production ready).

val results = csc.cassandraSql("select sum(my_value) from table where my_where_field = 'X'
.
.
results.collec().foreach(println)
15/03/27 19:09:27 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 18, 10.20.166.198): java.lang.ClassCastException: java.math.BigDecimal cannot be cast to scala.math.BigDecimal
    scala.math.Numeric$BigDecimalIsFractional$.plus(Numeric.scala:182)
    org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58)
    org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58)
    org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:114)
    org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:58)