|
| 1 | +package com.github.cloudml.zen.ml |
| 2 | + |
| 3 | +import scala.reflect.ClassTag |
| 4 | +import org.apache.spark.HashPartitioner |
| 5 | +import org.apache.spark.graphx._ |
| 6 | +import org.apache.spark.graphx.impl.GraphImpl |
| 7 | +import org.apache.spark.storage.StorageLevel |
| 8 | + |
| 9 | +object VMBLPPartitioner { |
| 10 | + /** |
| 11 | + * Modified Balanced Label Propogation, see: https://code.facebook.com/posts/274771932683700/large-scale-graph-partitioning-with-apache-giraph/ |
| 12 | + * This is the vertex-cut version (MBLP is an edge-cut algorithm for Apache Giraph) |
| 13 | + */ |
| 14 | + def partitionByVMBLP[VD: ClassTag, ED: ClassTag]( |
| 15 | + inGraph: Graph[VD, ED], |
| 16 | + numIter: Int, |
| 17 | + storageLevel: StorageLevel): Graph[VD, ED] = { |
| 18 | + |
| 19 | + val numPartitions = inGraph.edges.partitions.length |
| 20 | + var tbrGraph = inGraph |
| 21 | + tbrGraph.persist(storageLevel) |
| 22 | + |
| 23 | + for (i <- 0 to numIter) { |
| 24 | + val pidRdd = tbrGraph.vertices.mapPartitionsWithIndex((pid, iter) => iter.map(t => (t._1, pid)), true) |
| 25 | + val pidVertices = VertexRDD(pidRdd) // Get Vertices which v.attr = <partitionId of v> |
| 26 | + |
| 27 | + val pidGraph = GraphImpl(pidVertices, tbrGraph.edges) |
| 28 | + val neiVecVertices = pidGraph.aggregateMessages[Array[Int]](ectx => { |
| 29 | + val vecSrc = Array.fill(numPartitions)(0) |
| 30 | + vecSrc(ectx.dstAttr) += 1 |
| 31 | + val vecDst = Array.fill(numPartitions)(0) |
| 32 | + vecDst(ectx.srcAttr) += 1 |
| 33 | + ectx.sendToSrc(vecSrc) |
| 34 | + ectx.sendToDst(vecDst) |
| 35 | + }, (_, _).zipped.map(_ + _)) // Get Vertices which v.attr = Array[d0, d1, ..., dn] |
| 36 | + |
| 37 | + val wantVertices = neiVecVertices.mapValues(discreteSample(_)) |
| 38 | + // Get Vertices which v.attr = (<partitionId now>, <partitionId to move to>) |
| 39 | + val moveVertices = pidVertices.innerZipJoin(wantVertices)((_, fromPid, toPid) => (fromPid, toPid)) |
| 40 | + |
| 41 | + // Get a matrix which mat(i)(j) = total num of vertices that want to move from i to j |
| 42 | + val moveMat = moveVertices.aggregate(Array.fill(numPartitions, numPartitions)(0))({ |
| 43 | + case (mat, (_, ft)) => { |
| 44 | + if(ft._1 != ft._2) mat(ft._1)(ft._2) += 1 |
| 45 | + mat |
| 46 | + }}, (_, _).zipped.map((_, _).zipped.map(_ + _))) |
| 47 | + |
| 48 | + val newPidRdd = moveVertices.mapPartitions(iter => iter.map({case (vid, (from, to)) => { |
| 49 | + if(from == to) (vid, from) |
| 50 | + else { |
| 51 | + // Move vertices under balance constraints |
| 52 | + val numOut = moveMat(from)(to) |
| 53 | + val numIn = moveMat(to)(from) |
| 54 | + val threshold = math.min(numOut, numIn) |
| 55 | + val r = threshold.asInstanceOf[Float] / numOut |
| 56 | + val u = math.random |
| 57 | + if(u < r) (vid, to) |
| 58 | + else (vid, from) |
| 59 | + } |
| 60 | + }})) |
| 61 | + val newPidVertices = VertexRDD(newPidRdd) // Get Vertices which v.attr = <partitionId after moving> |
| 62 | + |
| 63 | + // Repartition |
| 64 | + val newPidGraph = GraphImpl(newPidVertices, tbrGraph.edges) |
| 65 | + val tempEdges = newPidGraph.triplets.mapPartitions(iter => iter.map{ |
| 66 | + et => (et.srcAttr, Edge(et.srcId, et.dstId, et.attr)) |
| 67 | + }) |
| 68 | + val newEdges = tempEdges.partitionBy(new HashPartitioner(numPartitions)).map(_._2) |
| 69 | + |
| 70 | + val ntbrGraph = GraphImpl(tbrGraph.vertices, newEdges, null.asInstanceOf[VD], storageLevel, storageLevel) |
| 71 | + ntbrGraph.persist(storageLevel) |
| 72 | + tbrGraph.unpersist(false) |
| 73 | + tbrGraph = ntbrGraph |
| 74 | + } // End for |
| 75 | + tbrGraph |
| 76 | + } |
| 77 | + |
| 78 | + def discreteSample(dist: Array[Int]): Int = { |
| 79 | + val s = dist.sum |
| 80 | + val u = math.random * s |
| 81 | + var ps = 0 |
| 82 | + for(p <- dist) { |
| 83 | + ps += p |
| 84 | + if(u < ps) return p |
| 85 | + } |
| 86 | + dist.length - 1 |
| 87 | + } |
| 88 | +} |
0 commit comments