ScaDaMaLe Course site and book

Tweet Streaming Collector - Track & Follow

In the previous notebook we were capturing tweets from the public streams under the assumption that it is a random sample of roughly 1% of all tweets.

In this notebook, we can modify the collector to focus on specific communications of interest to us. Specifically, by including:

  • a list of strings to track and
  • a list of twitter user-IDs of interest to follow.

For this we will first %run the ExtendedTwitterUtils and TTTDFfunctions notebooks.

"./025_a_extendedTwitterUtils2run"
"./025_b_TTTDFfunctions"
import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.

defined class ExtendedTwitterReceiver
defined class ExtendedTwitterInputDStream
// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) } 
USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.ColumnName
import org.apache.spark.sql.DataFrame
fromParquetFile2DF: (InputDFAsParquetFilePatternString: String)org.apache.spark.sql.DataFrame
tweetsJsonStringDF2TweetsDF: (tweetsAsJsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsIDLong_JsonStringPairDF2TweetsDF: (tweetsAsIDLong_JsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDF: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHashtags: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
defined object ExtendedTwitterUtils
done running the extendedTwitterUtils2run notebook - ready to stream from twitter
tweetsDF2TTTDFLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHashtagsLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame

Load your twitter credentials (secretly!).

Enter your Twitter API Credentials.

  • Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
  • Run the code in a cell to Enter your own credentials.
// put your own twitter developer credentials below instead of xxx
// instead of the '%run ".../secrets/026_secret_MyTwitterOAuthCredentials"' below
// you need to copy-paste the following code-block with your own Twitter credentials replacing XXXX


// put your own twitter developer credentials below 

import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder


// These have been regenerated!!! - need to chane them

def myAPIKey       = "XXXX" // APIKey 
def myAPISecret    = "XXXX" // APISecretKey
def myAccessToken          = "XXXX" // AccessToken
def myAccessTokenSecret    = "XXXX" // AccessTokenSecret


System.setProperty("twitter4j.oauth.consumerKey", myAPIKey)
System.setProperty("twitter4j.oauth.consumerSecret", myAPISecret)
System.setProperty("twitter4j.oauth.accessToken", myAccessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", myAccessTokenSecret)

println("twitter OAuth Credentials loaded")

The cell-below will not expose my Twitter API Credentials: myAPIKey, myAPISecret, myAccessToken and myAccessTokenSecret. Use the code above to enter your own credentials in a scala cell.

"Users/raazesh.sainudiin@math.uu.se/scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"

Using Twitter REST API

Next we import and instantiate for Twitter REST API, which allows us to obtain data from Twitter that is not in the live stream but in Twitter's storage layers containing archives of historcial events (including past status updates).

// SOME IMPORTTS
import scala.collection.mutable.ArrayBuffer
import twitter4j._
import twitter4j.conf._
import scala.collection.JavaConverters._ 

import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType, StructField, StringType};
import twitter4j.RateLimitStatus;
import twitter4j.ResponseList;
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.google.gson.Gson
import org.apache.spark.sql.DataFrame

val cb = new ConfigurationBuilder()       

val twitter = {
  val c = new ConfigurationBuilder
    c.setDebugEnabled(false)
    .setOAuthConsumerKey(myAPIKey)
    .setOAuthConsumerSecret(myAPISecret)
    .setOAuthAccessToken(myAccessToken)
    .setOAuthAccessTokenSecret(myAccessTokenSecret);

  new TwitterFactory(c.build()).getInstance()
}
import scala.collection.mutable.ArrayBuffer
import twitter4j._
import twitter4j.conf._
import scala.collection.JavaConverters._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import twitter4j.RateLimitStatus
import twitter4j.ResponseList
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.google.gson.Gson
import org.apache.spark.sql.DataFrame
cb: twitter4j.conf.ConfigurationBuilder = twitter4j.conf.ConfigurationBuilder@4559b2a7
twitter: twitter4j.Twitter = TwitterImpl{INCLUDE_MY_RETWEET=HttpParameter{name='include_my_retweet', value='true', jsonObject=null, file=null, fileBody=null}}

Testing REST API

Let's quickly test that the REST API calls can be made.

twitter.showUser("@raazozone").getId() // quick test that REST API works - should get 4173723312
res4: Long = 4173723312
twitter.showUser("@realDonaldTrump").getId() // quick test that REST API works - should get 25073877
res5: Long = 25073877
twitter.showUser("@WASP_Research").getId() // quick test that REST API works - should get ?
res6: Long = 1124265687755755520

Creating ScreenNames of Interest

Let's import a list of twitterIDS of interest to us...

val screenNamesOfInterest = List("raazozone","realDonaldTrump","WASP_Research") // could be done from a large list of up to 4,000 or so accounts
screenNamesOfInterest: List[String] = List(raazozone, realDonaldTrump, WASP_Research)
def lookupUserSNs(Retweeterids:Seq[String])={
  val grouped=Retweeterids.grouped(100).toList 
  for {group<-grouped  
       users=twitter.lookupUsers(group:_*)
       user<-users.asScala 
   } yield user     
}// we loose some suspended accounts...

def lookupUsers(Retweeterids:Seq[Long])={
  val grouped=Retweeterids.grouped(100).toList 
  for {group<-grouped  
       users=twitter.lookupUsers(group:_*)
       user<-users.asScala 
   } yield user     
}// we loose some suspended accounts...
lookupUserSNs: (Retweeterids: Seq[String])List[twitter4j.User]
lookupUsers: (Retweeterids: Seq[Long])List[twitter4j.User]
val twitterUsersOfInterest = lookupUserSNs(screenNamesOfInterest) // not displayed
println(twitterUsersOfInterest.size, screenNamesOfInterest.size) // we could lose users due to suspended accounts etc...
(3,3)
// just get their IDs
val seedIDs = twitterUsersOfInterest.map(u => u.getId()).toSet.toSeq.filter(_.isValidLong) // just get the IDs of the seed users who are valid
seedIDs: Seq[Long] = Vector(4173723312, 25073877, 1124265687755755520)

Extending streamFunc to Track & Follow

Now, let's extend our function to be able to track and follow.

import com.google.gson.Gson 
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory
val batchInterval = 1 // in minutes
val timeoutJobLength =  batchInterval * 5

var newContextCreated = false
var numTweetsCollected = 0L // track number of tweets collected
//val conf = new SparkConf().setAppName("TrackedTweetCollector").setMaster("local")
// This is the function that creates the SteamingContext and sets up the Spark Streaming job.
def streamFunc(): StreamingContext = {
  // Create a Spark Streaming Context.
  val ssc = new StreamingContext(sc, Minutes(batchInterval))
  // Create the OAuth Twitter credentials 
  val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
  
  val track = List("WASP rules!", "#MakeDataGreatAgain","sds-3-x rules!")//, "Hi")// just added for some live tests
  //val track = List.empty // if you do not want to track by any string
  
  val follow = seedIDs // who to follow in Twitter
  //val follow = List.empty // if you do not want to folow any specific twitter user
  
  // Create a Twitter Stream for the input source.  
  val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth, track, follow)
  // Transform the discrete RDDs into JSON
  val twitterStreamJson = twitterStream.map(x => { val gson = new Gson();
                                                 val xJson = gson.toJson(x)
                                                 xJson
                                               }) 
  // take care
  val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream.
  
  // what we want done with each discrete RDD tuple: (rdd, time)
  twitterStreamJson.foreachRDD((rdd, time) => { // for each filtered RDD in the DStream
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
        // to write to parquet directly in append mode in one directory per 'time'------------       
        val outputDF = outputRDD.toDF("tweetAsJsonString")
        // get some time fields from current `.Date()`
        val year = (new java.text.SimpleDateFormat("yyyy")).format(new java.util.Date())
        val month = (new java.text.SimpleDateFormat("MM")).format(new java.util.Date())
        val day = (new java.text.SimpleDateFormat("dd")).format(new java.util.Date())
        val hour = (new java.text.SimpleDateFormat("HH")).format(new java.util.Date())
        // write to a file with a clear time-based hierarchical directory structure for example
        outputDF.write.mode(SaveMode.Append)
                .parquet(outputDirectoryRoot+ "/"+ year + "/" + month + "/" + day + "/" + hour + "/" + time.milliseconds) 
        // end of writing as parquet file-------------------------------------
        numTweetsCollected += count // update with the latest count
      }
  })
  newContextCreated = true
  ssc
}
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
outputDirectoryRoot: String = /datasets/tweetsStreamTmp
batchInterval: Int = 1
timeoutJobLength: Int = 5
newContextCreated: Boolean = false
numTweetsCollected: Long = 0
streamFunc: ()org.apache.spark.streaming.StreamingContext
twitter OAuth Credentials loaded
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
myAPIKey: String
myAPISecret: String
myAccessToken: String
myAccessTokenSecret: String

Collect, Verify and Explore

Let us collect data, store it and then explore it to see how our new experimental design changes to the Tweet collector actually performs.

You should ideally be doing live Tweets using your Twitter accounts as you are doing the experiment so you can confirm that your collector actually captures what you inted to. For example, retweet a userID in the list of userIDs you are following to check if this retweet ends up in your collector as expected. You can also tweet string of interest in the list of strings you are tracking.

val ssc = StreamingContext.getActiveOrCreate(streamFunc)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@54fb3d50
ssc.start()
//ssc.awaitTerminationOrTimeout(timeoutJobLength) // you only need one of these to start
display(dbutils.fs.ls("/datasets/tweetsStreamTmp/2020/11/20/10/"))
path name size
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605868860000/ 1605868860000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605868920000/ 1605868920000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605868980000/ 1605868980000/ 0.0
display(dbutils.fs.ls(outputDirectoryRoot+"/2020/11/20/10/1605868860000/")) // keep adding sub-dirs and descent into time-tree'd directory hierarchy
path name size
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605868860000/_SUCCESS _SUCCESS 0.0
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605868860000/_committed_7377493726042826738 _committed_7377493726042826738 125.0
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605868860000/_started_7377493726042826738 _started_7377493726042826738 0.0
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605868860000/part-00000-tid-7377493726042826738-363f9903-0ada-41ff-92fd-c87b1349727d-6412-1-c000.snappy.parquet part-00000-tid-7377493726042826738-363f9903-0ada-41ff-92fd-c87b1349727d-6412-1-c000.snappy.parquet 95863.0
val rawDF = fromParquetFile2DF(outputDirectoryRoot+"/2020/11/*/*/*/*") //.cache()
rawDF.count
rawDF: org.apache.spark.sql.DataFrame = [tweetAsJsonString: string]
res15: Long = 437
val TTTsDF = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(rawDF)).cache()

Collect for a few minutes so all fields are availabale in the Tweets... otherwise you will get errors like this (which may be unavidable if what you are tracking has no geoLocation information for example, only a small fraction of Tweets have this information):

"org.apache.spark.sql.AnalysisException: cannot resolve 'geoLocation.latitude' given input columns: [contributorsIDs, createdAt, currentUserRetweetId, displayTextRangeEnd, displayTextRangeStart, favoriteCount, hashtagEntities, id, inReplyToScreenName, inReplyToStatusId, inReplyToUserId, isFavorited, isPossiblySensitive, isRetweeted, isTruncated, lang, mediaEntities, place, quotedStatus, quotedStatusId, quotedStatusPermalink, retweetCount, retweetedStatus, source, symbolEntities, text, urlEntities, user, userMentionEntities, withheldInCountries];;"

We can parse more robustly... but let's go and modify the function so it does not look for the missing fields in these tweeets...

NOTE In certain experiments all the user-IDs being tracked may not have enabled geoLocation information. In such cases, you can use the tweetsDF2TTTDFLightWeight to obtain the TTTDF from the raw tweets.

"./025_b_TTTDFfunctions"
val TTTsDF = tweetsDF2TTTDFLightWeight(tweetsJsonStringDF2TweetsDF(rawDF)).cache()
TTTsDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 33 more fields]
display(TTTsDF)  // output not displayed to comply with Twitter Developer rules
// this will make sure all streaming job in the cluster are stopped 
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } 
USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.ColumnName
import org.apache.spark.sql.DataFrame
fromParquetFile2DF: (InputDFAsParquetFilePatternString: String)org.apache.spark.sql.DataFrame
tweetsJsonStringDF2TweetsDF: (tweetsAsJsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsIDLong_JsonStringPairDF2TweetsDF: (tweetsAsIDLong_JsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDF: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHashtags: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
val a = TTTsDF.filter($"CurrentTweet" contains "WASP rules!")//.collect()
a: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 33 more fields]
tweetsDF2TTTDFLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHashtagsLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
display(a)
val b = TTTsDF.filter($"CurrentTweet" contains "#MakeDataGreatAgain")//.collect()
display(b) // output not displayed to comply with Twitter Developer rules
// this will make sure all streaming job in the cluster are stopped - raaz
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } 
// this will delete what we collected to keep the disk usage tight and tidy
dbutils.fs.rm(outputDirectoryRoot, true) 
res25: Boolean = true