// Set File Paths
val tripdelaysFilePath = "/databricks-datasets/flights/departuredelays.csv"
val airportsnaFilePath = "/databricks-datasets/flights/airport-codes-na.txt"
tripdelaysFilePath: String = /databricks-datasets/flights/departuredelays.csv
airportsnaFilePath: String = /databricks-datasets/flights/airport-codes-na.txt
// Obtain airports dataset
// Note that "spark-csv" package is built-in datasource in Spark 2.0
val airportsna = sqlContext.read.format("com.databricks.spark.csv").
option("header", "true").
option("inferschema", "true").
option("delimiter", "\t").
load(airportsnaFilePath)
airportsna.createOrReplaceTempView("airports_na")
// Obtain departure Delays data
val departureDelays = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load(tripdelaysFilePath)
departureDelays.createOrReplaceTempView("departureDelays")
departureDelays.cache()
// Available IATA (International Air Transport Association) codes from the departuredelays sample dataset
val tripIATA = sqlContext.sql("select distinct iata from (select distinct origin as iata from departureDelays union all select distinct destination as iata from departureDelays) a")
tripIATA.createOrReplaceTempView("tripIATA")
// Only include airports with atleast one trip from the departureDelays dataset
val airports = sqlContext.sql("select f.IATA, f.City, f.State, f.Country from airports_na f join tripIATA t on t.IATA = f.IATA")
airports.createOrReplaceTempView("airports")
airports.cache()
airportsna: org.apache.spark.sql.DataFrame = [City: string, State: string ... 2 more fields]
departureDelays: org.apache.spark.sql.DataFrame = [date: string, delay: string ... 3 more fields]
tripIATA: org.apache.spark.sql.DataFrame = [iata: string]
airports: org.apache.spark.sql.DataFrame = [IATA: string, City: string ... 2 more fields]
res0: airports.type = [IATA: string, City: string ... 2 more fields]
// Build `departureDelays_geo` DataFrame
// Obtain key attributes such as Date of flight, delays, distance, and airport information (Origin, Destination)
val departureDelays_geo = sqlContext.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ' '), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from departuredelays f join airports o on o.iata = f.origin join airports d on d.iata = f.destination")
// RegisterTempTable
departureDelays_geo.createOrReplaceTempView("departureDelays_geo")
// Cache and Count
departureDelays_geo.cache()
departureDelays_geo.count()
departureDelays_geo: org.apache.spark.sql.DataFrame = [tripid: int, localdate: timestamp ... 8 more fields]
res2: Long = 1361141
// Note, ensure you have already installed the GraphFrames spack-package
import org.apache.spark.sql.functions._
import org.graphframes._
// Create Vertices (airports) and Edges (flights)
val tripVertices = airports.withColumnRenamed("IATA", "id").distinct()
val tripEdges = departureDelays_geo.select("tripid", "delay", "src", "dst", "city_dst", "state_dst")
// Cache Vertices and Edges
tripEdges.cache()
tripVertices.cache()
import org.apache.spark.sql.functions._
import org.graphframes._
tripVertices: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, City: string ... 2 more fields]
tripEdges: org.apache.spark.sql.DataFrame = [tripid: int, delay: int ... 4 more fields]
res5: tripVertices.type = [id: string, City: string ... 2 more fields]
// Build `tripGraph` GraphFrame
// This GraphFrame builds up on the vertices and edges based on our trips (flights)
val tripGraph = GraphFrame(tripVertices, tripEdges)
println(tripGraph)
// Build `tripGraphPrime` GraphFrame
// This graphframe contains a smaller subset of data to make it easier to display motifs and subgraphs (below)
val tripEdgesPrime = departureDelays_geo.select("tripid", "delay", "src", "dst")
val tripGraphPrime = GraphFrame(tripVertices, tripEdgesPrime)
GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 4 more fields])
tripGraph: org.graphframes.GraphFrame = GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 4 more fields])
tripEdgesPrime: org.apache.spark.sql.DataFrame = [tripid: int, delay: int ... 2 more fields]
tripGraphPrime: org.graphframes.GraphFrame = GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 2 more fields])
ScaDaMaLe Course site and book