ScaDaMaLe Course site and book

Tweet Streaming Collector

Let us build a system to collect live tweets using Spark streaming.

Here are the main steps in this notebook:

  • let's collect from the public twitter stream and write to DBFS as json strings in a boiler-plate manner to understand the componets better.
  • Then we will turn the collector into a function and use it
  • Finally we will use some DataFrame-based pipelines to convert the raw tweets into other structured content.

Note that capturing tweets from the public streams for free using Twitter's Streaming API has some caveats. We are supposed to have access to a uniformly random sample of roughly 1% of all Tweets across the globe, but what's exactly available in the sample from the full twitter social media network, i.e. all status updates in the planet, for such free collection is not exactly known in terms of sub-sampling strategies like starification layers, etc. This latter is Twitter's proprietary information. However, we are supposed to be able to assume that it is indeed a random sample of roughly 1% of all tweets.

We will call extendedTwitterUtils notebook from here.

But first install the following libraries from maven central and attach to this cluster:

  • gson with maven coordinates com.google.code.gson:gson:2.8.6
  • twitter4j-examples with maven coordinates org.twitter4j:twitter4j-examples:4.0.7
"./025_a_extendedTwitterUtils2run"
import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.

defined class ExtendedTwitterReceiver
defined class ExtendedTwitterInputDStream
// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) }
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
defined object ExtendedTwitterUtils
done running the extendedTwitterUtils2run notebook - ready to stream from twitter

Let's create a directory in dbfs for storing tweets in the cluster's distributed file system.

val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory
outputDirectoryRoot: String = /datasets/tweetsStreamTmp
dbutils.fs.mkdirs("/datasets/tweetsStreamTmp")
res2: Boolean = true
//display(dbutils.fs.ls(outputDirectoryRoot))
// to remove a pre-existing directory and start from scratch uncomment next line and evaluate this cell
dbutils.fs.rm("/datasets/tweetsStreamTmp", true) 
res4: Boolean = true

Capture tweets in every sliding window of slideInterval many milliseconds.

val slideInterval = new Duration(1 * 1000) // 1 * 1000 = 1000 milli-seconds = 1 sec
slideInterval: org.apache.spark.streaming.Duration = 1000 ms

Recall that Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark?s abstraction of an immutable, distributed dataset (see Spark Programming Guide for more details). Each RDD in a DStream contains data from a certain interval, as shown in the following figure.

Spark Streaming

Let's import google's json library next.

import com.google.gson.Gson 
import com.google.gson.Gson

Our goal is to take each RDD in the twitter DStream and write it as a json file in our dbfs.

// Create a Spark Streaming Context.
val ssc = new StreamingContext(sc, slideInterval)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@7a8fbb86

CAUTION

Extracting knowledge from tweets is "easy" using techniques shown here, but one has to take legal responsibility for the use of this knowledge and conform to the rules and policies linked below.

Remeber that the use of twitter itself comes with various strings attached. Read:

Crucially, the use of the content from twitter by you (as done in this worksheet) comes with some strings. Read:

Enter your own Twitter API Credentials.

  • Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
  • Get your own Twitter API Credentials: consumerKey, consumerSecret, accessToken and accessTokenSecret and enter them in the cell below.

Ethical/Legal Aspects

See Background Readings/Viewings in Project MEP:

Tweet Collector

There are several steps to make a streaming twitter collector. We will do them one by one so you learn all the components. In the sequel we will make a function out of the various steps.

1. Twitter Credentials

First step towards doing your own experiments in twitter is to enter your Twitter API credentials.

  • Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
  • Run the code in a cell to Enter your own credentials.
// put your own twitter developer credentials below instead of xxx
// instead of the '%run ".../secrets/026_secret_MyTwitterOAuthCredentials"' below
// you need to copy-paste the following code-block with your own Twitter credentials replacing XXXX


// put your own twitter developer credentials below 

import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder


// These have been regenerated!!! - need to chane them

def myAPIKey       = "XXXX" // APIKey 
def myAPISecret    = "XXXX" // APISecretKey
def myAccessToken          = "XXXX" // AccessToken
def myAccessTokenSecret    = "XXXX" // AccessTokenSecret


System.setProperty("twitter4j.oauth.consumerKey", myAPIKey)
System.setProperty("twitter4j.oauth.consumerSecret", myAPISecret)
System.setProperty("twitter4j.oauth.accessToken", myAccessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", myAccessTokenSecret)

println("twitter OAuth Credentials loaded")

The cell-below will not expose my Twitter API Credentials: myAPIKey, myAPISecret, myAccessToken and myAccessTokenSecret. Use the code above to enter your own credentials in a scala cell.

"Users/raazesh.sainudiin@math.uu.se/scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"
// Create a Twitter Stream for the input source. 
val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
auth: Some[twitter4j.auth.OAuthAuthorization] = Some(OAuthAuthorization{consumerKey='8uN0N9RTLT1viaR811yyG7xwk', consumerSecret='******************************************', oauthToken=AccessToken{screenName='null', userId=4173723312}})
twitterStream: org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status] = ExtendedTwitterInputDStream@7f635e6d

2. Mapping Tweets to JSON

Let's map the tweets into JSON formatted string (one tweet per line). We will use Google GSON library for this.

val twitterStreamJson = twitterStream.map(
                                            x => { val gson = new Gson();
                                                 val xJson = gson.toJson(x)
                                                 xJson
                                                 }
                                          ) 
twitterStreamJson: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@b874a46
twitter OAuth Credentials loaded
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
myAPIKey: String
myAPISecret: String
myAccessToken: String
myAccessTokenSecret: String
outputDirectoryRoot
res8: String = /datasets/tweetsStreamTmp
var numTweetsCollected = 0L // track number of tweets collected
val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream.

twitterStreamJson.foreachRDD( 
  (rdd, time) => { // for each RDD in the DStream
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
        outputRDD.saveAsTextFile(outputDirectoryRoot + "/tweets_" + time.milliseconds.toString) // save as textfile
        numTweetsCollected += count // update with the latest count
      }
  }
)
numTweetsCollected: Long = 0
partitionsEachInterval: Int = 1

3. Start the Stream

Nothing has actually happened yet.

Let's start the spark streaming context we have created next.

ssc.start()

Let's look at the spark UI now and monitor the streaming job in action! Go to Clusters on the left and click on UI and then Streaming.

numTweetsCollected // number of tweets collected so far
res11: Long = 468

Let's try seeing again in a few seconds how many tweets have been collected up to now.

numTweetsCollected // number of tweets collected so far
res12: Long = 859

4. Stop the Stream

Note that you could easilt fill up disk space!!!

So let's stop the streaming job next.

ssc.stop(stopSparkContext = false) // gotto stop soon!!!

Let's make sure that the Streaming UI is not active in the Clusters UI.

StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } // extra cautious stopping of all active streaming contexts

5. Examine Collected Tweets

Next let's examine what was saved in dbfs.

display(dbutils.fs.ls(outputDirectoryRoot))
path name size
dbfs:/datasets/tweetsStreamTmp/tweets_1605867067000/ tweets_1605867067000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867068000/ tweets_1605867068000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867069000/ tweets_1605867069000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867070000/ tweets_1605867070000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867071000/ tweets_1605867071000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867072000/ tweets_1605867072000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867073000/ tweets_1605867073000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867074000/ tweets_1605867074000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867075000/ tweets_1605867075000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867076000/ tweets_1605867076000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867077000/ tweets_1605867077000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867078000/ tweets_1605867078000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867079000/ tweets_1605867079000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867080000/ tweets_1605867080000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867081000/ tweets_1605867081000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867082000/ tweets_1605867082000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867083000/ tweets_1605867083000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867084000/ tweets_1605867084000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867085000/ tweets_1605867085000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867086000/ tweets_1605867086000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867087000/ tweets_1605867087000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867088000/ tweets_1605867088000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867089000/ tweets_1605867089000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867090000/ tweets_1605867090000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867091000/ tweets_1605867091000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867092000/ tweets_1605867092000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867093000/ tweets_1605867093000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867094000/ tweets_1605867094000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867095000/ tweets_1605867095000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867096000/ tweets_1605867096000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867097000/ tweets_1605867097000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867098000/ tweets_1605867098000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867099000/ tweets_1605867099000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867100000/ tweets_1605867100000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867101000/ tweets_1605867101000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867102000/ tweets_1605867102000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867103000/ tweets_1605867103000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867104000/ tweets_1605867104000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867105000/ tweets_1605867105000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867106000/ tweets_1605867106000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867107000/ tweets_1605867107000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867108000/ tweets_1605867108000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867109000/ tweets_1605867109000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867110000/ tweets_1605867110000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867111000/ tweets_1605867111000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867112000/ tweets_1605867112000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867113000/ tweets_1605867113000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867114000/ tweets_1605867114000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867115000/ tweets_1605867115000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867116000/ tweets_1605867116000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867117000/ tweets_1605867117000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867118000/ tweets_1605867118000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867119000/ tweets_1605867119000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867120000/ tweets_1605867120000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867121000/ tweets_1605867121000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867122000/ tweets_1605867122000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867123000/ tweets_1605867123000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867124000/ tweets_1605867124000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867125000/ tweets_1605867125000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867126000/ tweets_1605867126000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867127000/ tweets_1605867127000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867128000/ tweets_1605867128000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867129000/ tweets_1605867129000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867130000/ tweets_1605867130000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867131000/ tweets_1605867131000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867132000/ tweets_1605867132000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867133000/ tweets_1605867133000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867134000/ tweets_1605867134000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867135000/ tweets_1605867135000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867136000/ tweets_1605867136000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867137000/ tweets_1605867137000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867138000/ tweets_1605867138000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867139000/ tweets_1605867139000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867140000/ tweets_1605867140000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867141000/ tweets_1605867141000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867142000/ tweets_1605867142000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867143000/ tweets_1605867143000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867144000/ tweets_1605867144000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867145000/ tweets_1605867145000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867146000/ tweets_1605867146000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867147000/ tweets_1605867147000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867148000/ tweets_1605867148000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867149000/ tweets_1605867149000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867150000/ tweets_1605867150000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867151000/ tweets_1605867151000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867152000/ tweets_1605867152000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867153000/ tweets_1605867153000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867154000/ tweets_1605867154000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867155000/ tweets_1605867155000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867156000/ tweets_1605867156000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867157000/ tweets_1605867157000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867158000/ tweets_1605867158000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867159000/ tweets_1605867159000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867160000/ tweets_1605867160000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867161000/ tweets_1605867161000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867162000/ tweets_1605867162000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867163000/ tweets_1605867163000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867164000/ tweets_1605867164000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867165000/ tweets_1605867165000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867166000/ tweets_1605867166000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867167000/ tweets_1605867167000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867168000/ tweets_1605867168000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867169000/ tweets_1605867169000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867170000/ tweets_1605867170000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867171000/ tweets_1605867171000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867172000/ tweets_1605867172000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867173000/ tweets_1605867173000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867174000/ tweets_1605867174000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867175000/ tweets_1605867175000/ 0.0
val tweetsDir = outputDirectoryRoot+"/tweets_1605867068000/" // use an existing file, may have to rename folder based on output above!
tweetsDir: String = /datasets/tweetsStreamTmp/tweets_1605867068000/
display(dbutils.fs.ls(tweetsDir)) 
path name size
dbfs:/datasets/tweetsStreamTmp/tweets_1605867068000/_SUCCESS _SUCCESS 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867068000/part-00000 part-00000 122395.0
sc.textFile(tweetsDir+"part-00000").count()
res17: Long = 31
val outJson = sqlContext.read.json(tweetsDir+"part-00000")
outJson: org.apache.spark.sql.DataFrame = [contributorsIDs: array<string>, createdAt: string ... 26 more fields]
outJson.printSchema()
root
 |-- contributorsIDs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- createdAt: string (nullable = true)
 |-- currentUserRetweetId: long (nullable = true)
 |-- displayTextRangeEnd: long (nullable = true)
 |-- displayTextRangeStart: long (nullable = true)
 |-- favoriteCount: long (nullable = true)
 |-- hashtagEntities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- text: string (nullable = true)
 |-- id: long (nullable = true)
 |-- inReplyToScreenName: string (nullable = true)
 |-- inReplyToStatusId: long (nullable = true)
 |-- inReplyToUserId: long (nullable = true)
 |-- isFavorited: boolean (nullable = true)
 |-- isPossiblySensitive: boolean (nullable = true)
 |-- isRetweeted: boolean (nullable = true)
 |-- isTruncated: boolean (nullable = true)
 |-- lang: string (nullable = true)
 |-- mediaEntities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- displayURL: string (nullable = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- expandedURL: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- mediaURL: string (nullable = true)
 |    |    |-- mediaURLHttps: string (nullable = true)
 |    |    |-- sizes: struct (nullable = true)
 |    |    |    |-- 0: struct (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- 2: struct (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- 3: struct (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- videoAspectRatioHeight: long (nullable = true)
 |    |    |-- videoAspectRatioWidth: long (nullable = true)
 |    |    |-- videoDurationMillis: long (nullable = true)
 |    |    |-- videoVariants: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- quotedStatus: struct (nullable = true)
 |    |-- contributorsIDs: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- createdAt: string (nullable = true)
 |    |-- currentUserRetweetId: long (nullable = true)
 |    |-- displayTextRangeEnd: long (nullable = true)
 |    |-- displayTextRangeStart: long (nullable = true)
 |    |-- favoriteCount: long (nullable = true)
 |    |-- hashtagEntities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- id: long (nullable = true)
 |    |-- inReplyToScreenName: string (nullable = true)
 |    |-- inReplyToStatusId: long (nullable = true)
 |    |-- inReplyToUserId: long (nullable = true)
 |    |-- isFavorited: boolean (nullable = true)
 |    |-- isPossiblySensitive: boolean (nullable = true)
 |    |-- isRetweeted: boolean (nullable = true)
 |    |-- isTruncated: boolean (nullable = true)
 |    |-- lang: string (nullable = true)
 |    |-- mediaEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- displayURL: string (nullable = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- expandedURL: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- mediaURL: string (nullable = true)
 |    |    |    |-- mediaURLHttps: string (nullable = true)
 |    |    |    |-- sizes: struct (nullable = true)
 |    |    |    |    |-- 0: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- 2: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- 3: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |    |-- url: string (nullable = true)
 |    |    |    |-- videoAspectRatioHeight: long (nullable = true)
 |    |    |    |-- videoAspectRatioWidth: long (nullable = true)
 |    |    |    |-- videoDurationMillis: long (nullable = true)
 |    |    |    |-- videoVariants: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |-- place: struct (nullable = true)
 |    |    |-- boundingBoxCoordinates: array (nullable = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- latitude: double (nullable = true)
 |    |    |    |    |    |-- longitude: double (nullable = true)
 |    |    |-- boundingBoxType: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- countryCode: string (nullable = true)
 |    |    |-- fullName: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- placeType: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |-- quotedStatusId: long (nullable = true)
 |    |-- retweetCount: long (nullable = true)
 |    |-- source: string (nullable = true)
 |    |-- symbolEntities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- text: string (nullable = true)
 |    |-- urlEntities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- user: struct (nullable = true)
 |    |    |-- createdAt: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- descriptionURLEntities: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- favouritesCount: long (nullable = true)
 |    |    |-- followersCount: long (nullable = true)
 |    |    |-- friendsCount: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- isContributorsEnabled: boolean (nullable = true)
 |    |    |-- isDefaultProfile: boolean (nullable = true)
 |    |    |-- isDefaultProfileImage: boolean (nullable = true)
 |    |    |-- isFollowRequestSent: boolean (nullable = true)
 |    |    |-- isGeoEnabled: boolean (nullable = true)
 |    |    |-- isProtected: boolean (nullable = true)
 |    |    |-- isVerified: boolean (nullable = true)
 |    |    |-- listedCount: long (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- profileBackgroundColor: string (nullable = true)
 |    |    |-- profileBackgroundImageUrl: string (nullable = true)
 |    |    |-- profileBackgroundImageUrlHttps: string (nullable = true)
 |    |    |-- profileBackgroundTiled: boolean (nullable = true)
 |    |    |-- profileBannerImageUrl: string (nullable = true)
 |    |    |-- profileImageUrl: string (nullable = true)
 |    |    |-- profileImageUrlHttps: string (nullable = true)
 |    |    |-- profileLinkColor: string (nullable = true)
 |    |    |-- profileSidebarBorderColor: string (nullable = true)
 |    |    |-- profileSidebarFillColor: string (nullable = true)
 |    |    |-- profileTextColor: string (nullable = true)
 |    |    |-- profileUseBackgroundImage: boolean (nullable = true)
 |    |    |-- screenName: string (nullable = true)
 |    |    |-- showAllInlineMedia: boolean (nullable = true)
 |    |    |-- statusesCount: long (nullable = true)
 |    |    |-- translator: boolean (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- utcOffset: long (nullable = true)
 |    |-- userMentionEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- screenName: string (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |-- quotedStatusId: long (nullable = true)
 |-- quotedStatusPermalink: struct (nullable = true)
 |    |-- displayURL: string (nullable = true)
 |    |-- end: long (nullable = true)
 |    |-- expandedURL: string (nullable = true)
 |    |-- start: long (nullable = true)
 |    |-- url: string (nullable = true)
 |-- retweetCount: long (nullable = true)
 |-- retweetedStatus: struct (nullable = true)
 |    |-- contributorsIDs: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- createdAt: string (nullable = true)
 |    |-- currentUserRetweetId: long (nullable = true)
 |    |-- displayTextRangeEnd: long (nullable = true)
 |    |-- displayTextRangeStart: long (nullable = true)
 |    |-- favoriteCount: long (nullable = true)
 |    |-- hashtagEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- inReplyToScreenName: string (nullable = true)
 |    |-- inReplyToStatusId: long (nullable = true)
 |    |-- inReplyToUserId: long (nullable = true)
 |    |-- isFavorited: boolean (nullable = true)
 |    |-- isPossiblySensitive: boolean (nullable = true)
 |    |-- isRetweeted: boolean (nullable = true)
 |    |-- isTruncated: boolean (nullable = true)
 |    |-- lang: string (nullable = true)
 |    |-- mediaEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- displayURL: string (nullable = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- expandedURL: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- mediaURL: string (nullable = true)
 |    |    |    |-- mediaURLHttps: string (nullable = true)
 |    |    |    |-- sizes: struct (nullable = true)
 |    |    |    |    |-- 0: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- 2: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- 3: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |    |-- url: string (nullable = true)
 |    |    |    |-- videoAspectRatioHeight: long (nullable = true)
 |    |    |    |-- videoAspectRatioWidth: long (nullable = true)
 |    |    |    |-- videoDurationMillis: long (nullable = true)
 |    |    |    |-- videoVariants: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |-- quotedStatusId: long (nullable = true)
 |    |-- retweetCount: long (nullable = true)
 |    |-- source: string (nullable = true)
 |    |-- symbolEntities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- text: string (nullable = true)
 |    |-- urlEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- displayURL: string (nullable = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- expandedURL: string (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |    |    |    |-- url: string (nullable = true)
 |    |-- user: struct (nullable = true)
 |    |    |-- createdAt: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- descriptionURLEntities: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- favouritesCount: long (nullable = true)
 |    |    |-- followersCount: long (nullable = true)
 |    |    |-- friendsCount: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- isContributorsEnabled: boolean (nullable = true)
 |    |    |-- isDefaultProfile: boolean (nullable = true)
 |    |    |-- isDefaultProfileImage: boolean (nullable = true)
 |    |    |-- isFollowRequestSent: boolean (nullable = true)
 |    |    |-- isGeoEnabled: boolean (nullable = true)
 |    |    |-- isProtected: boolean (nullable = true)
 |    |    |-- isVerified: boolean (nullable = true)
 |    |    |-- listedCount: long (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- profileBackgroundColor: string (nullable = true)
 |    |    |-- profileBackgroundImageUrl: string (nullable = true)
 |    |    |-- profileBackgroundImageUrlHttps: string (nullable = true)
 |    |    |-- profileBackgroundTiled: boolean (nullable = true)
 |    |    |-- profileBannerImageUrl: string (nullable = true)
 |    |    |-- profileImageUrl: string (nullable = true)
 |    |    |-- profileImageUrlHttps: string (nullable = true)
 |    |    |-- profileLinkColor: string (nullable = true)
 |    |    |-- profileSidebarBorderColor: string (nullable = true)
 |    |    |-- profileSidebarFillColor: string (nullable = true)
 |    |    |-- profileTextColor: string (nullable = true)
 |    |    |-- profileUseBackgroundImage: boolean (nullable = true)
 |    |    |-- screenName: string (nullable = true)
 |    |    |-- showAllInlineMedia: boolean (nullable = true)
 |    |    |-- statusesCount: long (nullable = true)
 |    |    |-- translator: boolean (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- utcOffset: long (nullable = true)
 |    |-- userMentionEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- screenName: string (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |-- source: string (nullable = true)
 |-- symbolEntities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- text: string (nullable = true)
 |-- urlEntities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- displayURL: string (nullable = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- expandedURL: string (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- url: string (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- createdAt: string (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- descriptionURLEntities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- favouritesCount: long (nullable = true)
 |    |-- followersCount: long (nullable = true)
 |    |-- friendsCount: long (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- isContributorsEnabled: boolean (nullable = true)
 |    |-- isDefaultProfile: boolean (nullable = true)
 |    |-- isDefaultProfileImage: boolean (nullable = true)
 |    |-- isFollowRequestSent: boolean (nullable = true)
 |    |-- isGeoEnabled: boolean (nullable = true)
 |    |-- isProtected: boolean (nullable = true)
 |    |-- isVerified: boolean (nullable = true)
 |    |-- listedCount: long (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- profileBackgroundColor: string (nullable = true)
 |    |-- profileBackgroundImageUrl: string (nullable = true)
 |    |-- profileBackgroundImageUrlHttps: string (nullable = true)
 |    |-- profileBackgroundTiled: boolean (nullable = true)
 |    |-- profileBannerImageUrl: string (nullable = true)
 |    |-- profileImageUrl: string (nullable = true)
 |    |-- profileImageUrlHttps: string (nullable = true)
 |    |-- profileLinkColor: string (nullable = true)
 |    |-- profileSidebarBorderColor: string (nullable = true)
 |    |-- profileSidebarFillColor: string (nullable = true)
 |    |-- profileTextColor: string (nullable = true)
 |    |-- profileUseBackgroundImage: boolean (nullable = true)
 |    |-- screenName: string (nullable = true)
 |    |-- showAllInlineMedia: boolean (nullable = true)
 |    |-- statusesCount: long (nullable = true)
 |    |-- translator: boolean (nullable = true)
 |    |-- url: string (nullable = true)
 |    |-- utcOffset: long (nullable = true)
 |-- userMentionEntities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- screenName: string (nullable = true)
 |    |    |-- start: long (nullable = true)
//outJson.select("id","text").show(false) // output not displayed to comply with Twitter Developer rules
//display(outJson)  // output not displayed to comply with Twitter Developer rules

Now, let's be good at house-keeping and clean-up the unnecessary data in dbfs, our distributed file system (in databricks).

// to remove a pre-existing directory and start from scratch uncomment next line and evaluate this cell
dbutils.fs.rm(outputDirectoryRoot, true) 
res24: Boolean = true

Clearly there is a lot one can do with tweets!

Enspecially, after you can get a few more primitives under your belt from the following areas:

  • Natural Language Processing (MLlib, beyond word counts of course),
  • Distributed vertex programming (Graph Frames, which you already know), and
  • Scalable geospatial computing with location data on open street maps (roughly a third of tweets are geo-enabled with Latitude and Longitude of the tweet location) - we will get into this.

Method for Spark Streaming Collector

Let's try to throw the bits and bobs of code in the above 5 steps into a method called streamFunc for simplicity and modularity.

import com.google.gson.Gson 
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory
val batchInterval = 1 // in minutes
val timeoutJobLength =  batchInterval * 5

var newContextCreated = false
var numTweetsCollected = 0L // track number of tweets collected
//val conf = new SparkConf().setAppName("TrackedTweetCollector").setMaster("local")
// This is the function that creates the SteamingContext and sets up the Spark Streaming job.
def streamFunc(): StreamingContext = {
  // Create a Spark Streaming Context.
  val ssc = new StreamingContext(sc, Minutes(batchInterval))
  // Create the OAuth Twitter credentials 
  val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
  // Create a Twitter Stream for the input source.  
  val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
  // Transform the discrete RDDs into JSON
  val twitterStreamJson = twitterStream.map(x => { val gson = new Gson();
                                                 val xJson = gson.toJson(x)
                                                 xJson
                                               }) 
  // take care
  val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream.
  
  // what we want done with each discrete RDD tuple: (rdd, time)
  twitterStreamJson.foreachRDD((rdd, time) => { // for each filtered RDD in the DStream
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
        // to write to parquet directly in append mode in one directory per 'time'------------       
        val outputDF = outputRDD.toDF("tweetAsJsonString")
        // get some time fields from current `.Date()`
        val year = (new java.text.SimpleDateFormat("yyyy")).format(new java.util.Date())
        val month = (new java.text.SimpleDateFormat("MM")).format(new java.util.Date())
        val day = (new java.text.SimpleDateFormat("dd")).format(new java.util.Date())
        val hour = (new java.text.SimpleDateFormat("HH")).format(new java.util.Date())
        // write to a file with a clear time-based hierarchical directory structure for example
        outputDF.write.mode(SaveMode.Append)
                .parquet(outputDirectoryRoot+ "/"+ year + "/" + month + "/" + day + "/" + hour + "/" + time.milliseconds) 
        // end of writing as parquet file-------------------------------------
        numTweetsCollected += count // update with the latest count
      }
  })
  newContextCreated = true
  ssc
}
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
outputDirectoryRoot: String = /datasets/tweetsStreamTmp
batchInterval: Int = 1
timeoutJobLength: Int = 5
newContextCreated: Boolean = false
numTweetsCollected: Long = 0
streamFunc: ()org.apache.spark.streaming.StreamingContext
// Now just use the function to create a Spark Streaming Context
val ssc = StreamingContext.getActiveOrCreate(streamFunc)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@400b6153
// you only need one of these to start
ssc.start()
//ssc.awaitTerminationOrTimeout(timeoutJobLength)
// this will make sure all streaming job in the cluster are stopped
// but let' run it for a few minutes before stopping it
//StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } 
display(dbutils.fs.ls("/datasets/tweetsStreamTmp/2020/11/20/10")) // outputDirectoryRoot
path name size
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605867660000/ 1605867660000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605867720000/ 1605867720000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605867780000/ 1605867780000/ 0.0

Tweet Transmission Tree Tables

Next, let us take a quick peek at the notebook ./025_b_TTTDFfunctions to see how we have pipelined the JSON tweets into tabular or structured data as DataFrames.

Please see http://lamastex.org/lmse/mep/src/TweetAnatomyAndTransmissionTree.html to understand more deeply.

Note that the fundamental issue here is that we need to define what we exactly mean by a particular type of status update, i.e.:

  • How do we categorize a particular status update, based on the data contained in it, as one of the following?
    • Original Tweet
    • Retweet
    • Quoted tweet
    • retweet of a Quoted Tweet
    • etc.

Answers to the above question are exactly answered by the methods in ./025_b_TTTDFfunctions.

"./025_b_TTTDFfunctions"
USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.ColumnName
import org.apache.spark.sql.DataFrame
fromParquetFile2DF: (InputDFAsParquetFilePatternString: String)org.apache.spark.sql.DataFrame
tweetsJsonStringDF2TweetsDF: (tweetsAsJsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsIDLong_JsonStringPairDF2TweetsDF: (tweetsAsIDLong_JsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDF: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHashtags: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHashtagsLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
val rawDF = fromParquetFile2DF("/datasets/tweetsStreamTmp/2020/11/*/*/*/*") //.cache()
val TTTsDF = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(rawDF)).cache()
rawDF: org.apache.spark.sql.DataFrame = [tweetAsJsonString: string]
TTTsDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 35 more fields]
TTTsDF.count()
res34: Long = 14686
//display(TTTsDF)  // output not displayed to comply with Twitter Developer rules
TTTsDF.printSchema
root
 |-- CurrentTweetDate: timestamp (nullable = true)
 |-- CurrentTwID: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- CreationDateOfOrgTwInRT: timestamp (nullable = true)
 |-- OriginalTwIDinRT: long (nullable = true)
 |-- CreationDateOfOrgTwInQT: timestamp (nullable = true)
 |-- OriginalTwIDinQT: long (nullable = true)
 |-- OriginalTwIDinReply: long (nullable = true)
 |-- CPostUserId: long (nullable = true)
 |-- userCreatedAtDate: timestamp (nullable = true)
 |-- OPostUserIdinRT: long (nullable = true)
 |-- OPostUserIdinQT: long (nullable = true)
 |-- OPostUserIdinReply: long (nullable = true)
 |-- CPostUserName: string (nullable = true)
 |-- OPostUserNameinRT: string (nullable = true)
 |-- OPostUserNameinQT: string (nullable = true)
 |-- CPostUserSN: string (nullable = true)
 |-- OPostUserSNinRT: string (nullable = true)
 |-- OPostUserSNinQT: string (nullable = true)
 |-- OPostUserSNinReply: string (nullable = true)
 |-- favouritesCount: long (nullable = true)
 |-- followersCount: long (nullable = true)
 |-- friendsCount: long (nullable = true)
 |-- isVerified: boolean (nullable = true)
 |-- isGeoEnabled: boolean (nullable = true)
 |-- CurrentTweet: string (nullable = true)
 |-- UMentionRTiD: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- UMentionRTsN: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- UMentionQTiD: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- UMentionQTsN: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- UMentionASiD: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- UMentionASsN: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- TweetType: string (nullable = false)
 |-- MentionType: string (nullable = false)
 |-- Weight: long (nullable = false)
display(TTTsDF.groupBy($"tweetType").count().orderBy($"count".desc))
tweetType count
ReTweet 5956.0
Reply Tweet 4187.0
Original Tweet 3326.0
Quoted Tweet 754.0
Retweet of Quoted Tweet 416.0
Reply of Quoted Tweet 47.0
// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) } 
// this will delete what we collected to keep the disk usage tight and tidy
dbutils.fs.rm(outputDirectoryRoot, true) 
res40: Boolean = true

Writing to public clouds

Next, let's write the tweets into a scalable commercial cloud storage system in AWS (similarly into Azure blobstore, Google cloud storage, etc ).

We will make sure to write the tweets to AWS's simple storage service or S3, a scalable storage system in the cloud. See https://aws.amazon.com/s3/.

skip this if you don't have an AWS account.

But all the main syntactic bits are here for your future convenience :)

// Replace with your AWS S3 credentials
//
// NOTE: Set the access to this notebook appropriately to protect the security of your keys.
// Or you can delete this cell after you run the mount command below once successfully.

val AccessKey = getArgument("1. ACCESS_KEY", "REPLACE_WITH_YOUR_ACCESS_KEY")
val SecretKey = getArgument("2. SECRET_KEY", "REPLACE_WITH_YOUR_SECRET_KEY")
val EncodedSecretKey = SecretKey.replace("/", "%2F")
val AwsBucketName = getArgument("3. S3_BUCKET", "REPLACE_WITH_YOUR_S3_BUCKET")
val MountName = getArgument("4. MNT_NAME", "REPLACE_WITH_YOUR_MOUNT_NAME")
val s3Filename = "tweetDump"

Now just mount s3 as follows:

dbutils.fs.mount(s"s3a://$AccessKey:$EncodedSecretKey@$AwsBucketName", s"/mnt/$MountName")

Now you can use the dbutils commands freely to access data in the mounted S3.

dbutils.fs.help()

copying:

// to copy all the tweets to s3
dbutils.fs.cp("dbfs:/rawTweets",s"/mnt/$MountName/rawTweetsInS3/",recurse=true) 

deleting:

// to remove all the files from s3
dbutils.fs.rm(s"/mnt/$MountName/rawTweetsInS3",recurse=true) 

unmounting:

// finally unmount when done - IMPORTANT!
dbutils.fs.unmount(s"/mnt/$MountName")