// Databricks notebook source exported at Sat, 18 Jun 2016 05:09:42 UTC
Scalable Data Science
prepared by Raazesh Sainudiin and Sivanand Sivaram
The html source url of this databricks notebook and its recorded Uji :
Word Count on US State of the Union (SoU) Addresses
- Word Count in big data is the equivalent of ‘Hello World’ in programming
- We count the number of occurences of each word in the first and last (2016) SoU addresses.
An interesting analysis of the textual content of the State of the Union (SoU) addresses by all US presidents was done in:
Fig. 5. A river network captures the flow across history of US political discourse, as perceived by contemporaries. Time moves along the x axis. Clusters on semantic networks of 300 most frequent terms for each of 10 historical periods are displayed as vertical bars. Relations between clusters of adjacent periods are indexed by gray flows, whose density reflects their degree of connection. Streams that connect at any point in history may be considered to be part of the same system, indicated with a single color.
Let us investigate this dataset ourselves!
- We first get the source text data by scraping and parsig from http://stateoftheunion.onetwothree.net/texts/index.html as explained in
scraping and parsing SoU addresses.
- This data is already made available in DBFS, our distributed file system.
- We only do the simplest word count with this data in this notebook and will do more sophisticated analyses in the sequel (including topic modeling, etc).
Key Data Management Concepts
The Structure Spectrum
(watch now 1:10):
Here we will be working with unstructured or schema-never data (plain text files). ***
Files
(watch later 1:43):
###DBFS and dbutils - where is this dataset in our distributed file system?
- Since we are on the databricks cloud, it has a file system called DBFS
- DBFS is similar to HDFS, the Hadoop distributed file system
- dbutils allows us to interact with dbfs.
- The ‘display’ command displays the list of files in a given directory in the file system.
display(dbutils.fs.ls("dbfs:/datasets/sou")) // Cntrl+Enter to display the files in dbfs:/datasets/sou
Let us display the head or the first few lines of the file dbfs:/datasets/sou/17900108.txt
to see what it contains using dbutils.fs.head
method.
head(file: String, maxBytes: int = 65536): String
-> Returns up to the first ‘maxBytes’ bytes of the given file as a String encoded in UTF-8
as follows:
dbutils.fs.head("dbfs:/datasets/sou/17900108.txt",673) // Cntrl+Enter to get the first 673 bytes of the file (which corresponds to the first five lines)
You Try!
Modify ``xxxx` in the cell below to read the first 1000 bytes from the file.
dbutils.fs.head("dbfs:/datasets/sou/17900108.txt", xxxx) // Cntrl+Enter to get the first 1000 bytes of the file
Read the file into Spark Context as an RDD of Strings
- The
textFile
method on the availableSparkContext
sc
can read the text filedbfs:/datasets/sou/17900108.txt
into Spark and create an RDD of Strings- but this is done lazily until an action is taken on the RDD
sou17900108
!
- but this is done lazily until an action is taken on the RDD
val sou17900108 = sc.textFile("dbfs:/datasets/sou/17900108.txt") // Cntrl+Enter to read in the textfile as RDD[String]
Perform some actions on the RDD
- Each String in the RDD
sou17900108
represents one line of data from the file and can be made to perform one of the following actions:- count the number of elements in the RDD
sou17900108
(i.e., the number of lines in the text filedbfs:/datasets/sou/17900108.txt
) usingsou17900108.count()
- display the contents of the RDD using
take
orcollect
.
- count the number of elements in the RDD
sou17900108.count() // <Shift+Enter> to count the number of elements in the RDD
sou17900108.take(5) // <Shift+Enter> to display the first 5 elements of RDD
sou17900108.take(5).foreach(println) // <Shift+Enter> to display the first 5 elements of RDD line by line
sou17900108.collect // <Cntrl+Enter> to display all the elements of RDD
Cache the RDD in (distributed) memory to avoid recreating it for each action
- Above, every time we took an action on the same RDD, the RDD was reconstructed from the textfile.
- Spark’s advantage compared to Hadoop MapReduce is the ability to cache or store the RDD in distributed memory across the nodes.
- Let’s use
.cache()
after creating an RDD so that it is in memory after the first action (and thus avoid reconstruction for subsequent actions).- count the number of elements in the RDD
sou17900108
(i.e., the number of lines in the text filedbfs:/datasets/sou/17900108.txt
) usingsou17900108.count()
- display the contents of the RDD using
take
orcollect
.
- count the number of elements in the RDD
// Shift+Enter to read in the textfile as RDD[String] and cache it in distributed memory
val sou17900108 = sc.textFile("dbfs:/datasets/sou/17900108.txt")
sou17900108.cache() // cache the RDD in memory
sou17900108.count() // Shift+Enter during this count action the RDD is constructed from texfile and cached
sou17900108.count() // Shift+Enter during this count action the cached RDD is used (notice less time taken by the same command)
sou17900108.take(5) // <Cntrl+Enter> to display the first 5 elements of the cached RDD
Lifecycle of a Spark Program
(watch now 0:23):
Summary
- create RDDs from:
- some external data source (such as a distributed file system)
- parallelized collection in your driver program
- lazily transform these RDDs into new RDDs
- cache some of those RDDs for future reuse
- you perform actions to execute parallel computation to produce results
Transform lines to words
- We need to loop through each line and split the line into words
- For now, let us split using whitespace
- More sophisticated regular expressions can be used to split the line (as we will see soon)
sou17900108
.flatMap(line => line.split(" "))
.take(100)
Naive word count
At a first glace, to do a word count of George Washingtons SoU address, we are templed to do the following:
- just break each line by the whitespace character “ “ and find the words using a
flatMap
- then do the
map
with the closureword => (word, 1)
to initialize eachword
with a integer count of1
- ie., transform each word to a (key, value) pair or
Tuple
such as(word, 1)
- ie., transform each word to a (key, value) pair or
- then count all values with the same key (
word
is the Key in our case) by doing areduceByKey(_+_)
- and finally
collect()
to display the results.
sou17900108
.flatMap( line => line.split(" ") )
.map( word => (word, 1) )
.reduceByKey(_+_)
.collect()
Unfortunately, as you can see from the collect
above:
- the words have punctuations at the end which means that the same words are being counted as different words. Eg: importance
- empty words are being counted
So we need a bit of regex
‘ing or regular-expression matching (all readily available from Scala via Java String types).
We will cover the three things we want to do with a simple example from Middle Earth!
- replace all multiple whitespace characters with one white space character “ “
- replace all punction characters we specify within
[
and]
such as[,?.!:;]
by the empty string “” (i.e., remove these punctuation characters) - convert everything to lower-case.
val example = "Master, Master! It's me, Sméagol... mhrhm*%* But they took away our precious, they wronged us. Gollum will protect us..., Master, it's me Sméagol."
example.replaceAll("\\s+", " ") //replace multiple whitespace characters (including space, tab, new line, etc.) with one whitespace " "
.replaceAll("""([,?.!:;])""", "") // replace the following punctions characters: , ? . ! : ; . with the empty string ""
.toLowerCase() // converting to lower-case
More sophisticated word count
We are now ready to do a word count of George Washington’s SoU on January 8th 1790 as follows:
val wordCount_sou17900108 =
sou17900108
.flatMap(line =>
line.replaceAll("\\s+", " ") //replace multiple whitespace characters (including space, tab, new line, etc.) with one whitespace " "
.replaceAll("""([,?.!:;])""", "") // replace the following punctions characters: , ? . ! : ; . with the empty string ""
.toLowerCase() // converting to lower-case
.split(" "))
.map(x => (x, 1))
.reduceByKey(_+_)
wordCount_sou17900108.collect()
val top10 = wordCount_sou17900108.sortBy(_._2, false).collect()
Doing it all together for George Washington and Barrack Obama
//sc.textFile("dbfs:/datasets/sou/17900108.txt") // George Washington's first SoU
sc.textFile("dbfs:/datasets/sou/20160112.txt") // Barrack Obama's second SoU
.flatMap(line =>
line.replaceAll("\\s+", " ") //replace multiple whitespace characters (including space, tab, new line, etc.) with one whitespace " "
.replaceAll("""([,?.!:;])""", "") // replace the following punctions characters: , ? . ! : ; . with the empty string ""
.toLowerCase() // converting to lower-case
.split(" "))
.map(x => (x,1))
.reduceByKey(_+_)
.sortBy(_._2, false)
.collect()
Reading all SoUs at once using wholetextFiles
Let us next read all text files (ending with .txt
) in the directory dbfs:/datasets/sou/
at once!
SparkContext.wholeTextFiles
lets you read a directory containing multiple small text files, and returns each of them as (filename, content)
pairs of strings.
This is in contrast with textFile
, which would return one record per line in each file.
val souAll = sc.wholeTextFiles("dbfs:/datasets/sou/*.txt") // Shift+Enter to read all text files in dbfs:/datasets/sou/
souAll.cache() // let's cache this RDD for efficient reuse
souAll.count() // Shift+enter to count the number of entries in RDD[(String,String)]
souAll.count() // Cntrl+Enter to count the number of entries in cached RDD[(String,String)] again (much faster!)
Let’s examine the first two elements of the RDD souAll
.
souAll.take(2) // Cntr+Enter to see the first two elements of souAll
Clearly, the elements are a pair of Strings, where the first String gives the filename and the second String gives the contents in the file.
this can be very helpful to simply loop through the files and take an action, such as counting the number of words per address, as folows:
// this just collects the file names which is the first element of the tuple given by "._1"
souAll.map( fileContentsPair => fileContentsPair._1).collect()
Let us find the number of words in each of the SoU addresses next (we need to work with Strings inside the closure!).
val wcs = souAll.map( fileContentsPair =>
{
val wc = fileContentsPair._2
.replaceAll("\\s+", " ") //replace multiple whitespace characters (including space, tab, new line, etc.) with one whitespace " "
.replaceAll("""([,?.!:;])""", "") // replace the following punctions characters: , ? . ! : ; . with the empty string ""
.toLowerCase() // converting to lower-case
.split(" ") // split each word separated by white space
.size // find the length of array
wc
}
)
wcs.collect()
HOMEWORK
- HOWEWORK WordCount 1:
sortBy
- HOMEWROK WordCount 2:
dbutils.fs
HOMEWORK WordCount 1. sortBy
Let’s understand sortBy
a bit more carefully.
val example = "Master, Master! It's me, Sméagol... mhrhm*%* But they took away our precious, they wronged us. Gollum will protect us..., Master, it's me Sméagol."
val words = example.replaceAll("\\s+", " ") //replace multiple whitespace characters (including space, tab, new line, etc.) with one whitespace " "
.replaceAll("""([,?.!:;])""", "") // replace the following punctions characters: , ? . ! : ; . with the empty string ""
.toLowerCase() // converting to lower-case
.split(" ")
val rddWords = sc.parallelize(words)
rddWords.take(10)
val wordCounts = rddWords
.map(x => (x,1))
.reduceByKey(_+_)
val top10 = wordCounts.sortBy(_._2, false).take(10)
Make your code easy to read for other developers ;)
Use ‘case classes’ with well defined variable names that everyone can understand
val top10 = wordCounts.sortBy({
case (word, count) => count
}, false).take(10)
If you just want a total count of all words in the file
rddWords.count
HOMEWORK WordCount 2: dbutils.fs
Have a brief look at what other commands dbutils.fs supports. We will introduce them as needed.
dbutils.fs.help // some of these were used to ETL this data into dbfs:/datasets/sou