027_TweetCollectorTrackAndFollow(Scala)

Tweet Collector - capture live tweets

by tracking a list of strings and following a list of users

In the previous notebook we were capturing tweets from the public streams (global collection of roughly 1% of all Tweets - note what's exactly available from the full twitter social media network, i.e. all status updates in the planet, for such free collection is not exactly known in terms of sub-sampling strategies, etc. This is Twitter's proprietary information. However, we can assume it is a random sample of roughly 1% of all tweets).

In this notebook, we can modify the collector to focus on specific communications of interest to us. Specifically, by including a list of strings to track and a list of twitter user-IDs to follow.

For this we will first %run the ExtendedTwitterUtils and TTTDFfunctions notebooks.

%run "scalable-data-science/sds-2-x/025_a_extendedTwitterUtils2run"
import twitter4j._ import twitter4j.auth.Authorization import twitter4j.conf.ConfigurationBuilder import twitter4j.auth.OAuthAuthorization import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver
defined class ExtendedTwitterReceiver
defined class ExtendedTwitterInputDStream
import twitter4j.Status import twitter4j.auth.Authorization import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} defined object ExtendedTwitterUtils
done running the extendedTwitterUtils2run notebook - ready to stream from twitter
%run "scalable-data-science/sds-2-x/025_b_TTTDFfunctions"
USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName"))) val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName"))) import org.apache.spark.sql.types.{StructType, StructField, StringType} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.ColumnName import org.apache.spark.sql.DataFrame fromParquetFile2DF: (InputDFAsParquetFilePatternString: String)org.apache.spark.sql.DataFrame tweetsJsonStringDF2TweetsDF: (tweetsAsJsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame tweetsIDLong_JsonStringPairDF2TweetsDF: (tweetsAsIDLong_JsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame tweetsDF2TTTDF: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame tweetsDF2TTTDFWithURLsAndHastags: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame

Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.

// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) } 

Load your twitter credentials (secretly!).

// put your own twitter developer credentials below instead of xxx
// instead of the '%run "scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"' above
// this notebook we just ran contains the following commented code block


import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder

def MyconsumerKey       = "????"
def MyconsumerSecret    = "???"
def Mytoken             = "????"
def MytokenSecret       = "????"


System.setProperty("twitter4j.oauth.consumerKey", MyconsumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", MyconsumerSecret)
System.setProperty("twitter4j.oauth.accessToken", Mytoken)
System.setProperty("twitter4j.oauth.accessTokenSecret", MytokenSecret)

%run "scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"
twitter OAuth Credentials loaded import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder MyconsumerKey: String MyconsumerSecret: String Mytoken: String MytokenSecret: String

Using Twitter REST API

Next we import and instantiate for Twitter REST API.

// SOME IMPORTTS
import scala.collection.mutable.ArrayBuffer
import twitter4j._
import twitter4j.conf._
import scala.collection.JavaConverters._ 

import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType, StructField, StringType};
import twitter4j.RateLimitStatus;
import twitter4j.ResponseList;
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.google.gson.Gson
import org.apache.spark.sql.DataFrame

val cb = new ConfigurationBuilder()       

val twitter = {
  val c = new ConfigurationBuilder
    c.setDebugEnabled(false)
    .setOAuthConsumerKey(MyconsumerKey)
    .setOAuthConsumerSecret(MyconsumerSecret)
    .setOAuthAccessToken(Mytoken)
    .setOAuthAccessTokenSecret(MytokenSecret);

  new TwitterFactory(c.build()).getInstance()
}
import scala.collection.mutable.ArrayBuffer import twitter4j._ import twitter4j.conf._ import scala.collection.JavaConverters._ import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType, StructField, StringType} import twitter4j.RateLimitStatus import twitter4j.ResponseList import com.google.gson.Gson import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import com.google.gson.Gson import org.apache.spark.sql.DataFrame cb: twitter4j.conf.ConfigurationBuilder = twitter4j.conf.ConfigurationBuilder@ec70bcd twitter: twitter4j.Twitter = TwitterImpl{INCLUDE_MY_RETWEET=HttpParameter{name='include_my_retweet', value='true', jsonObject=null, file=null, fileBody=null}}

Let's quickly test that the REST API calls can be made.

twitter.showUser("@raazozone").getId() // quick test that REST API works - should get 4173723312
res4: Long = 4173723312
twitter.showUser("@realDonaldTrump").getId() // quick test that REST API works - should get 25073877
res5: Long = 25073877
twitter.showUser("@aon_plc").getId() // quick test that REST API works - should get ?
res6: Long = 22152077

Let's import a list of twitterIDS of interest to us...

val screenNamesOfInterest = List("raazozone","realDonaldTrump","aon_plc") // could be done from a large list of up to 4,000 or so accounts
screenNamesOfInterest: List[String] = List(raazozone, realDonaldTrump, aon_plc)
def lookupUserSNs(Retweeterids:Seq[String])={
  val grouped=Retweeterids.grouped(100).toList 
  for {group<-grouped  
       users=twitter.lookupUsers(group:_*)
       user<-users.asScala 
   } yield user     
}// we loose some suspended accounts...

def lookupUsers(Retweeterids:Seq[Long])={
  val grouped=Retweeterids.grouped(100).toList 
  for {group<-grouped  
       users=twitter.lookupUsers(group:_*)
       user<-users.asScala 
   } yield user     
}// we loose some suspended accounts...
lookupUserSNs: (Retweeterids: Seq[String])List[twitter4j.User] lookupUsers: (Retweeterids: Seq[Long])List[twitter4j.User]
val twitterUsersOfInterest = lookupUserSNs(screenNamesOfInterest)
twitterUsersOfInterest: List[twitter4j.User] = List(UserJSONImpl{id=4173723312, name='Raazesh Sainudiin', email='null', screenName='raazozone', location='Uppsala, Sverige', description='This is just a laboratory for insights into models of meme evolution!', isContributorsEnabled=false, profileImageUrl='http://pbs.twimg.com/profile_images/890899899008634880/5fwLqkXm_normal.jpg', profileImageUrlHttps='https://pbs.twimg.com/profile_images/890899899008634880/5fwLqkXm_normal.jpg', isDefaultProfileImage=false, url='https://t.co/kq1hQpL1KR', isProtected=false, followersCount=49, status=StatusJSONImpl{createdAt=Sun May 19 09:40:37 UTC 2019, id=1130045586785357824, text='The Brexit Party is winning social media. These numbers prove it https://t.co/J9Y8kKoVjr', source='<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>', isTruncated=false, inReplyToStatusId=-1, inReplyToUserId=-1, isFavorited=false, isRetweeted=false, favoriteCount=0, inReplyToScreenName='null', geoLocation=null, place=null, retweetCount=0, isPossiblySensitive=false, lang='en', contributorsIDs=[], retweetedStatus=null, userMentionEntities=[], urlEntities=[URLEntityJSONImpl{url='https://t.co/J9Y8kKoVjr', expandedURL='https://www.wired.co.uk/article/brexit-party-nigel-farage-facebook-social-media', displayURL='wired.co.uk/article/brexit…'}], hashtagEntities=[], mediaEntities=[], symbolEntities=[], currentUserRetweetId=-1, user=null, withHeldInCountries=null, quotedStatusId=-1, quotedStatus=null}, profileBackgroundColor='000000', profileTextColor='000000', profileLinkColor='1B95E0', profileSidebarFillColor='000000', profileSidebarBorderColor='000000', profileUseBackgroundImage=false, isDefaultProfile=false, showAllInlineMedia=false, friendsCount=81, createdAt=Mon Nov 09 00:50:29 UTC 2015, favouritesCount=57, utcOffset=-1, timeZone='null', profileBackgroundImageUrl='http://abs.twimg.com/images/themes/theme1/bg.png', profileBackgroundImageUrlHttps='https://abs.twimg.com/images/themes/theme1/bg.png', profileBackgroundTiled=false, lang='null', statusesCount=1245, isGeoEnabled=false, isVerified=false, translator=false, listedCount=9, isFollowRequestSent=false, withheldInCountries=null}, UserJSONImpl{id=25073877, name='Donald J. Trump', email='null', screenName='realDonaldTrump', location='Washington, DC', description='45th President of the United States of America🇺🇸', isContributorsEnabled=false, profileImageUrl='http://pbs.twimg.com/profile_images/874276197357596672/kUuht00m_normal.jpg', profileImageUrlHttps='https://pbs.twimg.com/profile_images/874276197357596672/kUuht00m_normal.jpg', isDefaultProfileImage=false, url='https://t.co/OMxB0x7xC5', isProtected=false, followersCount=60493219, status=StatusJSONImpl{createdAt=Fri May 24 03:49:49 UTC 2019, id=1131769241651302401, text='“Intelligence Agencies were used against an American President.” @DevinNunes @ShannonBream @FoxNews This should NEVER happen to a President again! Dems are furious at Robert Mueller for his findings - NO COLLUSION, NO OBSTRUCTION. Now they should go back to work and legislate!', source='<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>', isTruncated=false, inReplyToStatusId=-1, inReplyToUserId=-1, isFavorited=false, isRetweeted=false, favoriteCount=26079, inReplyToScreenName='null', geoLocation=null, place=null, retweetCount=8244, isPossiblySensitive=false, lang='en', contributorsIDs=[], retweetedStatus=null, userMentionEntities=[UserMentionEntityJSONImpl{name='Devin Nunes', screenName='DevinNunes', id=54412900}, UserMentionEntityJSONImpl{name='Shannon Bream', screenName='ShannonBream', id=18983793}, UserMentionEntityJSONImpl{name='Fox News', screenName='FoxNews', id=1367531}], urlEntities=[], hashtagEntities=[], mediaEntities=[], symbolEntities=[], currentUserRetweetId=-1, user=null, withHeldInCountries=null, quotedStatusId=-1, quotedStatus=null}, profileBackgroundColor='6D5C18', profileTextColor='333333', profileLinkColor='1B95E0', profileSidebarFillColor='C5CEC0', profileSidebarBorderColor='BDDCAD', profileUseBackgroundImage=true, isDefaultProfile=false, showAllInlineMedia=false, friendsCount=47, createdAt=Wed Mar 18 13:46:38 UTC 2009, favouritesCount=7, utcOffset=-1, timeZone='null', profileBackgroundImageUrl='http://abs.twimg.com/images/themes/theme1/bg.png', profileBackgroundImageUrlHttps='https://abs.twimg.com/images/themes/theme1/bg.png', profileBackgroundTiled=true, lang='null', statusesCount=42042, isGeoEnabled=true, isVerified=true, translator=false, listedCount=103351, isFollowRequestSent=false, withheldInCountries=null}, UserJSONImpl{id=22152077, name='Aon', email='null', screenName='Aon_plc', location='120 countries around the globe', description='Aon plc (NYSE:AON) is a leading global professional services firm providing a broad range of #risk, #retirement and #health solutions.', isContributorsEnabled=false, profileImageUrl='http://pbs.twimg.com/profile_images/827249559046909954/SyaBPcH8_normal.jpg', profileImageUrlHttps='https://pbs.twimg.com/profile_images/827249559046909954/SyaBPcH8_normal.jpg', isDefaultProfileImage=false, url='http://t.co/QVMna9rFn0', isProtected=false, followersCount=36896, status=StatusJSONImpl{createdAt=Thu May 23 21:21:53 UTC 2019, id=1131671616142938112, text='Managing volatility on the long and heavily bunkered 11th hole @SchwabCupFinale will be key on this week’s #AonRiskReward Challenge https://t.co/SdN02a6Bob', source='<a href="https://mobile.twitter.com" rel="nofollow">Twitter Web App</a>', isTruncated=false, inReplyToStatusId=-1, inReplyToUserId=-1, isFavorited=false, isRetweeted=false, favoriteCount=2, inReplyToScreenName='null', geoLocation=null, place=null, retweetCount=0, isPossiblySensitive=false, lang='en', contributorsIDs=[], retweetedStatus=null, userMentionEntities=[UserMentionEntityJSONImpl{name='Charles Schwab Cup Championship', screenName='SchwabCupFinale', id=1330543374}], urlEntities=[], hashtagEntities=[HashtagEntityJSONImpl{text='AonRiskReward'}], mediaEntities=[MediaEntityJSONImpl{id=1131670718557630468, url='https://t.co/SdN02a6Bob', mediaURL='http://pbs.twimg.com/ext_tw_video_thumb/1131670718557630468/pu/img/fGIxN8EL-ERyBLpa.jpg', mediaURLHttps='https://pbs.twimg.com/ext_tw_video_thumb/1131670718557630468/pu/img/fGIxN8EL-ERyBLpa.jpg', expandedURL='https://twitter.com/Aon_plc/status/1131671616142938112/video/1', displayURL='pic.twitter.com/SdN02a6Bob', sizes={0=Size{width=150, height=150, resize=101}, 1=Size{width=680, height=383, resize=100}, 2=Size{width=1200, height=675, resize=100}, 3=Size{width=1280, height=720, resize=100}}, type='video', videoAspectRatioWidth=16, videoAspectRatioHeight=9, videoDurationMillis=53587, videoVariants=4, extAltText='null'}], symbolEntities=[], currentUserRetweetId=-1, user=null, withHeldInCountries=null, quotedStatusId=-1, quotedStatus=null}, profileBackgroundColor='FFFFFF', profileTextColor='333333', profileLinkColor='0084B4', profileSidebarFillColor='80B864', profileSidebarBorderColor='FFFFFF', profileUseBackgroundImage=false, isDefaultProfile=false, showAllInlineMedia=false, friendsCount=1285, createdAt=Fri Feb 27 15:43:53 UTC 2009, favouritesCount=532, utcOffset=-1, timeZone='null', profileBackgroundImageUrl='http://abs.twimg.com/images/themes/theme1/bg.png', profileBackgroundImageUrlHttps='https://abs.twimg.com/images/themes/theme1/bg.png', profileBackgroundTiled=false, lang='null', statusesCount=4337, isGeoEnabled=true, isVerified=true, translator=false, listedCount=715, isFollowRequestSent=false, withheldInCountries=null})
println(twitterUsersOfInterest.size, screenNamesOfInterest.size) // we could lose users due to suspended accounts etc...
(3,3)
// just get their IDs
val seedIDs = twitterUsersOfInterest.map(u => u.getId()).toSet.toSeq.filter(_ != null) // just get the IDs of the seed users who are valid
seedIDs: Seq[Long] = ArrayBuffer(4173723312, 25073877, 22152077)

Now, let's extend our function.

import com.google.gson.Gson 
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory
val batchInterval = 1 // in minutes
val timeoutJobLength =  batchInterval * 5

var newContextCreated = false
var numTweetsCollected = 0L // track number of tweets collected
//val conf = new SparkConf().setAppName("TrackedTweetCollector").setMaster("local")
// This is the function that creates the SteamingContext and sets up the Spark Streaming job.
def streamFunc(): StreamingContext = {
  // Create a Spark Streaming Context.
  val ssc = new StreamingContext(sc, Minutes(batchInterval))
  // Create the OAuth Twitter credentials 
  val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
  
  val track = List("aon rules!", "#MakeDataGreatAgain","sds-2-x rules!")//, "Hi")// just added for some live tests
  //val track = List.empty // if you do not want to track by any string
  
  val follow = seedIDs // who to follow in Twitter
  //val follow = List.empty // if you do not want to folow any specific twitter user
  
  // Create a Twitter Stream for the input source.  
  val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth, track, follow)
  // Transform the discrete RDDs into JSON
  val twitterStreamJson = twitterStream.map(x => { val gson = new Gson();
                                                 val xJson = gson.toJson(x)
                                                 xJson
                                               }) 
  // take care
  val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream.
  
  // what we want done with each discrete RDD tuple: (rdd, time)
  twitterStreamJson.foreachRDD((rdd, time) => { // for each filtered RDD in the DStream
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
        // to write to parquet directly in append mode in one directory per 'time'------------       
        val outputDF = outputRDD.toDF("tweetAsJsonString")
        // get some time fields from current `.Date()`
        val year = (new java.text.SimpleDateFormat("yyyy")).format(new java.util.Date())
        val month = (new java.text.SimpleDateFormat("MM")).format(new java.util.Date())
        val day = (new java.text.SimpleDateFormat("dd")).format(new java.util.Date())
        val hour = (new java.text.SimpleDateFormat("HH")).format(new java.util.Date())
        // write to a file with a clear time-based hierarchical directory structure for example
        outputDF.write.mode(SaveMode.Append)
                .parquet(outputDirectoryRoot+ "/"+ year + "/" + month + "/" + day + "/" + hour + "/" + time.milliseconds) 
        // end of writing as parquet file-------------------------------------
        numTweetsCollected += count // update with the latest count
      }
  })
  newContextCreated = true
  ssc
}
import com.google.gson.Gson import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ outputDirectoryRoot: String = /datasets/tweetsStreamTmp batchInterval: Int = 1 timeoutJobLength: Int = 5 newContextCreated: Boolean = false numTweetsCollected: Long = 0 streamFunc: ()org.apache.spark.streaming.StreamingContext
val ssc = StreamingContext.getActiveOrCreate(streamFunc)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@6c4be45
ssc.start()
//ssc.awaitTerminationOrTimeout(timeoutJobLength) // you only need one of these to start
display(dbutils.fs.ls("/datasets/tweetsStreamTmp/2019/05/24/09")) //"dbfs:///datasets/tweetsStreamTmp/"))
dbfs:/datasets/tweetsStreamTmp/2019/05/24/09/1558688400000/1558688400000/0
dbfs:/datasets/tweetsStreamTmp/2019/05/24/09/1558688460000/1558688460000/0
dbfs:/datasets/tweetsStreamTmp/2019/05/24/09/1558688520000/1558688520000/0
display(dbutils.fs.ls(outputDirectoryRoot+"/2019/05/24/09/")) // keep adding sub-dirs and descent into time-tree'd directory hierarchy
dbfs:/datasets/tweetsStreamTmp/2019/05/24/09/1558688400000/1558688400000/0
dbfs:/datasets/tweetsStreamTmp/2019/05/24/09/1558688460000/1558688460000/0
dbfs:/datasets/tweetsStreamTmp/2019/05/24/09/1558688520000/1558688520000/0
// this will make sure all streaming job in the cluster are stopped - raaz
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } 
val rawDF = fromParquetFile2DF(outputDirectoryRoot+"/2019/05/24/*/*/*") //.cache()
val TTTsDF = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(rawDF)).cache()
rawDF: org.apache.spark.sql.DataFrame = [tweetAsJsonString: string] TTTsDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 32 more fields]
display(TTTsDF)  // output not displayed to comply with Twitter Developer rules
val a = TTTsDF.filter($"CurrentTweet" contains "aon rules!")//.collect()
a: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 32 more fields]
display(a)
2019-05-24T08:58:24.000+00001131846900552994800nullnullnullnull-141737233122015-11-09T00:50:29.000+0000nullnull-1Raazesh Sainudiinnullnullraazozonenullnullnull574981falsefalseaon rules! Haha heenullnullnullnull[][]Original TweetAuthoredMention1
val b = TTTsDF.filter($"CurrentTweet" contains "#MakeDataGreatAgain")//.collect()
display(b)
2019-05-24T08:59:06.000+00001131847077045121000nullnullnullnull-141737233122015-11-09T00:50:29.000+0000nullnull-1Raazesh Sainudiinnullnullraazozonenullnullnull574981falsefalse#MakeDataGreatAgain from Krakownullnullnullnull[][]Original TweetAuthoredMention1
// this will make sure all streaming job in the cluster are stopped - raaz
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } 
// this will delete what we collected to keep the disk usage tight and tidy
dbutils.fs.rm(outputDirectoryRoot, true) 
res28: Boolean = true