028_TweetHashtagCount(Scala)

Twitter Hashtag Count

Using Twitter Streaming is a great way to learn Spark Streaming if you don't have your streaming datasource and want a great rich input dataset to try Spark Streaming transformations on.

In this example, we show how to calculate the top hashtags seen in the last X window of time every Y time unit.

Extracting knowledge from tweets is "easy" using techniques shown here, but one has to take responsibility for the use of this knowledge and conform to the rules and policies linked below.

Remeber that the use of twitter itself comes with various strings attached. Read:

Crucially, the use of the content from twitter by you (as done in this worksheet) comes with some strings. Read:

%run "scalable-data-science/sds-2-x/025_a_extendedTwitterUtils2run"
import twitter4j._ import twitter4j.auth.Authorization import twitter4j.conf.ConfigurationBuilder import twitter4j.auth.OAuthAuthorization import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver
defined class ExtendedTwitterReceiver
defined class ExtendedTwitterInputDStream
import twitter4j.Status import twitter4j.auth.Authorization import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} defined object ExtendedTwitterUtils
done running the extendedTwitterUtils2run notebook - ready to stream from twitter
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._


import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark._ import org.apache.spark.storage._ import org.apache.spark.streaming._ import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder

Step 1: Enter your Twitter API Credentials.

  • Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
  • Run the code in a cell to Enter your own credentials.
// put your own twitter developer credentials below instead of xxx
// instead of the '%run "scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"' above
// this notebook we just ran contains the following commented code block

/*
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder

def MyconsumerKey       = "xxx"
def MyconsumerSecret    = "xxx"
def Mytoken             = "xxx"
def MytokenSecret       = "xxx"

System.setProperty("twitter4j.oauth.consumerKey", MyconsumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", MyconsumerSecret)
System.setProperty("twitter4j.oauth.accessToken", Mytoken)
System.setProperty("twitter4j.oauth.accessTokenSecret", MytokenSecret)
*/

The cell-below is hidden to not expose my Twitter API Credentials: consumerKey, consumerSecret, accessToken and accessTokenSecret. Use the code above to enter your own credentials!

%run "scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"
twitter OAuth Credentials loaded import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder MyconsumerKey: String MyconsumerSecret: String Mytoken: String MytokenSecret: String

Step 2: Configure where to output the top hashtags and how often to compute them.

  • Run this cell for the input cells to appear.
  • Enter your credentials.
  • Run the cell again to pick up your defaults.
val outputDirectory = "/datasets/tweetsStreamTmp" // output directory

//Recompute the top hashtags every N seconds. N=1
val slideInterval = new Duration(10 * 1000) // 1000 milliseconds is 1 second!

//Compute the top hashtags for the last M seconds. M=5
val windowLength = new Duration(30 * 1000)

// Wait W seconds before stopping the streaming job. W=100
val timeoutJobLength = 20 * 1000
outputDirectory: String = /datasets/tweetsStreamTmp slideInterval: org.apache.spark.streaming.Duration = 10000 ms windowLength: org.apache.spark.streaming.Duration = 30000 ms timeoutJobLength: Int = 20000

Step 3: Run the Twitter Streaming job.

Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.

// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) } 

Clean up any old files.

dbutils.fs.mkdirs("dbfs:/datasets/tweetsStreamTmp/")
res9: Boolean = true
display(dbutils.fs.ls("dbfs:/datasets/tweetsStreamTmp/"))
OK

Let us write the function that creates the Streaming Context and sets up the streaming job.

var newContextCreated = false
var num = 0

// This is a helper class used for ordering by the second value in a (String, Int) tuple
import scala.math.Ordering
object SecondValueOrdering extends Ordering[(String, Int)] {
  def compare(a: (String, Int), b: (String, Int)) = {
    a._2 compare b._2
  }
}

// This is the function that creates the SteamingContext and sets up the Spark Streaming job.
def creatingFunc(): StreamingContext = {
  // Create a Spark Streaming Context.
  val ssc = new StreamingContext(sc, slideInterval)
  // Create a Twitter Stream for the input source. 
  val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
  val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
  
  // Parse the tweets and gather the hashTags.
  val hashTagStream = twitterStream.map(_.getText).flatMap(_.split(" ")).filter(_.startsWith("#"))
  
  // Compute the counts of each hashtag by window.
  // reduceByKey on a window of length windowLength
  // Once this is computed, slide the window by slideInterval and calculate reduceByKey again for the second window
  val windowedhashTagCountStream = hashTagStream.map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, windowLength, slideInterval)

  // For each window, calculate the top hashtags for that time period.
  windowedhashTagCountStream.foreachRDD(hashTagCountRDD => {
    val topEndpoints = hashTagCountRDD.top(20)(SecondValueOrdering)
    dbutils.fs.put(s"${outputDirectory}/top_hashtags_${num}", topEndpoints.mkString("\n"), true)
    println(s"------ TOP HASHTAGS For window ${num}")
    println(topEndpoints.mkString("\n"))
    num = num + 1
  })
  
  newContextCreated = true
  ssc
}
newContextCreated: Boolean = false num: Int = 0 import scala.math.Ordering defined object SecondValueOrdering creatingFunc: ()org.apache.spark.streaming.StreamingContext

Create the StreamingContext using getActiveOrCreate, as required when starting a streaming job in Databricks.

val ssc = StreamingContext.getActiveOrCreate(creatingFunc)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@750791ef

Start the Spark Streaming Context and return when the Streaming job exits or return with the specified timeout.

ssc.start()
ssc.awaitTerminationOrTimeout(timeoutJobLength)
ssc.stop(stopSparkContext = false)
Wrote 299 bytes. ------ TOP HASHTAGS For window 0 (#BTS,2) (#เตนล์ชิตพล,1) (#TEN,1) (#ชาวบ้าน,1) (#WayV,1) (#NCT127,1) (#เตนล์,1) (#FoodtruckBattleSeason1,1) (#ترحيل_اي_اجنبي_سخر_او_حرض عن,1) (#NCT,1) (#質問箱,1) (#LoveBTS20M,1) (#HAECHAN,1) (#โพก้า,1) (#محارمك,1) Wrote 423 bytes. ------ TOP HASHTAGS For window 1 (#BTS,4) (#WE_ARE_SUPERHUMAN,2) (#WayV,2) (#เตนล์,2) (#영화추천,2) (#MaBalle,2) (#ترحيل_اي_اجنبي_سخر_او_حرض عن,1) (#大阪市,1) (#최신예능,1) (#NUEST,1) (#HayırlıCumalar,1) (#jinkook,1) (#tumwarawut,1) (#ヴァリアー,1) (#บอทใหม่หาเพื่อนคุย,1) (#daily ,1) (#東京五輪,1) (#slotciti,1) (#スクアーロ 明日は……💣,1) (#biobet,1)

Check out the Clusters Streaming UI as the job is running.

It should automatically stop the streaming job after timeoutJobLength.

If not, then stop any active Streaming Contexts, but don't stop the spark contexts they are attached to using the following command.

StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }

Step 4: View the Results.

display(dbutils.fs.ls(outputDirectory))
dbfs:/datasets/tweetsStreamTmp/top_hashtags_0top_hashtags_0299
dbfs:/datasets/tweetsStreamTmp/top_hashtags_1top_hashtags_1423

There should be 100 intervals for each second and the top hashtags for each of them should be in the file top_hashtags_N for N in 0,1,2,...,99 and the top hashtags should be based on the past 5 seconds window.

dbutils.fs.head(s"${outputDirectory}/top_hashtags_0")
res14: String = (#BTS,2) (#เตนล์ชิตพล,1) (#TEN,1) (#ชาวบ้าน,1) (#WayV,1) (#NCT127,1) (#เตนล์,1) (#FoodtruckBattleSeason1,1) (#ترحيل_اي_اجنبي_سخر_او_حرض عن,1) (#NCT,1) (#質問箱,1) (#LoveBTS20M,1) (#HAECHAN,1) (#โพก้า,1) (#محارمك,1)

Let's brainstorm a bit now

What could you do with this type of streaming capability?

  • marketing?
  • pharmaceutical vigilance?
  • linking twitter activity to mass media activity?
  • data quality and integrity measures...

Note that there are various Spark Streaming ML algorithms that one could easily throw at such reduceByKeyAndWindow tweet streams:

Student Project or Volunteer for next Meetup - let's check it out now:

HOME-WORK: