Thursday, May 28, 2015

Testing a Lambda Architecture in a single JVM


Lambda Architecture
 has become very fashionable but how best to test it in a single JVM?

Basically, Lambda Architecture is composed of three layers.

- a large store of data (eg, HDFS) against which you can run batch jobs
- real time data processing (eg, Spark Streaming)
- a fast access layer (eg, Cassandra)

Ideally, you want to code such that your integration tests can be run in your IDE with no additional systems needed. This can be done as follows.

HDFS

Hadoop has a MiniCluster that facilitates testing. Pulling it into your project is simply a matter of adding a dependency to your pom.xml (see here). Note: I had to do some fiddly hacks to get it working with my Windows machines without installing binaries.

Once you have changed your POM, you can use it with:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.{DistributedFileSystem, MiniDFSCluster}
import java.nio.file.Files
.
.
    val baseDir           = Files.createTempDirectory("tests").toFile.getAbsoluteFile
    val conf              = new Configuration()
    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath)
    val builder           = new MiniDFSCluster.Builder(conf)
    val hdfsCluster       = builder.build()
    val distributedFS     = hdfsCluster.getFileSystem
    val hdfsUri           = "hdfs://127.0.0.1/" + hdfsCluster.getNameNodePort + "/"


Spark Streaming

There is an excellent article on how to use Spark Streaming in you integration tests here. Given a SparkContext, you can parallelize any data you have in memory and test against that as in the link.

But, if you want to run against a real Kafka instance, you want code that looks something like the following. Let's say we're dealing just with Strings. You can add your own compression etc later. But let's define the following:

  type KeyType                                  = String
  type ValueType                                = String
  type RddKeyValue                              = (KeyType, ValueType)

Now assume we have Kafka and Zookeeper up and running. We'll need a function that takes the Kafka stream:

import org.apache.spark.streaming.dstream.DStream

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

import org.apache.hadoop.mapred.JobConf
.
.
  def handler(path: String): (DStream[(KeyType, ValueType)]) => Unit = {
    stream =>
      stream.foreachRDD { rdd =>
        if (rdd.count() > 0) {
          // do what you need to do here.
          // After, we save the data to HDFS so:
          rdd.saveAsNewAPIHadoopFile(
            path,
            classOf[KeyType],
            classOf[ValueType],
            classOf[TextOutputFormat[KeyType, ValueType]],
            createHdfsPersistenceJobConf)
        }
      }
  } 

  def createHdfsPersistenceJobConf: JobConf = {
    val jobConf = new JobConf
    jobConf

  }

Note: this createHdfsPersistenceJobConf method must be a separate method in an object otherwise you'll see runtime messages such as:

NotSerializableException: org.apache.hadoop.mapred.JobConf

because it will otherwise have a reference to its enclosing function.

Now, we need a function that kicks the whole streaming process off:

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils

import kafka.serializer.StringDecoder
.
.
  def creatingFunc(checkpointFolder: String): () => StreamingContext = () => {
    val conf                           = new SparkConf().setMaster("local[*]").setAppName("myAppName")
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val streamingContext               = new StreamingContext(conf, Duration(3000))
    streamingContext.checkpoint(checkpointFolder)

    val dStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[KeyType, ValueType, StringDecoder, StringDecoder](streamingContext,
        kafkaConf,
        Set("my_topic_name"))

    handler(checkpointFolder)(dStream)
    streamingContext
  }

Then tell Spark Streaming to start using this function:

  val checkpointFolder  = hdfsUri + "checkpoint_directory"
  val streamingContext  = StreamingContext.getOrCreate(checkpointFolder, creatingFunc(checkpointFolder))

  streamingContext.start()

The handler will be called first at the StreamingContext.getOrCreate and then as Spark Streaming polls the stream.

Note that HDFS is also used for Kafka's checkpointing. Upon failure, the checkpoint directory holds all information needed to keep going. For this reason, you want your functions to be idempotent as there is a possibility that a message in the stream may be processed again.

One final note: Spark fires up a Jetty instance as an admin tool. This was initially giving me:

class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package

errors when I used:

      <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.3.1</version>
      </dependency>       
      <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
      </dependency>  
      <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.3.1</version>
      </dependency>  
      <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-minicluster</artifactId>
            <version>2.6.0</version>
      </dependency>

  So, you need to make sure your dependency on Jetty (org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016 for me) is the first in your classpath.