998_EX_01_GraphXShortestWeightedPaths(Scala)

Loading...

ScaDaMaLe Course site and book

Extending spark.graphx.lib.ShortestPaths to GraphXShortestWeightedPaths

2016-2020, Ivan Sadikov and Raazesh Sainudiin

We extend Shortest Paths algorithm in Spark's GraphX Library to allow for user-specified edge-weights as an edge attribute.

This is part of Project MEP: Meme Evolution Programme and supported by databricks academic partners program.

The analysis is available in the following databricks notebook:

Copyright 2016 Ivan Sadikov and Raazesh Sainudiin

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Let's modify shortest paths algorithm to allow for user-specified edge-weights

Update shortest paths algorithm to work over edge attribute of edge-weights as Double, key concepts are:

  • we increment map with delta, which is edge.attr
  • edge attribute is anything numeric, tested on Double
  • infinity value is not infinity, but Integer.MAX_VALUE

Modifying the following code:

Explained here:

import scala.reflect.ClassTag
import org.apache.spark.graphx._
 
/**
 * Computes shortest weighted paths to the given set of landmark vertices, returning a graph where each
 * vertex attribute is a map containing the shortest-path distance to each reachable landmark.
 * Currently supports only Graph of [VD, Double], where VD is an arbitrary vertex type.
 */
object GraphXShortestWeightedPaths extends Serializable {
  /** Stores a map from the vertex id of a landmark to the distance to that landmark. */
  type SPMap = Map[VertexId, Double]
  // initial and infinity values, use to relax edges
  private val INITIAL = 0.0
  private val INFINITY = Int.MaxValue.toDouble
 
  private def makeMap(x: (VertexId, Double)*) = Map(x: _*)
 
  private def incrementMap(spmap: SPMap, delta: Double): SPMap = {
    spmap.map { case (v, d) => v -> (d + delta) }
  }
 
  private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
    (spmap1.keySet ++ spmap2.keySet).map {
      k => k -> math.min(spmap1.getOrElse(k, INFINITY), spmap2.getOrElse(k, INFINITY))
    }.toMap
  }
  
  // at this point it does not really matter what vertex type is
  def run[VD](graph: Graph[VD, Double], landmarks: Seq[VertexId]): Graph[SPMap, Double] = {
    val spGraph = graph.mapVertices { (vid, attr) =>
      // initial value for itself is 0.0 as Double
      if (landmarks.contains(vid)) makeMap(vid -> INITIAL) else makeMap()
    }
 
    val initialMessage = makeMap()
 
    def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
      addMaps(attr, msg)
    }
 
    def sendMessage(edge: EdgeTriplet[SPMap, Double]): Iterator[(VertexId, SPMap)] = {
      val newAttr = incrementMap(edge.dstAttr, edge.attr)
      if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))
      else Iterator.empty
    }
 
    Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
  }
}
 
println("Usage: val result = GraphXShortestWeightedPaths.run(graph, Seq(4L, 0L, 9L))")
Usage: val result = GraphXShortestWeightedPaths.run(graph, Seq(4L, 0L, 9L)) import scala.reflect.ClassTag import org.apache.spark.graphx._ defined object GraphXShortestWeightedPaths

Generate test graph

Generate simple graph with double weights for edges

import scala.util.Random
 
import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators
 
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] = GraphGenerators.logNormalGraph(sc, numVertices = 10, seed=123L).mapEdges { e => 
  // to make things nicer we assign 0 distance to itself
  if (e.srcId == e.dstId) 0.0 else Random.nextDouble()
}
import scala.util.Random import org.apache.spark.graphx.{Graph, VertexId} import org.apache.spark.graphx.util.GraphGenerators graph: org.apache.spark.graphx.Graph[Long,Double] = org.apache.spark.graphx.impl.GraphImpl@45dcade2
val landMarkVertexIds = Seq(4L, 0L, 9L)
val result = GraphXShortestWeightedPaths.run(graph, landMarkVertexIds)
landMarkVertexIds: Seq[Long] = List(4, 0, 9) result: org.apache.spark.graphx.Graph[GraphXShortestWeightedPaths.SPMap,Double] = org.apache.spark.graphx.impl.GraphImpl@4bc2f186
// Found shortest paths
println(result.vertices.collect.mkString("\n"))
(0,Map(0 -> 0.0, 4 -> 0.9508628404888835, 9 -> 0.9591413283099168)) (8,Map(0 -> 0.2934032163544852, 4 -> 0.9801207594550188, 9 -> 0.8196019765599419)) (1,Map(0 -> 0.5802357924540097, 4 -> 1.2669533355545433, 9 -> 1.2752318233755766)) (9,Map(9 -> 0.0, 0 -> 0.5231864117917981, 4 -> 0.7980365238526225)) (2,Map(0 -> 0.2582682915929926, 4 -> 0.8339520209319522, 9 -> 0.5701009018719322)) (3,Map(0 -> 0.0923559237731384, 4 -> 0.18942157781466173, 9 -> 0.1977000656356951)) (4,Map(4 -> 0.0, 0 -> 0.25873464218225106, 9 -> 0.008278487821033353)) (5,Map(0 -> 0.031742669363998166, 4 -> 0.1288083234055215, 9 -> 0.13708681122655486)) (6,Map(0 -> 0.12059036542225454, 4 -> 0.8073079085227881, 9 -> 0.8155863963438215)) (7,Map(4 -> 0.22844606564222758, 9 -> 0.23672455346326093, 0 -> 0.33818821718284775))
// edges with weights, make sure to check couple of shortest paths from above
display(result.edges.toDF)
 
srcId
dstId
attr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
0
0
0
0
1
0.6721979874068958
0
4
0.9508628404888835
1
1
0
1
6
0.45964542703175515
1
6
0.7349975451261668
2
0
0.2582682915929926
2
1
0.9280493920155959
2
4
0.8339520209319522
2
6
0.26091386534774674
2
6
0.9481996224496678
2
9
0.5701009018719322
3
2
0.8893068591814861
3
3
0
3
5
0.06061325440914023
3
7
0.26415564719487117
3
8
0.5847037097670544

Showing all 55 rows.

display(graph.edges.toDF) // this is the directed weighted edge of the graph
 
srcId
dstId
attr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
0
0
0
0
1
0.2473870305881326
0
4
0.7593678671727498
1
1
0
1
6
0.4721793531500751
1
6
0.7565157046946671
2
0
0.8601376666144385
2
1
0.15474243507003416
2
4
0.17585136043466876
2
6
0.9762506825428003
2
6
0.8022953138023812
2
9
0.8898859140300339
3
2
0.23688722482284008
3
3
0
3
5
0.6621552329080445
3
7
0.6189436789554128
3
8
0.7648758038805598

Showing all 55 rows.

// now let us collect the shortest distance between every vertex and every landmark vertex
// to manipulate scala maps that are vertices of the result see: http://docs.scala-lang.org/overviews/collections/maps.html
// a quick point: http://stackoverflow.com/questions/28769367/scala-map-a-map-to-list-of-tuples
val shortestDistsVertex2Landmark = result.vertices.flatMap(GxSwpSPMap => {
  GxSwpSPMap._2.toSeq.map(x => (GxSwpSPMap._1, x._1, x._2)) // to get triples: vertex, landmarkVertex, shortest_distance
})
shortestDistsVertex2Landmark: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId, Double)] = MapPartitionsRDD[3128] at flatMap at command-2744653148555415:4
shortestDistsVertex2Landmark.collect.mkString("\n")
res4: String = (0,0,0.0) (0,4,0.9508628404888835) (0,9,0.9591413283099168) (8,0,0.2934032163544852) (8,4,0.9801207594550188) (8,9,0.8196019765599419) (1,0,0.5802357924540097) (1,4,1.2669533355545433) (1,9,1.2752318233755766) (9,9,0.0) (9,0,0.5231864117917981) (9,4,0.7980365238526225) (2,0,0.2582682915929926) (2,4,0.8339520209319522) (2,9,0.5701009018719322) (3,0,0.0923559237731384) (3,4,0.18942157781466173) (3,9,0.1977000656356951) (4,4,0.0) (4,0,0.25873464218225106) (4,9,0.008278487821033353) (5,0,0.031742669363998166) (5,4,0.1288083234055215) (5,9,0.13708681122655486) (6,0,0.12059036542225454) (6,4,0.8073079085227881) (6,9,0.8155863963438215) (7,4,0.22844606564222758) (7,9,0.23672455346326093) (7,0,0.33818821718284775)

Let's make a DataFrame for visualizing pairwise matrix plots

We want to make 4 columns in this example as follows (note actual values change for each realisation of graph!):

landmark_Id1 ("0"),   landmarkID2 ("4"), landmarkId3 ("9"),  srcVertexId
------------------------------------------------------------------------
0.0,                  0.7425..,          0.8718,                0
0.924...,             1.2464..,          1.0472,                1
...
// http://alvinalexander.com/scala/how-to-sort-map-in-scala-key-value-sortby-sortwith
// we need this to make sure that the maps are ordered by the keys for ensuring unique column values
import scala.collection.immutable.ListMap
import sqlContext.implicits._
import scala.collection.immutable.ListMap import sqlContext.implicits._
 // recall our landmark vertices in landMarkVertexIds. let's use their Strings for names
val unorderedNamedLandmarkVertices = landMarkVertexIds.map(id => (id, id.toString) )
val orderedNamedLandmarkVertices = ListMap(unorderedNamedLandmarkVertices.sortBy(_._1):_*)
val orderedLandmarkVertexNames = orderedNamedLandmarkVertices.toSeq.map(x => x._2)
orderedLandmarkVertexNames.mkString(", ")
unorderedNamedLandmarkVertices: Seq[(Long, String)] = List((4,4), (0,0), (9,9)) orderedNamedLandmarkVertices: scala.collection.immutable.ListMap[Long,String] = ListMap(0 -> 0, 4 -> 4, 9 -> 9) orderedLandmarkVertexNames: Seq[String] = Vector(0, 4, 9) res5: String = 0, 4, 9