import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import scala.math.Ordering
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import scala.math.Ordering
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
%run "Users/raazesh.sainudiin@math.uu.se/scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"
twitter OAuth Credentials loaded
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
myAPIKey: String
myAPISecret: String
myAccessToken: String
myAccessTokenSecret: String
// Create a Twitter Stream for the input source.
val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
auth: Some[twitter4j.auth.OAuthAuthorization] = Some(OAuthAuthorization{consumerKey='8uN0N9RTLT1viaR811yyG7xwk', consumerSecret='******************************************', oauthToken=AccessToken{screenName='null', userId=4173723312}})
twitterStream: org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status] = ExtendedTwitterInputDStream@206a254d
// Let's import google's json library next.
import com.google.gson.Gson
//Let's map the tweets into json formatted string (one tweet per line).
val twitterStreamJson = twitterStream.map(
x => { val gson = new Gson();
val xJson = gson.toJson(x)
xJson
}
)
import com.google.gson.Gson
twitterStreamJson: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@20405bce
val partitionsEachInterval = 1
val batchInterval = 1 // in minutes
val timeoutJobLength = batchInterval * 5
var newContextCreated = false
var numTweetsCollected = 0L // track number of tweets collected
twitterStreamJson.foreachRDD((rdd, time) => { // for each filtered RDD in the DStream
val count = rdd.count()
if (count > 0) {
val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
// to write to parquet directly in append mode in one directory per 'time'------------
val outputDF = outputRDD.toDF("tweetAsJsonString")
// get some time fields from current `.Date()`
val year = (new java.text.SimpleDateFormat("yyyy")).format(new java.util.Date())
val month = (new java.text.SimpleDateFormat("MM")).format(new java.util.Date())
val day = (new java.text.SimpleDateFormat("dd")).format(new java.util.Date())
val hour = (new java.text.SimpleDateFormat("HH")).format(new java.util.Date())
// write to a file with a clear time-based hierarchical directory structure for example
outputDF.write.mode(SaveMode.Append)
.parquet(outputDirectoryRoot+ "/"+ year + "/" + month + "/" + day + "/" + hour + "/" + time.milliseconds)
// end of writing as parquet file-------------------------------------
numTweetsCollected += count // update with the latest count
}
})
partitionsEachInterval: Int = 1
batchInterval: Int = 1
timeoutJobLength: Int = 5
newContextCreated: Boolean = false
numTweetsCollected: Long = 0
// ## Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.
// # let's stop the streaming job next.
ssc.stop(stopSparkContext = false)
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
ScaDaMaLe Course site and book