%run "scalable-data-science/sds-2-x/025_a_extendedTwitterUtils2run"
import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
defined class ExtendedTwitterReceiver
defined class ExtendedTwitterInputDStream
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
defined object ExtendedTwitterUtils
done running the extendedTwitterUtils2run notebook - ready to stream from twitter
%run "scalable-data-science/sds-2-x/025_b_TTTDFfunctions"
USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.ColumnName
import org.apache.spark.sql.DataFrame
fromParquetFile2DF: (InputDFAsParquetFilePatternString: String)org.apache.spark.sql.DataFrame
tweetsJsonStringDF2TweetsDF: (tweetsAsJsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsIDLong_JsonStringPairDF2TweetsDF: (tweetsAsIDLong_JsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDF: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHastags: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
// put your own twitter developer credentials below instead of xxx // instead of the '%run "scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"' above // this notebook we just ran contains the following commented code block import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder def MyconsumerKey = "????" def MyconsumerSecret = "???" def Mytoken = "????" def MytokenSecret = "????" System.setProperty("twitter4j.oauth.consumerKey", MyconsumerKey) System.setProperty("twitter4j.oauth.consumerSecret", MyconsumerSecret) System.setProperty("twitter4j.oauth.accessToken", Mytoken) System.setProperty("twitter4j.oauth.accessTokenSecret", MytokenSecret)
// SOME IMPORTTS import scala.collection.mutable.ArrayBuffer import twitter4j._ import twitter4j.conf._ import scala.collection.JavaConverters._ import org.apache.spark.sql.Row; import org.apache.spark.sql.types.{StructType, StructField, StringType}; import twitter4j.RateLimitStatus; import twitter4j.ResponseList; import com.google.gson.Gson import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import com.google.gson.Gson import org.apache.spark.sql.DataFrame val cb = new ConfigurationBuilder() val twitter = { val c = new ConfigurationBuilder c.setDebugEnabled(false) .setOAuthConsumerKey(MyconsumerKey) .setOAuthConsumerSecret(MyconsumerSecret) .setOAuthAccessToken(Mytoken) .setOAuthAccessTokenSecret(MytokenSecret); new TwitterFactory(c.build()).getInstance() }
import scala.collection.mutable.ArrayBuffer
import twitter4j._
import twitter4j.conf._
import scala.collection.JavaConverters._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import twitter4j.RateLimitStatus
import twitter4j.ResponseList
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.google.gson.Gson
import org.apache.spark.sql.DataFrame
cb: twitter4j.conf.ConfigurationBuilder = twitter4j.conf.ConfigurationBuilder@ec70bcd
twitter: twitter4j.Twitter = TwitterImpl{INCLUDE_MY_RETWEET=HttpParameter{name='include_my_retweet', value='true', jsonObject=null, file=null, fileBody=null}}
def lookupUserSNs(Retweeterids:Seq[String])={ val grouped=Retweeterids.grouped(100).toList for {group<-grouped users=twitter.lookupUsers(group:_*) user<-users.asScala } yield user }// we loose some suspended accounts... def lookupUsers(Retweeterids:Seq[Long])={ val grouped=Retweeterids.grouped(100).toList for {group<-grouped users=twitter.lookupUsers(group:_*) user<-users.asScala } yield user }// we loose some suspended accounts...
lookupUserSNs: (Retweeterids: Seq[String])List[twitter4j.User]
lookupUsers: (Retweeterids: Seq[Long])List[twitter4j.User]
val twitterUsersOfInterest = lookupUserSNs(screenNamesOfInterest)
twitterUsersOfInterest: List[twitter4j.User] = List(UserJSONImpl{id=4173723312, name='Raazesh Sainudiin', email='null', screenName='raazozone', location='Uppsala, Sverige', description='This is just a laboratory for insights into models of meme evolution!', isContributorsEnabled=false, profileImageUrl='http://pbs.twimg.com/profile_images/890899899008634880/5fwLqkXm_normal.jpg', profileImageUrlHttps='https://pbs.twimg.com/profile_images/890899899008634880/5fwLqkXm_normal.jpg', isDefaultProfileImage=false, url='https://t.co/kq1hQpL1KR', isProtected=false, followersCount=49, status=StatusJSONImpl{createdAt=Sun May 19 09:40:37 UTC 2019, id=1130045586785357824, text='The Brexit Party is winning social media. These numbers prove it https://t.co/J9Y8kKoVjr', source='<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>', isTruncated=false, inReplyToStatusId=-1, inReplyToUserId=-1, isFavorited=false, isRetweeted=false, favoriteCount=0, inReplyToScreenName='null', geoLocation=null, place=null, retweetCount=0, isPossiblySensitive=false, lang='en', contributorsIDs=[], retweetedStatus=null, userMentionEntities=[], urlEntities=[URLEntityJSONImpl{url='https://t.co/J9Y8kKoVjr', expandedURL='https://www.wired.co.uk/article/brexit-party-nigel-farage-facebook-social-media', displayURL='wired.co.uk/article/brexit…'}], hashtagEntities=[], mediaEntities=[], symbolEntities=[], currentUserRetweetId=-1, user=null, withHeldInCountries=null, quotedStatusId=-1, quotedStatus=null}, profileBackgroundColor='000000', profileTextColor='000000', profileLinkColor='1B95E0', profileSidebarFillColor='000000', profileSidebarBorderColor='000000', profileUseBackgroundImage=false, isDefaultProfile=false, showAllInlineMedia=false, friendsCount=81, createdAt=Mon Nov 09 00:50:29 UTC 2015, favouritesCount=57, utcOffset=-1, timeZone='null', profileBackgroundImageUrl='http://abs.twimg.com/images/themes/theme1/bg.png', profileBackgroundImageUrlHttps='https://abs.twimg.com/images/themes/theme1/bg.png', profileBackgroundTiled=false, lang='null', statusesCount=1245, isGeoEnabled=false, isVerified=false, translator=false, listedCount=9, isFollowRequestSent=false, withheldInCountries=null}, UserJSONImpl{id=25073877, name='Donald J. Trump', email='null', screenName='realDonaldTrump', location='Washington, DC', description='45th President of the United States of America🇺🇸', isContributorsEnabled=false, profileImageUrl='http://pbs.twimg.com/profile_images/874276197357596672/kUuht00m_normal.jpg', profileImageUrlHttps='https://pbs.twimg.com/profile_images/874276197357596672/kUuht00m_normal.jpg', isDefaultProfileImage=false, url='https://t.co/OMxB0x7xC5', isProtected=false, followersCount=60493219, status=StatusJSONImpl{createdAt=Fri May 24 03:49:49 UTC 2019, id=1131769241651302401, text='“Intelligence Agencies were used against an American President.” @DevinNunes @ShannonBream @FoxNews This should NEVER happen to a President again! Dems are furious at Robert Mueller for his findings - NO COLLUSION, NO OBSTRUCTION. Now they should go back to work and legislate!', source='<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>', isTruncated=false, inReplyToStatusId=-1, inReplyToUserId=-1, isFavorited=false, isRetweeted=false, favoriteCount=26079, inReplyToScreenName='null', geoLocation=null, place=null, retweetCount=8244, isPossiblySensitive=false, lang='en', contributorsIDs=[], retweetedStatus=null, userMentionEntities=[UserMentionEntityJSONImpl{name='Devin Nunes', screenName='DevinNunes', id=54412900}, UserMentionEntityJSONImpl{name='Shannon Bream', screenName='ShannonBream', id=18983793}, UserMentionEntityJSONImpl{name='Fox News', screenName='FoxNews', id=1367531}], urlEntities=[], hashtagEntities=[], mediaEntities=[], symbolEntities=[], currentUserRetweetId=-1, user=null, withHeldInCountries=null, quotedStatusId=-1, quotedStatus=null}, profileBackgroundColor='6D5C18', profileTextColor='333333', profileLinkColor='1B95E0', profileSidebarFillColor='C5CEC0', profileSidebarBorderColor='BDDCAD', profileUseBackgroundImage=true, isDefaultProfile=false, showAllInlineMedia=false, friendsCount=47, createdAt=Wed Mar 18 13:46:38 UTC 2009, favouritesCount=7, utcOffset=-1, timeZone='null', profileBackgroundImageUrl='http://abs.twimg.com/images/themes/theme1/bg.png', profileBackgroundImageUrlHttps='https://abs.twimg.com/images/themes/theme1/bg.png', profileBackgroundTiled=true, lang='null', statusesCount=42042, isGeoEnabled=true, isVerified=true, translator=false, listedCount=103351, isFollowRequestSent=false, withheldInCountries=null}, UserJSONImpl{id=22152077, name='Aon', email='null', screenName='Aon_plc', location='120 countries around the globe', description='Aon plc (NYSE:AON) is a leading global professional services firm providing a broad range of #risk, #retirement and #health solutions.', isContributorsEnabled=false, profileImageUrl='http://pbs.twimg.com/profile_images/827249559046909954/SyaBPcH8_normal.jpg', profileImageUrlHttps='https://pbs.twimg.com/profile_images/827249559046909954/SyaBPcH8_normal.jpg', isDefaultProfileImage=false, url='http://t.co/QVMna9rFn0', isProtected=false, followersCount=36896, status=StatusJSONImpl{createdAt=Thu May 23 21:21:53 UTC 2019, id=1131671616142938112, text='Managing volatility on the long and heavily bunkered 11th hole @SchwabCupFinale will be key on this week’s #AonRiskReward Challenge https://t.co/SdN02a6Bob', source='<a href="https://mobile.twitter.com" rel="nofollow">Twitter Web App</a>', isTruncated=false, inReplyToStatusId=-1, inReplyToUserId=-1, isFavorited=false, isRetweeted=false, favoriteCount=2, inReplyToScreenName='null', geoLocation=null, place=null, retweetCount=0, isPossiblySensitive=false, lang='en', contributorsIDs=[], retweetedStatus=null, userMentionEntities=[UserMentionEntityJSONImpl{name='Charles Schwab Cup Championship', screenName='SchwabCupFinale', id=1330543374}], urlEntities=[], hashtagEntities=[HashtagEntityJSONImpl{text='AonRiskReward'}], mediaEntities=[MediaEntityJSONImpl{id=1131670718557630468, url='https://t.co/SdN02a6Bob', mediaURL='http://pbs.twimg.com/ext_tw_video_thumb/1131670718557630468/pu/img/fGIxN8EL-ERyBLpa.jpg', mediaURLHttps='https://pbs.twimg.com/ext_tw_video_thumb/1131670718557630468/pu/img/fGIxN8EL-ERyBLpa.jpg', expandedURL='https://twitter.com/Aon_plc/status/1131671616142938112/video/1', displayURL='pic.twitter.com/SdN02a6Bob', sizes={0=Size{width=150, height=150, resize=101}, 1=Size{width=680, height=383, resize=100}, 2=Size{width=1200, height=675, resize=100}, 3=Size{width=1280, height=720, resize=100}}, type='video', videoAspectRatioWidth=16, videoAspectRatioHeight=9, videoDurationMillis=53587, videoVariants=4, extAltText='null'}], symbolEntities=[], currentUserRetweetId=-1, user=null, withHeldInCountries=null, quotedStatusId=-1, quotedStatus=null}, profileBackgroundColor='FFFFFF', profileTextColor='333333', profileLinkColor='0084B4', profileSidebarFillColor='80B864', profileSidebarBorderColor='FFFFFF', profileUseBackgroundImage=false, isDefaultProfile=false, showAllInlineMedia=false, friendsCount=1285, createdAt=Fri Feb 27 15:43:53 UTC 2009, favouritesCount=532, utcOffset=-1, timeZone='null', profileBackgroundImageUrl='http://abs.twimg.com/images/themes/theme1/bg.png', profileBackgroundImageUrlHttps='https://abs.twimg.com/images/themes/theme1/bg.png', profileBackgroundTiled=false, lang='null', statusesCount=4337, isGeoEnabled=true, isVerified=true, translator=false, listedCount=715, isFollowRequestSent=false, withheldInCountries=null})
import com.google.gson.Gson import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory val batchInterval = 1 // in minutes val timeoutJobLength = batchInterval * 5 var newContextCreated = false var numTweetsCollected = 0L // track number of tweets collected //val conf = new SparkConf().setAppName("TrackedTweetCollector").setMaster("local") // This is the function that creates the SteamingContext and sets up the Spark Streaming job. def streamFunc(): StreamingContext = { // Create a Spark Streaming Context. val ssc = new StreamingContext(sc, Minutes(batchInterval)) // Create the OAuth Twitter credentials val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build())) val track = List("aon rules!", "#MakeDataGreatAgain","sds-2-x rules!")//, "Hi")// just added for some live tests //val track = List.empty // if you do not want to track by any string val follow = seedIDs // who to follow in Twitter //val follow = List.empty // if you do not want to folow any specific twitter user // Create a Twitter Stream for the input source. val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth, track, follow) // Transform the discrete RDDs into JSON val twitterStreamJson = twitterStream.map(x => { val gson = new Gson(); val xJson = gson.toJson(x) xJson }) // take care val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream. // what we want done with each discrete RDD tuple: (rdd, time) twitterStreamJson.foreachRDD((rdd, time) => { // for each filtered RDD in the DStream val count = rdd.count() if (count > 0) { val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired // to write to parquet directly in append mode in one directory per 'time'------------ val outputDF = outputRDD.toDF("tweetAsJsonString") // get some time fields from current `.Date()` val year = (new java.text.SimpleDateFormat("yyyy")).format(new java.util.Date()) val month = (new java.text.SimpleDateFormat("MM")).format(new java.util.Date()) val day = (new java.text.SimpleDateFormat("dd")).format(new java.util.Date()) val hour = (new java.text.SimpleDateFormat("HH")).format(new java.util.Date()) // write to a file with a clear time-based hierarchical directory structure for example outputDF.write.mode(SaveMode.Append) .parquet(outputDirectoryRoot+ "/"+ year + "/" + month + "/" + day + "/" + hour + "/" + time.milliseconds) // end of writing as parquet file------------------------------------- numTweetsCollected += count // update with the latest count } }) newContextCreated = true ssc }
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
outputDirectoryRoot: String = /datasets/tweetsStreamTmp
batchInterval: Int = 1
timeoutJobLength: Int = 5
newContextCreated: Boolean = false
numTweetsCollected: Long = 0
streamFunc: ()org.apache.spark.streaming.StreamingContext
val rawDF = fromParquetFile2DF(outputDirectoryRoot+"/2019/05/24/*/*/*") //.cache() val TTTsDF = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(rawDF)).cache()
rawDF: org.apache.spark.sql.DataFrame = [tweetAsJsonString: string]
TTTsDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 32 more fields]
SDS-2.x, Scalable Data Engineering Science
Last refresh: Never