026_TweetCollector(Scala)

Tweet Collector - capture live tweets

Here are the main steps in this notebook:

  1. let's collect from the public twitter stream and write to DBFS as json strings in a boiler-plate manner to understand the componets better.
  2. Then we will turn the collector into a function and use it
  3. Finally we will use some DataFrame-based pipelines to convert the raw tweets into other structured content.

We will call extendedTwitterUtils notebook from here.

But first install the following libraries:

  • gson
  • twitter4j-examples
%run "scalable-data-science/sds-2-x/025_a_extendedTwitterUtils2run"
import twitter4j._ import twitter4j.auth.Authorization import twitter4j.conf.ConfigurationBuilder import twitter4j.auth.OAuthAuthorization import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver
defined class ExtendedTwitterReceiver
defined class ExtendedTwitterInputDStream
import twitter4j.Status import twitter4j.auth.Authorization import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} defined object ExtendedTwitterUtils
done running the extendedTwitterUtils2run notebook - ready to stream from twitter

Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.

// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) }

Let's create a directory in dbfs for storing tweets in the cluster's distributed file system.

val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory
outputDirectoryRoot: String = /datasets/tweetsStreamTmp
dbutils.fs.mkdirs("/datasets/tweetsStreamTmp")
res8: Boolean = true
display(dbutils.fs.ls(outputDirectoryRoot))
OK
// to remove a pre-existing directory and start from scratch uncomment next line and evaluate this cell
//dbutils.fs.rm("/datasets/tweetsStreamTmp", true) 
res6: Boolean = true

Capture tweets in every sliding window of slideInterval many milliseconds.

val slideInterval = new Duration(1 * 1000) // 1 * 1000 = 1000 milli-seconds = 1 sec
slideInterval: org.apache.spark.streaming.Duration = 1000 ms

Recall that Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark?s abstraction of an immutable, distributed dataset (see Spark Programming Guide for more details). Each RDD in a DStream contains data from a certain interval, as shown in the following figure.

Spark
Streaming

Let's import google's json library next.

import com.google.gson.Gson 
import com.google.gson.Gson

Our goal is to take each RDD in the twitter DStream and write it as a json file in our dbfs.

// Create a Spark Streaming Context.
val ssc = new StreamingContext(sc, slideInterval)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@19807790

CAUTION

Extracting knowledge from tweets is "easy" using techniques shown here, but one has to take legal responsibility for the use of this knowledge and conform to the rules and policies linked below.

Remeber that the use of twitter itself comes with various strings attached. Read:

Crucially, the use of the content from twitter by you (as done in this worksheet) comes with some strings. Read:

Enter your own Twitter API Credentials.

  • Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
  • Get your own Twitter API Credentials: consumerKey, consumerSecret, accessToken and accessTokenSecret and enter them in the cell below.

Ethical/Legal Aspects

See Background Readings/Viewings in Project MEP:

%run "scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"
twitter OAuth Credentials loaded import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder MyconsumerKey: String MyconsumerSecret: String Mytoken: String MytokenSecret: String
// put your own twitter developer credentials below instead of xxx
// instead of the '%run "scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"' above
// this notebook we just ran contains the following commented code block

/*
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder

def MyconsumerKey       = "???"
def MyconsumerSecret    = "???"
def Mytoken             = "???"
def MytokenSecret       = "???"
*/

System.setProperty("twitter4j.oauth.consumerKey", MyconsumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", MyconsumerSecret)
System.setProperty("twitter4j.oauth.accessToken", Mytoken)
System.setProperty("twitter4j.oauth.accessTokenSecret", MytokenSecret)

import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder MyconsumerKey: String MyconsumerSecret: String Mytoken: String MytokenSecret: String res10: String = null
// Create a Twitter Stream for the input source. 
val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)

Let's map the tweets into json formatted string (one tweet per line).

Show code
val twitterStreamJson = twitterStream.map(
                                            x => { val gson = new Gson();
                                                 val xJson = gson.toJson(x)
                                                 xJson
                                                 }
                                          ) 
twitterStreamJson: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@4b8198a2
outputDirectoryRoot
res23: String = /datasets/tweetsStreamTmp
var numTweetsCollected = 0L // track number of tweets collected
val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream.

twitterStreamJson.foreachRDD( 
  (rdd, time) => { // for each RDD in the DStream
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
        outputRDD.saveAsTextFile(outputDirectoryRoot + "/tweets_" + time.milliseconds.toString) // save as textfile
        numTweetsCollected += count // update with the latest count
      }
  }
)
numTweetsCollected: Long = 0 partitionsEachInterval: Int = 1

Nothing has actually happened yet.

Let's start the spark streaming context we have created next.

ssc.start()

Let's look at the spark UI now and monitor the streaming job in action! Go to Clusters on the left and click on UI and then Streaming.

numTweetsCollected // number of tweets collected so far
res27: Long = 4027

Let's try seeing again in a few seconds how many tweets have been collected up to now.

numTweetsCollected // number of tweets collected so far
res28: Long = 4155

Note that you could easilt fill up disk space!!!

So let's stop the streaming job next.

ssc.stop(stopSparkContext = false) // gotto stop soon!!!

Let's make sure that the Streaming UI is not active in the Clusters UI.

StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } // extra cautious stopping of all active streaming contexts

Let's examine what was saved in dbfs

display(dbutils.fs.ls(outputDirectoryRoot))
dbfs:/datasets/tweetsStreamTmp/tweets_1558684285000/tweets_1558684285000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684286000/tweets_1558684286000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684287000/tweets_1558684287000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684288000/tweets_1558684288000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684289000/tweets_1558684289000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684290000/tweets_1558684290000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684291000/tweets_1558684291000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684292000/tweets_1558684292000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684293000/tweets_1558684293000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684294000/tweets_1558684294000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684295000/tweets_1558684295000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684296000/tweets_1558684296000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684297000/tweets_1558684297000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684298000/tweets_1558684298000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684299000/tweets_1558684299000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684300000/tweets_1558684300000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684301000/tweets_1558684301000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684302000/tweets_1558684302000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684303000/tweets_1558684303000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684304000/tweets_1558684304000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684305000/tweets_1558684305000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684306000/tweets_1558684306000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684307000/tweets_1558684307000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684308000/tweets_1558684308000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684309000/tweets_1558684309000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684310000/tweets_1558684310000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684311000/tweets_1558684311000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684312000/tweets_1558684312000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684313000/tweets_1558684313000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684314000/tweets_1558684314000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684315000/tweets_1558684315000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684316000/tweets_1558684316000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684317000/tweets_1558684317000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684318000/tweets_1558684318000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684319000/tweets_1558684319000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684320000/tweets_1558684320000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684321000/tweets_1558684321000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684403000/tweets_1558684403000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684404000/tweets_1558684404000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684405000/tweets_1558684405000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684406000/tweets_1558684406000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684407000/tweets_1558684407000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684408000/tweets_1558684408000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684409000/tweets_1558684409000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684410000/tweets_1558684410000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684411000/tweets_1558684411000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684412000/tweets_1558684412000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684413000/tweets_1558684413000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684414000/tweets_1558684414000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684415000/tweets_1558684415000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684416000/tweets_1558684416000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684417000/tweets_1558684417000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684418000/tweets_1558684418000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684419000/tweets_1558684419000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684420000/tweets_1558684420000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684421000/tweets_1558684421000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684422000/tweets_1558684422000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684423000/tweets_1558684423000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684424000/tweets_1558684424000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684425000/tweets_1558684425000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684426000/tweets_1558684426000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684427000/tweets_1558684427000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684428000/tweets_1558684428000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684429000/tweets_1558684429000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684430000/tweets_1558684430000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684431000/tweets_1558684431000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684432000/tweets_1558684432000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684433000/tweets_1558684433000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684434000/tweets_1558684434000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684435000/tweets_1558684435000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684436000/tweets_1558684436000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684437000/tweets_1558684437000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684438000/tweets_1558684438000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684439000/tweets_1558684439000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684440000/tweets_1558684440000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684441000/tweets_1558684441000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684442000/tweets_1558684442000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684443000/tweets_1558684443000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684444000/tweets_1558684444000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684445000/tweets_1558684445000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684446000/tweets_1558684446000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684447000/tweets_1558684447000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684448000/tweets_1558684448000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684449000/tweets_1558684449000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684450000/tweets_1558684450000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684451000/tweets_1558684451000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684452000/tweets_1558684452000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684453000/tweets_1558684453000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684454000/tweets_1558684454000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684455000/tweets_1558684455000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684457000/tweets_1558684457000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684458000/tweets_1558684458000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684459000/tweets_1558684459000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684460000/tweets_1558684460000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684461000/tweets_1558684461000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684462000/tweets_1558684462000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684463000/tweets_1558684463000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684464000/tweets_1558684464000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684465000/tweets_1558684465000/0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684466000/tweets_1558684466000/0
val tweetsDir = outputDirectoryRoot+"/tweets_1558684290000/" // use an existing file, may have to rename folder based on output above!
tweetsDir: String = /datasets/tweetsStreamTmp/tweets_1558684290000/
display(dbutils.fs.ls(tweetsDir)) 
dbfs:/datasets/tweetsStreamTmp/tweets_1558684290000/_SUCCESS_SUCCESS0
dbfs:/datasets/tweetsStreamTmp/tweets_1558684290000/part-00000part-00000129269
sc.textFile(tweetsDir+"part-00000").count()
res33: Long = 28
val outJson = sqlContext.read.json(tweetsDir+"part-00000")
outJson: org.apache.spark.sql.DataFrame = [contributorsIDs: array<string>, createdAt: string ... 26 more fields]
outJson.printSchema()
root |-- contributorsIDs: array (nullable = true) | |-- element: string (containsNull = true) |-- createdAt: string (nullable = true) |-- currentUserRetweetId: long (nullable = true) |-- displayTextRangeEnd: long (nullable = true) |-- displayTextRangeStart: long (nullable = true) |-- favoriteCount: long (nullable = true) |-- hashtagEntities: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- end: long (nullable = true) | | |-- start: long (nullable = true) | | |-- text: string (nullable = true) |-- id: long (nullable = true) |-- inReplyToScreenName: string (nullable = true) |-- inReplyToStatusId: long (nullable = true) |-- inReplyToUserId: long (nullable = true) |-- isFavorited: boolean (nullable = true) |-- isPossiblySensitive: boolean (nullable = true) |-- isRetweeted: boolean (nullable = true) |-- isTruncated: boolean (nullable = true) |-- lang: string (nullable = true) |-- mediaEntities: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- displayURL: string (nullable = true) | | |-- end: long (nullable = true) | | |-- expandedURL: string (nullable = true) | | |-- id: long (nullable = true) | | |-- mediaURL: string (nullable = true) | | |-- mediaURLHttps: string (nullable = true) | | |-- sizes: struct (nullable = true) | | | |-- 0: struct (nullable = true) | | | | |-- height: long (nullable = true) | | | | |-- resize: long (nullable = true) | | | | |-- width: long (nullable = true) | | | |-- 1: struct (nullable = true) | | | | |-- height: long (nullable = true) | | | | |-- resize: long (nullable = true) | | | | |-- width: long (nullable = true) | | | |-- 2: struct (nullable = true) | | | | |-- height: long (nullable = true) | | | | |-- resize: long (nullable = true) | | | | |-- width: long (nullable = true) | | | |-- 3: struct (nullable = true) | | | | |-- height: long (nullable = true) | | | | |-- resize: long (nullable = true) | | | | |-- width: long (nullable = true) | | |-- start: long (nullable = true) | | |-- type: string (nullable = true) | | |-- url: string (nullable = true) | | |-- videoAspectRatioHeight: long (nullable = true) | | |-- videoAspectRatioWidth: long (nullable = true) | | |-- videoDurationMillis: long (nullable = true) | | |-- videoVariants: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- bitrate: long (nullable = true) | | | | |-- contentType: string (nullable = true) | | | | |-- url: string (nullable = true) |-- quotedStatus: struct (nullable = true) | |-- contributorsIDs: array (nullable = true) | | |-- element: string (containsNull = true) | |-- createdAt: string (nullable = true) | |-- currentUserRetweetId: long (nullable = true) | |-- displayTextRangeEnd: long (nullable = true) | |-- displayTextRangeStart: long (nullable = true) | |-- favoriteCount: long (nullable = true) | |-- hashtagEntities: array (nullable = true) | | |-- element: string (containsNull = true) | |-- id: long (nullable = true) | |-- inReplyToScreenName: string (nullable = true) | |-- inReplyToStatusId: long (nullable = true) | |-- inReplyToUserId: long (nullable = true) | |-- isFavorited: boolean (nullable = true) | |-- isPossiblySensitive: boolean (nullable = true) | |-- isRetweeted: boolean (nullable = true) | |-- isTruncated: boolean (nullable = true) | |-- lang: string (nullable = true) | |-- mediaEntities: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- displayURL: string (nullable = true) | | | |-- end: long (nullable = true) | | | |-- expandedURL: string (nullable = true) | | | |-- id: long (nullable = true) | | | |-- mediaURL: string (nullable = true) | | | |-- mediaURLHttps: string (nullable = true) | | | |-- sizes: struct (nullable = true) | | | | |-- 0: struct (nullable = true) | | | | | |-- height: long (nullable = true) | | | | | |-- resize: long (nullable = true) | | | | | |-- width: long (nullable = true) | | | | |-- 1: struct (nullable = true) | | | | | |-- height: long (nullable = true) | | | | | |-- resize: long (nullable = true) | | | | | |-- width: long (nullable = true) | | | | |-- 2: struct (nullable = true) | | | | | |-- height: long (nullable = true) | | | | | |-- resize: long (nullable = true) | | | | | |-- width: long (nullable = true) | | | | |-- 3: struct (nullable = true) | | | | | |-- height: long (nullable = true) | | | | | |-- resize: long (nullable = true) | | | | | |-- width: long (nullable = true) | | | |-- start: long (nullable = true) | | | |-- type: string (nullable = true) | | | |-- url: string (nullable = true) | | | |-- videoAspectRatioHeight: long (nullable = true) | | | |-- videoAspectRatioWidth: long (nullable = true) | | | |-- videoDurationMillis: long (nullable = true) | | | |-- videoVariants: array (nullable = true) | | | | |-- element: string (containsNull = true) | |-- place: struct (nullable = true) | | |-- boundingBoxCoordinates: array (nullable = true) | | | |-- element: array (containsNull = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- latitude: double (nullable = true) | | | | | |-- longitude: double (nullable = true) | | |-- boundingBoxType: string (nullable = true) | | |-- country: string (nullable = true) | | |-- countryCode: string (nullable = true) | | |-- fullName: string (nullable = true) | | |-- id: string (nullable = true) | | |-- name: string (nullable = true) | | |-- placeType: string (nullable = true) | | |-- url: string (nullable = true) | |-- quotedStatusId: long (nullable = true) | |-- retweetCount: long (nullable = true) | |-- source: string (nullable = true) | |-- symbolEntities: array (nullable = true) | | |-- element: string (containsNull = true) | |-- text: string (nullable = true) | |-- urlEntities: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- displayURL: string (nullable = true) | | | |-- end: long (nullable = true) | | | |-- expandedURL: string (nullable = true) | | | |-- start: long (nullable = true) | | | |-- url: string (nullable = true) | |-- user: struct (nullable = true) | | |-- createdAt: string (nullable = true) | | |-- description: string (nullable = true) | | |-- descriptionURLEntities: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- favouritesCount: long (nullable = true) | | |-- followersCount: long (nullable = true) | | |-- friendsCount: long (nullable = true) | | |-- id: long (nullable = true) | | |-- isContributorsEnabled: boolean (nullable = true) | | |-- isDefaultProfile: boolean (nullable = true) | | |-- isDefaultProfileImage: boolean (nullable = true) | | |-- isFollowRequestSent: boolean (nullable = true) | | |-- isGeoEnabled: boolean (nullable = true) | | |-- isProtected: boolean (nullable = true) | | |-- isVerified: boolean (nullable = true) | | |-- listedCount: long (nullable = true) | | |-- location: string (nullable = true) | | |-- name: string (nullable = true) | | |-- profileBackgroundColor: string (nullable = true) | | |-- profileBackgroundImageUrl: string (nullable = true) | | |-- profileBackgroundImageUrlHttps: string (nullable = true) | | |-- profileBackgroundTiled: boolean (nullable = true) | | |-- profileBannerImageUrl: string (nullable = true) | | |-- profileImageUrl: string (nullable = true) | | |-- profileImageUrlHttps: string (nullable = true) | | |-- profileLinkColor: string (nullable = true) | | |-- profileSidebarBorderColor: string (nullable = true) | | |-- profileSidebarFillColor: string (nullable = true) | | |-- profileTextColor: string (nullable = true) | | |-- profileUseBackgroundImage: boolean (nullable = true) | | |-- screenName: string (nullable = true) | | |-- showAllInlineMedia: boolean (nullable = true) | | |-- statusesCount: long (nullable = true) | | |-- translator: boolean (nullable = true) | | |-- url: string (nullable = true) | | |-- utcOffset: long (nullable = true) | |-- userMentionEntities: array (nullable = true) | | |-- element: string (containsNull = true) |-- quotedStatusId: long (nullable = true) |-- quotedStatusPermalink: struct (nullable = true) | |-- displayURL: string (nullable = true) | |-- end: long (nullable = true) | |-- expandedURL: string (nullable = true) | |-- start: long (nullable = true) | |-- url: string (nullable = true) |-- retweetCount: long (nullable = true) |-- retweetedStatus: struct (nullable = true) | |-- contributorsIDs: array (nullable = true) | | |-- element: string (containsNull = true) | |-- createdAt: string (nullable = true) | |-- currentUserRetweetId: long (nullable = true) | |-- displayTextRangeEnd: long (nullable = true) | |-- displayTextRangeStart: long (nullable = true) | |-- favoriteCount: long (nullable = true) | |-- hashtagEntities: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- end: long (nullable = true) | | | |-- start: long (nullable = true) | | | |-- text: string (nullable = true) | |-- id: long (nullable = true) | |-- inReplyToStatusId: long (nullable = true) | |-- inReplyToUserId: long (nullable = true) | |-- isFavorited: boolean (nullable = true) | |-- isPossiblySensitive: boolean (nullable = true) | |-- isRetweeted: boolean (nullable = true) | |-- isTruncated: boolean (nullable = true) | |-- lang: string (nullable = true) | |-- mediaEntities: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- displayURL: string (nullable = true) | | | |-- end: long (nullable = true) | | | |-- expandedURL: string (nullable = true) | | | |-- id: long (nullable = true) | | | |-- mediaURL: string (nullable = true) | | | |-- mediaURLHttps: string (nullable = true) | | | |-- sizes: struct (nullable = true) | | | | |-- 0: struct (nullable = true) | | | | | |-- height: long (nullable = true) | | | | | |-- resize: long (nullable = true) | | | | | |-- width: long (nullable = true) | | | | |-- 1: struct (nullable = true) | | | | | |-- height: long (nullable = true) | | | | | |-- resize: long (nullable = true) | | | | | |-- width: long (nullable = true) | | | | |-- 2: struct (nullable = true) | | | | | |-- height: long (nullable = true) | | | | | |-- resize: long (nullable = true) | | | | | |-- width: long (nullable = true) | | | | |-- 3: struct (nullable = true) | | | | | |-- height: long (nullable = true) | | | | | |-- resize: long (nullable = true) | | | | | |-- width: long (nullable = true) | | | |-- start: long (nullable = true) | | | |-- type: string (nullable = true) | | | |-- url: string (nullable = true) | | | |-- videoAspectRatioHeight: long (nullable = true) | | | |-- videoAspectRatioWidth: long (nullable = true) | | | |-- videoDurationMillis: long (nullable = true) | | | |-- videoVariants: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- bitrate: long (nullable = true) | | | | | |-- contentType: string (nullable = true) | | | | | |-- url: string (nullable = true) | |-- quotedStatus: struct (nullable = true) | | |-- contributorsIDs: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- createdAt: string (nullable = true) | | |-- currentUserRetweetId: long (nullable = true) | | |-- displayTextRangeEnd: long (nullable = true) | | |-- displayTextRangeStart: long (nullable = true) | | |-- favoriteCount: long (nullable = true) | | |-- hashtagEntities: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- id: long (nullable = true) | | |-- inReplyToStatusId: long (nullable = true) | | |-- inReplyToUserId: long (nullable = true) | | |-- isFavorited: boolean (nullable = true) | | |-- isPossiblySensitive: boolean (nullable = true) | | |-- isRetweeted: boolean (nullable = true) | | |-- isTruncated: boolean (nullable = true) | | |-- lang: string (nullable = true) | | |-- mediaEntities: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- displayURL: string (nullable = true) | | | | |-- end: long (nullable = true) | | | | |-- expandedURL: string (nullable = true) | | | | |-- id: long (nullable = true) | | | | |-- mediaURL: string (nullable = true) | | | | |-- mediaURLHttps: string (nullable = true) | | | | |-- sizes: struct (nullable = true) | | | | | |-- 0: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- resize: long (nullable = true) | | | | | | |-- width: long (nullable = true) | | | | | |-- 1: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- resize: long (nullable = true) | | | | | | |-- width: long (nullable = true) | | | | | |-- 2: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- resize: long (nullable = true) | | | | | | |-- width: long (nullable = true) | | | | | |-- 3: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- resize: long (nullable = true) | | | | | | |-- width: long (nullable = true) | | | | |-- start: long (nullable = true) | | | | |-- type: string (nullable = true) | | | | |-- url: string (nullable = true) | | | | |-- videoAspectRatioHeight: long (nullable = true) | | | | |-- videoAspectRatioWidth: long (nullable = true) | | | | |-- videoDurationMillis: long (nullable = true) | | | | |-- videoVariants: array (nullable = true) | | | | | |-- element: string (containsNull = true) | | |-- place: struct (nullable = true) | | | |-- boundingBoxCoordinates: array (nullable = true) | | | | |-- element: array (containsNull = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- latitude: double (nullable = true) | | | | | | |-- longitude: double (nullable = true) | | | |-- boundingBoxType: string (nullable = true) | | | |-- country: string (nullable = true) | | | |-- countryCode: string (nullable = true) | | | |-- fullName: string (nullable = true) | | | |-- id: string (nullable = true) | | | |-- name: string (nullable = true) | | | |-- placeType: string (nullable = true) | | | |-- url: string (nullable = true) | | |-- quotedStatusId: long (nullable = true) | | |-- retweetCount: long (nullable = true) | | |-- source: string (nullable = true) | | |-- symbolEntities: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- text: string (nullable = true) | | |-- urlEntities: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- displayURL: string (nullable = true) | | | | |-- end: long (nullable = true) | | | | |-- expandedURL: string (nullable = true) | | | | |-- start: long (nullable = true) | | | | |-- url: string (nullable = true) | | |-- user: struct (nullable = true) | | | |-- createdAt: string (nullable = true) | | | |-- description: string (nullable = true) | | | |-- descriptionURLEntities: array (nullable = true) | | | | |-- element: string (containsNull = true) | | | |-- favouritesCount: long (nullable = true) | | | |-- followersCount: long (nullable = true) | | | |-- friendsCount: long (nullable = true) | | | |-- id: long (nullable = true) | | | |-- isContributorsEnabled: boolean (nullable = true) | | | |-- isDefaultProfile: boolean (nullable = true) | | | |-- isDefaultProfileImage: boolean (nullable = true) | | | |-- isFollowRequestSent: boolean (nullable = true) | | | |-- isGeoEnabled: boolean (nullable = true) | | | |-- isProtected: boolean (nullable = true) | | | |-- isVerified: boolean (nullable = true) | | | |-- listedCount: long (nullable = true) | | | |-- location: string (nullable = true) | | | |-- name: string (nullable = true) | | | |-- profileBackgroundColor: string (nullable = true) | | | |-- profileBackgroundImageUrl: string (nullable = true) | | | |-- profileBackgroundImageUrlHttps: string (nullable = true) | | | |-- profileBackgroundTiled: boolean (nullable = true) | | | |-- profileBannerImageUrl: string (nullable = true) | | | |-- profileImageUrl: string (nullable = true) | | | |-- profileImageUrlHttps: string (nullable = true) | | | |-- profileLinkColor: string (nullable = true) | | | |-- profileSidebarBorderColor: string (nullable = true) | | | |-- profileSidebarFillColor: string (nullable = true) | | | |-- profileTextColor: string (nullable = true) | | | |-- profileUseBackgroundImage: boolean (nullable = true) | | | |-- screenName: string (nullable = true) | | | |-- showAllInlineMedia: boolean (nullable = true) | | | |-- statusesCount: long (nullable = true) | | | |-- translator: boolean (nullable = true) | | | |-- url: string (nullable = true) | | | |-- utcOffset: long (nullable = true) | | |-- userMentionEntities: array (nullable = true) | | | |-- element: string (containsNull = true) | |-- quotedStatusId: long (nullable = true) | |-- quotedStatusPermalink: struct (nullable = true) | | |-- displayURL: string (nullable = true) | | |-- end: long (nullable = true) | | |-- expandedURL: string (nullable = true) | | |-- start: long (nullable = true) | | |-- url: string (nullable = true) | |-- retweetCount: long (nullable = true) | |-- source: string (nullable = true) | |-- symbolEntities: array (nullable = true) | | |-- element: string (containsNull = true) | |-- text: string (nullable = true) | |-- urlEntities: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- displayURL: string (nullable = true) | | | |-- end: long (nullable = true) | | | |-- expandedURL: string (nullable = true) | | | |-- start: long (nullable = true) | | | |-- url: string (nullable = true) | |-- user: struct (nullable = true) | | |-- createdAt: string (nullable = true) | | |-- description: string (nullable = true) | | |-- descriptionURLEntities: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- favouritesCount: long (nullable = true) | | |-- followersCount: long (nullable = true) | | |-- friendsCount: long (nullable = true) | | |-- id: long (nullable = true) | | |-- isContributorsEnabled: boolean (nullable = true) | | |-- isDefaultProfile: boolean (nullable = true) | | |-- isDefaultProfileImage: boolean (nullable = true) | | |-- isFollowRequestSent: boolean (nullable = true) | | |-- isGeoEnabled: boolean (nullable = true) | | |-- isProtected: boolean (nullable = true) | | |-- isVerified: boolean (nullable = true) | | |-- listedCount: long (nullable = true) | | |-- location: string (nullable = true) | | |-- name: string (nullable = true) | | |-- profileBackgroundColor: string (nullable = true) | | |-- profileBackgroundImageUrl: string (nullable = true) | | |-- profileBackgroundImageUrlHttps: string (nullable = true) | | |-- profileBackgroundTiled: boolean (nullable = true) | | |-- profileBannerImageUrl: string (nullable = true) | | |-- profileImageUrl: string (nullable = true) | | |-- profileImageUrlHttps: string (nullable = true) | | |-- profileLinkColor: string (nullable = true) | | |-- profileSidebarBorderColor: string (nullable = true) | | |-- profileSidebarFillColor: string (nullable = true) | | |-- profileTextColor: string (nullable = true) | | |-- profileUseBackgroundImage: boolean (nullable = true) | | |-- screenName: string (nullable = true) | | |-- showAllInlineMedia: boolean (nullable = true) | | |-- statusesCount: long (nullable = true) | | |-- translator: boolean (nullable = true) | | |-- url: string (nullable = true) | | |-- utcOffset: long (nullable = true) | |-- userMentionEntities: array (nullable = true) | | |-- element: string (containsNull = true) |-- source: string (nullable = true) |-- symbolEntities: array (nullable = true) | |-- element: string (containsNull = true) |-- text: string (nullable = true) |-- urlEntities: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- displayURL: string (nullable = true) | | |-- end: long (nullable = true) | | |-- expandedURL: string (nullable = true) | | |-- start: long (nullable = true) | | |-- url: string (nullable = true) |-- user: struct (nullable = true) | |-- createdAt: string (nullable = true) | |-- description: string (nullable = true) | |-- descriptionURLEntities: array (nullable = true) | | |-- element: string (containsNull = true) | |-- favouritesCount: long (nullable = true) | |-- followersCount: long (nullable = true) | |-- friendsCount: long (nullable = true) | |-- id: long (nullable = true) | |-- isContributorsEnabled: boolean (nullable = true) | |-- isDefaultProfile: boolean (nullable = true) | |-- isDefaultProfileImage: boolean (nullable = true) | |-- isFollowRequestSent: boolean (nullable = true) | |-- isGeoEnabled: boolean (nullable = true) | |-- isProtected: boolean (nullable = true) | |-- isVerified: boolean (nullable = true) | |-- listedCount: long (nullable = true) | |-- location: string (nullable = true) | |-- name: string (nullable = true) | |-- profileBackgroundColor: string (nullable = true) | |-- profileBackgroundImageUrl: string (nullable = true) | |-- profileBackgroundImageUrlHttps: string (nullable = true) | |-- profileBackgroundTiled: boolean (nullable = true) | |-- profileBannerImageUrl: string (nullable = true) | |-- profileImageUrl: string (nullable = true) | |-- profileImageUrlHttps: string (nullable = true) | |-- profileLinkColor: string (nullable = true) | |-- profileSidebarBorderColor: string (nullable = true) | |-- profileSidebarFillColor: string (nullable = true) | |-- profileTextColor: string (nullable = true) | |-- profileUseBackgroundImage: boolean (nullable = true) | |-- screenName: string (nullable = true) | |-- showAllInlineMedia: boolean (nullable = true) | |-- statusesCount: long (nullable = true) | |-- translator: boolean (nullable = true) | |-- url: string (nullable = true) | |-- utcOffset: long (nullable = true) |-- userMentionEntities: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- end: long (nullable = true) | | |-- id: long (nullable = true) | | |-- name: string (nullable = true) | | |-- screenName: string (nullable = true) | | |-- start: long (nullable = true)
outJson.select("id","text").show(false) // output not displayed to comply with Twitter Developer rules
display(outJson)  // output not displayed to comply with Twitter Developer rules

Now, let's be good at house-keeping and clean-up the unnecessary data in dbfs, our distributed file system (in databricks).

// to remove a pre-existing directory and start from scratch uncomment next line and evaluate this cell
//dbutils.fs.rm(outputDirectoryRoot, true) 
res19: Boolean = true

Clearly there is a lot one can do with tweets!

Enspecially, after you can get a few more primitives under your belt from the following areas:

  • Natural Language Processing (MLlib, beyond word counts of course),
  • Distributed vertex programming (Graph Frames, which you already know), and
  • Scalable geospatial computing with location data on open street maps (roughly a third of tweets are geo-enabled with Latitude and Longitude of the tweet location) - we will get into this.

Making a function for Spark Streaming job

Let's try to throw the bits and bobs of code above into a function called streamFunc for simplicity and modularity.

import com.google.gson.Gson 
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory
val batchInterval = 1 // in minutes
val timeoutJobLength =  batchInterval * 5

var newContextCreated = false
var numTweetsCollected = 0L // track number of tweets collected
//val conf = new SparkConf().setAppName("TrackedTweetCollector").setMaster("local")
// This is the function that creates the SteamingContext and sets up the Spark Streaming job.
def streamFunc(): StreamingContext = {
  // Create a Spark Streaming Context.
  val ssc = new StreamingContext(sc, Minutes(batchInterval))
  // Create the OAuth Twitter credentials 
  val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
  // Create a Twitter Stream for the input source.  
  val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
  // Transform the discrete RDDs into JSON
  val twitterStreamJson = twitterStream.map(x => { val gson = new Gson();
                                                 val xJson = gson.toJson(x)
                                                 xJson
                                               }) 
  // take care
  val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream.
  
  // what we want done with each discrete RDD tuple: (rdd, time)
  twitterStreamJson.foreachRDD((rdd, time) => { // for each filtered RDD in the DStream
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
        // to write to parquet directly in append mode in one directory per 'time'------------       
        val outputDF = outputRDD.toDF("tweetAsJsonString")
        // get some time fields from current `.Date()`
        val year = (new java.text.SimpleDateFormat("yyyy")).format(new java.util.Date())
        val month = (new java.text.SimpleDateFormat("MM")).format(new java.util.Date())
        val day = (new java.text.SimpleDateFormat("dd")).format(new java.util.Date())
        val hour = (new java.text.SimpleDateFormat("HH")).format(new java.util.Date())
        // write to a file with a clear time-based hierarchical directory structure for example
        outputDF.write.mode(SaveMode.Append)
                .parquet(outputDirectoryRoot+ "/"+ year + "/" + month + "/" + day + "/" + hour + "/" + time.milliseconds) 
        // end of writing as parquet file-------------------------------------
        numTweetsCollected += count // update with the latest count
      }
  })
  newContextCreated = true
  ssc
}
import com.google.gson.Gson import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ outputDirectoryRoot: String = /datasets/tweetsStreamTmp batchInterval: Int = 1 timeoutJobLength: Int = 5 newContextCreated: Boolean = false numTweetsCollected: Long = 0 streamFunc: ()org.apache.spark.streaming.StreamingContext
// Now just use the function to create a Spark Streaming Context
val ssc = StreamingContext.getActiveOrCreate(streamFunc)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@53f2da21
// you only need one of these to start
ssc.start()
//ssc.awaitTerminationOrTimeout(timeoutJobLength)
// this will make sure all streaming job in the cluster are stopped
// but let' run it for a few minutes before stopping it
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } 
display(dbutils.fs.ls("/datasets/tweetsStreamTmp/2019/05/24/08/1558687080000/")) // outputDirectoryRoot
dbfs:/datasets/tweetsStreamTmp/2019/05/24/08/1558687080000/_SUCCESS_SUCCESS0
dbfs:/datasets/tweetsStreamTmp/2019/05/24/08/1558687080000/_committed_6722461357882571064_committed_6722461357882571064123
dbfs:/datasets/tweetsStreamTmp/2019/05/24/08/1558687080000/_started_6722461357882571064_started_67224613578825710640
dbfs:/datasets/tweetsStreamTmp/2019/05/24/08/1558687080000/part-00000-tid-6722461357882571064-9416bc35-ec6f-4306-9dff-efd97af25f17-1861-c000.snappy.parquetpart-00000-tid-6722461357882571064-9416bc35-ec6f-4306-9dff-efd97af25f17-1861-c000.snappy.parquet2030013

Next, let us take a quick peek at the notebook scalable-data-science/sds-2-2/025_b_TTTDFfunctions to see how we have pipelined the JSON tweets into DataFrames.

Please see http://lamastex.org/lmse/mep/src/TweetAnatomyAndTransmissionTree.html to understand more deeply.

%run "scalable-data-science/sds-2-x/025_b_TTTDFfunctions"
USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName"))) val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName"))) import org.apache.spark.sql.types.{StructType, StructField, StringType} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.ColumnName import org.apache.spark.sql.DataFrame fromParquetFile2DF: (InputDFAsParquetFilePatternString: String)org.apache.spark.sql.DataFrame tweetsJsonStringDF2TweetsDF: (tweetsAsJsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame tweetsIDLong_JsonStringPairDF2TweetsDF: (tweetsAsIDLong_JsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame tweetsDF2TTTDF: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame tweetsDF2TTTDFWithURLsAndHastags: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
val rawDF = fromParquetFile2DF("/datasets/tweetsStreamTmp/2019/05/*/*/*/*") //.cache()
val TTTsDF = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(rawDF)).cache()
rawDF: org.apache.spark.sql.DataFrame = [tweetAsJsonString: string] TTTsDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 32 more fields]
TTTsDF.count()
res46: Long = 5762
display(TTTsDF)  // output not displayed to comply with Twitter Developer rules
TTTsDF.printSchema
root |-- CurrentTweetDate: timestamp (nullable = true) |-- CurrentTwID: long (nullable = true) |-- CreationDateOfOrgTwInRT: timestamp (nullable = true) |-- OriginalTwIDinRT: long (nullable = true) |-- CreationDateOfOrgTwInQT: timestamp (nullable = true) |-- OriginalTwIDinQT: long (nullable = true) |-- OriginalTwIDinReply: long (nullable = true) |-- CPostUserId: long (nullable = true) |-- userCreatedAtDate: timestamp (nullable = true) |-- OPostUserIdinRT: long (nullable = true) |-- OPostUserIdinQT: long (nullable = true) |-- OPostUserIdinReply: long (nullable = true) |-- CPostUserName: string (nullable = true) |-- OPostUserNameinRT: string (nullable = true) |-- OPostUserNameinQT: string (nullable = true) |-- CPostUserSN: string (nullable = true) |-- OPostUserSNinRT: string (nullable = true) |-- OPostUserSNinQT: string (nullable = true) |-- OPostUserSNinReply: string (nullable = true) |-- favouritesCount: long (nullable = true) |-- followersCount: long (nullable = true) |-- friendsCount: long (nullable = true) |-- isVerified: boolean (nullable = true) |-- isGeoEnabled: boolean (nullable = true) |-- CurrentTweet: string (nullable = true) |-- UMentionRTiD: array (nullable = true) | |-- element: long (containsNull = true) |-- UMentionRTsN: array (nullable = true) | |-- element: string (containsNull = true) |-- UMentionQTiD: array (nullable = true) | |-- element: long (containsNull = true) |-- UMentionQTsN: array (nullable = true) | |-- element: string (containsNull = true) |-- UMentionASiD: array (nullable = true) | |-- element: long (containsNull = true) |-- UMentionASsN: array (nullable = true) | |-- element: string (containsNull = true) |-- TweetType: string (nullable = false) |-- MentionType: string (nullable = false) |-- Weight: long (nullable = false)
display(TTTsDF.groupBy($"tweetType").count().orderBy($"count".desc))
ReTweetOriginal TweetReply TweetRetweet of Quoted TweetQuoted TweetReply of Quoted Tweet48%24%21%4%3%0%tweetTypeReTweetReTweetOriginal TweetOriginal TweetReply TweetReply TweetRetweet of Quoted TweetRetweet of Quoted TweetQuoted TweetQuoted TweetReply of Quoted TweetReply of Quoted Tweet
// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) } 
// this will delete what we collected to keep the disk usage tight and tidy
dbutils.fs.rm(outputDirectoryRoot, true) 
res43: Boolean = true

Next, let's write the tweets into a scalable commercial cloud storage system

We will make sure to write the tweets to AWS's simple storage service or S3, a scalable storage system in the cloud. See https://aws.amazon.com/s3/.

skip this section if you don't have AWS account.

But all the main syntactic bits are here for your future convenience :)

// Replace with your AWS S3 credentials
//
// NOTE: Set the access to this notebook appropriately to protect the security of your keys.
// Or you can delete this cell after you run the mount command below once successfully.

val AccessKey = getArgument("1. ACCESS_KEY", "REPLACE_WITH_YOUR_ACCESS_KEY")
val SecretKey = getArgument("2. SECRET_KEY", "REPLACE_WITH_YOUR_SECRET_KEY")
val EncodedSecretKey = SecretKey.replace("/", "%2F")
val AwsBucketName = getArgument("3. S3_BUCKET", "REPLACE_WITH_YOUR_S3_BUCKET")
val MountName = getArgument("4. MNT_NAME", "REPLACE_WITH_YOUR_MOUNT_NAME")
val s3Filename = "tweetDump"

Now just mount s3 as follows:

dbutils.fs.mount(s"s3a://$AccessKey:$EncodedSecretKey@$AwsBucketName", s"/mnt/$MountName")

Now you can use the dbutils commands freely to access data in the mounted S3.

dbutils.fs.help()

copying:

// to copy all the tweets to s3
dbutils.fs.cp("dbfs:/rawTweets",s"/mnt/$MountName/rawTweetsInS3/",recurse=true)

deleting:

// to remove all the files from s3
dbutils.fs.rm(s"/mnt/$MountName/rawTweetsInS3",recurse=true)

unmounting:

// finally unmount when done - IMPORTANT!
dbutils.fs.unmount(s"/mnt/$MountName")