ScaDaMaLe Course site and book

// parameter for number of minutes of streaming (can be used in Jobs feature) dbutils.widgets.text("nbr_minutes", "3", label = "Minutes of streaming (int)") val nbr_minutes = dbutils.widgets.get("nbr_minutes").toInt
nbr_minutes: Int = 2

If the cluster was shut down, then start a new cluster and install the following libraries on it (via maven).

  • gson with maven coordinates
  • twitter4j-examples with maven coordinates org.twitter4j:twitter4j-examples:4.0.7
import twitter4j._ import twitter4j.auth.Authorization import twitter4j.conf.ConfigurationBuilder import twitter4j.auth.OAuthAuthorization import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import import org.apache.spark.streaming.receiver.Receiver
defined class ExtendedTwitterReceiver
defined class ExtendedTwitterInputDStream
import twitter4j.Status import twitter4j.auth.Authorization import import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} defined object ExtendedTwitterUtils
done running the extendedTwitterUtils2run notebook - ready to stream from twitter
USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName"))) val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName"))) import org.apache.spark.sql.types.{StructType, StructField, StringType} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.ColumnName import org.apache.spark.sql.DataFrame fromParquetFile2DF: (InputDFAsParquetFilePatternString: String)org.apache.spark.sql.DataFrame tweetsJsonStringDF2TweetsDF: (tweetsAsJsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame tweetsIDLong_JsonStringPairDF2TweetsDF: (tweetsAsIDLong_JsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame tweetsDF2TTTDF: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame tweetsDF2TTTDFWithURLsAndHashtags: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame

Loading twitter credentials

You need to have a twitter developer account to run the data collection. Save your credentials in a notebook called KeysAndTokens, in your user home directory.

// needs upgraded databricks subscription, works on project shard var usr = dbutils.notebook.getContext.tags("user") var keys_notebook_location = "/Users/" + usr + "/KeysAndTokens", 100)
Warning: No value returned from the notebook run. To return a value from a notebook, use dbutils.notebook.exit(value) usr: String = keys_notebook_location: String = /Users/ res18: String = null
import import org.apache.spark.sql.functions._ //import org.apache.spark.sql.types._ val outputDirectoryRoot = "/datasets/ScaDaMaLe/twitter/student-project-10_group-Geosmus" // output directory val batchInterval = 1 // in minutes val timeoutJobLength = batchInterval * 5 var newContextCreated = false var numTweetsCollected = 0L // track number of tweets collected // This is the function that creates the SteamingContext and sets up the Spark Streaming job. def streamFuncWithProcessing(): StreamingContext = { // Create a Spark Streaming Context. val ssc = new StreamingContext(sc, Minutes(batchInterval)) // Create the OAuth Twitter credentials val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build())) // Create filter val locationsQuery = new FilterQuery().locations(Array(-180.0, -90.0), Array(180.0, 90.0)) // all locations // Create a Twitter Stream for the input source. val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth, Some(locationsQuery)) // Transform the discrete RDDs into JSON val twitterStreamJson = => { val gson = new Gson(); val xJson = gson.toJson(x) xJson }) // take care val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream. // get some time fields from current `.Date()`, use the same for each batch in the job val year = (new java.text.SimpleDateFormat("yyyy")).format(new java.util.Date()) val month = (new java.text.SimpleDateFormat("MM")).format(new java.util.Date()) val day = (new java.text.SimpleDateFormat("dd")).format(new java.util.Date()) val hour = (new java.text.SimpleDateFormat("HH")).format(new java.util.Date()) // what we want done with each discrete RDD tuple: (rdd, time) twitterStreamJson.foreachRDD((rdd, time) => { // for each filtered RDD in the DStream val count = rdd.count() //We count because the following operations can only be applied to non-empty RDD's if (count > 0) { val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired // to write to parquet directly in append mode in one directory per 'time'------------ val outputDF = outputRDD.toDF("tweetAsJsonString") val processedDF = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(outputDF)).filter($"countryCode" =!= lit("")) // Writing the full processed df (We probably don't need it, but useful for exploring the data initially) processedDF.write.mode(SaveMode.Append) .parquet(outputDirectoryRoot + "/" + year + "/" + month + "/" + day + "/" + hour + "/" + time.milliseconds) // end of writing as parquet file------------------------------------- numTweetsCollected += count // update with the latest count } }) newContextCreated = true ssc }
import import org.apache.spark.sql.functions._ outputDirectoryRoot: String = /datasets/ScaDaMaLe/twitter/student-project-10_group-Geosmus batchInterval: Int = 1 timeoutJobLength: Int = 5 newContextCreated: Boolean = false numTweetsCollected: Long = 0 streamFuncWithProcessing: ()org.apache.spark.streaming.StreamingContext
// Now just use the function to create a Spark Streaming Context val ssc = StreamingContext.getActiveOrCreate(streamFuncWithProcessing)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@212f8bef
// you only need one of these to start ssc.start() // ssc.awaitTerminationOrTimeout(30000) //time in milliseconds
// Note, this is not fool-proof... Thread.sleep(nbr_minutes*60*1000) //time in milliseconds
ssc.stop(stopSparkContext = false)
numTweetsCollected // number of tweets collected so far