007c_SparkSQLProgGuide_HW(Scala)

This is an elaboration of the http://spark.apache.org/docs/latest/sql-programming-guide.html by Ivan Sadikov and Raazesh Sainudiin.

Getting Started

Spark Sql Programming Guide

  • Getting Started
    • Starting Point: SQLContext
    • Creating DataFrames
    • DataFrame Operations
    • Running SQL Queries Programmatically
    • Creating Datasets
    • Interoperating with RDDs
      • Inferring the Schema Using Reflection
      • Programmatically Specifying the Schema

Getting Started

Starting Point: SQLContext

The entry point into all functionality in Spark SQL is the SparkSession class and/or SQLContext/HiveContext. Spark session is created for you as spark when you start spark-shell or pyspark. You will need to create SparkSession usually when building an application (running on production-like on-premises cluster). n this case follow code below to create Spark session.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("Spark SQL basic example").getOrCreate()

// you could get SparkContext and SQLContext from SparkSession val sc = spark.sparkContext val sqlContext = spark.sqlContext

// This is used to implicitly convert an RDD or Seq to a DataFrame (see examples below) import spark.implicits._

But in Databricks notebook (similar to spark-shell) SparkSession is already created for you and is available as spark.

// Evaluation of the cell by Ctrl+Enter will print spark session available in notebook
spark
res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2d0c6c9

After evaluation you should see something like this:

res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2d0c6c9

In order to enable Hive support use enableHiveSupport() method on builder when constructing Spark session, which provides richer functionality over standard Spark SQL context, for example, usage of Hive user-defined functions or loading and writing data from/into Hive. Note that most of the SQL functionality is available regardless Hive support.

Creating DataFrames

With a SparkSessions, applications can create Dataset or DataFrame from an existing RDD, from a Hive table, or from various datasources.

Just to recap, a DataFrame is a distributed collection of data organized into named columns. You can think of it as an organized into table RDD of case class Row (which is not exactly true). DataFrames, in comparison to RDDs, are backed by rich optimizations, including tracking their own schema, adaptive query execution, code generation including whole stage codegen, extensible Catalyst optimizer, and project Tungsten.

Dataset provides type-safety when working with SQL, since Row is mapped to a case class, so that each column can be referenced by property of that class.

Note that performance for Dataset/DataFrames is the same across languages Scala, Java, Python, and R. This is due to the fact that the planning phase is just language-specific, only logical plan is constructed in Python, and all the physical execution is compiled and executed as JVM bytecode.

// Spark has some of the pre-built methods to create simple Dataset/DataFrame

// 1. Empty Dataset/DataFrame, not really interesting, is it?
println(spark.emptyDataFrame)
println(spark.emptyDataset[Int])
[] [value: int]
// 2. Range of numbers, note that Spark automatically names column as "id"
val range = spark.range(0, 10)

// In order to get a preview of data in DataFrame use "show()"
range.show(3)
+---+ | id| +---+ | 0| | 1| | 2| +---+ only showing top 3 rows range: org.apache.spark.sql.Dataset[Long] = [id: bigint]

You can also use different datasources that will be shown later or load Hive tables directly into Spark.

We have already created a table of social media usage from NYC

See the Appendix section below to create this social_media_usage table from raw data.

First let's make sure this table is available for us.

// Let's find out what tables are already available for loading
spark.catalog.listTables.show()
+--------------------+--------+-----------+---------+-----------+ | name|database|description|tableType|isTemporary| +--------------------+--------+-----------+---------+-----------+ | cities_csv| default| null| EXTERNAL| false| | cleaned_taxes| default| null| MANAGED| false| |commdettrumpclint...| default| null| MANAGED| false| | donaldtrumptweets| default| null| EXTERNAL| false| | linkage| default| null| EXTERNAL| false| | nations| default| null| EXTERNAL| false| | newmplist| default| null| EXTERNAL| false| | ny_baby_names| default| null| MANAGED| false| | nzmpsandparty| default| null| EXTERNAL| false| | pos_neg_category| default| null| EXTERNAL| false| | rna| default| null| MANAGED| false| | samh| default| null| EXTERNAL| false| | social_media_usage| default| null| EXTERNAL| false| | table1| default| null| EXTERNAL| false| | test_table| default| null| EXTERNAL| false| | uscites| default| null| EXTERNAL| false| +--------------------+--------+-----------+---------+-----------+

It looks like the table social_media_usage is available as a permanent table (isTemporary set as false).

Next let us do the following:

  • load this table as a DataFrame
  • print its schema and
  • show the first 20 rows.
val df = spark.table("social_media_usage") // Ctrl+Enter
df: org.apache.spark.sql.DataFrame = [agency: string, platform: string ... 3 more fields]

As you can see the immutable value df is a DataFrame and more specifically it is:

org.apache.spark.sql.DataFrame = [agency: string, platform: string, url: string, visits: int].

Now let us print schema of the DataFrame df and have a look at the actual data:

// Ctrl+Enter
df.printSchema() // prints schema of the DataFrame
df.show() // shows first n (default is 20) rows
root |-- agency: string (nullable = true) |-- platform: string (nullable = true) |-- url: string (nullable = true) |-- date: string (nullable = true) |-- visits: integer (nullable = true) +----------+----------+--------------------+--------------------+------+ | agency| platform| url| date|visits| +----------+----------+--------------------+--------------------+------+ | OEM| SMS| null|02/17/2012 12:00:...| 61652| | OEM| SMS| null|11/09/2012 12:00:...| 44547| | EDC| Flickr|http://www.flickr...|05/09/2012 12:00:...| null| | NYCHA|Newsletter| null|05/09/2012 12:00:...| null| | DHS| Twitter|www.twitter.com/n...|06/13/2012 12:00:...| 389| | DHS| Twitter|www.twitter.com/n...|08/02/2012 12:00:...| 431| | DOH| Android| Condom Finder|08/08/2011 12:00:...| 5026| | DOT| Android| You The Man|08/08/2011 12:00:...| null| | MOME| Android| MiNY Venor app|08/08/2011 12:00:...| 313| | DOT|Broadcastr| null|08/08/2011 12:00:...| null| | DPR|Broadcastr|http://beta.broad...|08/08/2011 12:00:...| null| | ENDHT| Facebook|http://www.facebo...|08/08/2011 12:00:...| 3| | VAC| Facebook|https://www.faceb...|08/08/2011 12:00:...| 36| | PlaNYC| Facebook|http://www.facebo...|08/08/2011 12:00:...| 47| | DFTA| Facebook|http://www.facebo...|08/08/2011 12:00:...| 90| | energyNYC| Facebook|http://www.facebo...|08/08/2011 12:00:...| 105| | MOIA| Facebook|http://www.facebo...|08/08/2011 12:00:...| 123| |City Store| Facebook|http://www.facebo...|08/08/2011 12:00:...| 119| | OCDV| Facebook|http://www.facebo...|08/08/2011 12:00:...| 148| | HIA| Facebook|http://www.facebo...|08/08/2011 12:00:...| 197| +----------+----------+--------------------+--------------------+------+ only showing top 20 rows

Note that (nullable = true) simply means if the value is allowed to be null.

Let us count the number of rows in df.

df.count() // Ctrl+Enter
res7: Long = 5899

So there are 5899 records or rows in the DataFrame df. Pretty good! You can also select individual columns using so-called DataFrame API, as follows:

val platforms = df.select("platform") // Shift+Enter
platforms: org.apache.spark.sql.DataFrame = [platform: string]
platforms.count() // Shift+Enter to count the number of rows
res8: Long = 5899
platforms.show(5) // Ctrl+Enter to show top 5 rows
+----------+ | platform| +----------+ | SMS| | SMS| | Flickr| |Newsletter| | Twitter| +----------+ only showing top 5 rows

You can also apply .distinct() to extract only unique entries as follows:

val uniquePlatforms = df.select("platform").distinct() // Shift+Enter
uniquePlatforms: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [platform: string]
uniquePlatforms.count() // Ctrl+Enter to count the number of distinct platforms
res10: Long = 23

Let's see all the rows of the DataFrame uniquePlatforms.

Note that display(uniquePlatforms) unlike uniquePlatforms.show() displays all rows of the DataFrame + gives you ability to select different view, e.g. charts.

display(uniquePlatforms) // Ctrl+Enter to show all rows; use the scroll-bar on the right of the display to see all platforms
nyc.gov
Flickr
Vimeo
iPhone
YouTube
WordPress
SMS
iPhone App
Youtube
Instagram
iPhone app
Linked-In
Twitter
TOTAL
Tumblr
Newsletter
Pinterest
Broadcastr
Android
Foursquare
Google+
Foursquare (Badge Unlock)
Facebook

Spark SQL and DataFrame API

Spark SQL provides DataFrame API that can perform relational operations on both external data sources and internal collections, which is similar to widely used data frame concept in R, but evaluates operations support lazily (remember RDDs?), so that it can perform relational optimizations. This API is also available in Java, Python and R, but some functionality may not be available, although with every release of Spark people minimize this gap.

So we give some examples how to query data in Python and R, but continue with Scala. You can do all DataFrame operations in this notebook using Python or R.

%py
# Ctrl+Enter to evaluate this python cell, recall '#' is the pre-comment character in python
# Using Python to query our "social_media_usage" table
pythonDF = spark.table("social_media_usage").select("platform").distinct()
pythonDF.show(3)
+--------+ |platform| +--------+ | nyc.gov| | Flickr| | Vimeo| +--------+ only showing top 3 rows
%sql
-- Ctrl+Enter to achieve the same result using standard SQL syntax!
select distinct platform from social_media_usage
nyc.gov
Flickr
Vimeo
iPhone
YouTube
WordPress
SMS
iPhone App
Youtube
Instagram
iPhone app
Linked-In
Twitter
TOTAL
Tumblr
Newsletter
Pinterest
Broadcastr
Android
Foursquare
Google+
Foursquare (Badge Unlock)
Facebook

Now it is time for some tips around how you use select and what the difference is between $"a", col("a"), df("a").

As you probably have noticed by now, you can specify individual columns to select by providing String values in select statement. But sometimes you need to:

  • distinguish between columns with the same name
  • use it to filter (actually you can still filter using full String expression)
  • do some "magic" with joins and user-defined functions (this will be shown later)

So Spark gives you ability to actually specify columns when you select. Now the difference between all those three notations is ... none, those things are just aliases for a Column in Spark SQL, which means following expressions yield the same result:

// Using string expressions
df.select("agency", "visits")

// Using "$" alias for column
df.select($"agency", $"visits")

// Using "col" alias for column
df.select(col("agency"), col("visits"))

// Using DataFrame name for column
df.select(df("agency"), df("visits"))

This "same-difference" applies to filtering, i.e. you can either use full expression to filter, or column as shown in the following example:

// Using column to filter
df.select("visits").filter($"visits" > 100)

// Or you can use full expression as string
df.select("visits").filter("visits > 100")

Note that $"visits" > 100 expression looks amazing, but under the hood it is just another column, and it equals to df("visits").>(100), where, thanks to Scala paradigm > is just another function that you can define.

val sms = df.select($"agency", $"platform", $"visits").filter($"platform" === "SMS")
sms.show() // Ctrl+Enter
+------+--------+------+ |agency|platform|visits| +------+--------+------+ | OEM| SMS| 61652| | OEM| SMS| 44547| | DOE| SMS| 382| | NYCHA| SMS| null| | OEM| SMS| 61652| | DOE| SMS| 382| | NYCHA| SMS| null| | OEM| SMS| 61652| | OEM| SMS| null| | DOE| SMS| null| | NYCHA| SMS| null| | OEM| SMS| null| | DOE| SMS| null| | NYCHA| SMS| null| | DOE| SMS| 382| | NYCHA| SMS| null| | OEM| SMS| 61652| | DOE| SMS| 382| | NYCHA| SMS| null| | OEM| SMS| 61652| +------+--------+------+ only showing top 20 rows sms: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [agency: string, platform: string ... 1 more field]

Again you could have written the query above using any column aliases or String names or even writing the query directly.

For example, we can do it using String names, as follows:

// Ctrl+Enter Note that we are using "platform = 'SMS'" since it will be evaluated as actual SQL
val sms = df.select(df("agency"), df("platform"), df("visits")).filter("platform = 'SMS'")
sms.show(5)
+------+--------+------+ |agency|platform|visits| +------+--------+------+ | OEM| SMS| 61652| | OEM| SMS| 44547| | DOE| SMS| 382| | NYCHA| SMS| null| | OEM| SMS| 61652| +------+--------+------+ only showing top 5 rows sms: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [agency: string, platform: string ... 1 more field]

Refer to the DataFrame API for more detailed API. In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the DataFrame Function Reference.

Let's next explore some of the functionality that is available by transforming this DataFrame df into a new DataFrame called fixedDF.

  • First, note that some columns are not exactly what we want them to be.

    • For example date column should be standard Date/Timestamp SQL column, and
    • visits should not contain null values, but 0s instead.
  • Let us fix it using some code that is briefly explained here (don't worry if you don't get it completely now, you will get the hang of it by playing more)

    • The coalesce function is similar to if-else statement, i.e., if first column in expression is null, then the value of the second column is used and so on.
    • lit just means column of constant value (literally speaking!).
    • the "funky" time conversion is essentially conversion from current format -> unix timestamp as a number -> Spark SQL Date format
    • we also remove TOTAL value from platform column.
// Ctrl+Enter to make fixedDF

// import the needed sql functions
import org.apache.spark.sql.functions.{coalesce, from_unixtime, lit, to_date, unix_timestamp}

// make the fixedDF DataFrame
val fixedDF = df.
   select(
     $"agency", 
     $"platform", 
     $"url", 
     to_date(from_unixtime(unix_timestamp($"date", "MM/dd/yyyy hh:mm:ss aaa"))).as("date"), 
     coalesce($"visits", lit(0)).as("visits")).
   filter($"platform" !== "TOTAL")

fixedDF.printSchema() // print its schema 
// and show the top 20 records fully
fixedDF.show(false) // the false argument does not truncate the rows, so you will not see something like this "anot..."
root |-- agency: string (nullable = true) |-- platform: string (nullable = true) |-- url: string (nullable = true) |-- date: date (nullable = true) |-- visits: integer (nullable = false) +----------+----------+---------------------------------------------------------------------------------------+----------+------+ |agency |platform |url |date |visits| +----------+----------+---------------------------------------------------------------------------------------+----------+------+ |OEM |SMS |null |2012-02-17|61652 | |OEM |SMS |null |2012-11-09|44547 | |EDC |Flickr |http://www.flickr.com/nycedc |2012-05-09|0 | |NYCHA |Newsletter|null |2012-05-09|0 | |DHS |Twitter |www.twitter.com/nycdhs |2012-06-13|389 | |DHS |Twitter |www.twitter.com/nycdhs |2012-08-02|431 | |DOH |Android |Condom Finder |2011-08-08|5026 | |DOT |Android |You The Man |2011-08-08|0 | |MOME |Android |MiNY Venor app |2011-08-08|313 | |DOT |Broadcastr|null |2011-08-08|0 | |DPR |Broadcastr|http://beta.broadcastr.com/Echo.html?audioId=670026-4001 |2011-08-08|0 | |ENDHT |Facebook |http://www.facebook.com/pages/NYC-Lets-End-Human-Trafficking/125730490795659?sk=wall |2011-08-08|3 | |VAC |Facebook |https://www.facebook.com/pages/NYC-Voter-Assistance-Commission/110226709012110 |2011-08-08|36 | |PlaNYC |Facebook |http://www.facebook.com/pages/New-York-NY/PlaNYC/160454173971169?ref=ts |2011-08-08|47 | |DFTA |Facebook |http://www.facebook.com/pages/NYC-Department-for-the-Aging/109028655823590 |2011-08-08|90 | |energyNYC |Facebook |http://www.facebook.com/EnergyNYC?sk=wall |2011-08-08|105 | |MOIA |Facebook |http://www.facebook.com/ihwnyc |2011-08-08|123 | |City Store|Facebook |http://www.facebook.com/citystorenyc |2011-08-08|119 | |OCDV |Facebook |http://www.facebook.com/pages/NYC-Healthy-Relationship-Training-Academy/134637829901065|2011-08-08|148 | |HIA |Facebook |http://www.facebook.com/pages/New-York-City-Health-Insurance-Link/145920551598 |2011-08-08|197 | +----------+----------+---------------------------------------------------------------------------------------+----------+------+ only showing top 20 rows <console>:47: warning: method !== in class Column is deprecated: !== does not have the same precedence as ===, use =!= instead filter($"platform" !== "TOTAL") ^ import org.apache.spark.sql.functions.{coalesce, from_unixtime, lit, to_date, unix_timestamp} fixedDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [agency: string, platform: string ... 3 more fields]

Okay, this is better, but urls are still inconsistent.

Let's fix this by writing our own UDF (user-defined function) to deal with special cases.

Note that if you CAN USE Spark functions library, do it. But for the sake of the example, custom UDF is shown below.

We take value of a column as String type and return the same String type, but ignore values that do not start with http.

// Ctrl+Enter to evaluate this UDF which takes a input String called "value"
// and converts it into lower-case if it begins with http and otherwise leaves it as null, so we sort of remove non valid web-urls
val cleanUrl = udf((value: String) => if (value != null && value.startsWith("http")) value.toLowerCase() else null)
cleanUrl: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

Let us apply our UDF on fixedDF to create a new DataFrame called cleanedDF as follows:

// Ctrl+Enter
val cleanedDF = fixedDF.select($"agency", $"platform", cleanUrl($"url").as("url"), $"date", $"visits")
cleanedDF: org.apache.spark.sql.DataFrame = [agency: string, platform: string ... 3 more fields]

Now, let's check that it actually worked by seeing the first 5 rows of the cleanedDF whose url isNull and isNotNull, as follows:

// Shift+Enter
cleanedDF.filter($"url".isNull).show(5)
+------+----------+----+----------+------+ |agency| platform| url| date|visits| +------+----------+----+----------+------+ | OEM| SMS|null|2012-02-17| 61652| | OEM| SMS|null|2012-11-09| 44547| | NYCHA|Newsletter|null|2012-05-09| 0| | DHS| Twitter|null|2012-06-13| 389| | DHS| Twitter|null|2012-08-02| 431| +------+----------+----+----------+------+ only showing top 5 rows
// Ctrl+Enter
cleanedDF.filter($"url".isNotNull).show(5, false) // false in .show(5, false) shows rows untruncated
+------+----------+------------------------------------------------------------------------------------+----------+------+ |agency|platform |url |date |visits| +------+----------+------------------------------------------------------------------------------------+----------+------+ |EDC |Flickr |http://www.flickr.com/nycedc |2012-05-09|0 | |DPR |Broadcastr|http://beta.broadcastr.com/echo.html?audioid=670026-4001 |2011-08-08|0 | |ENDHT |Facebook |http://www.facebook.com/pages/nyc-lets-end-human-trafficking/125730490795659?sk=wall|2011-08-08|3 | |VAC |Facebook |https://www.facebook.com/pages/nyc-voter-assistance-commission/110226709012110 |2011-08-08|36 | |PlaNYC|Facebook |http://www.facebook.com/pages/new-york-ny/planyc/160454173971169?ref=ts |2011-08-08|47 | +------+----------+------------------------------------------------------------------------------------+----------+------+ only showing top 5 rows

Now there is a suggestion from you manager's manager's manager that due to some perceived privacy concerns we want to replace agency with some unique identifier.

So we need to do the following:

  • create unique list of agencies with ids and
  • join them with main DataFrame.

Sounds easy, right? Let's do it.

// Crtl+Enter
// Import Spark SQL function that will give us unique id across all the records in this DataFrame
import org.apache.spark.sql.functions.monotonically_increasing_id

// We append column as SQL function that creates unique ids across all records in DataFrames 
val agencies = cleanedDF.select(cleanedDF("agency"))
                        .distinct()
                        .withColumn("id", monotonically_increasing_id())
agencies.show(5)
+--------------------+-----------+ | agency| id| +--------------------+-----------+ | PlaNYC|34359738368| | HIA|34359738369| |NYC Digital: exte...|34359738370| | NYCGLOBAL|42949672960| | nycgov|68719476736| +--------------------+-----------+ only showing top 5 rows import org.apache.spark.sql.functions.monotonically_increasing_id agencies: org.apache.spark.sql.DataFrame = [agency: string, id: bigint]

Those who want to understand left/right inner/outer joins can see the video lectures in Module 3 of Anthony Joseph's Introduction to Big data edX course.

// Ctrl+Enter
// And join with the rest of the data, note how join condition is specified 
val anonym = cleanedDF.join(agencies, cleanedDF("agency") === agencies("agency"), "inner").select("id", "platform", "url", "date", "visits")

// We also cache DataFrame since it can be quite expensive to recompute join
anonym.cache()

// Display result
anonym.show(5)
+-------------+----------+--------------------+----------+------+ | id| platform| url| date|visits| +-------------+----------+--------------------+----------+------+ |1580547964928| SMS| null|2012-02-17| 61652| |1580547964928| SMS| null|2012-11-09| 44547| | 412316860416| Flickr|http://www.flickr...|2012-05-09| 0| |1649267441664|Newsletter| null|2012-05-09| 0| |1529008357376| Twitter| null|2012-06-13| 389| +-------------+----------+--------------------+----------+------+ only showing top 5 rows anonym: org.apache.spark.sql.DataFrame = [id: bigint, platform: string ... 3 more fields]
spark.catalog.listTables().show() // look at the available tables
+--------------------+--------+-----------+---------+-----------+ | name|database|description|tableType|isTemporary| +--------------------+--------+-----------+---------+-----------+ | cities_csv| default| null| EXTERNAL| false| | cleaned_taxes| default| null| MANAGED| false| |commdettrumpclint...| default| null| MANAGED| false| | donaldtrumptweets| default| null| EXTERNAL| false| | linkage| default| null| EXTERNAL| false| | nations| default| null| EXTERNAL| false| | newmplist| default| null| EXTERNAL| false| | ny_baby_names| default| null| MANAGED| false| | nzmpsandparty| default| null| EXTERNAL| false| | pos_neg_category| default| null| EXTERNAL| false| | rna| default| null| MANAGED| false| | samh| default| null| EXTERNAL| false| | social_media_usage| default| null| EXTERNAL| false| | table1| default| null| EXTERNAL| false| | test_table| default| null| EXTERNAL| false| | uscites| default| null| EXTERNAL| false| +--------------------+--------+-----------+---------+-----------+
%sql 
-- to remove a TempTable if it exists already
drop table if exists anonym
OK
// Register table for Spark SQL, we also import "month" function 
import org.apache.spark.sql.functions.month

anonym.createOrReplaceTempView("anonym")
import org.apache.spark.sql.functions.month
%sql
-- Interesting. Now let's do some aggregation. Display platform, month, visits
-- Date column allows us to extract month directly

select platform, month(date) as month, sum(visits) as visits from anonym group by platform, month(date)
0500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,0009106811123412570500,0001,000,0001,500,0002,000,0002,500,0003,000,0003,500,000910681112341257TOOLTIPInstagramLinked-InFoursquare (Badge Unlock)iPhoneTwitterVimeoiPhone appSMSYouTubeNewsletterGoogle+AndroidYoutubeiPhone AppFacebookTumblrFoursquareFlickrWordPressPinterestmonthvisitsplatformInstagramInstagramLinked-InLinked-InFoursquare (Badge Unlock)Foursquare (Badge Unlock)iPhoneiPhoneTwitterTwitterVimeoVimeoiPhone appiPhone appSMSSMSYouTubeYouTubeNewsletterNewsletterGoogle+Google+AndroidAndroidYoutubeYoutubeiPhone AppiPhone AppFacebookFacebookTumblrTumblrFoursquareFoursquareFlickrFlickrWordPressWordPressPinterestPinterest

Only showing the first twenty series.

Note, that we could have done aggregation using DataFrame API instead of Spark SQL.

Alright, now let's see some cool operations with window functions.

Our next task is to compute (daily visits / monthly average) for all platforms.

import org.apache.spark.sql.functions.{dayofmonth, month, row_number, sum}
import org.apache.spark.sql.expressions.Window

val coolDF = anonym.select($"id", $"platform", dayofmonth($"date").as("day"), month($"date").as("month"), $"visits").
  groupBy($"id", $"platform", $"day", $"month").agg(sum("visits").as("visits"))

// Run window aggregation on visits per month and platform
val window = coolDF.select($"id", $"day", $"visits", sum($"visits").over(Window.partitionBy("platform", "month")).as("monthly_visits"))

// Create and register percent table
val percent = window.select($"id", $"day", ($"visits" / $"monthly_visits").as("percent"))

percent.createOrReplaceTempView("percent")
import org.apache.spark.sql.functions.{dayofmonth, month, row_number, sum} import org.apache.spark.sql.expressions.Window coolDF: org.apache.spark.sql.DataFrame = [id: bigint, platform: string ... 3 more fields] window: org.apache.spark.sql.DataFrame = [id: bigint, day: int ... 2 more fields] percent: org.apache.spark.sql.DataFrame = [id: bigint, day: int ... 1 more field]
%sql

-- A little bit of visualization as result of our efforts
select id, day, `percent` from percent where `percent` > 0.3 and day = 2

652835028992506806140929901943132161214748364800132284992716814087492730894123168604161580547964928206158430208166644731084868719476736164067750707229%10%17%3%6%8%2%3%10%7%3%3%id65283502899265283502899250680614092950680614092990194313216190194313216121474836480021474836480013228499271681322849927168140874927308914087492730894123168604164123168604161580547964928158054796492820615843020820615843020816664473108481666447310848687194767366871947673616406775070721640677507072