// Databricks notebook source exported at Sun, 26 Jun 2016 01:42:00 UTC

Scalable Data Science

Course Project by Akinwande Atanda

supported by and

The html source url of this databricks notebook and its recorded Uji Image of Uji, Dogen's Time-Being:


#Tweet Analytics

Presentation contents.

Filtered Generic Twitter Collector by Selected Retrieved Keys

Remeber that the use of twitter itself comes with various strings attached.

Crucially, the use of the content from twitter by you (as done in this worksheet) comes with some strings.

import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter.TwitterUtils

import scala.math.Ordering

import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder

Step 1: Enter your Twitter API Credentials.

  • Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
  • Run this cell for the input cells to appear.
  • Enter your credentials.
  • Run the cell again to pick up your defaults.

The cell-below is hidden to not expose the Twitter API Credentials: consumerKey, consumerSecret, accessToken and accessTokenSecret.

System.setProperty("twitter4j.oauth.consumerKey", getArgument("1. Consumer Key (API Key)", ""))
System.setProperty("twitter4j.oauth.consumerSecret", getArgument("2. Consumer Secret (API Secret)", ""))
System.setProperty("twitter4j.oauth.accessToken", getArgument("3. Access Token", ""))
System.setProperty("twitter4j.oauth.accessTokenSecret", getArgument("4. Access Token Secret", ""))

If you see warnings then ignore for now: https://forums.databricks.com/questions/6941/change-in-getargument-for-notebook-input.html.

Step 2: Configure where to output each unfiltered Batches of Tweet Stream and how often to compute them.

  • Run this cell for the input cells to appear.
  • Enter your credentials.
  • Run the cell again to pick up your defaults.

60*60*24*7*1 // (a * b* c* d * e); where a = 60 mini-seconds; b = 60 secods; c = number of hours within a day (range(1,24)); d = numnber of days (range(1,7)); e = number of weeks (range(1,@));

val outputDirectory = getArgument("1. Output Directory", "/twitterNew2")
val slideInterval = new Duration(getArgument("2. Recompute the top hashtags every N seconds", "1").toInt * 1000)
val windowLength = new Duration(getArgument("3. Compute the top hashtags for the last N seconds", "5").toInt * 1000) // not used now
val timeoutJobLength = getArgument("4. Wait this many seconds before stopping the streaming job", "100").toInt * 1000

// Replace with your AWS S3 credentials
// NOTE: Set the access to this notebook appropriately to protect the security of your keys.
// Or you can delete this cell after you run the mount command below once successfully.

val AccessKey = getArgument("1. ACCESS_KEY", "REPLACE_WITH_YOUR_ACCESS_KEY")
val SecretKey = getArgument("2. SECRET_KEY", "REPLACE_WITH_YOUR_SECRET_KEY")
val EncodedSecretKey = SecretKey.replace("/", "%2F")
val AwsBucketName = getArgument("3. S3_BUCKET", "REPLACE_WITH_YOUR_S3_BUCKET")
val MountName = getArgument("4. MNT_NAME", "REPLACE_WITH_YOUR_MOUNT_NAME")
val s3Filename = "tweetDump"

dbutils.fs.unmount(s"/mnt/$MountName") // finally unmount when done

dbutils.fs.mount(s"s3a://$AccessKey:$EncodedSecretKey@$AwsBucketName", s"/mnt/$MountName") // mount for the first time or after unmounting

A directory can be created to save the Tweet Stream


//dbutils.fs.rm("/mnt/$MountName/NAME_OF_DIRECTORY/",recurse=true) //Remove the directory if previously created and no longer required for further use. 


Step 3: Run the Twitter Streaming job.

Create the function to set-up the Streaming Context and the streaming job.

###Filter Set-up

  • Create a class of keys to retrieve from each streamed tweet (in JSON) format
  • Define a function to convert the entire streamed tweets (from twitter4j.Status) to the filtered tweet (“covertToFilter”)

import com.google.gson.Gson // the Library has already been attached to this cluster (show live how to do this from scratch?)

var newContextCreated = false
var num = 0
var numTweetsCollected = 0L // track number of tweets collected

// This is a helper class used for 
object SecondValueOrdering extends Ordering[(String, Int)] {
  def compare(a: (String, Int), b: (String, Int)) = {
    a._2 compare b._2

// This is the function that creates the SteamingContext and sets up the Spark Streaming job.
def creatingFunc(): StreamingContext = {
  // Create a Spark Streaming Context.
  val ssc = new StreamingContext(sc, slideInterval)
  // Create a Twitter Stream for the input source. 
  val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
  val twitterStream = TwitterUtils.createStream(ssc, auth)
  val twitterStreamJson = twitterStream.map(x => { val gson = new Gson();
                                                 val xJson = gson.toJson(x)
  def convertToFilter(status: twitter4j.Status): FilterStatus = {val hashTags: Array[String] =  if(status.getHashtagEntities !=null) status.getHashtagEntities().map(eachHT => eachHT.getText()) else Array[String]()
  val user = status.getUser()
  FilterStatus(id = status.getId.toString,tweets = status.getText,favoriteCount = status.getFavoriteCount,retweetCount = status.getRetweetCount, hashTags = hashTags, screenName = user.getScreenName, userName = user.getName, 
    followersCount = user.getFollowersCount)
val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream.

twitterStream.map(convertToFilter).foreachRDD((rdd, time) => { // for each filtered RDD in the DStream
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
        //outputRDD.saveAsTextFile(s"${outputDirectory}/tweets_" + time.milliseconds.toString) // save as textfile
        outputRDD.saveAsTextFile(s"/mnt/$MountName/${outputDirectory}" + "/tweets_" + time.milliseconds.toString+".txt") // save as textfile in s3
        numTweetsCollected += count // update with the latest count
  newContextCreated = true

case class FilterStatus(screenName:String,userName:String,followersCount:Long,id:String,tweets:String,favoriteCount:Long,retweetCount:Long,hashTags:Array[String]=Array[String]())

Create the StreamingContext using getActiveOrCreate, as required when starting a streaming job in Databricks.

val ssc = StreamingContext.getActiveOrCreate(creatingFunc)

Start the Spark Streaming Context and return when the Streaming job exits or return with the specified timeout.


Stop Streaming and/or Terminate the Spark Streaming Context (=true) and return when the Streaming job exits or return with the specified timeout.

ssc.stop(stopSparkContext = false)

Check out the Clusters ‘Streaming` UI as the job is running.

Stop any active Streaming Contexts, but don’t stop the spark contexts they are attached to.

StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }

Step 4: View the Results.


Read each RDD in the DStream as Text File

val rdd1 = sc.textFile(s"/mnt/s3Data/twitterNew2/tweets_1463704200000.txt/")




5. Read all the RDD as a Whole Text File

//val dStream = sc.wholeTextFiles(s"/mnt/$MountName/${outputDirectory}")
val dStream = sc.textFile(s"/mnt/s3Data/twitterNew2/*.txt/")


dStream.count //This returns the number of events or tweets in all the RDD stream

A better way of Merging the Files.

val dStreamw = sc.wholeTextFiles(s"/mnt/s3Data/twitterNew2/*.txt/")

val dStreamTitle = dStreamw.map(rdd => rdd._1).collect

val dStreamContent = dStreamw.map(rdd => rdd._2)


Scalable Data Science

Course Project by Akinwande Atanda

supported by and