Friday, June 5, 2015

Monads are easy too!


A lot of people are frightened of Monads for no reason. Perhaps it's because their origins are based in Category Theory. Or that they are used extensively in Haskell. But neither of these need to be understood for the average Java/Scala programmer.

All that needs to be understood are the Monadic Laws (from Coursera):

Monad Laws

Associativity

(m flatMap f) flatMap g == m flatMap (x => f(x) flatMap g)

Left unit

unit(x) flatMap f == f(x)

Right unit

m flatMap unit == m

(think of unit as just a factory function).

Note: f(x) will return a monadic structure, that is: f is of type X => M[X]  (if x is of type X and m is of type M in the above laws).

Like monoids, monads obey a few simple rules and that's that. So, what's so good about them?

Well, because monoids obey some laws, you can make assumptions about them (eg, when folding, you don't care if it's a left or right fold if you're using monoids).

With monads, you can make similar assumptions. For instance, given these simple functions:

  def addOne(x: Int)            : Int       = x + 1
  def times10(x: Int)           : Int       = x * 10

and this simple monad:

    val option7 = Some(7)

then these two lines are exactly equivalent:

    val outer = option7.flatMap(x => Some(times10(x))).flatMap(x => Some(addOne(x)))
    val inner = option7.flatMap(x => Some(times10(x)).flatMap(x => Some(addOne(x))))

This is useful when dealing with for-comprehensions (which are just syntactic sugar). 

Monads are also used a lot in Haskell as it's a lazy language and they make it actually do something (note: this is not my area of expertise).

The Monad API

Neither Scala nor Java 8 have a common trait or interface that all their monads implement even though they code. For instance, that's exactly what is done in Scalaz.

But, as mentioned, they're used in for-comprehensions in Scala so you might have been using them without even knowing! The compiler doesn't care that the monads don't have a common interface. It knows for-comprehension syntactic sugar can be converted to their respective map and flatMap methods. Then, using the monadic laws above, refactoring for-comprehensions is safe and easy.

Conclusion

So, what monads are is easy. Why they are useful is more subtle. Here is an interesting video on the subject of monads.


Thursday, May 28, 2015

Testing a Lambda Architecture in a single JVM


Lambda Architecture
 has become very fashionable but how best to test it in a single JVM?

Basically, Lambda Architecture is composed of three layers.

- a large store of data (eg, HDFS) against which you can run batch jobs
- real time data processing (eg, Spark Streaming)
- a fast access layer (eg, Cassandra)

Ideally, you want to code such that your integration tests can be run in your IDE with no additional systems needed. This can be done as follows.

HDFS

Hadoop has a MiniCluster that facilitates testing. Pulling it into your project is simply a matter of adding a dependency to your pom.xml (see here). Note: I had to do some fiddly hacks to get it working with my Windows machines without installing binaries.

Once you have changed your POM, you can use it with:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.{DistributedFileSystem, MiniDFSCluster}
import java.nio.file.Files
.
.
    val baseDir           = Files.createTempDirectory("tests").toFile.getAbsoluteFile
    val conf              = new Configuration()
    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath)
    val builder           = new MiniDFSCluster.Builder(conf)
    val hdfsCluster       = builder.build()
    val distributedFS     = hdfsCluster.getFileSystem
    val hdfsUri           = "hdfs://127.0.0.1/" + hdfsCluster.getNameNodePort + "/"


Spark Streaming

There is an excellent article on how to use Spark Streaming in you integration tests here. Given a SparkContext, you can parallelize any data you have in memory and test against that as in the link.

But, if you want to run against a real Kafka instance, you want code that looks something like the following. Let's say we're dealing just with Strings. You can add your own compression etc later. But let's define the following:

  type KeyType                                  = String
  type ValueType                                = String
  type RddKeyValue                              = (KeyType, ValueType)

Now assume we have Kafka and Zookeeper up and running. We'll need a function that takes the Kafka stream:

import org.apache.spark.streaming.dstream.DStream

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

import org.apache.hadoop.mapred.JobConf
.
.
  def handler(path: String): (DStream[(KeyType, ValueType)]) => Unit = {
    stream =>
      stream.foreachRDD { rdd =>
        if (rdd.count() > 0) {
          // do what you need to do here.
          // After, we save the data to HDFS so:
          rdd.saveAsNewAPIHadoopFile(
            path,
            classOf[KeyType],
            classOf[ValueType],
            classOf[TextOutputFormat[KeyType, ValueType]],
            createHdfsPersistenceJobConf)
        }
      }
  } 

  def createHdfsPersistenceJobConf: JobConf = {
    val jobConf = new JobConf
    jobConf

  }

Note: this createHdfsPersistenceJobConf method must be a separate method in an object otherwise you'll see runtime messages such as:

NotSerializableException: org.apache.hadoop.mapred.JobConf

because it will otherwise have a reference to its enclosing function.

Now, we need a function that kicks the whole streaming process off:

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils

import kafka.serializer.StringDecoder
.
.
  def creatingFunc(checkpointFolder: String): () => StreamingContext = () => {
    val conf                           = new SparkConf().setMaster("local[*]").setAppName("myAppName")
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val streamingContext               = new StreamingContext(conf, Duration(3000))
    streamingContext.checkpoint(checkpointFolder)

    val dStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[KeyType, ValueType, StringDecoder, StringDecoder](streamingContext,
        kafkaConf,
        Set("my_topic_name"))

    handler(checkpointFolder)(dStream)
    streamingContext
  }

Then tell Spark Streaming to start using this function:

  val checkpointFolder  = hdfsUri + "checkpoint_directory"
  val streamingContext  = StreamingContext.getOrCreate(checkpointFolder, creatingFunc(checkpointFolder))

  streamingContext.start()

The handler will be called first at the StreamingContext.getOrCreate and then as Spark Streaming polls the stream.

Note that HDFS is also used for Kafka's checkpointing. Upon failure, the checkpoint directory holds all information needed to keep going. For this reason, you want your functions to be idempotent as there is a possibility that a message in the stream may be processed again.

One final note: Spark fires up a Jetty instance as an admin tool. This was initially giving me:

class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package

errors when I used:

      <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.3.1</version>
      </dependency>       
      <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
      </dependency>  
      <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.3.1</version>
      </dependency>  
      <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-minicluster</artifactId>
            <version>2.6.0</version>
      </dependency>

  So, you need to make sure your dependency on Jetty (org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016 for me) is the first in your classpath.


Friday, April 17, 2015

Cassandra Table Design


Our customer wants a table that has hundreds of thousands of rows stored in it per day. No problem, Cassandra handles it well. Our performance test shows that with a Replication Factor of 3, writes in a 5-node cluster take a touch less than 1ms and reads about 2ms per row. If you want to pull out more rows, then it's even better - 1000 rows takes about 400ms.

However, our customer wants to slice and dice the data, pivoting on an arbitrary set of columns, aggregating values as they go. Now, Cassandra is great at storing data but does not offer the richness you'd get in a SQL DB for retrieving subsets of it. You really want to search on primary keys. You can create indexes but they just create invisible tables for you. And even if you search on primary keys, you need the right permutation of keys to avoid WITH FILTERING.

So, our first stab got us creating way too many tables because of our customer requirements. The number of column families we can pivot on causes the number of tables to grow factorially. If our customer requires us to pivot on say 5 columns, that's a combination of

n!
r!(n-r)!

where n in our case is the 5 potential choices on offer and r is the number the user chooses.

Of course, our user can choose any number, from 1 to 5, so the number of permutations now looks like:

 5 

r = 1
n!
r!(n-r)!


Which is starting to cause the number of tables to balloon since

up to 3 choices means 7 tables
up to 4 choices means 15 tables
up to 5 choices means 31 tables
.
.

The number of rows also starts rising like:


 
r = 1
 r 

p = 1

dp


where r and p are just indexes and dp is the number of distinct elements for a given dimension p.

Note: this is the worse case scenario. It depends on your data how many rows there will actually be. As it happens, since our data "clumps" around certain regions, our numbers were much smaller than this.

But this article has a very interesting way of querying the data for just what you want and using one table in which to do it. To leverage it, we need to pre-compute the aggregations. As it happens, we're using Apache Spark for that then dumping it into Cassandra.

Friday, March 27, 2015

Not so clever Spark


More playing with new shiny toys...

Apache Spark promises to make handling big data faster. It also interfaces with (among other things) Cassandra. This is very useful as Cassandra has limited commands for processing data.

I tried reading in my data from Cassandra on the Spark CLI, so:

val rdd = sc.cassandraTable("keyspace", "table")

Now, I want to sum up all the values for a particular subset of my rows in table. Being little more than a key/value store, I can't do this in Cassandra so let's try it in Spark:

rdd.filter(_.getString("my_where_field") == "X").map(_.getDouble("my_value")).fold(0)((x, y) => x + y)

Not so fast. Really. It takes minutes. The solution is to be more precise earlier:

val rdd = sc.cassandraTable("keyspace", "table").where("my_where_field" = 'X')
rdd.aggregate(0.0D)((acc, row) => acc + row.getDouble("my_value"), (d1, d2) => d1 + d2)

(The second function ((d1, d2) => d1 + d2) defines how the results from all the jobs are processed.)

This is much faster! About 3s over 10 000 rows out of 100 million.

Spark is very nice but don't expect queries to run within a time the average web user's is familiar with. It spins up JVMs on all the boxes in the cluster and executes jobs by default in a serial manner. However, if you want to reduce number crunching from, say, an hour down to minutes, it might be the tool for you.

But do be aware that it is still somewhat immature. I downloaded the latest DataStax bundle that has version 1.1.0 (it's on 1.3 at the moment) and got a ClassCastException deep down in Scala code using Spark SQL (which to be fair is not considered production ready).

val results = csc.cassandraSql("select sum(my_value) from table where my_where_field = 'X'
.
.
results.collec().foreach(println)
15/03/27 19:09:27 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 18, 10.20.166.198): java.lang.ClassCastException: java.math.BigDecimal cannot be cast to scala.math.BigDecimal
    scala.math.Numeric$BigDecimalIsFractional$.plus(Numeric.scala:182)
    org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58)
    org.apache.spark.sql.catalyst.expressions.Add$$anonfun$eval$2.apply(arithmetic.scala:58)
    org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:114)
    org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:58)



Sunday, March 22, 2015

Interesting syntax but does it makes a bad API?


There's yet another interesting syntax in Scala for methods that look like this:

  def intOrErr(x: String): Int Or ErrorMessage = { ...

where it appears that the return type can really be one of two types. This actually isn't the case. It's Scala's syntactic sugar. It is equivalent to:

  def intOrErr(x: String): Or[Int, ErrorMessage] = { ...

but reads more nicely.

I came across this syntax while looking at a colleague's use of the Scalactic library. Production code returned an Or and he mapped over it. The mistake he made was what if the code had executed the unhappy path? The assertions in his map function would not have executed and the test would have given a green bar, the coder none the wiser that his code was running unhappily.

He should have used a fold but that's not immediately obvious (unlike Java code that throws an Exception where you must catch it or explicitly state that the method calling your code must deal with it).

I posted on this forum to get opinions as to how worrying this is. There's some interesting responses ranging from it's less than ideal to assertions in test code suck and there are libraries to help do it better.

Thursday, February 26, 2015

Cassandra for the Busy - Part 2


Interesting problem I had this week that's obvious when you think about it.

The Config

I have a cluster of 5 Cassandra (version 2.0.11.83) nodes. I execute:

DROP KEYSPACE IF EXISTS my_keyspace
CREATE KEYSPACE my_keyspace WITH REPLICATION = { 
    'class' :              'SimpleStrategy',
    'replication_factor' : 2 
}
CREATE TABLE my_table (
    my_int_id    varint,
    my_string_id varchar,
    my_value     decimal,
    PRIMARY KEY (my_int_id, my_string_id)
)

I then start hammering my cluster with some CQL like:

INSERT INTO my_table 
    (my_int_id, my_string_id, my_value)
    VALUES (1, "2", 42.0)

or such like. The code I use to write to the DB looks something like:

RegularStatement  toPrepare         = (RegularStatement) (new SimpleStatement(cql).setConsistencyLevel(ConsistencyLevel.TWO));
PreparedStatement preparedStatement = cassandraSession.prepare(toPrepare);
.
.
BoundStatement    bounded           = preparedStatement.bind(new BigInteger(myIntId), myStringId, new BigDecimal("42.0"));
cassandraSession.execute(bounded);


The Error

I then kill a single node and my client starts barfing with:

Not enough replica available for query at consistency TWO (2 required but only 1 alive)

which baffled me initially because there were still 4 nodes in the cluster:

dse$ bin/nodetool -h 192.168.1.15 status my_keyspace
Datacenter: Analytics
=====================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address       Load       Tokens  Owns   Host ID                               Rack
UN  192.168.1.15  83.98 KB   256     40.0%  86df3d65-0e19-49b2-a49e-bc89d6347fa4  rack1
UN  192.168.1.16  82.78 KB   256     42.0%  8fghe485-0e19-49b2-a49e-bc89d6347fa5  rack1
UN  192.168.1.17  84.91 KB   256     38.0%  9834edf1-0e19-49b2-a49e-bc89d6347fa6  rack1
UN  192.168.1.18  88.22 KB   256     41.0%  873fdeab-0e19-49b2-a49e-bc89d6347fa7  rack1
DN  192.168.1.19  86.36 KB   256     39.0%  2bdf9211-0e19-49b2-a49e-bc89d6347fa8  rack1

Of course, the problem is that your tolerance to a node fault is:

    Tolerance = Replication Factor - Consistency Level

For me, that was Tolerance = 2 - 2 = 0 tolerance. Oops.

This is not a problem on all inserts. Running

dse$ bin/cqlsh -uUSER - pPASSWORD
Connected to PhillsCassandraCluster at 192.168.1.15:9160.
[cqlsh 4.1.1 | Cassandra 2.0.11.83 | DSE 4.6.0 | CQL spec 3.1.1 | Thrift protocol 19.39.0]
Use HELP for help.

and inserting worked fine. But this is because cqlsh uses a different consistency level.

cqlsh> consistency
Current consistency level is ONE.

so, let's change that:

cqlsh> consistency TWO

Consistency level set to TWO.

Now do the inserts fail? Well sometimes. It depends on which nodes we attempt to write to, something transparent to the user.


The Problem

Cassandra will try to put a piece of data on REPLICATION_FACTOR number of nodes. These nodes will be determined by hashing their primary key. If a node is down, Cassandra will not choose another. It simply cannot be written to at this time (of course, it might get the data if/when the node joins the cluster).

Upon inserting, the client decides how many nodes must be successfully written to before the INSERT is declared a success. This number is given by your CONSISTENCY_LEVEL. If the contract cannot be fulfilled then Cassandra will barf.


Further Reading

Application Failure Scenarios with Cassandra

Lightweight Transactions in Cassandra 2.0


Monday, February 23, 2015

Cassandra For the Busy - Part 1


A quick guide to Cassandra admin

From the Cassandra installation directory, running:

bin/nodetool -uUSER -pPASSWORD -hNODE ring KEYSPACE

(where the username, password, node and keyspace correspond to whatever you set up) produces data that reads like this:

Token is a number between Long.MIN_VALUE and Long.MAX_VALUE that is used in hashing the key. (See org.apache.cassandra.dht.Murmur3Partitioner.describeOwnership for how ownership is calculated over  a cluster using something like a MurmerHash algorithm. This is the default for Cassandra ).

Load refers to how much has been flushed to disk. Therefore, this may take some time to register a change after you have truncated a table containing 10 million rows. This appears to correspond to org.apache.cassandra.service.StorageService.getLoad().

Owns This represents how much of the ring range a token covers. Note: it might not add up to 100%. "For DC [data centre] unaware replication strategies, ownership without replication will be 100%" (see code comments here. for org.apache.cassandra.service.StorageService.effectiveOwnership). But with replication, it appears to be 100% * the replication factor.