Sunday, November 27, 2016

GraphX


Spark tries to abstract you from the data so you can write beautiful, functional code. The truth is that you have to get your hands dirty if you don't want it to run like a dog.

I'm processing a graph of over 100 million edges and about 20 million vertices. It's been a learning experience to make it perform. It first took over 24 hours to execute the Connected Components algorithm. I finally managed to run it in less than an hour. But it wasn't easy.

Cluster configuration

I didn't find GraphX very memory hungry, at least not for the executors (the entire graph was only some 4gb on HDFS). After 24 hours on my first attempt, the job looked far from over. Using jstat, I noticed that the driver was GCing a lot. It had 8gb but I increased it to 20 and it was much more happy.

Although one is often recommended to run a larger number of executors with fewer cores each, for GraphX this might not be the best configuration. Facebook found:

"Even though both systems can run multiple workers on a single physical machine, we found that it is more efficient when we start only one worker process per machine and allocate all the available memory to it. In the following experiments, the number of workers is also the number of physical machines used for each job. We allocated 80 GB of RAM to each worker."

The shape of the graph

What your graph looks like makes a difference to the performance of GraphX. Looking at the maximum number of "Shuffle write Size / Records" in each super step (mapPartitions at GraphImpl.scala:207 in Spark 2.0.2) in the Spark web GUI, we see the numbers steadily decreasing.

This depends on the shape of the graph.

"GraphFrames and GraphX both use an algorithm which runs in d iterations, where d is the largest diameter of any connected component (i.e., the max number of hops between any 2 nodes in a component). So the running time will vary significantly depending on the your graph's structure. Tuning the vertices and edges DataFrames by making sure to cache them and possibly adjust the partitioning beforehand can help." (from DataBricks forum)

As somebody pointed out on StackOverflow:

"If you have a cluster where each of the vertexes is connected to every other vertex, then after one round of messages each one knows who the lowest VertexID is, and they all go silent the next round. One sequential step, the entire cluster.

"If, on the other hand, each vertex in the cluster is only connected to at most 2 other vertices, then it could take N sequential steps before all the vertices know who what the minimum VertexID is."

I tested this by making a graph that was just a chain of numbers 1, 2, 3, 4 ... 20. I found that after 10 super-steps, there were 10 resolved connected components: {1 to 11}, {12}, {13}, {14} ... {20}.

Partitioning

How do you partition a graph? Facebook said that they got good results using EdgePartition2D. The trick here is to imagine the graph as matrix where for a given vertex X, all non-zero elements in row X indicate an outgoing edge and all non-zero elements in row X indicate an incoming edge.

Therefore, all the details for a given vertex will either be in single row and a single column. So, for an NxN matrix, all the data will be in 2N elements (ie one row plus one column). Equivalently, if there are E edges, the upper bound on the number of partitions where we store the data for X is O(√E) assuming a maximum of one edge per pair of vertices (which is not a problem for the connected components algorithm).

But the biggest saving came from reducing the number of partitions. The previous stage of the pipeline had deliberately and explicitly increased the number of partitions to avoid running out of memory when performing matrix multiplication. This can be a disaster for GraphX.

The previous job had increased the number of partitions to some 18k. What's going on is described here:
As the graph algorithm progresses, it is common for less and less [sic] of the vertices to remain active. Therefore a full scan of all triplets during a map-reduce becomes less and less effective. “For example, in the last iteration of connected components on the Twitter graph, only a few of the vertices are still active. However, to execute mrTriplets we still must sequentially scan 1.5 billion edges and check whether their vertices are in the active set.”
Although GraphX has clever optimizations here, there's was still a huge amount of overhead in constantly mapping and reducing.

Coalescing the 18 000 partitions to a mere 360 on a cluster with 8 boxes and 24 cores per box, the time to run was reduced to a healthy 10-40 minutes depending on sub-graph structures.


No comments:

Post a Comment