023_OnTimeFlightPerformance(Scala)

Loading...

ScaDaMaLe Course site and book

This is a scala version of the python notebook in the following talk:

Homework:

See https://www.brighttalk.com/webcast/12891/199003 (you need to subscribe freely to Bright Talk first). Then go through this scala version of the notebook from the talk.

On-Time Flight Performance with GraphFrames for Apache Spark

This notebook provides an analysis of On-Time Flight Performance and Departure Delays data using GraphFrames for Apache Spark.

Source Data:

References:

Preparation

Extract the Airports and Departure Delays information from S3 / DBFS

// Set File Paths
val tripdelaysFilePath = "/databricks-datasets/flights/departuredelays.csv"
val airportsnaFilePath = "/databricks-datasets/flights/airport-codes-na.txt"
tripdelaysFilePath: String = /databricks-datasets/flights/departuredelays.csv airportsnaFilePath: String = /databricks-datasets/flights/airport-codes-na.txt
// Obtain airports dataset
// Note that "spark-csv" package is built-in datasource in Spark 2.0
val airportsna = sqlContext.read.format("com.databricks.spark.csv").
  option("header", "true").
  option("inferschema", "true").
  option("delimiter", "\t").
  load(airportsnaFilePath)
 
airportsna.createOrReplaceTempView("airports_na")
 
// Obtain departure Delays data
val departureDelays = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load(tripdelaysFilePath)
departureDelays.createOrReplaceTempView("departureDelays")
departureDelays.cache()
 
// Available IATA (International Air Transport Association) codes from the departuredelays sample dataset
val tripIATA = sqlContext.sql("select distinct iata from (select distinct origin as iata from departureDelays union all select distinct destination as iata from departureDelays) a")
tripIATA.createOrReplaceTempView("tripIATA")
 
// Only include airports with atleast one trip from the departureDelays dataset
val airports = sqlContext.sql("select f.IATA, f.City, f.State, f.Country from airports_na f join tripIATA t on t.IATA = f.IATA")
airports.createOrReplaceTempView("airports")
airports.cache()
airportsna: org.apache.spark.sql.DataFrame = [City: string, State: string ... 2 more fields] departureDelays: org.apache.spark.sql.DataFrame = [date: string, delay: string ... 3 more fields] tripIATA: org.apache.spark.sql.DataFrame = [iata: string] airports: org.apache.spark.sql.DataFrame = [IATA: string, City: string ... 2 more fields] res0: airports.type = [IATA: string, City: string ... 2 more fields]
// Build `departureDelays_geo` DataFrame
// Obtain key attributes such as Date of flight, delays, distance, and airport information (Origin, Destination)  
val departureDelays_geo = sqlContext.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ' '), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from departuredelays f join airports o on o.iata = f.origin join airports d on d.iata = f.destination") 
 
// RegisterTempTable
departureDelays_geo.createOrReplaceTempView("departureDelays_geo")
 
// Cache and Count
departureDelays_geo.cache()
departureDelays_geo.count()
departureDelays_geo: org.apache.spark.sql.DataFrame = [tripid: int, localdate: timestamp ... 8 more fields] res2: Long = 1361141
display(departureDelays_geo)
 
tripid
localdate
delay
distance
src
dst
city_src
city_dst
state_src
state_dst
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1011111
2014-01-01T11:11:00.000+0000
-5
221
MSP
INL
Minneapolis
International Falls
MN
MN
1021111
2014-01-02T11:11:00.000+0000
7
221
MSP
INL
Minneapolis
International Falls
MN
MN
1031111
2014-01-03T11:11:00.000+0000
0
221
MSP
INL
Minneapolis
International Falls
MN
MN
1041925
2014-01-04T19:25:00.000+0000
0
221
MSP
INL
Minneapolis
International Falls
MN
MN
1061115
2014-01-06T11:15:00.000+0000
33
221
MSP
INL
Minneapolis
International Falls
MN
MN
1071115
2014-01-07T11:15:00.000+0000
23
221
MSP
INL
Minneapolis
International Falls
MN
MN
1081115
2014-01-08T11:15:00.000+0000
-9
221
MSP
INL
Minneapolis
International Falls
MN
MN
1091115
2014-01-09T11:15:00.000+0000
11
221
MSP
INL
Minneapolis
International Falls
MN
MN
1101115
2014-01-10T11:15:00.000+0000
-3
221
MSP
INL
Minneapolis
International Falls
MN
MN
1112015
2014-01-11T20:15:00.000+0000
-7
221
MSP
INL
Minneapolis
International Falls
MN
MN
1121925
2014-01-12T19:25:00.000+0000
-5
221
MSP
INL
Minneapolis
International Falls
MN
MN
1131115
2014-01-13T11:15:00.000+0000
-3
221
MSP
INL
Minneapolis
International Falls
MN
MN
1141115
2014-01-14T11:15:00.000+0000
-6
221
MSP
INL
Minneapolis
International Falls
MN
MN
1151115
2014-01-15T11:15:00.000+0000
-7
221
MSP
INL
Minneapolis
International Falls
MN
MN
1161115
2014-01-16T11:15:00.000+0000
-3
221
MSP
INL
Minneapolis
International Falls
MN
MN
1171115
2014-01-17T11:15:00.000+0000
4
221
MSP
INL
Minneapolis
International Falls
MN
MN
1182015
2014-01-18T20:15:00.000+0000
-5
221
MSP
INL
Minneapolis
International Falls
MN
MN

Truncated results, showing first 1000 rows.

Building the Graph

Now that we've imported our data, we're going to need to build our graph. To do so we're going to do two things. We are going to build the structure of the vertices (or nodes) and we're going to build the structure of the edges. What's awesome about GraphFrames is that this process is incredibly simple.

  • Rename IATA airport code to id in the Vertices Table
  • Start and End airports to src and dst for the Edges Table (flights)

These are required naming conventions for vertices and edges in GraphFrames.

WARNING: If the graphframes package, required in the cell below, is not installed, follow the instructions here.

// Note, ensure you have already installed the GraphFrames spack-package
import org.apache.spark.sql.functions._
import org.graphframes._
 
// Create Vertices (airports) and Edges (flights)
val tripVertices = airports.withColumnRenamed("IATA", "id").distinct()
val tripEdges = departureDelays_geo.select("tripid", "delay", "src", "dst", "city_dst", "state_dst")
 
// Cache Vertices and Edges
tripEdges.cache()
tripVertices.cache()
import org.apache.spark.sql.functions._ import org.graphframes._ tripVertices: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, City: string ... 2 more fields] tripEdges: org.apache.spark.sql.DataFrame = [tripid: int, delay: int ... 4 more fields] res5: tripVertices.type = [id: string, City: string ... 2 more fields]
// Vertices
// The vertices of our graph are the airports
display(tripVertices)
 
id
City
State
Country
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
FAT
Fresno
CA
USA
CMH
Columbus
OH
USA
PHX
Phoenix
AZ
USA
PAH
Paducah
KY
USA
COS
Colorado Springs
CO
USA
RNO
Reno
NV
USA
MYR
Myrtle Beach
SC
USA
VLD
Valdosta
GA
USA
BPT
Beaumont
TX
USA
CAE
Columbia
SC
USA
PSC
Pasco
WA
USA
SRQ
Sarasota
FL
USA
LAX
Los Angeles
CA
USA
DAY
Dayton
OH
USA
AVP
Wilkes-Barre
PA
USA
MFR
Medford
OR
USA
JFK
New York
NY
USA

Showing all 279 rows.

// Edges
// The edges of our graph are the flights between airports
display(tripEdges)
 
tripid
delay
src
dst
city_dst
state_dst
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1011111
-5
MSP
INL
International Falls
MN
1021111
7
MSP
INL
International Falls
MN
1031111
0
MSP
INL
International Falls
MN
1041925
0
MSP
INL
International Falls
MN
1061115
33
MSP
INL
International Falls
MN
1071115
23
MSP
INL
International Falls
MN
1081115
-9
MSP
INL
International Falls
MN
1091115
11
MSP
INL
International Falls
MN
1101115
-3
MSP
INL
International Falls
MN
1112015
-7
MSP
INL
International Falls
MN
1121925
-5
MSP
INL
International Falls
MN
1131115
-3
MSP
INL
International Falls
MN
1141115
-6
MSP
INL
International Falls
MN
1151115
-7
MSP
INL
International Falls
MN
1161115
-3
MSP
INL
International Falls
MN
1171115
4
MSP
INL
International Falls
MN
1182015
-5
MSP
INL
International Falls
MN

Truncated results, showing first 1000 rows.

// Build `tripGraph` GraphFrame
// This GraphFrame builds up on the vertices and edges based on our trips (flights)
val tripGraph = GraphFrame(tripVertices, tripEdges)
println(tripGraph)
 
// Build `tripGraphPrime` GraphFrame
// This graphframe contains a smaller subset of data to make it easier to display motifs and subgraphs (below)
val tripEdgesPrime = departureDelays_geo.select("tripid", "delay", "src", "dst")
val tripGraphPrime = GraphFrame(tripVertices, tripEdgesPrime)
GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 4 more fields]) tripGraph: org.graphframes.GraphFrame = GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 4 more fields]) tripEdgesPrime: org.apache.spark.sql.DataFrame = [tripid: int, delay: int ... 2 more fields] tripGraphPrime: org.graphframes.GraphFrame = GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 2 more fields])

Simple Queries

Let's start with a set of simple graph queries to understand flight performance and departure delays