ScaDaMaLe Course site and book

Extending spark.graphx.lib.ShortestPaths to GraphXShortestWeightedPaths

2016-2020, Ivan Sadikov and Raazesh Sainudiin

We extend Shortest Paths algorithm in Spark's GraphX Library to allow for user-specified edge-weights as an edge attribute.

This is part of Project MEP: Meme Evolution Programme and supported by databricks academic partners program.

The analysis is available in the following databricks notebook: * http://lamastex.org/lmse/mep/src/GraphXShortestWeightedPaths.html

Copyright 2016 Ivan Sadikov and Raazesh Sainudiin

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Let's modify shortest paths algorithm to allow for user-specified edge-weights

Update shortest paths algorithm to work over edge attribute of edge-weights as Double, key concepts are: - we increment map with delta, which is edge.attr - edge attribute is anything numeric, tested on Double - infinity value is not infinity, but Integer.MAX_VALUE

Modifying the following code: * https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala

Explained here: * http://note.yuhc.me/2015/03/graphx-pregel-shortest-path/

import scala.reflect.ClassTag
import org.apache.spark.graphx._

/**
 * Computes shortest weighted paths to the given set of landmark vertices, returning a graph where each
 * vertex attribute is a map containing the shortest-path distance to each reachable landmark.
 * Currently supports only Graph of [VD, Double], where VD is an arbitrary vertex type.
 */
object GraphXShortestWeightedPaths extends Serializable {
  /** Stores a map from the vertex id of a landmark to the distance to that landmark. */
  type SPMap = Map[VertexId, Double]
  // initial and infinity values, use to relax edges
  private val INITIAL = 0.0
  private val INFINITY = Int.MaxValue.toDouble

  private def makeMap(x: (VertexId, Double)*) = Map(x: _*)

  private def incrementMap(spmap: SPMap, delta: Double): SPMap = {
    spmap.map { case (v, d) => v -> (d + delta) }
  }

  private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
    (spmap1.keySet ++ spmap2.keySet).map {
      k => k -> math.min(spmap1.getOrElse(k, INFINITY), spmap2.getOrElse(k, INFINITY))
    }.toMap
  }
  
  // at this point it does not really matter what vertex type is
  def run[VD](graph: Graph[VD, Double], landmarks: Seq[VertexId]): Graph[SPMap, Double] = {
    val spGraph = graph.mapVertices { (vid, attr) =>
      // initial value for itself is 0.0 as Double
      if (landmarks.contains(vid)) makeMap(vid -> INITIAL) else makeMap()
    }

    val initialMessage = makeMap()

    def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
      addMaps(attr, msg)
    }

    def sendMessage(edge: EdgeTriplet[SPMap, Double]): Iterator[(VertexId, SPMap)] = {
      val newAttr = incrementMap(edge.dstAttr, edge.attr)
      if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))
      else Iterator.empty
    }

    Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
  }
}

println("Usage: val result = GraphXShortestWeightedPaths.run(graph, Seq(4L, 0L, 9L))")
Usage: val result = GraphXShortestWeightedPaths.run(graph, Seq(4L, 0L, 9L))
import scala.reflect.ClassTag
import org.apache.spark.graphx._
defined object GraphXShortestWeightedPaths

Generate test graph

Generate simple graph with double weights for edges

import scala.util.Random

import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators

// A graph with edge attributes containing distances
val graph: Graph[Long, Double] = GraphGenerators.logNormalGraph(sc, numVertices = 10, seed=123L).mapEdges { e => 
  // to make things nicer we assign 0 distance to itself
  if (e.srcId == e.dstId) 0.0 else Random.nextDouble()
}
import scala.util.Random
import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators
graph: org.apache.spark.graphx.Graph[Long,Double] = org.apache.spark.graphx.impl.GraphImpl@6a98b503
val landMarkVertexIds = Seq(4L, 0L, 9L)
val result = GraphXShortestWeightedPaths.run(graph, landMarkVertexIds)
landMarkVertexIds: Seq[Long] = List(4, 0, 9)
result: org.apache.spark.graphx.Graph[GraphXShortestWeightedPaths.SPMap,Double] = org.apache.spark.graphx.impl.GraphImpl@763902d8
// Found shortest paths
println(result.vertices.collect.mkString("\n"))
(4,Map(4 -> 0.0, 0 -> 0.4978771374144447, 9 -> 0.0039659650390762025))
(0,Map(0 -> 0.0, 4 -> 0.6026164718020462, 9 -> 0.6065824368411225))
(6,Map(0 -> 0.18329441371794564, 4 -> 0.7501236631976119, 9 -> 0.5876411577224736))
(8,Map(0 -> 1.0175525403647847, 4 -> 0.8260424329285025, 9 -> 0.6635599274533642))
(2,Map(0 -> 0.27424380755189526, 4 -> 0.43261521868768604, 9 -> 0.43658118372676225))
(1,Map(0 -> 0.8561112576964152, 4 -> 1.4229405071760814, 9 -> 1.2604580017009432))
(3,Map(0 -> 0.45813682659496957, 4 -> 0.2666267191586873, 9 -> 0.10414421368354898))
(7,Map(4 -> 0.2073486603716349, 9 -> 0.04486615489649659, 0 -> 0.42470420866790193))
(9,Map(9 -> 0.0, 0 -> 0.5045977394480895, 4 -> 0.3302516601623805))
(5,Map(0 -> 0.7760991789332677, 4 -> 0.278222041518823, 9 -> 0.2821880065578992))
// edges with weights, make sure to check couple of shortest paths from above
result.edges.toDF.show
+-----+-----+-------------------+
|srcId|dstId|               attr|
+-----+-----+-------------------+
|    0|    0|                0.0|
|    0|    1| 0.3685042379337995|
|    0|    4| 0.6026164718020462|
|    1|    1|                0.0|
|    1|    6| 0.6728168439784695|
|    1|    6| 0.7852936724902796|
|    2|    0|0.27424380755189526|
|    2|    1| 0.7870284262414232|
|    2|    4|0.43261521868768604|
|    2|    6| 0.3595658109787053|
|    2|    6|0.27647602206577304|
|    2|    9|  0.841095651455322|
|    3|    2| 0.1838930190430743|
|    3|    3|                0.0|
|    3|    5|0.29401237129990443|
|    3|    7|0.05927805878705239|
|    3|    8| 0.6194270974815277|
|    4|    0| 0.4978771374144447|
|    4|    3|  0.895324471945953|
|    4|    5| 0.3082168662674215|
+-----+-----+-------------------+
only showing top 20 rows
graph.edges.toDF.show // this is the directed weighted edge of the graph
+-----+-----+--------------------+
|srcId|dstId|                attr|
+-----+-----+--------------------+
|    0|    0|                 0.0|
|    0|    1| 0.18179342573305257|
|    0|    4| 0.10499884753471289|
|    1|    1|                 0.0|
|    1|    6| 0.06902878615829067|
|    1|    6| 0.31916888663632115|
|    2|    0|   0.994117390558936|
|    2|    1|  0.3483496903814788|
|    2|    4|  0.6637120585508148|
|    2|    6|  0.7903591616520668|
|    2|    6| 0.15547330775766222|
|    2|    9| 0.20548813724247617|
|    3|    2| 0.30236814876856666|
|    3|    3|                 0.0|
|    3|    5|  0.8097988524182909|
|    3|    7|  0.7296892168314497|
|    3|    8|  0.7622160701710208|
|    4|    0| 0.19303017905390474|
|    4|    3| 0.09317663461190051|
|    4|    5|0.013527517923307197|
+-----+-----+--------------------+
only showing top 20 rows
// now let us collect the shortest distance between every vertex and every landmark vertex
// to manipulate scala maps that are vertices of the result see: http://docs.scala-lang.org/overviews/collections/maps.html
// a quick point: http://stackoverflow.com/questions/28769367/scala-map-a-map-to-list-of-tuples
val shortestDistsVertex2Landmark = result.vertices.flatMap(GxSwpSPMap => {
  GxSwpSPMap._2.toSeq.map(x => (GxSwpSPMap._1, x._1, x._2)) // to get triples: vertex, landmarkVertex, shortest_distance
})
shortestDistsVertex2Landmark: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId, Double)] = MapPartitionsRDD[112] at flatMap at command-2971213210276591:4
shortestDistsVertex2Landmark.collect.mkString("\n")
res6: String =
(4,4,0.0)
(4,0,0.4978771374144447)
(4,9,0.0039659650390762025)
(0,0,0.0)
(0,4,0.6026164718020462)
(0,9,0.6065824368411225)
(6,0,0.18329441371794564)
(6,4,0.7501236631976119)
(6,9,0.5876411577224736)
(8,0,1.0175525403647847)
(8,4,0.8260424329285025)
(8,9,0.6635599274533642)
(2,0,0.27424380755189526)
(2,4,0.43261521868768604)
(2,9,0.43658118372676225)
(1,0,0.8561112576964152)
(1,4,1.4229405071760814)
(1,9,1.2604580017009432)
(3,0,0.45813682659496957)
(3,4,0.2666267191586873)
(3,9,0.10414421368354898)
(7,4,0.2073486603716349)
(7,9,0.04486615489649659)
(7,0,0.42470420866790193)
(9,9,0.0)
(9,0,0.5045977394480895)
(9,4,0.3302516601623805)
(5,0,0.7760991789332677)
(5,4,0.278222041518823)
(5,9,0.2821880065578992)

Let's make a DataFrame for visualizing pairwise matrix plots

We want to make 4 columns in this example as follows (note actual values change for each realisation of graph!):

landmark_Id1 ("0"),   landmarkID2 ("4"), landmarkId3 ("9"),  srcVertexId
------------------------------------------------------------------------
0.0,                  0.7425..,          0.8718,                0
0.924...,             1.2464..,          1.0472,                1
...
// http://alvinalexander.com/scala/how-to-sort-map-in-scala-key-value-sortby-sortwith
// we need this to make sure that the maps are ordered by the keys for ensuring unique column values
import scala.collection.immutable.ListMap
import sqlContext.implicits._
import scala.collection.immutable.ListMap
import sqlContext.implicits._
 // recall our landmark vertices in landMarkVertexIds. let's use their Strings for names
val unorderedNamedLandmarkVertices = landMarkVertexIds.map(id => (id, id.toString) )
val orderedNamedLandmarkVertices = ListMap(unorderedNamedLandmarkVertices.sortBy(_._1):_*)
val orderedLandmarkVertexNames = orderedNamedLandmarkVertices.toSeq.map(x => x._2)
orderedLandmarkVertexNames.mkString(", ")
unorderedNamedLandmarkVertices: Seq[(Long, String)] = List((4,4), (0,0), (9,9))
orderedNamedLandmarkVertices: scala.collection.immutable.ListMap[Long,String] = ListMap(0 -> 0, 4 -> 4, 9 -> 9)
orderedLandmarkVertexNames: Seq[String] = Vector(0, 4, 9)
res7: String = 0, 4, 9
// this is going to be our column names
val columnNames:Seq[String] = orderedLandmarkVertexNames :+ "srcVertexId"
columnNames: Seq[String] = Vector(0, 4, 9, srcVertexId)
// a case class to make a data-frame quickly from the result
case class SeqOfDoublesAndsrcVertexId(shortestDistances: Seq[Double], srcVertexId: VertexId)
defined class SeqOfDoublesAndsrcVertexId
val shortestDistsSeqFromVertex2Landmark2DF = result.vertices.map(GxSwpSPMap => {
  //GxSwpSPMap._2.toSeq.map(x => (GxSwpSPMap._1, x._1, x._2)) // from before to get triples: vertex, landmarkVertex, shortest_distance
  val v = GxSwpSPMap._1
  val a = ListMap(GxSwpSPMap._2.toSeq.sortBy(_._1):_*).toSeq.map(x => x._2)
  val d = (a,v)
  d
}).map(x => SeqOfDoublesAndsrcVertexId(x._1, x._2)).toDF()
shortestDistsSeqFromVertex2Landmark2DF: org.apache.spark.sql.DataFrame = [shortestDistances: array<double>, srcVertexId: bigint]
shortestDistsSeqFromVertex2Landmark2DF.show // but this dataframe needs the first column exploded into 3 columns
+--------------------+-----------+
|   shortestDistances|srcVertexId|
+--------------------+-----------+
|[0.49787713741444...|          4|
|[0.0, 0.602616471...|          0|
|[0.18329441371794...|          6|
|[1.01755254036478...|          8|
|[0.27424380755189...|          2|
|[0.85611125769641...|          1|
|[0.45813682659496...|          3|
|[0.42470420866790...|          7|
|[0.50459773944808...|          9|
|[0.77609917893326...|          5|
+--------------------+-----------+

Now we want to make separate columns for each distance in the Sequence in column 'shortestDistances'.

Let us use the following ideas for this: * https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3741049972324885/2662535171379268/4413065072037724/latest.html

// this is from https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3741049972324885/2662535171379268/4413065072037724/latest.html
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions.{lit, udf}

// UDF to extract i-th element from array column
//val elem = udf((x: Seq[Int], y: Int) => x(y))
val elem = udf((x: Seq[Double], y: Int) => x(y)) // modified for Sequence of Doubles

// Method to apply 'elem' UDF on each element, requires knowing length of sequence in advance
def split(col: Column, len: Int): Seq[Column] = {
  for (i <- 0 until len) yield { elem(col, lit(i)).as(s"$col($i)") }
}

// Implicit conversion to make things nicer to use, e.g. 
// select(Column, Seq[Column], Column) is converted into select(Column*) flattening sequences
implicit class DataFrameSupport(df: DataFrame) {
  def select(cols: Any*): DataFrame = {
    var buffer: Seq[Column] = Seq.empty
    for (col <- cols) {
      if (col.isInstanceOf[Seq[_]]) {
        buffer = buffer ++ col.asInstanceOf[Seq[Column]]
      } else {
        buffer = buffer :+ col.asInstanceOf[Column]
      }
    }
    df.select(buffer:_*)
  }
}
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions.{lit, udf}
elem: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$6726/1752149086@9f2dc3d,DoubleType,List(Some(class[value[0]: array<double>]), Some(class[value[0]: int])),Some(class[value[0]: double]),None,false,true)
split: (col: org.apache.spark.sql.Column, len: Int)Seq[org.apache.spark.sql.Column]
defined class DataFrameSupport
val shortestDistsFromVertex2Landmark2DF = shortestDistsSeqFromVertex2Landmark2DF.select(split($"shortestDistances", 3), $"srcVertexId")
shortestDistsFromVertex2Landmark2DF: org.apache.spark.sql.DataFrame = [shortestDistances(0): double, shortestDistances(1): double ... 2 more fields]
shortestDistsFromVertex2Landmark2DF.show
+-------------------+-------------------+--------------------+-----------+
|                  0|                  4|                   9|srcVertexId|
+-------------------+-------------------+--------------------+-----------+
| 0.4978771374144447|                0.0|0.003965965039076...|          4|
|                0.0| 0.6026164718020462|  0.6065824368411225|          0|
|0.18329441371794564| 0.7501236631976119|  0.5876411577224736|          6|
| 1.0175525403647847| 0.8260424329285025|  0.6635599274533642|          8|
|0.27424380755189526|0.43261521868768604| 0.43658118372676225|          2|
| 0.8561112576964152| 1.4229405071760814|  1.2604580017009432|          1|
|0.45813682659496957| 0.2666267191586873| 0.10414421368354898|          3|
|0.42470420866790193| 0.2073486603716349| 0.04486615489649659|          7|
| 0.5045977394480895| 0.3302516601623805|                 0.0|          9|
| 0.7760991789332677|  0.278222041518823|  0.2821880065578992|          5|
+-------------------+-------------------+--------------------+-----------+
// now let's give it our names based on the landmark vertex Ids
val shortestDistsFromVertex2Landmark2DF = shortestDistsSeqFromVertex2Landmark2DF.select(split($"shortestDistances", 3), $"srcVertexId").toDF(columnNames:_*)
shortestDistsFromVertex2Landmark2DF: org.apache.spark.sql.DataFrame = [0: double, 4: double ... 2 more fields]
shortestDistsFromVertex2Landmark2DF.show
+-------------------+-------------------+--------------------+-----------+
|                  0|                  4|                   9|srcVertexId|
+-------------------+-------------------+--------------------+-----------+
| 0.4978771374144447|                0.0|0.003965965039076...|          4|
|                0.0| 0.6026164718020462|  0.6065824368411225|          0|
|0.18329441371794564| 0.7501236631976119|  0.5876411577224736|          6|
| 1.0175525403647847| 0.8260424329285025|  0.6635599274533642|          8|
|0.27424380755189526|0.43261521868768604| 0.43658118372676225|          2|
| 0.8561112576964152| 1.4229405071760814|  1.2604580017009432|          1|
|0.45813682659496957| 0.2666267191586873| 0.10414421368354898|          3|
|0.42470420866790193| 0.2073486603716349| 0.04486615489649659|          7|
| 0.5045977394480895| 0.3302516601623805|                 0.0|          9|
| 0.7760991789332677|  0.278222041518823|  0.2821880065578992|          5|
+-------------------+-------------------+--------------------+-----------+
display(shortestDistsFromVertex2Landmark2DF.select($"0",$"4",$"9"))
0 4 9
0.4978771374144447 0.0 3.9659650390762025e-3
0.0 0.6026164718020462 0.6065824368411225
0.18329441371794564 0.7501236631976119 0.5876411577224736
1.0175525403647847 0.8260424329285025 0.6635599274533642
0.27424380755189526 0.43261521868768604 0.43658118372676225
0.8561112576964152 1.4229405071760814 1.2604580017009432
0.45813682659496957 0.2666267191586873 0.10414421368354898
0.42470420866790193 0.2073486603716349 4.486615489649659e-2
0.5045977394480895 0.3302516601623805 0.0
0.7760991789332677 0.278222041518823 0.2821880065578992