Tuesday, March 20, 2018

Fighting Orcs


Orc has an amazing ability to compress data. A Parquet file of 1.1tb shrank to 15.7gb when saved as Orc although note that the Orc file is compressed. "The default compression format for ORC is set to snappy" (StackOverflow). This indeed seems to be the case as you can see snappy in the name of files in the HDFS directory.

But how fast is querying it?

Well, there was an initial pause when first reading the file with:

val orcFile = spark.read.orc(HDFS_DIR_NAME)

It seemed that the driver was doing all the work and all 60 executors I had asked my shell to employ were doing nothing. Using jstack showed that the driver was trying to infer the schema (see o.a.s.s.e.d.DataSource.getOrInferFileFormatSchema). Adding the schema would mitigate this.

Now, let's group by a field called dax.

orcFile.groupBy('dax).agg(count('dax))

(There are 36 different values for dax over nearly 890 million rows with 292 columns).

This groupBy took about 50s when the underlying format was Parquet and 37s when it was Orc (not including the perfectly avoidable large read time) so not bad. How does Orc do it? It has 'indexes'. Sort of. "The term 'index' is rather inappropriate. Basically it's just min/max information persisted in the stripe footer at write time, then used at read time for skipping all stripes that are clearly not meeting the WHERE requirements, drastically reducing I/O in some cases" (StackOverflow).

In fact, you can see this metadata using the Hive command line:

hive --orcfiledump HDFS_DIR_NAME

Can we make it even faster? I thought sorting and repartitioning might make a difference so I executed this:

orcFile.sort("dax").repartition(200).write.partitionBy("dax").save(SaveMode.Overwrite).orc(HDFS_DIR)

Note, cacheing the original Dataframe would cause OOMEs for reasons unknown to me at the moment. And without repartitioning, the third (of four) stages would take so long I had to kill the job. Repartitioning helped but I did notice a huge amount of shuffle (100s of gbs when the original file itself was only 16gb).

Also, I had to keep increasing the drivers' memory until I stopped getting OOMEs in the last stage. For reasons that I don't currently understand, the memory had to increase to 20gb before the job finished. Even then, it took 2.5 hours on 30 executors with 5 cores each and the resulting directory took 146gb of disk space.

However, the speed of a query was staggeringly fast. This:

partitionedOrc.where('dax === 201501).groupBy('dax).agg(count('dax)).show(10)

took a mere 2s. By comparison, the same query on the unpartitioned and unsorted Orc file took about 32s.

This speed is comparable to Apache Impala which "can be awesome for small ad-hoc queries" (StackOverflow).

Note that Impala, being "highly memory intensive (MPP), it is not a good fit for tasks that require heavy data operations like joins etc., as you just can't fit everything into the memory. This is where Hive is a better fit" (ibid).

Note that this speed increase appears to be available only when querying purely on that which is partitioned. For instance, let's take another field, mandant, and run a slightly modified version of the query above:

partitionedOrc.where('mandant === 201501).groupBy('dax).agg(count('dax)).show(10)

This takes 32s as well.

No comments:

Post a Comment