%run "scalable-data-science/sds-2-x/025_a_extendedTwitterUtils2run"
import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
defined class ExtendedTwitterReceiver
defined class ExtendedTwitterInputDStream
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
defined object ExtendedTwitterUtils
done running the extendedTwitterUtils2run notebook - ready to stream from twitter
import org.apache.spark._ import org.apache.spark.storage._ import org.apache.spark.streaming._ import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
val outputDirectory = "/datasets/tweetsStreamTmp" // output directory //Recompute the top hashtags every N seconds. N=1 val slideInterval = new Duration(10 * 1000) // 1000 milliseconds is 1 second! //Compute the top hashtags for the last M seconds. M=5 val windowLength = new Duration(30 * 1000) // Wait W seconds before stopping the streaming job. W=100 val timeoutJobLength = 20 * 1000
outputDirectory: String = /datasets/tweetsStreamTmp
slideInterval: org.apache.spark.streaming.Duration = 10000 ms
windowLength: org.apache.spark.streaming.Duration = 30000 ms
timeoutJobLength: Int = 20000
var newContextCreated = false var num = 0 // This is a helper class used for ordering by the second value in a (String, Int) tuple import scala.math.Ordering object SecondValueOrdering extends Ordering[(String, Int)] { def compare(a: (String, Int), b: (String, Int)) = { a._2 compare b._2 } } // This is the function that creates the SteamingContext and sets up the Spark Streaming job. def creatingFunc(): StreamingContext = { // Create a Spark Streaming Context. val ssc = new StreamingContext(sc, slideInterval) // Create a Twitter Stream for the input source. val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build())) val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth) // Parse the tweets and gather the hashTags. val hashTagStream = twitterStream.map(_.getText).flatMap(_.split(" ")).filter(_.startsWith("#")) // Compute the counts of each hashtag by window. // reduceByKey on a window of length windowLength // Once this is computed, slide the window by slideInterval and calculate reduceByKey again for the second window val windowedhashTagCountStream = hashTagStream.map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, windowLength, slideInterval) // For each window, calculate the top hashtags for that time period. windowedhashTagCountStream.foreachRDD(hashTagCountRDD => { val topEndpoints = hashTagCountRDD.top(20)(SecondValueOrdering) dbutils.fs.put(s"${outputDirectory}/top_hashtags_${num}", topEndpoints.mkString("\n"), true) println(s"------ TOP HASHTAGS For window ${num}") println(topEndpoints.mkString("\n")) num = num + 1 }) newContextCreated = true ssc }
newContextCreated: Boolean = false
num: Int = 0
import scala.math.Ordering
defined object SecondValueOrdering
creatingFunc: ()org.apache.spark.streaming.StreamingContext
ssc.start() ssc.awaitTerminationOrTimeout(timeoutJobLength) ssc.stop(stopSparkContext = false)
Wrote 299 bytes.
------ TOP HASHTAGS For window 0
(#BTS,2)
(#เตนล์ชิตพล,1)
(#TEN,1)
(#ชาวบ้าน,1)
(#WayV,1)
(#NCT127,1)
(#เตนล์,1)
(#FoodtruckBattleSeason1,1)
(#ترحيل_اي_اجنبي_سخر_او_حرض
عن,1)
(#NCT,1)
(#質問箱,1)
(#LoveBTS20M,1)
(#HAECHAN,1)
(#โพก้า,1)
(#محارمك,1)
Wrote 423 bytes.
------ TOP HASHTAGS For window 1
(#BTS,4)
(#WE_ARE_SUPERHUMAN,2)
(#WayV,2)
(#เตนล์,2)
(#영화추천,2)
(#MaBalle,2)
(#ترحيل_اي_اجنبي_سخر_او_حرض
عن,1)
(#大阪市,1)
(#최신예능,1)
(#NUEST,1)
(#HayırlıCumalar,1)
(#jinkook,1)
(#tumwarawut,1)
(#ヴァリアー,1)
(#บอทใหม่หาเพื่อนคุย,1)
(#daily
,1)
(#東京五輪,1)
(#slotciti,1)
(#スクアーロ
明日は……💣,1)
(#biobet,1)
SDS-2.x, Scalable Data Engineering Science
Last refresh: Never