029_TweetLanguageClassifier(Scala)

Loading...

ScaDaMaLe Course site and book

Twitter Streaming Language Classifier

This is a databricksification of https://databricks.gitbooks.io/databricks-spark-reference-applications/content/twitter_classifier/index.html by Amendra Shreshta.

Note that you need to change the fields in background notebook 025_a_extendedTwitterUtils2run so more fields for lang in Tweets are exposed. This is a good example of how one has to go deeper into the java code of twitter4j as new needs arise.

%run "./025_c_extendedTwitterUtils2runWithLangs"
Show result
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
 
import scala.math.Ordering
 
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark._ import org.apache.spark.storage._ import org.apache.spark.streaming._ import scala.math.Ordering import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder

Enter your Twitter API Credentials.

  • Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
  • Run the code in a cell to Enter your own credentials.
// put your own twitter developer credentials below instead of xxx
// instead of the '%run ".../secrets/026_secret_MyTwitterOAuthCredentials"' below
// you need to copy-paste the following code-block with your own Twitter credentials replacing XXXX


// put your own twitter developer credentials below 

import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder


// These have been regenerated!!! - need to chane them

def myAPIKey       = "XXXX" // APIKey 
def myAPISecret    = "XXXX" // APISecretKey
def myAccessToken          = "XXXX" // AccessToken
def myAccessTokenSecret    = "XXXX" // AccessTokenSecret


System.setProperty("twitter4j.oauth.consumerKey", myAPIKey)
System.setProperty("twitter4j.oauth.consumerSecret", myAPISecret)
System.setProperty("twitter4j.oauth.accessToken", myAccessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", myAccessTokenSecret)

println("twitter OAuth Credentials loaded")

The cell-below will not expose my Twitter API Credentials: myAPIKey, myAPISecret, myAccessToken and myAccessTokenSecret. Use the code above to enter your own credentials in a scala cell.

%run "Users/raazesh.sainudiin@math.uu.se/scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"
twitter OAuth Credentials loaded import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder myAPIKey: String myAPISecret: String myAccessToken: String myAccessTokenSecret: String

Step 1. Collect Data

Start downloading tweets in order to start building a simple model for language classification.

// ## Let's create a directory in dbfs for storing tweets in the cluster's distributed file system.
val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory
outputDirectoryRoot: String = /datasets/tweetsStreamTmp
// to remove a pre-existing directory and start from scratch uncomment next line and evaluate this cell
dbutils.fs.rm(outputDirectoryRoot, true) 
res2: Boolean = true
// ## Capture tweets in every sliding window of slideInterval many milliseconds.
val slideInterval = new Duration(1 * 1000) // 1 * 1000 = 1000 milli-seconds = 1 sec
slideInterval: org.apache.spark.streaming.Duration = 1000 ms
// Our goal is to take each RDD in the twitter DStream and write it as a json file in our dbfs.
// Create a Spark Streaming Context.
val ssc = new StreamingContext(sc, slideInterval)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@466128fb
// Create a Twitter Stream for the input source. 
val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
auth: Some[twitter4j.auth.OAuthAuthorization] = Some(OAuthAuthorization{consumerKey='8uN0N9RTLT1viaR811yyG7xwk', consumerSecret='******************************************', oauthToken=AccessToken{screenName='null', userId=4173723312}}) twitterStream: org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status] = ExtendedTwitterInputDStream@206a254d
// Let's import google's json library next.
import com.google.gson.Gson 
//Let's map the tweets into json formatted string (one tweet per line).
val twitterStreamJson = twitterStream.map(
                                            x => { val gson = new Gson();
                                                 val xJson = gson.toJson(x)
                                                 xJson
                                                 }
                                          ) 
import com.google.gson.Gson twitterStreamJson: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@20405bce
val partitionsEachInterval = 1 
 
val batchInterval = 1 // in minutes
val timeoutJobLength =  batchInterval * 5
 
var newContextCreated = false
var numTweetsCollected = 0L // track number of tweets collected
 
twitterStreamJson.foreachRDD((rdd, time) => { // for each filtered RDD in the DStream
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
        // to write to parquet directly in append mode in one directory per 'time'------------       
        val outputDF = outputRDD.toDF("tweetAsJsonString")
        // get some time fields from current `.Date()`
        val year = (new java.text.SimpleDateFormat("yyyy")).format(new java.util.Date())
        val month = (new java.text.SimpleDateFormat("MM")).format(new java.util.Date())
        val day = (new java.text.SimpleDateFormat("dd")).format(new java.util.Date())
        val hour = (new java.text.SimpleDateFormat("HH")).format(new java.util.Date())
        // write to a file with a clear time-based hierarchical directory structure for example
        outputDF.write.mode(SaveMode.Append)
                .parquet(outputDirectoryRoot+ "/"+ year + "/" + month + "/" + day + "/" + hour + "/" + time.milliseconds) 
        // end of writing as parquet file-------------------------------------
        numTweetsCollected += count // update with the latest count
      }
  })
partitionsEachInterval: Int = 1 batchInterval: Int = 1 timeoutJobLength: Int = 5 newContextCreated: Boolean = false numTweetsCollected: Long = 0
// ## Let's start the spark streaming context we have created next.
ssc.start()
// total tweets downloaded
numTweetsCollected
res14: Long = 6936
// ## Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.
// #  let's stop the streaming job next.
ssc.stop(stopSparkContext = false) 
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } 

Step 2: Explore Data

%run "./025_b_TTTDFfunctions"
Show result
// #Let's examine what was saved in dbfs
display(dbutils.fs.ls(outputDirectoryRoot))
 
path
name
size
1
dbfs:/datasets/tweetsStreamTmp/2020/
2020/
0

Showing all 1 rows.