// Evaluation of the cell by Ctrl+Enter will print spark session available in notebook spark
res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2d0c6c9
// Spark has some of the pre-built methods to create simple Dataset/DataFrame // 1. Empty Dataset/DataFrame, not really interesting, is it? println(spark.emptyDataFrame) println(spark.emptyDataset[Int])
[]
[value: int]
// 2. Range of numbers, note that Spark automatically names column as "id" val range = spark.range(0, 10) // In order to get a preview of data in DataFrame use "show()" range.show(3)
+---+
| id|
+---+
| 0|
| 1|
| 2|
+---+
only showing top 3 rows
range: org.apache.spark.sql.Dataset[Long] = [id: bigint]
// Let's find out what tables are already available for loading spark.catalog.listTables.show()
+--------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+--------------------+--------+-----------+---------+-----------+
| cities_csv| default| null| EXTERNAL| false|
| cleaned_taxes| default| null| MANAGED| false|
|commdettrumpclint...| default| null| MANAGED| false|
| donaldtrumptweets| default| null| EXTERNAL| false|
| linkage| default| null| EXTERNAL| false|
| nations| default| null| EXTERNAL| false|
| newmplist| default| null| EXTERNAL| false|
| ny_baby_names| default| null| MANAGED| false|
| nzmpsandparty| default| null| EXTERNAL| false|
| pos_neg_category| default| null| EXTERNAL| false|
| rna| default| null| MANAGED| false|
| samh| default| null| EXTERNAL| false|
| social_media_usage| default| null| EXTERNAL| false|
| table1| default| null| EXTERNAL| false|
| test_table| default| null| EXTERNAL| false|
| uscites| default| null| EXTERNAL| false|
+--------------------+--------+-----------+---------+-----------+
val df = spark.table("social_media_usage") // Ctrl+Enter
df: org.apache.spark.sql.DataFrame = [agency: string, platform: string ... 3 more fields]
// Ctrl+Enter df.printSchema() // prints schema of the DataFrame df.show() // shows first n (default is 20) rows
root
|-- agency: string (nullable = true)
|-- platform: string (nullable = true)
|-- url: string (nullable = true)
|-- date: string (nullable = true)
|-- visits: integer (nullable = true)
+----------+----------+--------------------+--------------------+------+
| agency| platform| url| date|visits|
+----------+----------+--------------------+--------------------+------+
| OEM| SMS| null|02/17/2012 12:00:...| 61652|
| OEM| SMS| null|11/09/2012 12:00:...| 44547|
| EDC| Flickr|http://www.flickr...|05/09/2012 12:00:...| null|
| NYCHA|Newsletter| null|05/09/2012 12:00:...| null|
| DHS| Twitter|www.twitter.com/n...|06/13/2012 12:00:...| 389|
| DHS| Twitter|www.twitter.com/n...|08/02/2012 12:00:...| 431|
| DOH| Android| Condom Finder|08/08/2011 12:00:...| 5026|
| DOT| Android| You The Man|08/08/2011 12:00:...| null|
| MOME| Android| MiNY Venor app|08/08/2011 12:00:...| 313|
| DOT|Broadcastr| null|08/08/2011 12:00:...| null|
| DPR|Broadcastr|http://beta.broad...|08/08/2011 12:00:...| null|
| ENDHT| Facebook|http://www.facebo...|08/08/2011 12:00:...| 3|
| VAC| Facebook|https://www.faceb...|08/08/2011 12:00:...| 36|
| PlaNYC| Facebook|http://www.facebo...|08/08/2011 12:00:...| 47|
| DFTA| Facebook|http://www.facebo...|08/08/2011 12:00:...| 90|
| energyNYC| Facebook|http://www.facebo...|08/08/2011 12:00:...| 105|
| MOIA| Facebook|http://www.facebo...|08/08/2011 12:00:...| 123|
|City Store| Facebook|http://www.facebo...|08/08/2011 12:00:...| 119|
| OCDV| Facebook|http://www.facebo...|08/08/2011 12:00:...| 148|
| HIA| Facebook|http://www.facebo...|08/08/2011 12:00:...| 197|
+----------+----------+--------------------+--------------------+------+
only showing top 20 rows
val platforms = df.select("platform") // Shift+Enter
platforms: org.apache.spark.sql.DataFrame = [platform: string]
platforms.show(5) // Ctrl+Enter to show top 5 rows
+----------+
| platform|
+----------+
| SMS|
| SMS|
| Flickr|
|Newsletter|
| Twitter|
+----------+
only showing top 5 rows
val uniquePlatforms = df.select("platform").distinct() // Shift+Enter
uniquePlatforms: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [platform: string]
display(uniquePlatforms) // Ctrl+Enter to show all rows; use the scroll-bar on the right of the display to see all platforms
%py # Ctrl+Enter to evaluate this python cell, recall '#' is the pre-comment character in python # Using Python to query our "social_media_usage" table pythonDF = spark.table("social_media_usage").select("platform").distinct() pythonDF.show(3)
+--------+
|platform|
+--------+
| nyc.gov|
| Flickr|
| Vimeo|
+--------+
only showing top 3 rows
%sql -- Ctrl+Enter to achieve the same result using standard SQL syntax! select distinct platform from social_media_usage
val sms = df.select($"agency", $"platform", $"visits").filter($"platform" === "SMS") sms.show() // Ctrl+Enter
+------+--------+------+
|agency|platform|visits|
+------+--------+------+
| OEM| SMS| 61652|
| OEM| SMS| 44547|
| DOE| SMS| 382|
| NYCHA| SMS| null|
| OEM| SMS| 61652|
| DOE| SMS| 382|
| NYCHA| SMS| null|
| OEM| SMS| 61652|
| OEM| SMS| null|
| DOE| SMS| null|
| NYCHA| SMS| null|
| OEM| SMS| null|
| DOE| SMS| null|
| NYCHA| SMS| null|
| DOE| SMS| 382|
| NYCHA| SMS| null|
| OEM| SMS| 61652|
| DOE| SMS| 382|
| NYCHA| SMS| null|
| OEM| SMS| 61652|
+------+--------+------+
only showing top 20 rows
sms: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [agency: string, platform: string ... 1 more field]
// Ctrl+Enter Note that we are using "platform = 'SMS'" since it will be evaluated as actual SQL val sms = df.select(df("agency"), df("platform"), df("visits")).filter("platform = 'SMS'") sms.show(5)
+------+--------+------+
|agency|platform|visits|
+------+--------+------+
| OEM| SMS| 61652|
| OEM| SMS| 44547|
| DOE| SMS| 382|
| NYCHA| SMS| null|
| OEM| SMS| 61652|
+------+--------+------+
only showing top 5 rows
sms: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [agency: string, platform: string ... 1 more field]
// Ctrl+Enter to make fixedDF // import the needed sql functions import org.apache.spark.sql.functions.{coalesce, from_unixtime, lit, to_date, unix_timestamp} // make the fixedDF DataFrame val fixedDF = df. select( $"agency", $"platform", $"url", to_date(from_unixtime(unix_timestamp($"date", "MM/dd/yyyy hh:mm:ss aaa"))).as("date"), coalesce($"visits", lit(0)).as("visits")). filter($"platform" !== "TOTAL") fixedDF.printSchema() // print its schema // and show the top 20 records fully fixedDF.show(false) // the false argument does not truncate the rows, so you will not see something like this "anot..."
root
|-- agency: string (nullable = true)
|-- platform: string (nullable = true)
|-- url: string (nullable = true)
|-- date: date (nullable = true)
|-- visits: integer (nullable = false)
+----------+----------+---------------------------------------------------------------------------------------+----------+------+
|agency |platform |url |date |visits|
+----------+----------+---------------------------------------------------------------------------------------+----------+------+
|OEM |SMS |null |2012-02-17|61652 |
|OEM |SMS |null |2012-11-09|44547 |
|EDC |Flickr |http://www.flickr.com/nycedc |2012-05-09|0 |
|NYCHA |Newsletter|null |2012-05-09|0 |
|DHS |Twitter |www.twitter.com/nycdhs |2012-06-13|389 |
|DHS |Twitter |www.twitter.com/nycdhs |2012-08-02|431 |
|DOH |Android |Condom Finder |2011-08-08|5026 |
|DOT |Android |You The Man |2011-08-08|0 |
|MOME |Android |MiNY Venor app |2011-08-08|313 |
|DOT |Broadcastr|null |2011-08-08|0 |
|DPR |Broadcastr|http://beta.broadcastr.com/Echo.html?audioId=670026-4001 |2011-08-08|0 |
|ENDHT |Facebook |http://www.facebook.com/pages/NYC-Lets-End-Human-Trafficking/125730490795659?sk=wall |2011-08-08|3 |
|VAC |Facebook |https://www.facebook.com/pages/NYC-Voter-Assistance-Commission/110226709012110 |2011-08-08|36 |
|PlaNYC |Facebook |http://www.facebook.com/pages/New-York-NY/PlaNYC/160454173971169?ref=ts |2011-08-08|47 |
|DFTA |Facebook |http://www.facebook.com/pages/NYC-Department-for-the-Aging/109028655823590 |2011-08-08|90 |
|energyNYC |Facebook |http://www.facebook.com/EnergyNYC?sk=wall |2011-08-08|105 |
|MOIA |Facebook |http://www.facebook.com/ihwnyc |2011-08-08|123 |
|City Store|Facebook |http://www.facebook.com/citystorenyc |2011-08-08|119 |
|OCDV |Facebook |http://www.facebook.com/pages/NYC-Healthy-Relationship-Training-Academy/134637829901065|2011-08-08|148 |
|HIA |Facebook |http://www.facebook.com/pages/New-York-City-Health-Insurance-Link/145920551598 |2011-08-08|197 |
+----------+----------+---------------------------------------------------------------------------------------+----------+------+
only showing top 20 rows
<console>:47: warning: method !== in class Column is deprecated: !== does not have the same precedence as ===, use =!= instead
filter($"platform" !== "TOTAL")
^
import org.apache.spark.sql.functions.{coalesce, from_unixtime, lit, to_date, unix_timestamp}
fixedDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [agency: string, platform: string ... 3 more fields]
// Ctrl+Enter to evaluate this UDF which takes a input String called "value" // and converts it into lower-case if it begins with http and otherwise leaves it as null, so we sort of remove non valid web-urls val cleanUrl = udf((value: String) => if (value != null && value.startsWith("http")) value.toLowerCase() else null)
cleanUrl: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
// Ctrl+Enter val cleanedDF = fixedDF.select($"agency", $"platform", cleanUrl($"url").as("url"), $"date", $"visits")
cleanedDF: org.apache.spark.sql.DataFrame = [agency: string, platform: string ... 3 more fields]
// Shift+Enter cleanedDF.filter($"url".isNull).show(5)
+------+----------+----+----------+------+
|agency| platform| url| date|visits|
+------+----------+----+----------+------+
| OEM| SMS|null|2012-02-17| 61652|
| OEM| SMS|null|2012-11-09| 44547|
| NYCHA|Newsletter|null|2012-05-09| 0|
| DHS| Twitter|null|2012-06-13| 389|
| DHS| Twitter|null|2012-08-02| 431|
+------+----------+----+----------+------+
only showing top 5 rows
// Ctrl+Enter cleanedDF.filter($"url".isNotNull).show(5, false) // false in .show(5, false) shows rows untruncated
+------+----------+------------------------------------------------------------------------------------+----------+------+
|agency|platform |url |date |visits|
+------+----------+------------------------------------------------------------------------------------+----------+------+
|EDC |Flickr |http://www.flickr.com/nycedc |2012-05-09|0 |
|DPR |Broadcastr|http://beta.broadcastr.com/echo.html?audioid=670026-4001 |2011-08-08|0 |
|ENDHT |Facebook |http://www.facebook.com/pages/nyc-lets-end-human-trafficking/125730490795659?sk=wall|2011-08-08|3 |
|VAC |Facebook |https://www.facebook.com/pages/nyc-voter-assistance-commission/110226709012110 |2011-08-08|36 |
|PlaNYC|Facebook |http://www.facebook.com/pages/new-york-ny/planyc/160454173971169?ref=ts |2011-08-08|47 |
+------+----------+------------------------------------------------------------------------------------+----------+------+
only showing top 5 rows
// Crtl+Enter // Import Spark SQL function that will give us unique id across all the records in this DataFrame import org.apache.spark.sql.functions.monotonically_increasing_id // We append column as SQL function that creates unique ids across all records in DataFrames val agencies = cleanedDF.select(cleanedDF("agency")) .distinct() .withColumn("id", monotonically_increasing_id()) agencies.show(5)
+--------------------+-----------+
| agency| id|
+--------------------+-----------+
| PlaNYC|34359738368|
| HIA|34359738369|
|NYC Digital: exte...|34359738370|
| NYCGLOBAL|42949672960|
| nycgov|68719476736|
+--------------------+-----------+
only showing top 5 rows
import org.apache.spark.sql.functions.monotonically_increasing_id
agencies: org.apache.spark.sql.DataFrame = [agency: string, id: bigint]
// Ctrl+Enter // And join with the rest of the data, note how join condition is specified val anonym = cleanedDF.join(agencies, cleanedDF("agency") === agencies("agency"), "inner").select("id", "platform", "url", "date", "visits") // We also cache DataFrame since it can be quite expensive to recompute join anonym.cache() // Display result anonym.show(5)
+-------------+----------+--------------------+----------+------+
| id| platform| url| date|visits|
+-------------+----------+--------------------+----------+------+
|1580547964928| SMS| null|2012-02-17| 61652|
|1580547964928| SMS| null|2012-11-09| 44547|
| 412316860416| Flickr|http://www.flickr...|2012-05-09| 0|
|1649267441664|Newsletter| null|2012-05-09| 0|
|1529008357376| Twitter| null|2012-06-13| 389|
+-------------+----------+--------------------+----------+------+
only showing top 5 rows
anonym: org.apache.spark.sql.DataFrame = [id: bigint, platform: string ... 3 more fields]
spark.catalog.listTables().show() // look at the available tables
+--------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+--------------------+--------+-----------+---------+-----------+
| cities_csv| default| null| EXTERNAL| false|
| cleaned_taxes| default| null| MANAGED| false|
|commdettrumpclint...| default| null| MANAGED| false|
| donaldtrumptweets| default| null| EXTERNAL| false|
| linkage| default| null| EXTERNAL| false|
| nations| default| null| EXTERNAL| false|
| newmplist| default| null| EXTERNAL| false|
| ny_baby_names| default| null| MANAGED| false|
| nzmpsandparty| default| null| EXTERNAL| false|
| pos_neg_category| default| null| EXTERNAL| false|
| rna| default| null| MANAGED| false|
| samh| default| null| EXTERNAL| false|
| social_media_usage| default| null| EXTERNAL| false|
| table1| default| null| EXTERNAL| false|
| test_table| default| null| EXTERNAL| false|
| uscites| default| null| EXTERNAL| false|
+--------------------+--------+-----------+---------+-----------+
// Register table for Spark SQL, we also import "month" function import org.apache.spark.sql.functions.month anonym.createOrReplaceTempView("anonym")
import org.apache.spark.sql.functions.month
%sql -- Interesting. Now let's do some aggregation. Display platform, month, visits -- Date column allows us to extract month directly select platform, month(date) as month, sum(visits) as visits from anonym group by platform, month(date)
import org.apache.spark.sql.functions.{dayofmonth, month, row_number, sum} import org.apache.spark.sql.expressions.Window val coolDF = anonym.select($"id", $"platform", dayofmonth($"date").as("day"), month($"date").as("month"), $"visits"). groupBy($"id", $"platform", $"day", $"month").agg(sum("visits").as("visits")) // Run window aggregation on visits per month and platform val window = coolDF.select($"id", $"day", $"visits", sum($"visits").over(Window.partitionBy("platform", "month")).as("monthly_visits")) // Create and register percent table val percent = window.select($"id", $"day", ($"visits" / $"monthly_visits").as("percent")) percent.createOrReplaceTempView("percent")
import org.apache.spark.sql.functions.{dayofmonth, month, row_number, sum}
import org.apache.spark.sql.expressions.Window
coolDF: org.apache.spark.sql.DataFrame = [id: bigint, platform: string ... 3 more fields]
window: org.apache.spark.sql.DataFrame = [id: bigint, day: int ... 2 more fields]
percent: org.apache.spark.sql.DataFrame = [id: bigint, day: int ... 1 more field]
SDS-2.2-360-in-525-01: Intro to Apache Spark for data Scientists
SDS-2.2, Scalable Data Science
Last refresh: Never