ScaDaMaLe Course site and book

Twitter Hashtag Count

Using Twitter Streaming is a great way to learn Spark Streaming if you don't have your streaming datasource and want a great rich input dataset to try Spark Streaming transformations on.

In this example, we show how to calculate the top hashtags seen in the last X window of time every Y time unit.

Top Hashtags in 4 Easy Steps

We will now show quickly how to compute the top hashtags in a few easy steps after running some utility functions and importing needed libraries.

"./025_a_extendedTwitterUtils2run"
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._


import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder

Step 1: Enter your Twitter API Credentials.

  • Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
  • Run the code in a cell to Enter your own credentials.
// put your own twitter developer credentials below instead of xxx
// instead of the '%run ".../secrets/026_secret_MyTwitterOAuthCredentials"' below
// you need to copy-paste the following code-block with your own Twitter credentials replacing XXXX


// put your own twitter developer credentials below 

import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder


// These have been regenerated!!! - need to chane them

def myAPIKey       = "XXXX" // APIKey 
def myAPISecret    = "XXXX" // APISecretKey
def myAccessToken          = "XXXX" // AccessToken
def myAccessTokenSecret    = "XXXX" // AccessTokenSecret


System.setProperty("twitter4j.oauth.consumerKey", myAPIKey)
System.setProperty("twitter4j.oauth.consumerSecret", myAPISecret)
System.setProperty("twitter4j.oauth.accessToken", myAccessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", myAccessTokenSecret)

println("twitter OAuth Credentials loaded")

The cell-below will not expose my Twitter API Credentials: myAPIKey, myAPISecret, myAccessToken and myAccessTokenSecret. Use the code above to enter your own credentials in a scala cell.

import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
"Users/raazesh.sainudiin@math.uu.se/scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"

Step 2: Configure where to output the top hashtags and how often to compute them.

val outputDirectory = "/datasets/tweetsStreamTmp" // output directory

//Recompute the top hashtags every N seconds. N=1
val slideInterval = new Duration(10 * 1000) // 1000 milliseconds is 1 second!

//Compute the top hashtags for the last M seconds. M=5
val windowLength = new Duration(30 * 1000)

// Wait W seconds before stopping the streaming job. W=100
val timeoutJobLength = 20 * 1000
outputDirectory: String = /datasets/tweetsStreamTmp
slideInterval: org.apache.spark.streaming.Duration = 10000 ms
windowLength: org.apache.spark.streaming.Duration = 30000 ms
timeoutJobLength: Int = 20000

Step 3: Run the Twitter Streaming job.

Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.

// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) } 

Clean up any old files.

dbutils.fs.mkdirs("dbfs:/datasets/tweetsStreamTmp/")
res3: Boolean = true
display(dbutils.fs.ls("dbfs:/datasets/tweetsStreamTmp/"))

Let us write the function that creates the Streaming Context and sets up the streaming job.

var newContextCreated = false
var num = 0

// This is a helper class used for ordering by the second value in a (String, Int) tuple
import scala.math.Ordering
object SecondValueOrdering extends Ordering[(String, Int)] {
  def compare(a: (String, Int), b: (String, Int)) = {
    a._2 compare b._2
  }
}

// This is the function that creates the SteamingContext and sets up the Spark Streaming job.
def creatingFunc(): StreamingContext = {
  // Create a Spark Streaming Context.
  val ssc = new StreamingContext(sc, slideInterval)
  // Create a Twitter Stream for the input source. 
  val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
  val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
  
  // Parse the tweets and gather the hashTags.
  val hashTagStream = twitterStream.map(_.getText).flatMap(_.split(" ")).filter(_.startsWith("#"))
  
  // Compute the counts of each hashtag by window.
  // reduceByKey on a window of length windowLength
  // Once this is computed, slide the window by slideInterval and calculate reduceByKey again for the second window
  val windowedhashTagCountStream = hashTagStream.map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, windowLength, slideInterval)

  // For each window, calculate the top hashtags for that time period.
  windowedhashTagCountStream.foreachRDD(hashTagCountRDD => {
    val topEndpoints = hashTagCountRDD.top(20)(SecondValueOrdering)
    dbutils.fs.put(s"${outputDirectory}/top_hashtags_${num}", topEndpoints.mkString("\n"), true)
    println(s"------ TOP HASHTAGS For window ${num}")
    println(topEndpoints.mkString("\n"))
    num = num + 1
  })
  
  newContextCreated = true
  ssc
}
newContextCreated: Boolean = false
num: Int = 0
import scala.math.Ordering
defined object SecondValueOrdering
creatingFunc: ()org.apache.spark.streaming.StreamingContext

Create the StreamingContext using getActiveOrCreate, as required when starting a streaming job in Databricks.

val ssc = StreamingContext.getActiveOrCreate(creatingFunc)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@3961e605

Start the Spark Streaming Context and return when the Streaming job exits or return with the specified timeout.

ssc.start()
ssc.awaitTerminationOrTimeout(timeoutJobLength)
ssc.stop(stopSparkContext = false)
Wrote 498 bytes.
------ TOP HASHTAGS For window 0
(#1212SHOPEESTRAYKIDS
MAU,8)
(#PS5,2)
(#LifeGoesOnWithBTS,2)
(#BTS_BE,2)
(#아미트친소,1)
(#rechtsterrorismus,1)
(#EXO,1)
(#Pourlamesse,,1)
(#MeralAkşener,1)
(#뷔,1)
(#赤いヨーグリーナ発売記念
#この冬の体調管理に​

体調管理をして風邪など引かない様にしなきゃ😤,1)
(#SoundCloud,1)
(#weareoneEXO
#开
#음…,1)
(#kilicdaroglu,1)
(#Pride2020,1)
(#icisleribakanlığı,1)
(#Vialli?,1)
(#Cuma,1)
(#アリス・マーガトロイド,1)
(#FridayVibes,1)
Wrote 423 bytes.
------ TOP HASHTAGS For window 1
(#1212SHOPEESTRAYKIDS
MAU,8)
(#Monster 
MONSTER,7)
(#FridayLivestream

SB19,4)
(#BTS_BE,3)
(#LifeGoesOnWithBTS,3)
(#EXO,2)
(#Unlock_GOLIVEINLIFE,2)
(#PS5,2)
(#redvelvet,2)
(#SUNGHOON,1)
(#omniscient_reader,1)
(#小学5年生より賢いの,1)
(#takasbükücüxhanthos
#GLYHO,1)
(#FCKNZS,1)
(#DAMI,1)
(#CHOICE,1)
(#fahrettinkoca,1)
(#FridayThoughts,1)
(#Lausanne.,1)
(#트레저
@treasuremembers

https://t.co/lo6QhtLCBB,1)

Check out the Clusters Streaming UI as the job is running.

It should automatically stop the streaming job after timeoutJobLength.

If not, then stop any active Streaming Contexts, but don't stop the spark contexts they are attached to using the following command.

StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }

Step 4: View the Results.

display(dbutils.fs.ls(outputDirectory))
path name size
dbfs:/datasets/tweetsStreamTmp/top_hashtags_0 top_hashtags_0 498.0
dbfs:/datasets/tweetsStreamTmp/top_hashtags_1 top_hashtags_1 423.0

There should be 100 intervals for each second and the top hashtags for each of them should be in the file top_hashtags_N for N in 0,1,2,...,99 and the top hashtags should be based on the past 5 seconds window.

dbutils.fs.head(s"${outputDirectory}/top_hashtags_0")
res8: String =
(#1212SHOPEESTRAYKIDS
MAU,8)
(#PS5,2)
(#LifeGoesOnWithBTS,2)
(#BTS_BE,2)
(#아미트친소,1)
(#rechtsterrorismus,1)
(#EXO,1)
(#Pourlamesse,,1)
(#MeralAkşener,1)
(#뷔,1)
(#赤いヨーグリーナ発売記念
#この冬の体調管理に​

体調管理をして風邪など引かない様にしなきゃ😤,1)
(#SoundCloud,1)
(#weareoneEXO
#开
#음…,1)
(#kilicdaroglu,1)
(#Pride2020,1)
(#icisleribakanlığı,1)
(#Vialli?,1)
(#Cuma,1)
(#アリス・マーガトロイド,1)
(#FridayVibes,1)
defined class ExtendedTwitterReceiver
defined class ExtendedTwitterInputDStream
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
defined object ExtendedTwitterUtils
twitter OAuth Credentials loaded
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
myAPIKey: String
myAPISecret: String
myAccessToken: String
myAccessTokenSecret: String
done running the extendedTwitterUtils2run notebook - ready to stream from twitter

Let's brainstorm

What could you do with this type of streaming capability?

  • marketing?
  • pharmaceutical vigilance?
  • linking twitter activity to mass media activity?
  • data quality and integrity measures...

Note that there are various Spark Streaming ML algorithms that one could easily throw at such reduceByKeyAndWindow tweet streams:

Student Project or Volunteer for next Meetup - let's check it out now:

HOME-WORK:

Responsible Experiments

Extracting knowledge from tweets is "easy" using techniques shown here, but one has to take responsibility for the use of this knowledge and conform to the rules and policies linked below.

Remeber that the use of twitter itself comes with various strings attached. Read:

Crucially, the use of the content from twitter by you (as done in this worksheet) comes with some strings. Read: - Developer Agreement & Policy Twitter Developer Agreement