// Set File Paths val tripdelaysFilePath = "/databricks-datasets/flights/departuredelays.csv" val airportsnaFilePath = "/databricks-datasets/flights/airport-codes-na.txt"
tripdelaysFilePath: String = /databricks-datasets/flights/departuredelays.csv
airportsnaFilePath: String = /databricks-datasets/flights/airport-codes-na.txt
// Obtain airports dataset // Note that "spark-csv" package is built-in datasource in Spark 2.0 val airportsna = sqlContext.read.format("com.databricks.spark.csv"). option("header", "true"). option("inferschema", "true"). option("delimiter", "\t"). load(airportsnaFilePath) airportsna.createOrReplaceTempView("airports_na") // Obtain departure Delays data val departureDelays = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load(tripdelaysFilePath) departureDelays.createOrReplaceTempView("departureDelays") departureDelays.cache() // Available IATA (International Air Transport Association) codes from the departuredelays sample dataset val tripIATA = sqlContext.sql("select distinct iata from (select distinct origin as iata from departureDelays union all select distinct destination as iata from departureDelays) a") tripIATA.createOrReplaceTempView("tripIATA") // Only include airports with atleast one trip from the departureDelays dataset val airports = sqlContext.sql("select f.IATA, f.City, f.State, f.Country from airports_na f join tripIATA t on t.IATA = f.IATA") airports.createOrReplaceTempView("airports") airports.cache()
airportsna: org.apache.spark.sql.DataFrame = [City: string, State: string ... 2 more fields]
departureDelays: org.apache.spark.sql.DataFrame = [date: string, delay: string ... 3 more fields]
tripIATA: org.apache.spark.sql.DataFrame = [iata: string]
airports: org.apache.spark.sql.DataFrame = [IATA: string, City: string ... 2 more fields]
res0: airports.type = [IATA: string, City: string ... 2 more fields]
// Build `departureDelays_geo` DataFrame // Obtain key attributes such as Date of flight, delays, distance, and airport information (Origin, Destination) val departureDelays_geo = sqlContext.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ' '), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from departuredelays f join airports o on o.iata = f.origin join airports d on d.iata = f.destination") // RegisterTempTable departureDelays_geo.createOrReplaceTempView("departureDelays_geo") // Cache and Count departureDelays_geo.cache() departureDelays_geo.count()
departureDelays_geo: org.apache.spark.sql.DataFrame = [tripid: int, localdate: timestamp ... 8 more fields]
res3: Long = 1361141
// Note, ensure you have already installed the GraphFrames spack-package import org.apache.spark.sql.functions._ import org.graphframes._ // Create Vertices (airports) and Edges (flights) val tripVertices = airports.withColumnRenamed("IATA", "id").distinct() val tripEdges = departureDelays_geo.select("tripid", "delay", "src", "dst", "city_dst", "state_dst") // Cache Vertices and Edges tripEdges.cache() tripVertices.cache()
import org.apache.spark.sql.functions._
import org.graphframes._
tripVertices: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, City: string ... 2 more fields]
tripEdges: org.apache.spark.sql.DataFrame = [tripid: int, delay: int ... 4 more fields]
res5: tripVertices.type = [id: string, City: string ... 2 more fields]
// Build `tripGraph` GraphFrame // This GraphFrame builds up on the vertices and edges based on our trips (flights) val tripGraph = GraphFrame(tripVertices, tripEdges) println(tripGraph) // Build `tripGraphPrime` GraphFrame // This graphframe contains a smaller subset of data to make it easier to display motifs and subgraphs (below) val tripEdgesPrime = departureDelays_geo.select("tripid", "delay", "src", "dst") val tripGraphPrime = GraphFrame(tripVertices, tripEdgesPrime)
GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 4 more fields])
tripGraph: org.graphframes.GraphFrame = GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 4 more fields])
tripEdgesPrime: org.apache.spark.sql.DataFrame = [tripid: int, delay: int ... 2 more fields]
tripGraphPrime: org.graphframes.GraphFrame = GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 2 more fields])
println(s"Airports: ${tripGraph.vertices.count()}") println(s"Trips: ${tripGraph.edges.count()}")
Airports: 279
Trips: 1361141
// Finding the longest Delay val longestDelay = tripGraph.edges.groupBy().max("delay") display(longestDelay)
// Determining number of on-time / early flights vs. delayed flights println(s"On-time / Early Flights: ${tripGraph.edges.filter("delay <= 0").count()}") println(s"Delayed Flights: ${tripGraph.edges.filter("delay > 0").count()}")
On-time / Early Flights: 780469
Delayed Flights: 580672
SDS-2.2, Scalable Data Science
Last refresh: Never