// Databricks notebook source exported at Tue, 28 Jun 2016 08:45:47 UTC

Scalable Data Science

prepared by Raazesh Sainudiin and Sivanand Sivaram

supported by and

The html source url of this databricks notebook and its recorded Uji Image of Uji, Dogen's Time-Being:


This is an augmentation of http://go.databricks.com/hubfs/notebooks/3-GraphFrames-User-Guide-scala.html


//This allows easy embedding of publicly available information into any other notebook
//when viewing in git-book just ignore this block - you may have to manually chase the URL in frameIt("URL").
//Example usage:
// displayHTML(frameIt("https://en.wikipedia.org/wiki/Latent_Dirichlet_allocation#Topics_in_LDA",250))
def frameIt( u:String, h:Int ) : String = {
 src=""""+ u+""""
 width="95%" height="""" + h + """"
    <a href="http://spark.apache.org/docs/latest/index.html">
      Fallback link for browsers that, unlikely, don't support frames


#GraphFrames User Guide (Scala)

GraphFrames is a package for Apache Spark which provides DataFrame-based Graphs. It provides high-level APIs in Scala, Java, and Python. It aims to provide both the functionality of GraphX and extended functionality taking advantage of Spark DataFrames. This extended functionality includes motif finding, DataFrame-based serialization, and highly expressive graph queries.

The GraphFrames package is available from Spark Packages.

This notebook demonstrates examples from the GraphFrames User Guide.


import org.apache.spark.sql._
import org.apache.spark.sql.functions._

import org.graphframes._

##Creating GraphFrames

Let us try to create an example social network from the blog:


Users can create GraphFrames from vertex and edge DataFrames.

  • Vertex DataFrame: A vertex DataFrame should contain a special column named id which specifies unique IDs for each vertex in the graph.
  • Edge DataFrame: An edge DataFrame should contain two special columns: src (source vertex ID of edge) and dst (destination vertex ID of edge).

Both DataFrames can have arbitrary other columns. Those columns can represent vertex and edge attributes.

In our example, we can use a GraphFrame can store data or properties associated with each vertex and edge.

In our social network, each user might have an age and name, and each connection might have a relationship type.

Create the vertices and edges

// Vertex DataFrame
val v = sqlContext.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = sqlContext.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

Let’s create a graph from these vertices and these edges:

val g = GraphFrame(v, e)

Let’s use the d3.graphs to visualise graphs (recall the D3 graphs in wiki-click example).

package d3
// We use a package object so that we can define top level classes like Edge that need to be used in other cells

import org.apache.spark.sql._
import com.databricks.backend.daemon.driver.EnhancedRDDFunctions.displayHTML

case class Edge(src: String, dest: String, count: Long)

case class Node(name: String)
case class Link(source: Int, target: Int, value: Long)
case class Graph(nodes: Seq[Node], links: Seq[Link])

object graphs {
val sqlContext = SQLContext.getOrCreate(org.apache.spark.SparkContext.getOrCreate())  
import sqlContext.implicits._
def force(clicks: Dataset[Edge], height: Int = 100, width: Int = 960): Unit = {
  val data = clicks.collect()
  val nodes = (data.map(_.src) ++ data.map(_.dest)).map(_.replaceAll("_", " ")).toSet.toSeq.map(Node)
  val links = data.map { t =>
    Link(nodes.indexWhere(_.name == t.src.replaceAll("_", " ")), nodes.indexWhere(_.name == t.dest.replaceAll("_", " ")), t.count / 20 + 1)
  showGraph(height, width, Seq(Graph(nodes, links)).toDF().toJSON.first())

 * Displays a force directed graph using d3
 * input: {"nodes": [{"name": "..."}], "links": [{"source": 1, "target": 2, "value": 0}]}
def showGraph(height: Int, width: Int, graph: String): Unit = {

<!DOCTYPE html>
  <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
  <title>Polish Books Themes - an Interactive Map</title>
  <meta charset="utf-8">

.node_circle {
  stroke: #777;
  stroke-width: 1.3px;

.node_label {
  pointer-events: none;

.link {
  stroke: #777;
  stroke-opacity: .2;

.node_count {
  stroke: #777;
  stroke-width: 1.0px;
  fill: #999;

text.legend {
  font-family: Verdana;
  font-size: 13px;
  fill: #000;

.node text {
  font-family: "Helvetica Neue","Helvetica","Arial",sans-serif;
  font-size: 17px;
  font-weight: 200;


<script src="//d3js.org/d3.v3.min.js"></script>

var graph = $graph;

var width = $width,
    height = $height;

var color = d3.scale.category20();

var force = d3.layout.force()
    .size([width, height]);

var svg = d3.select("body").append("svg")
    .attr("width", width)
    .attr("height", height);

var link = svg.selectAll(".link")
    .attr("class", "link")
    .style("stroke-width", function(d) { return Math.sqrt(d.value); });

var node = svg.selectAll(".node")
    .attr("class", "node")

    .attr("r", 10)
    .style("fill", function (d) {
    if (d.name.startsWith("other")) { return color(1); } else { return color(2); };

      .attr("dx", 10)
      .attr("dy", ".35em")
      .text(function(d) { return d.name });
//Now we are giving the SVGs co-ordinates - the force layout is generating the co-ordinates which this code is using to update the attributes of the SVG elements
force.on("tick", function () {
    link.attr("x1", function (d) {
        return d.source.x;
        .attr("y1", function (d) {
        return d.source.y;
        .attr("x2", function (d) {
        return d.target.x;
        .attr("y2", function (d) {
        return d.target.y;
    d3.selectAll("circle").attr("cx", function (d) {
        return d.x;
        .attr("cy", function (d) {
        return d.y;
    d3.selectAll("text").attr("x", function (d) {
        return d.x;
        .attr("y", function (d) {
        return d.y;
  def help() = {
Produces a force-directed graph given a collection of edges of the following form:</br>
<tt><font color="#a71d5d">case class</font> <font color="#795da3">Edge</font>(<font color="#ed6a43">src</font>: <font color="#a71d5d">String</font>, <font color="#ed6a43">dest</font>: <font color="#a71d5d">String</font>, <font color="#ed6a43">count</font>: <font color="#a71d5d">Long</font>)</tt>
<tt><font color="#a71d5d">import</font> <font color="#ed6a43">d3._</font></tt><br/>
<tt><font color="#795da3">graphs.force</font>(</br>
&nbsp;&nbsp;<font color="#ed6a43">height</font> = <font color="#795da3">500</font>,<br/>
&nbsp;&nbsp;<font color="#ed6a43">width</font> = <font color="#795da3">500</font>,<br/>
&nbsp;&nbsp;<font color="#ed6a43">clicks</font>: <font color="#795da3">Dataset</font>[<font color="#795da3">Edge</font>])</tt>


import org.apache.spark.sql.functions.lit // import the lit function in sql
val gE= g.edges.select($"src", $"dst".as("dest"), lit(1L).as("count")) // for us the column count is just an edge incidence


  height = 500,
  width = 500,
  clicks = gE.as[d3.Edge])

// This example graph also comes with the GraphFrames package.
val g0 = examples.Graphs.friends

d3.graphs.force( // let us see g0 now in one cell
  height = 500,
  width = 500,
  clicks = g0.edges.select($"src", $"dst".as("dest"), lit(1L).as("count")).as[d3.Edge])

Basic graph and DataFrame queries

GraphFrames provide several simple graph queries, such as node degree.

Also, since GraphFrames represent graphs as pairs of vertex and edge DataFrames, it is easy to make powerful queries directly on the vertex and edge DataFrames. Those DataFrames are made available as vertices and edges fields in the GraphFrame.

Simple queries are simple

GraphFrames make it easy to express queries over graphs. Since GraphFrame vertices and edges are stored as DataFrames, many queries are just DataFrame (or SQL) queries.


display(g0.vertices) // this is the same query on the graph loaded as an example from GraphFrame package


The incoming degree of the vertices:


The outgoing degree of the vertices:


The degree of the vertices:


You can run queries directly on the vertices DataFrame. For example, we can find the age of the youngest person in the graph:

val youngest = g.vertices.groupBy().min("age")

Likewise, you can run queries on the edges DataFrame.

For example, let us count the number of ‘follow’ relationships in the graph:

val numFollows = g.edges.filter("relationship = 'follow'").count()

##Motif finding

More complex relationships involving edges and vertices can be built using motifs.

The following cell finds the pairs of vertices with edges in both directions between them.

The result is a dataframe, in which the column names are given by the motif keys.

Check out the GraphFrame User Guide for more details on the API.

// Search for pairs of vertices with edges in both directions between them, i.e., find undirected or bidirected edges.
val motifs = g.find("(a)-[e1]->(b); (b)-[e2]->(a)")

Since the result is a DataFrame, more complex queries can be built on top of the motif.

Let us find all the reciprocal relationships in which one person is older than 30:

val filtered = motifs.filter("b.age > 30")

You Try!

//Search for all "directed triangles" or triplets of vertices: a,b,c with edges: a->b, b->c and c->a
//uncomment the next 2 lines and replace the "..." below
val motifs3 = g.find("(a)-[e1]->(b); (b)-[e2]->(c); (c)-[e3]->(a) ")

Stateful queries

Many motif queries are stateless and simple to express, as in the examples above. The next examples demonstrate more complex queries which carry state along a path in the motif. These queries can be expressed by combining GraphFrame motif finding with filters on the result, where the filters use sequence operations to construct a series of DataFrame Columns.

For example, suppose one wishes to identify a chain of 4 vertices with some property defined by a sequence of functions. That is, among chains of 4 vertices a->b->c->d, identify the subset of chains matching this complex filter:

  • Initialize state on path.
  • Update state based on vertex a.
  • Update state based on vertex b.
  • Etc. for c and d.
  • If final state matches some condition, then the chain is accepted by the filter.

The below code snippets demonstrate this process, where we identify chains of 4 vertices such that at least 2 of the 3 edges are friend relationships. In this example, the state is the current count of friend edges; in general, it could be any DataFrame Column.

// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
  foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)


Subgraphs are built by filtering a subset of edges and vertices. For example, the following subgraph only contains people who are friends and who are more than 30 years old.

// Select subgraph of users older than 30, and edges of type "friend"
val v2 = g.vertices.filter("age > 30")
val e2 = g.edges.filter("relationship = 'friend'")
val g2 = GraphFrame(v2, e2)



d3.graphs.force( // let us see g2 now in one cell
  height = 500,
  width = 500,
  clicks = g2.edges.select($"src", $"dst".as("dest"), lit(1L).as("count")).as[d3.Edge])

Complex triplet filters

The following example shows how to select a subgraph based upon triplet filters which operate on:

  • an edge and
  • its src and
  • dst vertices.

This example could be extended to go beyond triplets by using more complex motifs.

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
  .filter("e.relationship = 'follow'")
  .filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
//  val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)



Standard graph algorithms

GraphFrames comes with a number of standard graph algorithms built in:

  • Breadth-first search (BFS)
  • Connected components
  • Strongly connected components
  • Label Propagation Algorithm (LPA)
  • PageRank
  • Shortest paths
  • Triangle count

Breadth-first search (BFS)


Search from “Esther” for users of age < 32.

val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()

The search may also be limited by edge filters and maximum path lengths.

val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
  .edgeFilter("relationship != 'friend'")

###Connected components

Compute the connected component membership of each vertex and return a graph with each vertex assigned a component ID.


val result = g.connectedComponents.run() // doesn't work on Spark 1.4

Fun Exercise: Try to modify the d3.graph function to allow a visualisation of a given Sequence of component ids in the above result.

##Strongly connected components

Compute the strongly connected component (SCC) of each vertex and return a graph with each vertex assigned to the SCC containing that vertex.


val result = g.stronglyConnectedComponents.maxIter(10).run()

##Label propagation

Run static Label Propagation Algorithm for detecting communities in networks.

Each node in the network is initially assigned to its own community. At every superstep, nodes send their community affiliation to all neighbors and update their state to the mode community affiliation of incoming messages.

LPA is a standard community detection algorithm for graphs. It is very inexpensive computationally, although

  • (1) convergence is not guaranteed and
  • (2) one can end up with trivial solutions (all nodes are identified into a single community).


val result = g.labelPropagation.maxIter(5).run()


Identify important vertices in a graph based on connections.


// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()


// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()

// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()

##Shortest paths

Computes shortest paths to the given set of landmark vertices, where landmarks are specified by vertex ID.


val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()

###Triangle count

Computes the number of triangles passing through each vertex.


val results = g.triangleCount.run()

There is a lot more… dig into the docs to find out about belief propogation algorithm now!


Scalable Data Science

prepared by Raazesh Sainudiin and Sivanand Sivaram

supported by and