%run "scalable-data-science/sds-2-x/025_a_extendedTwitterUtils2run"
import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
defined class ExtendedTwitterReceiver
defined class ExtendedTwitterInputDStream
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
defined object ExtendedTwitterUtils
done running the extendedTwitterUtils2run notebook - ready to stream from twitter
// put your own twitter developer credentials below instead of xxx // instead of the '%run "scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"' above // this notebook we just ran contains the following commented code block /* import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder def MyconsumerKey = "???" def MyconsumerSecret = "???" def Mytoken = "???" def MytokenSecret = "???" */ System.setProperty("twitter4j.oauth.consumerKey", MyconsumerKey) System.setProperty("twitter4j.oauth.consumerSecret", MyconsumerSecret) System.setProperty("twitter4j.oauth.accessToken", Mytoken) System.setProperty("twitter4j.oauth.accessTokenSecret", MytokenSecret)
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
MyconsumerKey: String
MyconsumerSecret: String
Mytoken: String
MytokenSecret: String
res10: String = null
var numTweetsCollected = 0L // track number of tweets collected val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream. twitterStreamJson.foreachRDD( (rdd, time) => { // for each RDD in the DStream val count = rdd.count() if (count > 0) { val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired outputRDD.saveAsTextFile(outputDirectoryRoot + "/tweets_" + time.milliseconds.toString) // save as textfile numTweetsCollected += count // update with the latest count } } )
numTweetsCollected: Long = 0
partitionsEachInterval: Int = 1
outJson.printSchema()
root
|-- contributorsIDs: array (nullable = true)
| |-- element: string (containsNull = true)
|-- createdAt: string (nullable = true)
|-- currentUserRetweetId: long (nullable = true)
|-- displayTextRangeEnd: long (nullable = true)
|-- displayTextRangeStart: long (nullable = true)
|-- favoriteCount: long (nullable = true)
|-- hashtagEntities: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- end: long (nullable = true)
| | |-- start: long (nullable = true)
| | |-- text: string (nullable = true)
|-- id: long (nullable = true)
|-- inReplyToScreenName: string (nullable = true)
|-- inReplyToStatusId: long (nullable = true)
|-- inReplyToUserId: long (nullable = true)
|-- isFavorited: boolean (nullable = true)
|-- isPossiblySensitive: boolean (nullable = true)
|-- isRetweeted: boolean (nullable = true)
|-- isTruncated: boolean (nullable = true)
|-- lang: string (nullable = true)
|-- mediaEntities: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- displayURL: string (nullable = true)
| | |-- end: long (nullable = true)
| | |-- expandedURL: string (nullable = true)
| | |-- id: long (nullable = true)
| | |-- mediaURL: string (nullable = true)
| | |-- mediaURLHttps: string (nullable = true)
| | |-- sizes: struct (nullable = true)
| | | |-- 0: struct (nullable = true)
| | | | |-- height: long (nullable = true)
| | | | |-- resize: long (nullable = true)
| | | | |-- width: long (nullable = true)
| | | |-- 1: struct (nullable = true)
| | | | |-- height: long (nullable = true)
| | | | |-- resize: long (nullable = true)
| | | | |-- width: long (nullable = true)
| | | |-- 2: struct (nullable = true)
| | | | |-- height: long (nullable = true)
| | | | |-- resize: long (nullable = true)
| | | | |-- width: long (nullable = true)
| | | |-- 3: struct (nullable = true)
| | | | |-- height: long (nullable = true)
| | | | |-- resize: long (nullable = true)
| | | | |-- width: long (nullable = true)
| | |-- start: long (nullable = true)
| | |-- type: string (nullable = true)
| | |-- url: string (nullable = true)
| | |-- videoAspectRatioHeight: long (nullable = true)
| | |-- videoAspectRatioWidth: long (nullable = true)
| | |-- videoDurationMillis: long (nullable = true)
| | |-- videoVariants: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- bitrate: long (nullable = true)
| | | | |-- contentType: string (nullable = true)
| | | | |-- url: string (nullable = true)
|-- quotedStatus: struct (nullable = true)
| |-- contributorsIDs: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- createdAt: string (nullable = true)
| |-- currentUserRetweetId: long (nullable = true)
| |-- displayTextRangeEnd: long (nullable = true)
| |-- displayTextRangeStart: long (nullable = true)
| |-- favoriteCount: long (nullable = true)
| |-- hashtagEntities: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- id: long (nullable = true)
| |-- inReplyToScreenName: string (nullable = true)
| |-- inReplyToStatusId: long (nullable = true)
| |-- inReplyToUserId: long (nullable = true)
| |-- isFavorited: boolean (nullable = true)
| |-- isPossiblySensitive: boolean (nullable = true)
| |-- isRetweeted: boolean (nullable = true)
| |-- isTruncated: boolean (nullable = true)
| |-- lang: string (nullable = true)
| |-- mediaEntities: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- displayURL: string (nullable = true)
| | | |-- end: long (nullable = true)
| | | |-- expandedURL: string (nullable = true)
| | | |-- id: long (nullable = true)
| | | |-- mediaURL: string (nullable = true)
| | | |-- mediaURLHttps: string (nullable = true)
| | | |-- sizes: struct (nullable = true)
| | | | |-- 0: struct (nullable = true)
| | | | | |-- height: long (nullable = true)
| | | | | |-- resize: long (nullable = true)
| | | | | |-- width: long (nullable = true)
| | | | |-- 1: struct (nullable = true)
| | | | | |-- height: long (nullable = true)
| | | | | |-- resize: long (nullable = true)
| | | | | |-- width: long (nullable = true)
| | | | |-- 2: struct (nullable = true)
| | | | | |-- height: long (nullable = true)
| | | | | |-- resize: long (nullable = true)
| | | | | |-- width: long (nullable = true)
| | | | |-- 3: struct (nullable = true)
| | | | | |-- height: long (nullable = true)
| | | | | |-- resize: long (nullable = true)
| | | | | |-- width: long (nullable = true)
| | | |-- start: long (nullable = true)
| | | |-- type: string (nullable = true)
| | | |-- url: string (nullable = true)
| | | |-- videoAspectRatioHeight: long (nullable = true)
| | | |-- videoAspectRatioWidth: long (nullable = true)
| | | |-- videoDurationMillis: long (nullable = true)
| | | |-- videoVariants: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| |-- place: struct (nullable = true)
| | |-- boundingBoxCoordinates: array (nullable = true)
| | | |-- element: array (containsNull = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- latitude: double (nullable = true)
| | | | | |-- longitude: double (nullable = true)
| | |-- boundingBoxType: string (nullable = true)
| | |-- country: string (nullable = true)
| | |-- countryCode: string (nullable = true)
| | |-- fullName: string (nullable = true)
| | |-- id: string (nullable = true)
| | |-- name: string (nullable = true)
| | |-- placeType: string (nullable = true)
| | |-- url: string (nullable = true)
| |-- quotedStatusId: long (nullable = true)
| |-- retweetCount: long (nullable = true)
| |-- source: string (nullable = true)
| |-- symbolEntities: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- text: string (nullable = true)
| |-- urlEntities: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- displayURL: string (nullable = true)
| | | |-- end: long (nullable = true)
| | | |-- expandedURL: string (nullable = true)
| | | |-- start: long (nullable = true)
| | | |-- url: string (nullable = true)
| |-- user: struct (nullable = true)
| | |-- createdAt: string (nullable = true)
| | |-- description: string (nullable = true)
| | |-- descriptionURLEntities: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- favouritesCount: long (nullable = true)
| | |-- followersCount: long (nullable = true)
| | |-- friendsCount: long (nullable = true)
| | |-- id: long (nullable = true)
| | |-- isContributorsEnabled: boolean (nullable = true)
| | |-- isDefaultProfile: boolean (nullable = true)
| | |-- isDefaultProfileImage: boolean (nullable = true)
| | |-- isFollowRequestSent: boolean (nullable = true)
| | |-- isGeoEnabled: boolean (nullable = true)
| | |-- isProtected: boolean (nullable = true)
| | |-- isVerified: boolean (nullable = true)
| | |-- listedCount: long (nullable = true)
| | |-- location: string (nullable = true)
| | |-- name: string (nullable = true)
| | |-- profileBackgroundColor: string (nullable = true)
| | |-- profileBackgroundImageUrl: string (nullable = true)
| | |-- profileBackgroundImageUrlHttps: string (nullable = true)
| | |-- profileBackgroundTiled: boolean (nullable = true)
| | |-- profileBannerImageUrl: string (nullable = true)
| | |-- profileImageUrl: string (nullable = true)
| | |-- profileImageUrlHttps: string (nullable = true)
| | |-- profileLinkColor: string (nullable = true)
| | |-- profileSidebarBorderColor: string (nullable = true)
| | |-- profileSidebarFillColor: string (nullable = true)
| | |-- profileTextColor: string (nullable = true)
| | |-- profileUseBackgroundImage: boolean (nullable = true)
| | |-- screenName: string (nullable = true)
| | |-- showAllInlineMedia: boolean (nullable = true)
| | |-- statusesCount: long (nullable = true)
| | |-- translator: boolean (nullable = true)
| | |-- url: string (nullable = true)
| | |-- utcOffset: long (nullable = true)
| |-- userMentionEntities: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- quotedStatusId: long (nullable = true)
|-- quotedStatusPermalink: struct (nullable = true)
| |-- displayURL: string (nullable = true)
| |-- end: long (nullable = true)
| |-- expandedURL: string (nullable = true)
| |-- start: long (nullable = true)
| |-- url: string (nullable = true)
|-- retweetCount: long (nullable = true)
|-- retweetedStatus: struct (nullable = true)
| |-- contributorsIDs: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- createdAt: string (nullable = true)
| |-- currentUserRetweetId: long (nullable = true)
| |-- displayTextRangeEnd: long (nullable = true)
| |-- displayTextRangeStart: long (nullable = true)
| |-- favoriteCount: long (nullable = true)
| |-- hashtagEntities: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- end: long (nullable = true)
| | | |-- start: long (nullable = true)
| | | |-- text: string (nullable = true)
| |-- id: long (nullable = true)
| |-- inReplyToStatusId: long (nullable = true)
| |-- inReplyToUserId: long (nullable = true)
| |-- isFavorited: boolean (nullable = true)
| |-- isPossiblySensitive: boolean (nullable = true)
| |-- isRetweeted: boolean (nullable = true)
| |-- isTruncated: boolean (nullable = true)
| |-- lang: string (nullable = true)
| |-- mediaEntities: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- displayURL: string (nullable = true)
| | | |-- end: long (nullable = true)
| | | |-- expandedURL: string (nullable = true)
| | | |-- id: long (nullable = true)
| | | |-- mediaURL: string (nullable = true)
| | | |-- mediaURLHttps: string (nullable = true)
| | | |-- sizes: struct (nullable = true)
| | | | |-- 0: struct (nullable = true)
| | | | | |-- height: long (nullable = true)
| | | | | |-- resize: long (nullable = true)
| | | | | |-- width: long (nullable = true)
| | | | |-- 1: struct (nullable = true)
| | | | | |-- height: long (nullable = true)
| | | | | |-- resize: long (nullable = true)
| | | | | |-- width: long (nullable = true)
| | | | |-- 2: struct (nullable = true)
| | | | | |-- height: long (nullable = true)
| | | | | |-- resize: long (nullable = true)
| | | | | |-- width: long (nullable = true)
| | | | |-- 3: struct (nullable = true)
| | | | | |-- height: long (nullable = true)
| | | | | |-- resize: long (nullable = true)
| | | | | |-- width: long (nullable = true)
| | | |-- start: long (nullable = true)
| | | |-- type: string (nullable = true)
| | | |-- url: string (nullable = true)
| | | |-- videoAspectRatioHeight: long (nullable = true)
| | | |-- videoAspectRatioWidth: long (nullable = true)
| | | |-- videoDurationMillis: long (nullable = true)
| | | |-- videoVariants: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- bitrate: long (nullable = true)
| | | | | |-- contentType: string (nullable = true)
| | | | | |-- url: string (nullable = true)
| |-- quotedStatus: struct (nullable = true)
| | |-- contributorsIDs: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- createdAt: string (nullable = true)
| | |-- currentUserRetweetId: long (nullable = true)
| | |-- displayTextRangeEnd: long (nullable = true)
| | |-- displayTextRangeStart: long (nullable = true)
| | |-- favoriteCount: long (nullable = true)
| | |-- hashtagEntities: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- id: long (nullable = true)
| | |-- inReplyToStatusId: long (nullable = true)
| | |-- inReplyToUserId: long (nullable = true)
| | |-- isFavorited: boolean (nullable = true)
| | |-- isPossiblySensitive: boolean (nullable = true)
| | |-- isRetweeted: boolean (nullable = true)
| | |-- isTruncated: boolean (nullable = true)
| | |-- lang: string (nullable = true)
| | |-- mediaEntities: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- displayURL: string (nullable = true)
| | | | |-- end: long (nullable = true)
| | | | |-- expandedURL: string (nullable = true)
| | | | |-- id: long (nullable = true)
| | | | |-- mediaURL: string (nullable = true)
| | | | |-- mediaURLHttps: string (nullable = true)
| | | | |-- sizes: struct (nullable = true)
| | | | | |-- 0: struct (nullable = true)
| | | | | | |-- height: long (nullable = true)
| | | | | | |-- resize: long (nullable = true)
| | | | | | |-- width: long (nullable = true)
| | | | | |-- 1: struct (nullable = true)
| | | | | | |-- height: long (nullable = true)
| | | | | | |-- resize: long (nullable = true)
| | | | | | |-- width: long (nullable = true)
| | | | | |-- 2: struct (nullable = true)
| | | | | | |-- height: long (nullable = true)
| | | | | | |-- resize: long (nullable = true)
| | | | | | |-- width: long (nullable = true)
| | | | | |-- 3: struct (nullable = true)
| | | | | | |-- height: long (nullable = true)
| | | | | | |-- resize: long (nullable = true)
| | | | | | |-- width: long (nullable = true)
| | | | |-- start: long (nullable = true)
| | | | |-- type: string (nullable = true)
| | | | |-- url: string (nullable = true)
| | | | |-- videoAspectRatioHeight: long (nullable = true)
| | | | |-- videoAspectRatioWidth: long (nullable = true)
| | | | |-- videoDurationMillis: long (nullable = true)
| | | | |-- videoVariants: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | |-- place: struct (nullable = true)
| | | |-- boundingBoxCoordinates: array (nullable = true)
| | | | |-- element: array (containsNull = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- latitude: double (nullable = true)
| | | | | | |-- longitude: double (nullable = true)
| | | |-- boundingBoxType: string (nullable = true)
| | | |-- country: string (nullable = true)
| | | |-- countryCode: string (nullable = true)
| | | |-- fullName: string (nullable = true)
| | | |-- id: string (nullable = true)
| | | |-- name: string (nullable = true)
| | | |-- placeType: string (nullable = true)
| | | |-- url: string (nullable = true)
| | |-- quotedStatusId: long (nullable = true)
| | |-- retweetCount: long (nullable = true)
| | |-- source: string (nullable = true)
| | |-- symbolEntities: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- text: string (nullable = true)
| | |-- urlEntities: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- displayURL: string (nullable = true)
| | | | |-- end: long (nullable = true)
| | | | |-- expandedURL: string (nullable = true)
| | | | |-- start: long (nullable = true)
| | | | |-- url: string (nullable = true)
| | |-- user: struct (nullable = true)
| | | |-- createdAt: string (nullable = true)
| | | |-- description: string (nullable = true)
| | | |-- descriptionURLEntities: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- favouritesCount: long (nullable = true)
| | | |-- followersCount: long (nullable = true)
| | | |-- friendsCount: long (nullable = true)
| | | |-- id: long (nullable = true)
| | | |-- isContributorsEnabled: boolean (nullable = true)
| | | |-- isDefaultProfile: boolean (nullable = true)
| | | |-- isDefaultProfileImage: boolean (nullable = true)
| | | |-- isFollowRequestSent: boolean (nullable = true)
| | | |-- isGeoEnabled: boolean (nullable = true)
| | | |-- isProtected: boolean (nullable = true)
| | | |-- isVerified: boolean (nullable = true)
| | | |-- listedCount: long (nullable = true)
| | | |-- location: string (nullable = true)
| | | |-- name: string (nullable = true)
| | | |-- profileBackgroundColor: string (nullable = true)
| | | |-- profileBackgroundImageUrl: string (nullable = true)
| | | |-- profileBackgroundImageUrlHttps: string (nullable = true)
| | | |-- profileBackgroundTiled: boolean (nullable = true)
| | | |-- profileBannerImageUrl: string (nullable = true)
| | | |-- profileImageUrl: string (nullable = true)
| | | |-- profileImageUrlHttps: string (nullable = true)
| | | |-- profileLinkColor: string (nullable = true)
| | | |-- profileSidebarBorderColor: string (nullable = true)
| | | |-- profileSidebarFillColor: string (nullable = true)
| | | |-- profileTextColor: string (nullable = true)
| | | |-- profileUseBackgroundImage: boolean (nullable = true)
| | | |-- screenName: string (nullable = true)
| | | |-- showAllInlineMedia: boolean (nullable = true)
| | | |-- statusesCount: long (nullable = true)
| | | |-- translator: boolean (nullable = true)
| | | |-- url: string (nullable = true)
| | | |-- utcOffset: long (nullable = true)
| | |-- userMentionEntities: array (nullable = true)
| | | |-- element: string (containsNull = true)
| |-- quotedStatusId: long (nullable = true)
| |-- quotedStatusPermalink: struct (nullable = true)
| | |-- displayURL: string (nullable = true)
| | |-- end: long (nullable = true)
| | |-- expandedURL: string (nullable = true)
| | |-- start: long (nullable = true)
| | |-- url: string (nullable = true)
| |-- retweetCount: long (nullable = true)
| |-- source: string (nullable = true)
| |-- symbolEntities: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- text: string (nullable = true)
| |-- urlEntities: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- displayURL: string (nullable = true)
| | | |-- end: long (nullable = true)
| | | |-- expandedURL: string (nullable = true)
| | | |-- start: long (nullable = true)
| | | |-- url: string (nullable = true)
| |-- user: struct (nullable = true)
| | |-- createdAt: string (nullable = true)
| | |-- description: string (nullable = true)
| | |-- descriptionURLEntities: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- favouritesCount: long (nullable = true)
| | |-- followersCount: long (nullable = true)
| | |-- friendsCount: long (nullable = true)
| | |-- id: long (nullable = true)
| | |-- isContributorsEnabled: boolean (nullable = true)
| | |-- isDefaultProfile: boolean (nullable = true)
| | |-- isDefaultProfileImage: boolean (nullable = true)
| | |-- isFollowRequestSent: boolean (nullable = true)
| | |-- isGeoEnabled: boolean (nullable = true)
| | |-- isProtected: boolean (nullable = true)
| | |-- isVerified: boolean (nullable = true)
| | |-- listedCount: long (nullable = true)
| | |-- location: string (nullable = true)
| | |-- name: string (nullable = true)
| | |-- profileBackgroundColor: string (nullable = true)
| | |-- profileBackgroundImageUrl: string (nullable = true)
| | |-- profileBackgroundImageUrlHttps: string (nullable = true)
| | |-- profileBackgroundTiled: boolean (nullable = true)
| | |-- profileBannerImageUrl: string (nullable = true)
| | |-- profileImageUrl: string (nullable = true)
| | |-- profileImageUrlHttps: string (nullable = true)
| | |-- profileLinkColor: string (nullable = true)
| | |-- profileSidebarBorderColor: string (nullable = true)
| | |-- profileSidebarFillColor: string (nullable = true)
| | |-- profileTextColor: string (nullable = true)
| | |-- profileUseBackgroundImage: boolean (nullable = true)
| | |-- screenName: string (nullable = true)
| | |-- showAllInlineMedia: boolean (nullable = true)
| | |-- statusesCount: long (nullable = true)
| | |-- translator: boolean (nullable = true)
| | |-- url: string (nullable = true)
| | |-- utcOffset: long (nullable = true)
| |-- userMentionEntities: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- source: string (nullable = true)
|-- symbolEntities: array (nullable = true)
| |-- element: string (containsNull = true)
|-- text: string (nullable = true)
|-- urlEntities: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- displayURL: string (nullable = true)
| | |-- end: long (nullable = true)
| | |-- expandedURL: string (nullable = true)
| | |-- start: long (nullable = true)
| | |-- url: string (nullable = true)
|-- user: struct (nullable = true)
| |-- createdAt: string (nullable = true)
| |-- description: string (nullable = true)
| |-- descriptionURLEntities: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- favouritesCount: long (nullable = true)
| |-- followersCount: long (nullable = true)
| |-- friendsCount: long (nullable = true)
| |-- id: long (nullable = true)
| |-- isContributorsEnabled: boolean (nullable = true)
| |-- isDefaultProfile: boolean (nullable = true)
| |-- isDefaultProfileImage: boolean (nullable = true)
| |-- isFollowRequestSent: boolean (nullable = true)
| |-- isGeoEnabled: boolean (nullable = true)
| |-- isProtected: boolean (nullable = true)
| |-- isVerified: boolean (nullable = true)
| |-- listedCount: long (nullable = true)
| |-- location: string (nullable = true)
| |-- name: string (nullable = true)
| |-- profileBackgroundColor: string (nullable = true)
| |-- profileBackgroundImageUrl: string (nullable = true)
| |-- profileBackgroundImageUrlHttps: string (nullable = true)
| |-- profileBackgroundTiled: boolean (nullable = true)
| |-- profileBannerImageUrl: string (nullable = true)
| |-- profileImageUrl: string (nullable = true)
| |-- profileImageUrlHttps: string (nullable = true)
| |-- profileLinkColor: string (nullable = true)
| |-- profileSidebarBorderColor: string (nullable = true)
| |-- profileSidebarFillColor: string (nullable = true)
| |-- profileTextColor: string (nullable = true)
| |-- profileUseBackgroundImage: boolean (nullable = true)
| |-- screenName: string (nullable = true)
| |-- showAllInlineMedia: boolean (nullable = true)
| |-- statusesCount: long (nullable = true)
| |-- translator: boolean (nullable = true)
| |-- url: string (nullable = true)
| |-- utcOffset: long (nullable = true)
|-- userMentionEntities: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- end: long (nullable = true)
| | |-- id: long (nullable = true)
| | |-- name: string (nullable = true)
| | |-- screenName: string (nullable = true)
| | |-- start: long (nullable = true)
import com.google.gson.Gson import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory val batchInterval = 1 // in minutes val timeoutJobLength = batchInterval * 5 var newContextCreated = false var numTweetsCollected = 0L // track number of tweets collected //val conf = new SparkConf().setAppName("TrackedTweetCollector").setMaster("local") // This is the function that creates the SteamingContext and sets up the Spark Streaming job. def streamFunc(): StreamingContext = { // Create a Spark Streaming Context. val ssc = new StreamingContext(sc, Minutes(batchInterval)) // Create the OAuth Twitter credentials val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build())) // Create a Twitter Stream for the input source. val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth) // Transform the discrete RDDs into JSON val twitterStreamJson = twitterStream.map(x => { val gson = new Gson(); val xJson = gson.toJson(x) xJson }) // take care val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream. // what we want done with each discrete RDD tuple: (rdd, time) twitterStreamJson.foreachRDD((rdd, time) => { // for each filtered RDD in the DStream val count = rdd.count() if (count > 0) { val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired // to write to parquet directly in append mode in one directory per 'time'------------ val outputDF = outputRDD.toDF("tweetAsJsonString") // get some time fields from current `.Date()` val year = (new java.text.SimpleDateFormat("yyyy")).format(new java.util.Date()) val month = (new java.text.SimpleDateFormat("MM")).format(new java.util.Date()) val day = (new java.text.SimpleDateFormat("dd")).format(new java.util.Date()) val hour = (new java.text.SimpleDateFormat("HH")).format(new java.util.Date()) // write to a file with a clear time-based hierarchical directory structure for example outputDF.write.mode(SaveMode.Append) .parquet(outputDirectoryRoot+ "/"+ year + "/" + month + "/" + day + "/" + hour + "/" + time.milliseconds) // end of writing as parquet file------------------------------------- numTweetsCollected += count // update with the latest count } }) newContextCreated = true ssc }
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
outputDirectoryRoot: String = /datasets/tweetsStreamTmp
batchInterval: Int = 1
timeoutJobLength: Int = 5
newContextCreated: Boolean = false
numTweetsCollected: Long = 0
streamFunc: ()org.apache.spark.streaming.StreamingContext
%run "scalable-data-science/sds-2-x/025_b_TTTDFfunctions"
USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.ColumnName
import org.apache.spark.sql.DataFrame
fromParquetFile2DF: (InputDFAsParquetFilePatternString: String)org.apache.spark.sql.DataFrame
tweetsJsonStringDF2TweetsDF: (tweetsAsJsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsIDLong_JsonStringPairDF2TweetsDF: (tweetsAsIDLong_JsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDF: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHastags: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
val rawDF = fromParquetFile2DF("/datasets/tweetsStreamTmp/2019/05/*/*/*/*") //.cache() val TTTsDF = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(rawDF)).cache()
rawDF: org.apache.spark.sql.DataFrame = [tweetAsJsonString: string]
TTTsDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 32 more fields]
TTTsDF.printSchema
root
|-- CurrentTweetDate: timestamp (nullable = true)
|-- CurrentTwID: long (nullable = true)
|-- CreationDateOfOrgTwInRT: timestamp (nullable = true)
|-- OriginalTwIDinRT: long (nullable = true)
|-- CreationDateOfOrgTwInQT: timestamp (nullable = true)
|-- OriginalTwIDinQT: long (nullable = true)
|-- OriginalTwIDinReply: long (nullable = true)
|-- CPostUserId: long (nullable = true)
|-- userCreatedAtDate: timestamp (nullable = true)
|-- OPostUserIdinRT: long (nullable = true)
|-- OPostUserIdinQT: long (nullable = true)
|-- OPostUserIdinReply: long (nullable = true)
|-- CPostUserName: string (nullable = true)
|-- OPostUserNameinRT: string (nullable = true)
|-- OPostUserNameinQT: string (nullable = true)
|-- CPostUserSN: string (nullable = true)
|-- OPostUserSNinRT: string (nullable = true)
|-- OPostUserSNinQT: string (nullable = true)
|-- OPostUserSNinReply: string (nullable = true)
|-- favouritesCount: long (nullable = true)
|-- followersCount: long (nullable = true)
|-- friendsCount: long (nullable = true)
|-- isVerified: boolean (nullable = true)
|-- isGeoEnabled: boolean (nullable = true)
|-- CurrentTweet: string (nullable = true)
|-- UMentionRTiD: array (nullable = true)
| |-- element: long (containsNull = true)
|-- UMentionRTsN: array (nullable = true)
| |-- element: string (containsNull = true)
|-- UMentionQTiD: array (nullable = true)
| |-- element: long (containsNull = true)
|-- UMentionQTsN: array (nullable = true)
| |-- element: string (containsNull = true)
|-- UMentionASiD: array (nullable = true)
| |-- element: long (containsNull = true)
|-- UMentionASsN: array (nullable = true)
| |-- element: string (containsNull = true)
|-- TweetType: string (nullable = false)
|-- MentionType: string (nullable = false)
|-- Weight: long (nullable = false)
SDS-2.x, Scalable Data Engineering Science
Last refresh: Never