import scala.reflect.ClassTag
import org.apache.spark.graphx._
/**
* Computes shortest weighted paths to the given set of landmark vertices, returning a graph where each
* vertex attribute is a map containing the shortest-path distance to each reachable landmark.
* Currently supports only Graph of [VD, Double], where VD is an arbitrary vertex type.
*/
object GraphXShortestWeightedPaths extends Serializable {
/** Stores a map from the vertex id of a landmark to the distance to that landmark. */
type SPMap = Map[VertexId, Double]
// initial and infinity values, use to relax edges
private val INITIAL = 0.0
private val INFINITY = Int.MaxValue.toDouble
private def makeMap(x: (VertexId, Double)*) = Map(x: _*)
private def incrementMap(spmap: SPMap, delta: Double): SPMap = {
spmap.map { case (v, d) => v -> (d + delta) }
}
private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
(spmap1.keySet ++ spmap2.keySet).map {
k => k -> math.min(spmap1.getOrElse(k, INFINITY), spmap2.getOrElse(k, INFINITY))
}.toMap
}
// at this point it does not really matter what vertex type is
def run[VD](graph: Graph[VD, Double], landmarks: Seq[VertexId]): Graph[SPMap, Double] = {
val spGraph = graph.mapVertices { (vid, attr) =>
// initial value for itself is 0.0 as Double
if (landmarks.contains(vid)) makeMap(vid -> INITIAL) else makeMap()
}
val initialMessage = makeMap()
def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
addMaps(attr, msg)
}
def sendMessage(edge: EdgeTriplet[SPMap, Double]): Iterator[(VertexId, SPMap)] = {
val newAttr = incrementMap(edge.dstAttr, edge.attr)
if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))
else Iterator.empty
}
Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
}
}
println("Usage: val result = GraphXShortestWeightedPaths.run(graph, Seq(4L, 0L, 9L))")
Usage: val result = GraphXShortestWeightedPaths.run(graph, Seq(4L, 0L, 9L))
import scala.reflect.ClassTag
import org.apache.spark.graphx._
defined object GraphXShortestWeightedPaths
import scala.util.Random
import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] = GraphGenerators.logNormalGraph(sc, numVertices = 10, seed=123L).mapEdges { e =>
// to make things nicer we assign 0 distance to itself
if (e.srcId == e.dstId) 0.0 else Random.nextDouble()
}
import scala.util.Random
import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators
graph: org.apache.spark.graphx.Graph[Long,Double] = org.apache.spark.graphx.impl.GraphImpl@45dcade2
// Found shortest paths
println(result.vertices.collect.mkString("\n"))
(0,Map(0 -> 0.0, 4 -> 0.9508628404888835, 9 -> 0.9591413283099168))
(8,Map(0 -> 0.2934032163544852, 4 -> 0.9801207594550188, 9 -> 0.8196019765599419))
(1,Map(0 -> 0.5802357924540097, 4 -> 1.2669533355545433, 9 -> 1.2752318233755766))
(9,Map(9 -> 0.0, 0 -> 0.5231864117917981, 4 -> 0.7980365238526225))
(2,Map(0 -> 0.2582682915929926, 4 -> 0.8339520209319522, 9 -> 0.5701009018719322))
(3,Map(0 -> 0.0923559237731384, 4 -> 0.18942157781466173, 9 -> 0.1977000656356951))
(4,Map(4 -> 0.0, 0 -> 0.25873464218225106, 9 -> 0.008278487821033353))
(5,Map(0 -> 0.031742669363998166, 4 -> 0.1288083234055215, 9 -> 0.13708681122655486))
(6,Map(0 -> 0.12059036542225454, 4 -> 0.8073079085227881, 9 -> 0.8155863963438215))
(7,Map(4 -> 0.22844606564222758, 9 -> 0.23672455346326093, 0 -> 0.33818821718284775))
// now let us collect the shortest distance between every vertex and every landmark vertex
// to manipulate scala maps that are vertices of the result see: http://docs.scala-lang.org/overviews/collections/maps.html
// a quick point: http://stackoverflow.com/questions/28769367/scala-map-a-map-to-list-of-tuples
val shortestDistsVertex2Landmark = result.vertices.flatMap(GxSwpSPMap => {
GxSwpSPMap._2.toSeq.map(x => (GxSwpSPMap._1, x._1, x._2)) // to get triples: vertex, landmarkVertex, shortest_distance
})
shortestDistsVertex2Landmark: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId, Double)] = MapPartitionsRDD[3128] at flatMap at command-2744653148555415:4
shortestDistsVertex2Landmark.collect.mkString("\n")
res4: String =
(0,0,0.0)
(0,4,0.9508628404888835)
(0,9,0.9591413283099168)
(8,0,0.2934032163544852)
(8,4,0.9801207594550188)
(8,9,0.8196019765599419)
(1,0,0.5802357924540097)
(1,4,1.2669533355545433)
(1,9,1.2752318233755766)
(9,9,0.0)
(9,0,0.5231864117917981)
(9,4,0.7980365238526225)
(2,0,0.2582682915929926)
(2,4,0.8339520209319522)
(2,9,0.5701009018719322)
(3,0,0.0923559237731384)
(3,4,0.18942157781466173)
(3,9,0.1977000656356951)
(4,4,0.0)
(4,0,0.25873464218225106)
(4,9,0.008278487821033353)
(5,0,0.031742669363998166)
(5,4,0.1288083234055215)
(5,9,0.13708681122655486)
(6,0,0.12059036542225454)
(6,4,0.8073079085227881)
(6,9,0.8155863963438215)
(7,4,0.22844606564222758)
(7,9,0.23672455346326093)
(7,0,0.33818821718284775)
// http://alvinalexander.com/scala/how-to-sort-map-in-scala-key-value-sortby-sortwith
// we need this to make sure that the maps are ordered by the keys for ensuring unique column values
import scala.collection.immutable.ListMap
import sqlContext.implicits._
import scala.collection.immutable.ListMap
import sqlContext.implicits._
// recall our landmark vertices in landMarkVertexIds. let's use their Strings for names
val unorderedNamedLandmarkVertices = landMarkVertexIds.map(id => (id, id.toString) )
val orderedNamedLandmarkVertices = ListMap(unorderedNamedLandmarkVertices.sortBy(_._1):_*)
val orderedLandmarkVertexNames = orderedNamedLandmarkVertices.toSeq.map(x => x._2)
orderedLandmarkVertexNames.mkString(", ")
unorderedNamedLandmarkVertices: Seq[(Long, String)] = List((4,4), (0,0), (9,9))
orderedNamedLandmarkVertices: scala.collection.immutable.ListMap[Long,String] = ListMap(0 -> 0, 4 -> 4, 9 -> 9)
orderedLandmarkVertexNames: Seq[String] = Vector(0, 4, 9)
res5: String = 0, 4, 9
ScaDaMaLe Course site and book