// Databricks notebook source exported at Sun, 26 Jun 2016 01:43:20 UTC

Scalable Data Science

Course Project by Akinwande Atanda

supported by and

The html source url of this databricks notebook and its recorded Uji Image of Uji, Dogen's Time-Being:

sds/uji/studentProjects/02_AkinwandeAtanda/Tweet_Analytics/043_TA02_ETL_Tweets

#Tweet Analytics

Presentation contents.

Extract-Transform-Load (ETL) Processing of Streamed Tweets

This notebook should be runned after the tweets have been collected in batches. The operations performed in this notebooks are:

  • Read/Load the Streamed Tweets in batches of RDD
  • Read/Load the Streamed Tweets in merged batches of RDDs
  • Save the Tweets in Parquet format, convert to Dataframe Table and run SQL queries
  • Explore the Streamed Tweets using SQL queries: Filter, Plot and Re-Shape

display(dbutils.fs.ls(s"/mnt/s3Data/filteredTweet"))


val tweet = sc.textFile(s"dbfs:/mnt/s3Data/filteredTweet/tweets_1463600700000").take(1)


val tweetDF = sqlContext.read.json(s"dbfs:/mnt/s3Data/filteredTweet/tweets_1463600700000")



tweetDF.show()


tweetDF.printSchema()


tweetDF.select("text").show


tweetDF.select("createdAt", "text", "favoriteCount","retweetCount").show


tweetDF.select(tweetDF("createdAt"), tweetDF("text"), tweetDF("favoriteCount")+1,tweetDF("retweetCount")+2).show


tweetDF.select(tweetDF("createdAt"), tweetDF("text"), tweetDF("favoriteCount")>1,tweetDF("retweetCount")).show


tweetDF.groupBy("favoriteCount").count().show


tweetDF.groupBy("retweetCount").count().show


tweetDF.groupBy("createdAt").count().show


tweetDF.groupBy("user").count().show


tweetDF.filter(tweetDF("favoriteCount")<1).show


tweetDF.filter(tweetDF("tex")==="trump").show


import sqlContext.implicits._


val tweedDS = tweetDF.select("text").show

Merged RDD Streams


val tweetDF2 = sqlContext.read.json(s"dbfs:/mnt/s3Data/filteredTweet/*")



tweetDF2.show


tweetDF2.select("createdAt","text").show


tweetDF2.groupBy("createdAt").count.show


tweetDF2.groupBy("favoriteCount").count.show


tweetDF2.groupBy("retweetCount").count.show


tweetDF2.groupBy("text").count.show

Saving the Tweet as a Table and Run SQL Queries

  • Unmerged Tweet (tweetDF)

tweetDF.registerTempTable("tweetTable") // Register the DataFrames as a table.


val tweetText = sqlContext.sql("SELECT text FROM tweetTable") //Run SQL query


tweetText.take(1).foreach(println)


tweetText.map(t=>(t,1)).take(1).foreach(println)

Merged RDD Streams


val tweetDF2 = sqlContext.read.json(s"dbfs:/mnt/s3Data/filteredTweet/*")

Saving the Tweet as a Table and Run SQL Queries

Parquet Data format: Save, Preview from the Tables menu, and Query directly without transforming to DataFrame


tweetDF2.select("createdAt", "text", "favoriteCount","retweetCount").write.save("filterTweet.parquet")  
//Save the filter Tweet as parquest data format and go to create table to load it from the directory.


val mergedTweets = sqlContext.read.format("parquet").load("dbfs:/filterTweet.parquet") 
//This reads all the tweets in the parquet data format


mergedTweets.registerTempTable("mergedTweetsTable") // Save as a Table


val mergedTweetQuery = sqlContext.sql("SELECT * FROM mergedTweetsTable")

Use SQL syntax to extract required fileds from the registered table


%sql SELECT * FROM mergedTweetsTable LIMIT 1 


mergedTweetQuery.cache


mergedTweetQuery.map(c=>c(1)).foreach(println)


mergedTweetQuery.take(1)


val mergedTextQuery = sqlContext.sql("SELECT text FROM mergedTweetsTable").cache


mergedTextQuery.map(c=>c).take(1).foreach(println)


dbutils.fs.help


display(dbutils.fs.ls("dbfs:/"))


dbutils.fs.rm("dbfs:/filterTweet.parquet",recurse=true)


display(dbutils.fs.ls("dbfs:/"))


//display(dbutils.fs.ls("dbfs:/mnt/s3Data/filteredTweet/Trumps.txt"))


//dbutils.fs.rm("dbfs:/mnt/s3Data/filteredTweet/Trumps.txt",recurse=true)

ETL Operationalization: Actual Tweeets Project


//dbutils.fs.rm(s"dbfs:/mnt/s3Data/TrumpTweetText.parquet",recurse=true)


display(dbutils.fs.ls(s"dbfs:/mnt/s3Data"))


dbutils.fs.rm(s"dbfs:/mnt/s3Data/tweetAll.parquet",recurse=true)


val tweetAll = sqlContext.read.json(s"dbfs:/mnt/s3Data/twitterNew/*")
tweetAll.cache


//Save the filter Tweet as parquest data format and go to create table to load it from the directory.
tweetAll.select("createdAt", "text", "favoriteCount","retweetCount").write.save(s"dbfs:/mnt/s3Data/tweetAll.parquet")  



val mergedTweetAll = sqlContext.read.format("parquet").load(s"dbfs:/mnt/s3Data/tweetAll.parquet") //This reads all the tweets in the parquet data format


mergedTweetAll.registerTempTable("mergedTweetTable")


%sql SELECT * FROM mergedTweetTable LIMIT 1


mergedTweetAll.show

Returns the number of Tweets in the merged dataset (26.3million Tweets as at 3.30p.m today)


%sql SELECT COUNT(text) as TweetCount FROM mergedTweetTable 

Returns the number of Tweets in the merged dataset group and sort by Date of Created Tweet


%sql SELECT COUNT(text) as TweetCount, createdAt FROM mergedTweetTable group by createdAt order by createdAt asc


%sql SELECT COUNT(text) as TweetCount, createdAt as DStreamTime FROM mergedTweetTable group by createdAt order by createdAt asc


%sql SELECT distinct createdAt FROM mergedTweetTable where favoriteCount == 1

Filter Query by Keywords


%sql SELECT count(*) as TrumpsTweet FROM mergedTweetTable where text like "%trump%"


%sql SELECT COUNT(text) as TrumpsTweetCount, createdAt as DStreamTime FROM mergedTweetTable where text like "%trump%" group by createdAt order by createdAt asc


%sql SELECT createdAt as DStreamTime, text as TrumpsTweet FROM mergedTweetTable where text like "%trump%" order by createdAt asc limit 1


val TrumpTextQuery = sqlContext.sql("SELECT createdAt as date, text as review, favoriteCount as category FROM mergedTweetTable where text like '%trump%' order by createdAt asc").cache


val TrumpQuery = sqlContext.sql("SELECT createdAt as date, text as review, CAST(favoriteCount as FLOAT) as category FROM mergedTweetTable where text like '%trump%' order by createdAt asc").cache


TrumpTextQuery.select("date", "review", "category").write.save(s"dbfs:/mnt/s3Data/TrumpTweet.parquet")


TrumpQuery.select("date", "review", "category").write.save(s"dbfs:/mnt/s3Data/TrumpSentiment.parquet")


TrumpTextQuery.registerTempTable("TrumpTweetTable")


%sql ALTER TABLE TrumpTweetTable ALTER COLUMN category 


%sql SELECT * FROM TrumpTweetTable limit 3


TrumpTextQuery.count //Returns the number of tweets 


TrumpTextQuery.take(3).foreach(println)

Scalable Data Science

Course Project by Akinwande Atanda

supported by and

Updated: