Tuesday, November 29, 2016

An Alternative Approach to Matrices in Spark


I've been writing some code that performs TF-IDF on a corpus of text made up of M number of records. Given the frequency of each term, I can build a normalized vector for each record made up of the terms in it. To reduce complexity, I ignore terms that appear more than W times.

Cosine Similarities and Matrix Multiplication

(Note: since Spark 1.6.0, there is a method in IndexedRowMatrix.columnSimilarities to compute cosine similarities. However, this only delegates to an old 1.2.0 method that uses DIMSUM. This blows up on anything remotely large with OOMEs. See here for more information).

If we have a set of vectors that we want to multiply together to find their cosine similarities, we can imagine them as a matrix where each row in that matrix is one of our vectors.

So, imagine we have M vectors { v1, v2, v3, ... vM }. Each vector is made up of N elements so, for instance, vector vcan be expressed as { v11, v12, v13 ... v1N }.

We can express this as a matrix that looks like:

v11v12v13...v1N
v21v22v23...v2N
v31v32v33...v3N
.......
vM1vM2vM3...vMN

(You'll notice that each row represents a record and each column represents a term. If a term appears in that record then vij is non-zero).

What we want is a matrix that holds every vector multiplied by every other vector. For instance, for v1 and v2 we want

v11v21+ v12v22+ v13v23 +... v1Nv2N

Well, this is the same as multiplying the matrix by its own transpose:
v11v12v13...v1N
v21v22v23...v2N
v31v32v33...v3N
.......
vM1vM2vM3...vMN
v11v21v31...vM1
v12v22v32...vM2
v13v22v33...vM3
.......
v1Nv2Nv3N...vMN
and looking at element (1, 2) of that matrix for this particular pair in this particular example.

So, using the brevity of summation notation, if this matrix is called A then:


N
Aij  = Σvinvjn

n=1

Or, using Einstein notation, we can state it even more succinctly:

Aij = vinvjn

since the n index is repeated.

Given two matrices, one m x n the other n x p, the complexity of multiplying them is O(mnp). In our case, we're multiplying a matrix by its transpose so p = m. Therefore, the complexity is O(m2n) - pretty bad.

The good news is that our matrix is sparse. How sparse depends on our variable W that determines at what point we start discarding terms in our TF-IDF stage.

Spark Implementation

Since we are just multiplying a matrix by its transpose, we might want to use BlockMatrix.multiply. However, the documentation tells us:
"The output BlockMatrix will only consist of blocks of DenseMatrix. This may cause some performance issues until support for multiplying two sparse matrices is added."
Since M and N for us is typically hundreds of millions, this is a problem.

So, we might want to just manually multiply any two vectors, i and j, in our equation for Aij. The trouble is that if we have two vectors in a Spark RDD, vi and vj, they may very well live in different partitions. And since we want to calculate vi . vj for all i,j then we'll definitely be hitting all partition combinations and there will be O(M2) multiplications.

We want to reduce the amount of inter-partition shuffling and avoid O(M2) multiplications of N-sized vectors. One solution might be to take each vector and for each non-zero element attach it's column ID which we'll call k. We can then reduceByKey (where the k is the key) and start multiplying out the set of values for each key.

That is, associated with every column, k, there will be a set of vectors that have non-zero elements for their k-th element. There will be a maximum of W of them since that's the limit we imposed on how often a term appears before we consider it insignificant. We'll call this set K where in set-builder notation:

K = { vxk ∈ A | vxk ≠ 0 and |K| ≤ W}

We then take a Cartesian product of this set and attach the tuple (i,j), that is, its co-ordinates such that we have a set, D, where:

Dk = { ((i,j), vik vjk) | vikvjk ∈ K and i < j}

We ignore the case where i=j since we're not interested in the cosine similarities of a vector with itself. We know it's 1 as they're unit vectors.

Note, that this can be computationally expensive as it scales as O(W2).

Now we can reduceByKey on the co-ordinates (i,j) and we'll have obtained the cosine similarity for all i, j.

Validity of this approach

Mathematically, if our column index is k and we call the contribution of the term corresponding to k to any cosine similarity between any pair of vectors (i,j), Di,jk then:

Di,jk = vikvjk

which is just our equation above for Aij (with k=n) thus proving that this technique is mathematically correct at least. How good its performance is something else...

By making the key the first element of a tuple, all values will live in the same partition. Spark will partition Tuple2s according to the first element in the tuple.

Partitions

We can demonstrate this so:

    type Pair = (Long, Long)

    val pairs = (1 to 100).map(x => (x.toLong, Random.nextLong()))
    val rdd   = sparkContext.parallelize(pairs)

which gives us an RDD of random pairs of Longs, the first being between 1 and 100. Now we create a function that can map over the RDD assigning each element a number unique to the partition it's in:

  type Enumerated[T] = (Int, T)

  val idGenerator = new AtomicInteger(0)
  def partitionMapper[T]: Iterator[T] => Iterator[Enumerated[T]] = { pairIterator =>
    val partitionId = idGenerator.getAndIncrement()
    val enumerated  = ArrayBuffer[Enumerated[T]]()
    pairIterator.foreach { pair =>
      enumerated += ((partitionId, pair))
    }
    enumerated.toIterator
  }

Finally, we see the output:

    val partitioned  = rdd mapPartitions partitionMapper[Pair]
    val inMem        = partitioned.collect()
    val groupedPairs = inMem.groupBy(_._1).toSeq.sortBy(_._1)
    groupedPairs foreach { case (partitionId, enumerated) =>
      val vals = enumerated.map(_._2._1)
      println(s"partition #$partitionId: ${vals.min}  to ${vals.max} ")
    }

Which produces:

partition #0: 51  to 75
partition #1: 1  to 25
partition #2: 26  to 50
partition #3: 76  to 100

Very neat.

Conclusion

It looks like I am not the only person who is rolling-his-own distributed matrix multiplication solution using Spark (see this link here). It seems others have found you don't get it out-of-the-box. There appear to be moves afoot to address this.

No comments:

Post a Comment