027_TweetCollectorTrackAndFollow(Scala)

Loading...

ScaDaMaLe Course site and book

Tweet Streaming Collector - Track & Follow

In the previous notebook we were capturing tweets from the public streams under the assumption that it is a random sample of roughly 1% of all tweets.

In this notebook, we can modify the collector to focus on specific communications of interest to us. Specifically, by including:

  • a list of strings to track and
  • a list of twitter user-IDs of interest to follow.

For this we will first %run the ExtendedTwitterUtils and TTTDFfunctions notebooks.

%run "./025_a_extendedTwitterUtils2run"
Show result
%run "./025_b_TTTDFfunctions"
Show result

Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.

// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) } 

Load your twitter credentials (secretly!).

Enter your Twitter API Credentials.

  • Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
  • Run the code in a cell to Enter your own credentials.
// put your own twitter developer credentials below instead of xxx
// instead of the '%run ".../secrets/026_secret_MyTwitterOAuthCredentials"' below
// you need to copy-paste the following code-block with your own Twitter credentials replacing XXXX


// put your own twitter developer credentials below 

import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder


// These have been regenerated!!! - need to chane them

def myAPIKey       = "XXXX" // APIKey 
def myAPISecret    = "XXXX" // APISecretKey
def myAccessToken          = "XXXX" // AccessToken
def myAccessTokenSecret    = "XXXX" // AccessTokenSecret


System.setProperty("twitter4j.oauth.consumerKey", myAPIKey)
System.setProperty("twitter4j.oauth.consumerSecret", myAPISecret)
System.setProperty("twitter4j.oauth.accessToken", myAccessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", myAccessTokenSecret)

println("twitter OAuth Credentials loaded")

The cell-below will not expose my Twitter API Credentials: myAPIKey, myAPISecret, myAccessToken and myAccessTokenSecret. Use the code above to enter your own credentials in a scala cell.

%run "Users/raazesh.sainudiin@math.uu.se/scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"
twitter OAuth Credentials loaded import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder myAPIKey: String myAPISecret: String myAccessToken: String myAccessTokenSecret: String

Using Twitter REST API

Next we import and instantiate for Twitter REST API, which allows us to obtain data from Twitter that is not in the live stream but in Twitter's storage layers containing archives of historcial events (including past status updates).

// SOME IMPORTTS
import scala.collection.mutable.ArrayBuffer
import twitter4j._
import twitter4j.conf._
import scala.collection.JavaConverters._ 
 
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType, StructField, StringType};
import twitter4j.RateLimitStatus;
import twitter4j.ResponseList;
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.google.gson.Gson
import org.apache.spark.sql.DataFrame
 
val cb = new ConfigurationBuilder()       
 
val twitter = {
  val c = new ConfigurationBuilder
    c.setDebugEnabled(false)
    .setOAuthConsumerKey(myAPIKey)
    .setOAuthConsumerSecret(myAPISecret)
    .setOAuthAccessToken(myAccessToken)
    .setOAuthAccessTokenSecret(myAccessTokenSecret);
 
  new TwitterFactory(c.build()).getInstance()
}
import scala.collection.mutable.ArrayBuffer import twitter4j._ import twitter4j.conf._ import scala.collection.JavaConverters._ import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType, StructField, StringType} import twitter4j.RateLimitStatus import twitter4j.ResponseList import com.google.gson.Gson import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import com.google.gson.Gson import org.apache.spark.sql.DataFrame cb: twitter4j.conf.ConfigurationBuilder = twitter4j.conf.ConfigurationBuilder@4559b2a7 twitter: twitter4j.Twitter = TwitterImpl{INCLUDE_MY_RETWEET=HttpParameter{name='include_my_retweet', value='true', jsonObject=null, file=null, fileBody=null}}

Testing REST API

Let's quickly test that the REST API calls can be made.

twitter.showUser("@raazozone").getId() // quick test that REST API works - should get 4173723312
res4: Long = 4173723312
twitter.showUser("@realDonaldTrump").getId() // quick test that REST API works - should get 25073877
res5: Long = 25073877
twitter.showUser("@WASP_Research").getId() // quick test that REST API works - should get ?
res6: Long = 1124265687755755520