// Databricks notebook source exported at Sun, 26 Jun 2016 01:43:20 UTC
Scalable Data Science
Course Project by Akinwande Atanda
The html source url of this databricks notebook and its recorded Uji :
#Tweet Analytics
Extract-Transform-Load (ETL) Processing of Streamed Tweets
This notebook should be runned after the tweets have been collected in batches. The operations performed in this notebooks are:
- Read/Load the Streamed Tweets in batches of RDD
- Read/Load the Streamed Tweets in merged batches of RDDs
- Save the Tweets in Parquet format, convert to Dataframe Table and run SQL queries
- Explore the Streamed Tweets using SQL queries: Filter, Plot and Re-Shape
display(dbutils.fs.ls(s"/mnt/s3Data/filteredTweet"))
val tweet = sc.textFile(s"dbfs:/mnt/s3Data/filteredTweet/tweets_1463600700000").take(1)
val tweetDF = sqlContext.read.json(s"dbfs:/mnt/s3Data/filteredTweet/tweets_1463600700000")
tweetDF.show()
tweetDF.printSchema()
tweetDF.select("text").show
tweetDF.select("createdAt", "text", "favoriteCount","retweetCount").show
tweetDF.select(tweetDF("createdAt"), tweetDF("text"), tweetDF("favoriteCount")+1,tweetDF("retweetCount")+2).show
tweetDF.select(tweetDF("createdAt"), tweetDF("text"), tweetDF("favoriteCount")>1,tweetDF("retweetCount")).show
tweetDF.groupBy("favoriteCount").count().show
tweetDF.groupBy("retweetCount").count().show
tweetDF.groupBy("createdAt").count().show
tweetDF.groupBy("user").count().show
tweetDF.filter(tweetDF("favoriteCount")<1).show
tweetDF.filter(tweetDF("tex")==="trump").show
import sqlContext.implicits._
val tweedDS = tweetDF.select("text").show
Merged RDD Streams
val tweetDF2 = sqlContext.read.json(s"dbfs:/mnt/s3Data/filteredTweet/*")
tweetDF2.show
tweetDF2.select("createdAt","text").show
tweetDF2.groupBy("createdAt").count.show
tweetDF2.groupBy("favoriteCount").count.show
tweetDF2.groupBy("retweetCount").count.show
tweetDF2.groupBy("text").count.show
Saving the Tweet as a Table and Run SQL Queries
- Unmerged Tweet (tweetDF)
tweetDF.registerTempTable("tweetTable") // Register the DataFrames as a table.
val tweetText = sqlContext.sql("SELECT text FROM tweetTable") //Run SQL query
tweetText.take(1).foreach(println)
tweetText.map(t=>(t,1)).take(1).foreach(println)
Merged RDD Streams
val tweetDF2 = sqlContext.read.json(s"dbfs:/mnt/s3Data/filteredTweet/*")
Saving the Tweet as a Table and Run SQL Queries
Parquet Data format: Save, Preview from the Tables menu, and Query directly without transforming to DataFrame
tweetDF2.select("createdAt", "text", "favoriteCount","retweetCount").write.save("filterTweet.parquet")
//Save the filter Tweet as parquest data format and go to create table to load it from the directory.
val mergedTweets = sqlContext.read.format("parquet").load("dbfs:/filterTweet.parquet")
//This reads all the tweets in the parquet data format
mergedTweets.registerTempTable("mergedTweetsTable") // Save as a Table
val mergedTweetQuery = sqlContext.sql("SELECT * FROM mergedTweetsTable")
Use SQL syntax to extract required fileds from the registered table
%sql SELECT * FROM mergedTweetsTable LIMIT 1
mergedTweetQuery.cache
mergedTweetQuery.map(c=>c(1)).foreach(println)
mergedTweetQuery.take(1)
val mergedTextQuery = sqlContext.sql("SELECT text FROM mergedTweetsTable").cache
mergedTextQuery.map(c=>c).take(1).foreach(println)
dbutils.fs.help
display(dbutils.fs.ls("dbfs:/"))
dbutils.fs.rm("dbfs:/filterTweet.parquet",recurse=true)
display(dbutils.fs.ls("dbfs:/"))
//display(dbutils.fs.ls("dbfs:/mnt/s3Data/filteredTweet/Trumps.txt"))
//dbutils.fs.rm("dbfs:/mnt/s3Data/filteredTweet/Trumps.txt",recurse=true)
ETL Operationalization: Actual Tweeets Project
//dbutils.fs.rm(s"dbfs:/mnt/s3Data/TrumpTweetText.parquet",recurse=true)
display(dbutils.fs.ls(s"dbfs:/mnt/s3Data"))
dbutils.fs.rm(s"dbfs:/mnt/s3Data/tweetAll.parquet",recurse=true)
val tweetAll = sqlContext.read.json(s"dbfs:/mnt/s3Data/twitterNew/*")
tweetAll.cache
//Save the filter Tweet as parquest data format and go to create table to load it from the directory.
tweetAll.select("createdAt", "text", "favoriteCount","retweetCount").write.save(s"dbfs:/mnt/s3Data/tweetAll.parquet")
val mergedTweetAll = sqlContext.read.format("parquet").load(s"dbfs:/mnt/s3Data/tweetAll.parquet") //This reads all the tweets in the parquet data format
mergedTweetAll.registerTempTable("mergedTweetTable")
%sql SELECT * FROM mergedTweetTable LIMIT 1
mergedTweetAll.show
Returns the number of Tweets in the merged dataset (26.3million Tweets as at 3.30p.m today)
%sql SELECT COUNT(text) as TweetCount FROM mergedTweetTable
Returns the number of Tweets in the merged dataset group and sort by Date of Created Tweet
%sql SELECT COUNT(text) as TweetCount, createdAt FROM mergedTweetTable group by createdAt order by createdAt asc
%sql SELECT COUNT(text) as TweetCount, createdAt as DStreamTime FROM mergedTweetTable group by createdAt order by createdAt asc
%sql SELECT distinct createdAt FROM mergedTweetTable where favoriteCount == 1
Filter Query by Keywords
%sql SELECT count(*) as TrumpsTweet FROM mergedTweetTable where text like "%trump%"
%sql SELECT COUNT(text) as TrumpsTweetCount, createdAt as DStreamTime FROM mergedTweetTable where text like "%trump%" group by createdAt order by createdAt asc
%sql SELECT createdAt as DStreamTime, text as TrumpsTweet FROM mergedTweetTable where text like "%trump%" order by createdAt asc limit 1
val TrumpTextQuery = sqlContext.sql("SELECT createdAt as date, text as review, favoriteCount as category FROM mergedTweetTable where text like '%trump%' order by createdAt asc").cache
val TrumpQuery = sqlContext.sql("SELECT createdAt as date, text as review, CAST(favoriteCount as FLOAT) as category FROM mergedTweetTable where text like '%trump%' order by createdAt asc").cache
TrumpTextQuery.select("date", "review", "category").write.save(s"dbfs:/mnt/s3Data/TrumpTweet.parquet")
TrumpQuery.select("date", "review", "category").write.save(s"dbfs:/mnt/s3Data/TrumpSentiment.parquet")
TrumpTextQuery.registerTempTable("TrumpTweetTable")
%sql ALTER TABLE TrumpTweetTable ALTER COLUMN category
%sql SELECT * FROM TrumpTweetTable limit 3
TrumpTextQuery.count //Returns the number of tweets
TrumpTextQuery.take(3).foreach(println)