Friday, June 17, 2016

Spark, HBase and security


Getting Spark and HBase to talk to each other is clearly non-trivial if Jiras are anything to go by (see here and here).

My situation is this: I'm building TF-IDF statistics from a corpus of text (almost a terabyte of data). The dictionary is far too large to pass around as a broadcast variable that is typically done in most examples (eg, chapter 6 of Advanced Analytics with Spark). So, an external data store is needed. Simply because of various constraints, it has to be HBase version 0.98.18 and Spark 1.3.1.

The data is highly confidential and must be protected by Kerberos. The system is set up by simply using su to start a new shell thus giving a fresh ticket. The application runs under YARN and the Driver connects to HBase and initialises the tables before outsourcing the work to the Executors. And it's there were we see "Failed to find any Kerberos tgt" [Ticket Giving Ticket] when the Executors try to access these tables.

What is happening looks like this:

From "Hadoop: The Definitive Guide" by Tom White
The exact stack trace I was seeing is described at StackOverflow here but unfortunately their solution didn't work for me.

After a lot of head-scratching, I looked at the code in the new(ish) hbase-spark codebase. After some judicious thievery from the HBaseContext code, I wrote something like:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{Job, TableMapReduceUtil}
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.spark.{SparkContext, SerializableWritable}
import org.apache.spark.broadcast.Broadcast

object HBaseKerberosSecurity {

  def callThisFromTheDriverCode(config: Configuration, sc: SparkContext): Broadcast[SerializableWritable[Credentials]] = {
    val job = Job.getInstance(config)
    TableMapReduceUtil.initCredentials(job)
    sc.broadcast(new SerializableWritable(job.getCredentials))
  }

  def callThisFromYourExecutorCode(credentialConf: Broadcast[SerializableWritable[Credentials]]): Unit = {
    val ugi = UserGroupInformation.getCurrentUser
    val credentials = ugi.getCredentials // see YarnSparkHadoopUtils
    if (credentials != null) {
      ugi.addCredentials(credentials)
      ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
      ugi.addCredentials(credentialConf.value.value)
  }

}

and success! The executors too were allowed to read and write from HBase.

Note: make sure that this is called before you ever need a connection. If you call it once but your executor dies, a new one will come up and carry on from where the last one finished. If this code is not called again, you won't be authenticated.