007c_SparkSQLProgGuide_HW(Scala)

This is an elaboration of the http://spark.apache.org/docs/latest/sql-programming-guide.html by Ivan Sadikov and Raazesh Sainudiin.

Getting Started

Spark Sql Programming Guide

  • Getting Started
    • Starting Point: SQLContext
    • Creating DataFrames
    • DataFrame Operations
    • Running SQL Queries Programmatically
    • Creating Datasets
    • Interoperating with RDDs
      • Inferring the Schema Using Reflection
      • Programmatically Specifying the Schema

Getting Started

Starting Point: SQLContext

The entry point into all functionality in Spark SQL is the SparkSession class and/or SQLContext/HiveContext. Spark session is created for you as spark when you start spark-shell or pyspark. You will need to create SparkSession usually when building an application (running on production-like on-premises cluster). n this case follow code below to create Spark session.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("Spark SQL basic example").getOrCreate()

// you could get SparkContext and SQLContext from SparkSession val sc = spark.sparkContext val sqlContext = spark.sqlContext

// This is used to implicitly convert an RDD or Seq to a DataFrame (see examples below) import spark.implicits._

But in Databricks notebook (similar to spark-shell) SparkSession is already created for you and is available as spark.

// Evaluation of the cell by Ctrl+Enter will print spark session available in notebook
spark
res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2d0c6c9

After evaluation you should see something like this:

res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2d0c6c9

In order to enable Hive support use enableHiveSupport() method on builder when constructing Spark session, which provides richer functionality over standard Spark SQL context, for example, usage of Hive user-defined functions or loading and writing data from/into Hive. Note that most of the SQL functionality is available regardless Hive support.

Creating DataFrames

With a SparkSessions, applications can create Dataset or DataFrame from an existing RDD, from a Hive table, or from various datasources.

Just to recap, a DataFrame is a distributed collection of data organized into named columns. You can think of it as an organized into table RDD of case class Row (which is not exactly true). DataFrames, in comparison to RDDs, are backed by rich optimizations, including tracking their own schema, adaptive query execution, code generation including whole stage codegen, extensible Catalyst optimizer, and project Tungsten.

Dataset provides type-safety when working with SQL, since Row is mapped to a case class, so that each column can be referenced by property of that class.

Note that performance for Dataset/DataFrames is the same across languages Scala, Java, Python, and R. This is due to the fact that the planning phase is just language-specific, only logical plan is constructed in Python, and all the physical execution is compiled and executed as JVM bytecode.

// Spark has some of the pre-built methods to create simple Dataset/DataFrame

// 1. Empty Dataset/DataFrame, not really interesting, is it?
println(spark.emptyDataFrame)
println(spark.emptyDataset[Int])
[] [value: int]
// 2. Range of numbers, note that Spark automatically names column as "id"
val range = spark.range(0, 10)

// In order to get a preview of data in DataFrame use "show()"
range.show(3)
+---+ | id| +---+ | 0| | 1| | 2| +---+ only showing top 3 rows range: org.apache.spark.sql.Dataset[Long] = [id: bigint]

You can also use different datasources that will be shown later or load Hive tables directly into Spark.

We have already created a table of social media usage from NYC

See the Appendix section below to create this social_media_usage table from raw data.

First let's make sure this table is available for us.

// Let's find out what tables are already available for loading
spark.catalog.listTables.show()
+--------------------+--------+-----------+---------+-----------+ | name|database|description|tableType|isTemporary| +--------------------+--------+-----------+---------+-----------+ | cities_csv| default| null| EXTERNAL| false| | cleaned_taxes| default| null| MANAGED| false| |commdettrumpclint...| default| null| MANAGED| false| | donaldtrumptweets| default| null| EXTERNAL| false| | linkage| default| null| EXTERNAL| false| | nations| default| null| EXTERNAL| false| | newmplist| default| null| EXTERNAL| false| | ny_baby_names| default| null| MANAGED| false| | nzmpsandparty| default| null| EXTERNAL| false| | pos_neg_category| default| null| EXTERNAL| false| | rna| default| null| MANAGED| false| | samh| default| null| EXTERNAL| false| | social_media_usage| default| null| EXTERNAL| false| | table1| default| null| EXTERNAL| false| | test_table| default| null| EXTERNAL| false| | uscites| default| null| EXTERNAL| false| +--------------------+--------+-----------+---------+-----------+

It looks like the table social_media_usage is available as a permanent table (isTemporary set as false).

Next let us do the following:

  • load this table as a DataFrame
  • print its schema and
  • show the first 20 rows.
val df = spark.table("social_media_usage") // Ctrl+Enter
df: org.apache.spark.sql.DataFrame = [agency: string, platform: string ... 3 more fields]

As you can see the immutable value df is a DataFrame and more specifically it is:

org.apache.spark.sql.DataFrame = [agency: string, platform: string, url: string, visits: int].

Now let us print schema of the DataFrame df and have a look at the actual data:

// Ctrl+Enter
df.printSchema() // prints schema of the DataFrame
df.show() // shows first n (default is 20) rows
root |-- agency: string (nullable = true) |-- platform: string (nullable = true) |-- url: string (nullable = true) |-- date: string (nullable = true) |-- visits: integer (nullable = true) +----------+----------+--------------------+--------------------+------+ | agency| platform| url| date|visits| +----------+----------+--------------------+--------------------+------+ | OEM| SMS| null|02/17/2012 12:00:...| 61652| | OEM| SMS| null|11/09/2012 12:00:...| 44547| | EDC| Flickr|http://www.flickr...|05/09/2012 12:00:...| null| | NYCHA|Newsletter| null|05/09/2012 12:00:...| null| | DHS| Twitter|www.twitter.com/n...|06/13/2012 12:00:...| 389| | DHS| Twitter|www.twitter.com/n...|08/02/2012 12:00:...| 431| | DOH| Android| Condom Finder|08/08/2011 12:00:...| 5026| | DOT| Android| You The Man|08/08/2011 12:00:...| null| | MOME| Android| MiNY Venor app|08/08/2011 12:00:...| 313| | DOT|Broadcastr| null|08/08/2011 12:00:...| null| | DPR|Broadcastr|http://beta.broad...|08/08/2011 12:00:...| null| | ENDHT| Facebook|http://www.facebo...|08/08/2011 12:00:...| 3| | VAC| Facebook|https://www.faceb...|08/08/2011 12:00:...| 36| | PlaNYC| Facebook|http://www.facebo...|08/08/2011 12:00:...| 47| | DFTA| Facebook|http://www.facebo...|08/08/2011 12:00:...| 90| | energyNYC| Facebook|http://www.facebo...|08/08/2011 12:00:...| 105| | MOIA| Facebook|http://www.facebo...|08/08/2011 12:00:...| 123| |City Store| Facebook|http://www.facebo...|08/08/2011 12:00:...| 119| | OCDV| Facebook|http://www.facebo...|08/08/2011 12:00:...| 148| | HIA| Facebook|http://www.facebo...|08/08/2011 12:00:...| 197| +----------+----------+--------------------+--------------------+------+ only showing top 20 rows

Note that (nullable = true) simply means if the value is allowed to be null.

Let us count the number of rows in df.

df.count() // Ctrl+Enter
res7: Long = 5899

So there are 5899 records or rows in the DataFrame df. Pretty good! You can also select individual columns using so-called DataFrame API, as follows:

val platforms = df.select("platform") // Shift+Enter
platforms: org.apache.spark.sql.DataFrame = [platform: string]
platforms.count() // Shift+Enter to count the number of rows
res8: Long = 5899
platforms.show(5) // Ctrl+Enter to show top 5 rows
+----------+ | platform| +----------+ | SMS| | SMS| | Flickr| |Newsletter| | Twitter| +----------+ only showing top 5 rows

You can also apply .distinct() to extract only unique entries as follows:

val uniquePlatforms = df.select("platform").distinct() // Shift+Enter
uniquePlatforms: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [platform: string]
uniquePlatforms.count() // Ctrl+Enter to count the number of distinct platforms
res10: Long = 23

Let's see all the rows of the DataFrame uniquePlatforms.

Note that display(uniquePlatforms) unlike uniquePlatforms.show() displays all rows of the DataFrame + gives you ability to select different view, e.g. charts.

display(uniquePlatforms) // Ctrl+Enter to show all rows; use the scroll-bar on the right of the display to see all platforms
nyc.gov
Flickr
Vimeo
iPhone
YouTube
WordPress
SMS
iPhone App
Youtube
Instagram
iPhone app
Linked-In
Twitter
TOTAL
Tumblr
Newsletter
Pinterest
Broadcastr
Android
Foursquare
Google+
Foursquare (Badge Unlock)
Facebook

Spark SQL and DataFrame API

Spark SQL provides DataFrame API that can perform relational operations on both external data sources and internal collections, which is similar to widely used data frame concept in R, but evaluates operations support lazily (remember RDDs?), so that it can perform relational optimizations. This API is also available in Java, Python and R, but some functionality may not be available, although with every release of Spark people minimize this gap.

So we give some examples how to query data in Python and R, but continue with Scala. You can do all DataFrame operations in this notebook using Python or R.

%py
# Ctrl+Enter to evaluate this python cell, recall '#' is the pre-comment character in python
# Using Python to query our "social_media_usage" table
pythonDF = spark.table("social_media_usage").select("platform").distinct()
pythonDF.show(3)
+--------+ |platform| +--------+ | nyc.gov| | Flickr| | Vimeo| +--------+ only showing top 3 rows
%sql
-- Ctrl+Enter to achieve the same result using standard SQL syntax!
select distinct platform from social_media_usage
nyc.gov
Flickr
Vimeo
iPhone
YouTube
WordPress
SMS
iPhone App
Youtube
Instagram
iPhone app
Linked-In
Twitter
TOTAL
Tumblr
Newsletter
Pinterest
Broadcastr
Android
Foursquare
Google+
Foursquare (Badge Unlock)
Facebook

Now it is time for some tips around how you use select and what the difference is between $"a", col("a"), df("a").

As you probably have noticed by now, you can specify individual columns to select by providing String values in select statement. But sometimes you need to:

  • distinguish between columns with the same name
  • use it to filter (actually you can still filter using full String expression)
  • do some "magic" with joins and user-defined functions (this will be shown later)

So Spark gives you ability to actually specify columns when you select. Now the difference between all those three notations is ... none, those things are just aliases for a Column in Spark SQL, which means following expressions yield the same result:

// Using string expressions
df.select("agency", "visits")

// Using "$" alias for column
df.select($"agency", $"visits")

// Using "col" alias for column
df.select(col("agency"), col("visits"))

// Using DataFrame name for column
df.select(df("agency"), df("visits"))

This "same-difference" applies to filtering, i.e. you can either use full expression to filter, or column as shown in the following example:

// Using column to filter
df.select("visits").filter($"visits" > 100)

// Or you can use full expression as string
df.select("visits").filter("visits > 100")

Note that $"visits" > 100 expression looks amazing, but under the hood it is just another column, and it equals to df("visits").>(100), where, thanks to Scala paradigm > is just another function that you can define.