ScaDaMaLe Course site and book

Introduction to Spark Streaming

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.

This is a walk-through of excerpts from the following resources:

Overview

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.

Data can be ingested from many sources like

and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window.

Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark's

Spark Streaming architecture

Internally, it works as follows:

  • Spark Streaming receives live input data streams and
  • divides the data into batches,
  • which are then processed by the Spark engine
  • to generate the final stream of results in batches.

Spark Streaming data flow

Discretized Streams (DStreams)

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source (for eg. Kafka, Flume, and Kinesis) or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark's abstraction of an immutable, distributed dataset (see Spark Programming Guide for more details). Each RDD in a DStream contains data from a certain interval, as shown in the following figure.

Spark Streaming


This guide shows you how to start writing Spark Streaming programs with DStreams. You can write Spark Streaming programs in Scala, Java or Python (introduced in Spark 1.2), all of which are presented in this guide.

Here, we will focus on Streaming in Scala.


Spark Streaming is a near-real-time micro-batch stream processing engine as opposed to other real-time stream processing frameworks like Apache Storm. Typically 'near-real-time' in Spark Streaming can be in the order of seconds as opposed to milliseconds, for example.

Three Quick Examples

Before we go into the details of how to write your own Spark Streaming program, let us take a quick look at what a simple Spark Streaming program looks like.

We will choose the first two examples in Databricks notebooks below.

Spark Streaming Hello World Examples

These are adapted from several publicly available Databricks Notebooks

  1. Streaming Word Count (Scala)
  • Tweet Collector for Capturing Live Tweets
  • Twitter Hashtag Count (Scala)

Other examples we won't try here:

  • Kinesis Word Count (Scala)
  • Kafka Word Count (Scala)
  • FileStream Word Count (Python)
  • etc.
  1. Streaming Word Count

This is a hello world example of Spark Streaming which counts words on 1 second batches of streaming data.

It uses an in-memory string generator as a dummy source for streaming data.

Setting up a streaming source

Configurations

Configurations that control the streaming app in the notebook

// === Configuration to control the flow of the application ===
val stopActiveContext = true	 
// "true"  = stop if any existing StreamingContext is running;              
// "false" = dont stop, and let it run undisturbed, but your latest code may not be used

// === Configurations for Spark Streaming ===
val batchIntervalSeconds = 1 
val eventsPerSecond = 1000    // For the dummy source

// Verify that the attached Spark cluster is 1.4.0+
require(sc.version.replace(".", "").toInt >= 140, "Spark 1.4.0+ is required to run this notebook. Please attach it to a Spark 1.4.0+ cluster.")
stopActiveContext: Boolean = true
batchIntervalSeconds: Int = 1
eventsPerSecond: Int = 1000

Imports

Import all the necessary libraries. If you see any error here, you have to make sure that you have attached the necessary libraries to the attached cluster.

import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._

Setup: Define the function that sets up the StreamingContext

In this we will do two things.

  • Define a custom receiver as the dummy source (no need to understand this)

    • this custom receiver will have lines that end with a random number between 0 and 9 and read:
    I am a dummy source 2
    I am a dummy source 8
    ...
    

This is the dummy source implemented as a custom receiver. No need to understand this now.

// This is the dummy source implemented as a custom receiver. No need to fully understand this.

import scala.util.Random
import org.apache.spark.streaming.receiver._

class DummySource(ratePerSec: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Dummy Source") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
   // There is nothing much to do as the thread calling receive()
   // is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    while(!isStopped()) {      
      store("I am a dummy source " + Random.nextInt(10))
      Thread.sleep((1000.toDouble / ratePerSec).toInt)
    }
  }
}
import scala.util.Random
import org.apache.spark.streaming.receiver._
defined class DummySource

Transforming and Acting on the DStream of lines

Any operation applied on a DStream translates to operations on the underlying RDDs. For converting a stream of lines to words, the flatMap operation is applied on each RDD in the lines DStream to generate the RDDs of the wordStream DStream. This is shown in the following figure.

Spark Streaming

These underlying RDD transformations are computed by the Spark engine. The DStream operations hide most of these details and provide the developer with a higher-level API for convenience.

Next reduceByKey is used to get wordCountStream that counts the words in wordStream.

Finally, this is registered as a temporary table for each RDD in the DStream.

Let's try to understand the following creatingFunc to create a new StreamingContext and setting it up for word count and registering it as temp table for each batch of 1000 lines per second in the stream.

batchIntervalSeconds
res1: Int = 1
var newContextCreated = false      // Flag to detect whether new context was created or not

// Function to create a new StreamingContext and set it up
def creatingFunc(): StreamingContext = {
    
  // Create a StreamingContext - starting point for a Spark Streaming job
  val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
  
  // Create a stream that generates 1000 lines per second
  val stream = ssc.receiverStream(new DummySource(eventsPerSecond))  
  
  // Split the lines into words, and then do word count
  val wordStream = stream.flatMap { _.split(" ")  }
  val wordCountStream = wordStream.map(word => (word, 1)).reduceByKey(_ + _)

  // Create temp table at every batch interval
  wordCountStream.foreachRDD { rdd => 
    rdd.toDF("word", "count").createOrReplaceTempView("batch_word_count")    
  }
  
  stream.foreachRDD { rdd =>
    System.out.println("# events = " + rdd.count())
    System.out.println("\t " + rdd.take(10).mkString(", ") + ", ...")
  }
  
  ssc.remember(Minutes(1))  // To make sure data is not deleted by the time we query it interactively
  
  println("Creating function called to create new StreamingContext")
  newContextCreated = true  
  ssc
}
newContextCreated: Boolean = false
creatingFunc: ()org.apache.spark.streaming.StreamingContext

Start Streaming Job

First it is important to stop existing StreamingContext if any and then start/restart the new one.

Here we are going to use the configurations at the top of the notebook to decide whether to stop any existing StreamingContext, and start a new one, or recover one from existing checkpoints.

// Stop any existing StreamingContext 
// The getActive function is proviced by Databricks to access active Streaming Contexts
if (stopActiveContext) {	
  StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
} 

// Get or create a streaming context
val ssc = StreamingContext.getActiveOrCreate(creatingFunc)
if (newContextCreated) {
  println("New context created from currently defined creating function") 
} else {
  println("Existing context running or recovered from checkpoint, may not be running currently defined creating function")
}

// Start the streaming context in the background.
ssc.start()

// This is to ensure that we wait for some time before the background streaming job starts. This will put this cell on hold for 5 times the batchIntervalSeconds.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
Creating function called to create new StreamingContext
New context created from currently defined creating function
# events = 0
	 , ...
# events = 803
	 I am a dummy source 6, I am a dummy source 6, I am a dummy source 5, I am a dummy source 3, I am a dummy source 7, I am a dummy source 2, I am a dummy source 8, I am a dummy source 1, I am a dummy source 7, I am a dummy source 5, ...
# events = 879
	 I am a dummy source 7, I am a dummy source 5, I am a dummy source 3, I am a dummy source 0, I am a dummy source 2, I am a dummy source 3, I am a dummy source 1, I am a dummy source 2, I am a dummy source 7, I am a dummy source 7, ...
# events = 888
	 I am a dummy source 9, I am a dummy source 0, I am a dummy source 1, I am a dummy source 2, I am a dummy source 7, I am a dummy source 3, I am a dummy source 3, I am a dummy source 0, I am a dummy source 1, I am a dummy source 9, ...
# events = 883
	 I am a dummy source 6, I am a dummy source 5, I am a dummy source 4, I am a dummy source 6, I am a dummy source 4, I am a dummy source 2, I am a dummy source 3, I am a dummy source 6, I am a dummy source 1, I am a dummy source 6, ...
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@50c3571
res2: Boolean = false

Interactive Querying

Now let's try querying the table. You can run this command again and again, you will find the numbers changing.

select * from batch_word_count
word count
8 107.0
0 100.0
dummy 890.0
a 890.0
I 890.0
9 70.0
1 89.0
2 87.0
source 890.0
3 91.0
4 79.0
am 890.0
5 96.0
6 82.0
7 89.0

Try again for current table.

select * from batch_word_count 
word count
8 109.0
0 77.0
dummy 888.0
a 888.0
I 888.0
9 84.0
1 85.0
2 89.0
source 888.0
3 86.0
4 91.0
am 888.0
5 74.0
6 84.0
7 109.0

Finally, if you want stop the StreamingContext, you can uncomment and execute the following code:

StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }

StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } // please do this if you are done!

Next two examples Spark Streaming is with live tweets.

ScaDaMaLe Course site and book

Dear Researcher,

The main form of assessment for day 01 of module 01 and both days of module 03 is most likely going to be a fairly open-ended project in groups of appropriate sizes (with some structure TBD) that is close to your research interests. The exact details will be given at the end of day 02 of module 03. There will most likely be dedicated Office Hours after module 03 to support you with the project (admin, infrastructure, etc).

Towards this, as one possibility for project, I strongly encourage you to apply for Twitter Developer Account (you need a Twitter user account first). This process can take couple of weeks. With Twitter developer account you can do your own experiments in Twitter and it would be an interesting application of streaming.

The instructions are roughly as follows (Twitter will ask different questions to different users... rules keep evolving... just make it clear you are just wanting credentials for learning Spark streaming. Keep it simple.): https://lamastex.github.io/scalable-data-science/sds/basics/instructions/getTwitterDevCreds/ (Links to an external site.)

You can still follow the lab/lectures without your own Twitter Developer credentials IF I/we dynamically decide to go through Twitter examples of Streaming (provided at least one of you has applied for the developer credentials and is interested in such experiments), but then you will not be able to do your own experiments live in Twitter as I will be doing mine.

Twitter can be a fun source of interesting projects from uniformly sampled social media interactions in the world (but there are many other projects, especially those coming from your own research questions).

Cheers!

Raaz

ScaDaMaLe Course site and book

Twitter Utility Functions

Here we develop a few notebooks starting with 025_* to help us with Twitter experiments.

Extended TwitterUtils

We extend twitter utils from Spark to allow for filtering by user-ids using .follow and strings in the tweet using .track method of twitter4j.

This is part of Project MEP: Meme Evolution Programme and supported by databricks, AWS and a Swedish VR grant.

The analysis is available in the following databricks notebook: * http://lamastex.org/lmse/mep/src/extendedTwitterUtils.html

Copyright 2016-2020 Ivan Sadikov and Raazesh Sainudiin

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization

import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
class ExtendedTwitterReceiver(
    twitterAuth: Authorization,
    filters: Seq[String],
    userFilters: Seq[Long],
    storageLevel: StorageLevel
  ) extends Receiver[Status](storageLevel) {

  @volatile private var twitterStream: TwitterStream = _
  @volatile private var stopped = false

  def onStart() {
    try {
      val newTwitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
      newTwitterStream.addListener(new StatusListener {
        def onStatus(status: Status): Unit = {
          store(status)
        }
        // Unimplemented
        def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
        def onTrackLimitationNotice(i: Int) {}
        def onScrubGeo(l: Long, l1: Long) {}
        def onStallWarning(stallWarning: StallWarning) {}
        def onException(e: Exception) {
          if (!stopped) {
            restart("Error receiving tweets", e)
          }
        }
      })

      // do filtering only when filters are available
      if (filters.nonEmpty || userFilters.nonEmpty) {
        val query = new FilterQuery()
        if (filters.nonEmpty) {
          query.track(filters.mkString(","))
        }

        if (userFilters.nonEmpty) {
          query.follow(userFilters: _*)
        }
        
        newTwitterStream.filter(query)
      } else {
        newTwitterStream.sample()
      }
      setTwitterStream(newTwitterStream)
      println("Twitter receiver started")
      stopped = false
    } catch {
      case e: Exception => restart("Error starting Twitter stream", e)
    }
  }

  def onStop() {
    stopped = true
    setTwitterStream(null)
    println("Twitter receiver stopped")
  }

  private def setTwitterStream(newTwitterStream: TwitterStream) = synchronized {
    if (twitterStream != null) {
      twitterStream.shutdown()
    }
    twitterStream = newTwitterStream
  }
}
defined class ExtendedTwitterReceiver
class ExtendedTwitterInputDStream(
    ssc_ : StreamingContext,
    twitterAuth: Option[Authorization],
    filters: Seq[String],
    userFilters: Seq[Long],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[Status](ssc_)  {

  private def createOAuthAuthorization(): Authorization = {
    new OAuthAuthorization(new ConfigurationBuilder().build())
  }

  private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())

  override def getReceiver(): Receiver[Status] = {
    new ExtendedTwitterReceiver(authorization, filters, userFilters, storageLevel)
  }
}
defined class ExtendedTwitterInputDStream
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}

object ExtendedTwitterUtils {
  def createStream(
      ssc: StreamingContext,
      twitterAuth: Option[Authorization],
      filters: Seq[String] = Nil,
      userFilters: Seq[Long] = Nil,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[Status] = {
    new ExtendedTwitterInputDStream(ssc, twitterAuth, filters, userFilters, storageLevel)
  }
}
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
defined module ExtendedTwitterUtils
println("done running the extendedTwitterUtils2run notebook - ready to stream from twitter")

ScaDaMaLe Course site and book

Tweet Transmission Tree Function

This is part of Project MEP: Meme Evolution Programme and supported by databricks, AWS and a Swedish VR grant.

Please see the following notebook to understand the rationale for the Tweet Transmission Tree Functions: * http://lamastex.org/lmse/mep/src/TweetAnatomyAndTransmissionTree.html

Copyright 2016-2020 Akinwande Atanda and Raazesh Sainudiin

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
import org.apache.spark.sql.types.{StructType, StructField, StringType};
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.ColumnName
import org.apache.spark.sql.DataFrame

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

def fromParquetFile2DF(InputDFAsParquetFilePatternString: String): DataFrame = {
      sqlContext.
        read.parquet(InputDFAsParquetFilePatternString)
}

def tweetsJsonStringDF2TweetsDF(tweetsAsJsonStringInputDF: DataFrame): DataFrame = {
      sqlContext
        .read
        .json(tweetsAsJsonStringInputDF.map({case Row(val1: String) => val1}))
      }

def tweetsIDLong_JsonStringPairDF2TweetsDF(tweetsAsIDLong_JsonStringInputDF: DataFrame): DataFrame = {
      sqlContext
        .read
        .json(tweetsAsIDLong_JsonStringInputDF.map({case Row(val0:Long, val1: String) => val1}))
      }

def tweetsDF2TTTDF(tweetsInputDF: DataFrame): DataFrame = {
 tweetsInputDF.select(
  unix_timestamp($"createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CurrentTweetDate"),
  $"id".as("CurrentTwID"),
  $"lang".as("lang"),
  $"geoLocation.latitude".as("lat"),
  $"geoLocation.longitude".as("lon"),
  unix_timestamp($"retweetedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInRT"), 
  $"retweetedStatus.id".as("OriginalTwIDinRT"),  
  unix_timestamp($"quotedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInQT"), 
  $"quotedStatus.id".as("OriginalTwIDinQT"), 
  $"inReplyToStatusId".as("OriginalTwIDinReply"), 
  $"user.id".as("CPostUserId"),
  unix_timestamp($"user.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("userCreatedAtDate"),
  $"retweetedStatus.user.id".as("OPostUserIdinRT"), 
  $"quotedStatus.user.id".as("OPostUserIdinQT"),
  $"inReplyToUserId".as("OPostUserIdinReply"),
  $"user.name".as("CPostUserName"), 
  $"retweetedStatus.user.name".as("OPostUserNameinRT"), 
  $"quotedStatus.user.name".as("OPostUserNameinQT"), 
  $"user.screenName".as("CPostUserSN"), 
  $"retweetedStatus.user.screenName".as("OPostUserSNinRT"), 
  $"quotedStatus.user.screenName".as("OPostUserSNinQT"),
  $"inReplyToScreenName".as("OPostUserSNinReply"),
  $"user.favouritesCount",
  $"user.followersCount",
  $"user.friendsCount",
  $"user.isVerified",
  $"user.isGeoEnabled",
  $"text".as("CurrentTweet"), 
  $"retweetedStatus.userMentionEntities.id".as("UMentionRTiD"), 
  $"retweetedStatus.userMentionEntities.screenName".as("UMentionRTsN"), 
  $"quotedStatus.userMentionEntities.id".as("UMentionQTiD"), 
  $"quotedStatus.userMentionEntities.screenName".as("UMentionQTsN"), 
  $"userMentionEntities.id".as("UMentionASiD"), 
  $"userMentionEntities.screenName".as("UMentionASsN")
 ).withColumn("TweetType",
    when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1,
      "Original Tweet")
    .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1,
      "Reply Tweet")
    .when($"OriginalTwIDinRT".isNotNull &&$"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1,
      "ReTweet")
    .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1,
      "Quoted Tweet")
    .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1,
      "Retweet of Quoted Tweet")
    .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1,
      "Retweet of Reply Tweet")
    .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1,
      "Reply of Quoted Tweet")
    .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1,
      "Retweet of Quoted Rely Tweet")
      .otherwise("Unclassified"))
.withColumn("MentionType", 
    when($"UMentionRTid".isNotNull && $"UMentionQTid".isNotNull, "RetweetAndQuotedMention")
    .when($"UMentionRTid".isNotNull && $"UMentionQTid".isNull, "RetweetMention")
    .when($"UMentionRTid".isNull && $"UMentionQTid".isNotNull, "QuotedMention")
    .when($"UMentionRTid".isNull && $"UMentionQTid".isNull, "AuthoredMention")
    .otherwise("NoMention"))
.withColumn("Weight", lit(1L))
}

def tweetsDF2TTTDFWithURLsAndHashtags(tweetsInputDF: DataFrame): DataFrame = {
 tweetsInputDF.select(
  unix_timestamp($"createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CurrentTweetDate"),
  $"id".as("CurrentTwID"),
  $"lang".as("lang"),
  $"geoLocation.latitude".as("lat"),
  $"geoLocation.longitude".as("lon"),
  unix_timestamp($"retweetedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInRT"), 
  $"retweetedStatus.id".as("OriginalTwIDinRT"),  
  unix_timestamp($"quotedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInQT"), 
  $"quotedStatus.id".as("OriginalTwIDinQT"), 
  $"inReplyToStatusId".as("OriginalTwIDinReply"), 
  $"user.id".as("CPostUserId"),
  unix_timestamp($"user.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("userCreatedAtDate"),
  $"retweetedStatus.user.id".as("OPostUserIdinRT"), 
  $"quotedStatus.user.id".as("OPostUserIdinQT"),
  $"inReplyToUserId".as("OPostUserIdinReply"),
  $"user.name".as("CPostUserName"), 
  $"retweetedStatus.user.name".as("OPostUserNameinRT"), 
  $"quotedStatus.user.name".as("OPostUserNameinQT"), 
  $"user.screenName".as("CPostUserSN"), 
  $"retweetedStatus.user.screenName".as("OPostUserSNinRT"), 
  $"quotedStatus.user.screenName".as("OPostUserSNinQT"),
  $"inReplyToScreenName".as("OPostUserSNinReply"),
  $"user.favouritesCount",
  $"user.followersCount",
  $"user.friendsCount",
  $"user.isVerified",
  $"user.isGeoEnabled",
  $"text".as("CurrentTweet"), 
  $"retweetedStatus.userMentionEntities.id".as("UMentionRTiD"), 
  $"retweetedStatus.userMentionEntities.screenName".as("UMentionRTsN"), 
  $"quotedStatus.userMentionEntities.id".as("UMentionQTiD"), 
  $"quotedStatus.userMentionEntities.screenName".as("UMentionQTsN"), 
  $"userMentionEntities.id".as("UMentionASiD"), 
  $"userMentionEntities.screenName".as("UMentionASsN"),
  $"urlEntities.expandedURL".as("URLs"),
  $"hashtagEntities.text".as("hashTags")
 ).withColumn("TweetType",
    when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1,
      "Original Tweet")
    .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1,
      "Reply Tweet")
    .when($"OriginalTwIDinRT".isNotNull &&$"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1,
      "ReTweet")
    .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1,
      "Quoted Tweet")
    .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1,
      "Retweet of Quoted Tweet")
    .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1,
      "Retweet of Reply Tweet")
    .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1,
      "Reply of Quoted Tweet")
    .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1,
      "Retweet of Quoted Rely Tweet")
      .otherwise("Unclassified"))
.withColumn("MentionType", 
    when($"UMentionRTid".isNotNull && $"UMentionQTid".isNotNull, "RetweetAndQuotedMention")
    .when($"UMentionRTid".isNotNull && $"UMentionQTid".isNull, "RetweetMention")
    .when($"UMentionRTid".isNull && $"UMentionQTid".isNotNull, "QuotedMention")
    .when($"UMentionRTid".isNull && $"UMentionQTid".isNull, "AuthoredMention")
    .otherwise("NoMention"))
.withColumn("Weight", lit(1L))
}

println("""USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  """)
// try to modify the function tweetsDF2TTTDF so some fields are not necessarily assumed to be available
// there are better ways - https://stackoverflow.com/questions/35904136/how-do-i-detect-if-a-spark-dataframe-has-a-column

def tweetsDF2TTTDFLightWeight(tweetsInputDF: DataFrame): DataFrame = {
 tweetsInputDF.select(
  unix_timestamp($"createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CurrentTweetDate"),
  $"id".as("CurrentTwID"),
  $"lang".as("lang"),
  //$"geoLocation.latitude".as("lat"),
  //$"geoLocation.longitude".as("lon"),
  unix_timestamp($"retweetedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInRT"), 
  $"retweetedStatus.id".as("OriginalTwIDinRT"),  
  unix_timestamp($"quotedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInQT"), 
  $"quotedStatus.id".as("OriginalTwIDinQT"), 
  $"inReplyToStatusId".as("OriginalTwIDinReply"), 
  $"user.id".as("CPostUserId"),
  unix_timestamp($"user.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("userCreatedAtDate"),
  $"retweetedStatus.user.id".as("OPostUserIdinRT"), 
  $"quotedStatus.user.id".as("OPostUserIdinQT"),
  $"inReplyToUserId".as("OPostUserIdinReply"),
  $"user.name".as("CPostUserName"), 
  $"retweetedStatus.user.name".as("OPostUserNameinRT"), 
  $"quotedStatus.user.name".as("OPostUserNameinQT"), 
  $"user.screenName".as("CPostUserSN"), 
  $"retweetedStatus.user.screenName".as("OPostUserSNinRT"), 
  $"quotedStatus.user.screenName".as("OPostUserSNinQT"),
  $"inReplyToScreenName".as("OPostUserSNinReply"),
  $"user.favouritesCount",
  $"user.followersCount",
  $"user.friendsCount",
  $"user.isVerified",
  $"user.isGeoEnabled",
  $"text".as("CurrentTweet"), 
  $"retweetedStatus.userMentionEntities.id".as("UMentionRTiD"), 
  $"retweetedStatus.userMentionEntities.screenName".as("UMentionRTsN"), 
  $"quotedStatus.userMentionEntities.id".as("UMentionQTiD"), 
  $"quotedStatus.userMentionEntities.screenName".as("UMentionQTsN"), 
  $"userMentionEntities.id".as("UMentionASiD"), 
  $"userMentionEntities.screenName".as("UMentionASsN")
 ).withColumn("TweetType",
    when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1,
      "Original Tweet")
    .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1,
      "Reply Tweet")
    .when($"OriginalTwIDinRT".isNotNull &&$"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1,
      "ReTweet")
    .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1,
      "Quoted Tweet")
    .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1,
      "Retweet of Quoted Tweet")
    .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1,
      "Retweet of Reply Tweet")
    .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1,
      "Reply of Quoted Tweet")
    .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1,
      "Retweet of Quoted Rely Tweet")
      .otherwise("Unclassified"))
.withColumn("MentionType", 
    when($"UMentionRTid".isNotNull && $"UMentionQTid".isNotNull, "RetweetAndQuotedMention")
    .when($"UMentionRTid".isNotNull && $"UMentionQTid".isNull, "RetweetMention")
    .when($"UMentionRTid".isNull && $"UMentionQTid".isNotNull, "QuotedMention")
    .when($"UMentionRTid".isNull && $"UMentionQTid".isNull, "AuthoredMention")
    .otherwise("NoMention"))
.withColumn("Weight", lit(1L))
}

def tweetsDF2TTTDFWithURLsAndHashtagsLightWeight(tweetsInputDF: DataFrame): DataFrame = {
 tweetsInputDF.select(
  unix_timestamp($"createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CurrentTweetDate"),
  $"id".as("CurrentTwID"),
  $"lang".as("lang"),
  //$"geoLocation.latitude".as("lat"),
  //$"geoLocation.longitude".as("lon"),
  unix_timestamp($"retweetedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInRT"), 
  $"retweetedStatus.id".as("OriginalTwIDinRT"),  
  unix_timestamp($"quotedStatus.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("CreationDateOfOrgTwInQT"), 
  $"quotedStatus.id".as("OriginalTwIDinQT"), 
  $"inReplyToStatusId".as("OriginalTwIDinReply"), 
  $"user.id".as("CPostUserId"),
  unix_timestamp($"user.createdAt", """MMM dd, yyyy hh:mm:ss a""").cast(TimestampType).as("userCreatedAtDate"),
  $"retweetedStatus.user.id".as("OPostUserIdinRT"), 
  $"quotedStatus.user.id".as("OPostUserIdinQT"),
  $"inReplyToUserId".as("OPostUserIdinReply"),
  $"user.name".as("CPostUserName"), 
  $"retweetedStatus.user.name".as("OPostUserNameinRT"), 
  $"quotedStatus.user.name".as("OPostUserNameinQT"), 
  $"user.screenName".as("CPostUserSN"), 
  $"retweetedStatus.user.screenName".as("OPostUserSNinRT"), 
  $"quotedStatus.user.screenName".as("OPostUserSNinQT"),
  $"inReplyToScreenName".as("OPostUserSNinReply"),
  $"user.favouritesCount",
  $"user.followersCount",
  $"user.friendsCount",
  $"user.isVerified",
  $"user.isGeoEnabled",
  $"text".as("CurrentTweet"), 
  $"retweetedStatus.userMentionEntities.id".as("UMentionRTiD"), 
  $"retweetedStatus.userMentionEntities.screenName".as("UMentionRTsN"), 
  $"quotedStatus.userMentionEntities.id".as("UMentionQTiD"), 
  $"quotedStatus.userMentionEntities.screenName".as("UMentionQTsN"), 
  $"userMentionEntities.id".as("UMentionASiD"), 
  $"userMentionEntities.screenName".as("UMentionASsN"),
  $"urlEntities.expandedURL".as("URLs"),
  $"hashtagEntities.text".as("hashTags")
 ).withColumn("TweetType",
    when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1,
      "Original Tweet")
    .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1,
      "Reply Tweet")
    .when($"OriginalTwIDinRT".isNotNull &&$"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" === -1,
      "ReTweet")
    .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1,
      "Quoted Tweet")
    .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" === -1,
      "Retweet of Quoted Tweet")
    .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNull && $"OriginalTwIDinReply" > -1,
      "Retweet of Reply Tweet")
    .when($"OriginalTwIDinRT".isNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1,
      "Reply of Quoted Tweet")
    .when($"OriginalTwIDinRT".isNotNull && $"OriginalTwIDinQT".isNotNull && $"OriginalTwIDinReply" > -1,
      "Retweet of Quoted Rely Tweet")
      .otherwise("Unclassified"))
.withColumn("MentionType", 
    when($"UMentionRTid".isNotNull && $"UMentionQTid".isNotNull, "RetweetAndQuotedMention")
    .when($"UMentionRTid".isNotNull && $"UMentionQTid".isNull, "RetweetMention")
    .when($"UMentionRTid".isNull && $"UMentionQTid".isNotNull, "QuotedMention")
    .when($"UMentionRTid".isNull && $"UMentionQTid".isNull, "AuthoredMention")
    .otherwise("NoMention"))
.withColumn("Weight", lit(1L))
}

ScaDaMaLe Course site and book

Extended TwitterUtils with Language

We extend twitter utils from Spark to allow for filtering by user-ids using .follow and strings in the tweet using .track method of twitter4j.

This notebook is mainly left to show how one can adapt one of the exsiting notebooks for twitter experiemnts to cahnge the design of the experiment itself. Similarly, one can extend further into the twitter4j functions and methods we are wrapping into Scala here.

This is part of Project MEP: Meme Evolution Programme and supported by databricks academic partners program.

The analysis is available in the following databricks notebook: * http://lamastex.org/lmse/mep/src/extendedTwitterUtils.html

Copyright 2016 Ivan Sadikov and Raazesh Sainudiin

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization

import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
class ExtendedTwitterReceiver(
    twitterAuth: Authorization,
    filters: Seq[String],
    userFilters: Seq[Long],
    langFilters: Seq[String],
    storageLevel: StorageLevel
  ) extends Receiver[Status](storageLevel) {

  @volatile private var twitterStream: TwitterStream = _
  @volatile private var stopped = false

  def onStart() {
    try {
      val newTwitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
      newTwitterStream.addListener(new StatusListener {
        def onStatus(status: Status): Unit = {
          store(status)
        }
        // Unimplemented
        def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
        def onTrackLimitationNotice(i: Int) {}
        def onScrubGeo(l: Long, l1: Long) {}
        def onStallWarning(stallWarning: StallWarning) {}
        def onException(e: Exception) {
          if (!stopped) {
            restart("Error receiving tweets", e)
          }
        }
      })

      // do filtering only when filters are available
      if (filters.nonEmpty || userFilters.nonEmpty || langFilters.nonEmpty) {
        val query = new FilterQuery()
        if (filters.nonEmpty) {
          query.track(filters.mkString(","))
        }

        if (userFilters.nonEmpty) {
          query.follow(userFilters: _*)
        }
        
        if (langFilters.nonEmpty) {
          query.language(langFilters.mkString(","))
        }
        
        newTwitterStream.filter(query)
      } else {
        newTwitterStream.sample()
      }
      setTwitterStream(newTwitterStream)
      println("Twitter receiver started")
      stopped = false
    } catch {
      case e: Exception => restart("Error starting Twitter stream", e)
    }
  }

  def onStop() {
    stopped = true
    setTwitterStream(null)
    println("Twitter receiver stopped")
  }

  private def setTwitterStream(newTwitterStream: TwitterStream) = synchronized {
    if (twitterStream != null) {
      twitterStream.shutdown()
    }
    twitterStream = newTwitterStream
  }
}
defined class ExtendedTwitterReceiver
class ExtendedTwitterInputDStream(
    ssc_ : StreamingContext,
    twitterAuth: Option[Authorization],
    filters: Seq[String],
    userFilters: Seq[Long],
    langFilters: Seq[String],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[Status](ssc_)  {

  private def createOAuthAuthorization(): Authorization = {
    new OAuthAuthorization(new ConfigurationBuilder().build())
  }

  private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())

  override def getReceiver(): Receiver[Status] = {
    new ExtendedTwitterReceiver(authorization, filters, userFilters, langFilters, storageLevel)
  }
}
defined class ExtendedTwitterInputDStream
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}

object ExtendedTwitterUtils {
  def createStream(
      ssc: StreamingContext,
      twitterAuth: Option[Authorization],
      filters: Seq[String] = Nil,
      userFilters: Seq[Long] = Nil,
      langFilters: Seq[String] = Nil,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[Status] = {
    new ExtendedTwitterInputDStream(ssc, twitterAuth, filters, userFilters, langFilters, storageLevel)
  }
}
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
defined object ExtendedTwitterUtils
println("done running the extendedTwitterUtils2run notebook - ready to stream from twitter by filtering on strings, users and language")
done running the extendedTwitterUtils2run notebook - ready to stream from twitter by filtering on strings, users and language

ScaDaMaLe Course site and book

Tweet Streaming Collector

Let us build a system to collect live tweets using Spark streaming.

Here are the main steps in this notebook:

  • let's collect from the public twitter stream and write to DBFS as json strings in a boiler-plate manner to understand the componets better.
  • Then we will turn the collector into a function and use it
  • Finally we will use some DataFrame-based pipelines to convert the raw tweets into other structured content.

Note that capturing tweets from the public streams for free using Twitter's Streaming API has some caveats. We are supposed to have access to a uniformly random sample of roughly 1% of all Tweets across the globe, but what's exactly available in the sample from the full twitter social media network, i.e. all status updates in the planet, for such free collection is not exactly known in terms of sub-sampling strategies like starification layers, etc. This latter is Twitter's proprietary information. However, we are supposed to be able to assume that it is indeed a random sample of roughly 1% of all tweets.

We will call extendedTwitterUtils notebook from here.

But first install the following libraries from maven central and attach to this cluster:

  • gson with maven coordinates com.google.code.gson:gson:2.8.6
  • twitter4j-examples with maven coordinates org.twitter4j:twitter4j-examples:4.0.7
"./025_a_extendedTwitterUtils2run"
import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.

defined class ExtendedTwitterReceiver
defined class ExtendedTwitterInputDStream
// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) }
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
defined object ExtendedTwitterUtils
done running the extendedTwitterUtils2run notebook - ready to stream from twitter

Let's create a directory in dbfs for storing tweets in the cluster's distributed file system.

val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory
outputDirectoryRoot: String = /datasets/tweetsStreamTmp
dbutils.fs.mkdirs("/datasets/tweetsStreamTmp")
res2: Boolean = true
//display(dbutils.fs.ls(outputDirectoryRoot))
// to remove a pre-existing directory and start from scratch uncomment next line and evaluate this cell
dbutils.fs.rm("/datasets/tweetsStreamTmp", true) 
res4: Boolean = true

Capture tweets in every sliding window of slideInterval many milliseconds.

val slideInterval = new Duration(1 * 1000) // 1 * 1000 = 1000 milli-seconds = 1 sec
slideInterval: org.apache.spark.streaming.Duration = 1000 ms

Recall that Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark?s abstraction of an immutable, distributed dataset (see Spark Programming Guide for more details). Each RDD in a DStream contains data from a certain interval, as shown in the following figure.

Spark Streaming

Let's import google's json library next.

import com.google.gson.Gson 
import com.google.gson.Gson

Our goal is to take each RDD in the twitter DStream and write it as a json file in our dbfs.

// Create a Spark Streaming Context.
val ssc = new StreamingContext(sc, slideInterval)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@7a8fbb86

CAUTION

Extracting knowledge from tweets is "easy" using techniques shown here, but one has to take legal responsibility for the use of this knowledge and conform to the rules and policies linked below.

Remeber that the use of twitter itself comes with various strings attached. Read:

Crucially, the use of the content from twitter by you (as done in this worksheet) comes with some strings. Read:

Enter your own Twitter API Credentials.

  • Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
  • Get your own Twitter API Credentials: consumerKey, consumerSecret, accessToken and accessTokenSecret and enter them in the cell below.

Ethical/Legal Aspects

See Background Readings/Viewings in Project MEP:

Tweet Collector

There are several steps to make a streaming twitter collector. We will do them one by one so you learn all the components. In the sequel we will make a function out of the various steps.

1. Twitter Credentials

First step towards doing your own experiments in twitter is to enter your Twitter API credentials.

  • Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
  • Run the code in a cell to Enter your own credentials.
// put your own twitter developer credentials below instead of xxx
// instead of the '%run ".../secrets/026_secret_MyTwitterOAuthCredentials"' below
// you need to copy-paste the following code-block with your own Twitter credentials replacing XXXX


// put your own twitter developer credentials below 

import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder


// These have been regenerated!!! - need to chane them

def myAPIKey       = "XXXX" // APIKey 
def myAPISecret    = "XXXX" // APISecretKey
def myAccessToken          = "XXXX" // AccessToken
def myAccessTokenSecret    = "XXXX" // AccessTokenSecret


System.setProperty("twitter4j.oauth.consumerKey", myAPIKey)
System.setProperty("twitter4j.oauth.consumerSecret", myAPISecret)
System.setProperty("twitter4j.oauth.accessToken", myAccessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", myAccessTokenSecret)

println("twitter OAuth Credentials loaded")

The cell-below will not expose my Twitter API Credentials: myAPIKey, myAPISecret, myAccessToken and myAccessTokenSecret. Use the code above to enter your own credentials in a scala cell.

"Users/raazesh.sainudiin@math.uu.se/scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"
// Create a Twitter Stream for the input source. 
val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
auth: Some[twitter4j.auth.OAuthAuthorization] = Some(OAuthAuthorization{consumerKey='8uN0N9RTLT1viaR811yyG7xwk', consumerSecret='******************************************', oauthToken=AccessToken{screenName='null', userId=4173723312}})
twitterStream: org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status] = ExtendedTwitterInputDStream@7f635e6d

2. Mapping Tweets to JSON

Let's map the tweets into JSON formatted string (one tweet per line). We will use Google GSON library for this.

val twitterStreamJson = twitterStream.map(
                                            x => { val gson = new Gson();
                                                 val xJson = gson.toJson(x)
                                                 xJson
                                                 }
                                          ) 
twitterStreamJson: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@b874a46
twitter OAuth Credentials loaded
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
myAPIKey: String
myAPISecret: String
myAccessToken: String
myAccessTokenSecret: String
outputDirectoryRoot
res8: String = /datasets/tweetsStreamTmp
var numTweetsCollected = 0L // track number of tweets collected
val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream.

twitterStreamJson.foreachRDD( 
  (rdd, time) => { // for each RDD in the DStream
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
        outputRDD.saveAsTextFile(outputDirectoryRoot + "/tweets_" + time.milliseconds.toString) // save as textfile
        numTweetsCollected += count // update with the latest count
      }
  }
)
numTweetsCollected: Long = 0
partitionsEachInterval: Int = 1

3. Start the Stream

Nothing has actually happened yet.

Let's start the spark streaming context we have created next.

ssc.start()

Let's look at the spark UI now and monitor the streaming job in action! Go to Clusters on the left and click on UI and then Streaming.

numTweetsCollected // number of tweets collected so far
res11: Long = 468

Let's try seeing again in a few seconds how many tweets have been collected up to now.

numTweetsCollected // number of tweets collected so far
res12: Long = 859

4. Stop the Stream

Note that you could easilt fill up disk space!!!

So let's stop the streaming job next.

ssc.stop(stopSparkContext = false) // gotto stop soon!!!

Let's make sure that the Streaming UI is not active in the Clusters UI.

StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } // extra cautious stopping of all active streaming contexts

5. Examine Collected Tweets

Next let's examine what was saved in dbfs.

display(dbutils.fs.ls(outputDirectoryRoot))
path name size
dbfs:/datasets/tweetsStreamTmp/tweets_1605867067000/ tweets_1605867067000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867068000/ tweets_1605867068000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867069000/ tweets_1605867069000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867070000/ tweets_1605867070000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867071000/ tweets_1605867071000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867072000/ tweets_1605867072000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867073000/ tweets_1605867073000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867074000/ tweets_1605867074000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867075000/ tweets_1605867075000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867076000/ tweets_1605867076000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867077000/ tweets_1605867077000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867078000/ tweets_1605867078000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867079000/ tweets_1605867079000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867080000/ tweets_1605867080000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867081000/ tweets_1605867081000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867082000/ tweets_1605867082000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867083000/ tweets_1605867083000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867084000/ tweets_1605867084000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867085000/ tweets_1605867085000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867086000/ tweets_1605867086000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867087000/ tweets_1605867087000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867088000/ tweets_1605867088000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867089000/ tweets_1605867089000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867090000/ tweets_1605867090000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867091000/ tweets_1605867091000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867092000/ tweets_1605867092000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867093000/ tweets_1605867093000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867094000/ tweets_1605867094000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867095000/ tweets_1605867095000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867096000/ tweets_1605867096000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867097000/ tweets_1605867097000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867098000/ tweets_1605867098000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867099000/ tweets_1605867099000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867100000/ tweets_1605867100000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867101000/ tweets_1605867101000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867102000/ tweets_1605867102000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867103000/ tweets_1605867103000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867104000/ tweets_1605867104000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867105000/ tweets_1605867105000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867106000/ tweets_1605867106000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867107000/ tweets_1605867107000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867108000/ tweets_1605867108000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867109000/ tweets_1605867109000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867110000/ tweets_1605867110000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867111000/ tweets_1605867111000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867112000/ tweets_1605867112000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867113000/ tweets_1605867113000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867114000/ tweets_1605867114000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867115000/ tweets_1605867115000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867116000/ tweets_1605867116000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867117000/ tweets_1605867117000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867118000/ tweets_1605867118000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867119000/ tweets_1605867119000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867120000/ tweets_1605867120000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867121000/ tweets_1605867121000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867122000/ tweets_1605867122000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867123000/ tweets_1605867123000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867124000/ tweets_1605867124000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867125000/ tweets_1605867125000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867126000/ tweets_1605867126000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867127000/ tweets_1605867127000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867128000/ tweets_1605867128000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867129000/ tweets_1605867129000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867130000/ tweets_1605867130000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867131000/ tweets_1605867131000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867132000/ tweets_1605867132000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867133000/ tweets_1605867133000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867134000/ tweets_1605867134000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867135000/ tweets_1605867135000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867136000/ tweets_1605867136000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867137000/ tweets_1605867137000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867138000/ tweets_1605867138000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867139000/ tweets_1605867139000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867140000/ tweets_1605867140000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867141000/ tweets_1605867141000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867142000/ tweets_1605867142000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867143000/ tweets_1605867143000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867144000/ tweets_1605867144000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867145000/ tweets_1605867145000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867146000/ tweets_1605867146000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867147000/ tweets_1605867147000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867148000/ tweets_1605867148000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867149000/ tweets_1605867149000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867150000/ tweets_1605867150000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867151000/ tweets_1605867151000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867152000/ tweets_1605867152000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867153000/ tweets_1605867153000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867154000/ tweets_1605867154000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867155000/ tweets_1605867155000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867156000/ tweets_1605867156000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867157000/ tweets_1605867157000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867158000/ tweets_1605867158000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867159000/ tweets_1605867159000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867160000/ tweets_1605867160000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867161000/ tweets_1605867161000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867162000/ tweets_1605867162000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867163000/ tweets_1605867163000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867164000/ tweets_1605867164000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867165000/ tweets_1605867165000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867166000/ tweets_1605867166000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867167000/ tweets_1605867167000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867168000/ tweets_1605867168000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867169000/ tweets_1605867169000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867170000/ tweets_1605867170000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867171000/ tweets_1605867171000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867172000/ tweets_1605867172000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867173000/ tweets_1605867173000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867174000/ tweets_1605867174000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867175000/ tweets_1605867175000/ 0.0
val tweetsDir = outputDirectoryRoot+"/tweets_1605867068000/" // use an existing file, may have to rename folder based on output above!
tweetsDir: String = /datasets/tweetsStreamTmp/tweets_1605867068000/
display(dbutils.fs.ls(tweetsDir)) 
path name size
dbfs:/datasets/tweetsStreamTmp/tweets_1605867068000/_SUCCESS _SUCCESS 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1605867068000/part-00000 part-00000 122395.0
sc.textFile(tweetsDir+"part-00000").count()
res17: Long = 31
val outJson = sqlContext.read.json(tweetsDir+"part-00000")
outJson: org.apache.spark.sql.DataFrame = [contributorsIDs: array<string>, createdAt: string ... 26 more fields]
outJson.printSchema()
root
 |-- contributorsIDs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- createdAt: string (nullable = true)
 |-- currentUserRetweetId: long (nullable = true)
 |-- displayTextRangeEnd: long (nullable = true)
 |-- displayTextRangeStart: long (nullable = true)
 |-- favoriteCount: long (nullable = true)
 |-- hashtagEntities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- text: string (nullable = true)
 |-- id: long (nullable = true)
 |-- inReplyToScreenName: string (nullable = true)
 |-- inReplyToStatusId: long (nullable = true)
 |-- inReplyToUserId: long (nullable = true)
 |-- isFavorited: boolean (nullable = true)
 |-- isPossiblySensitive: boolean (nullable = true)
 |-- isRetweeted: boolean (nullable = true)
 |-- isTruncated: boolean (nullable = true)
 |-- lang: string (nullable = true)
 |-- mediaEntities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- displayURL: string (nullable = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- expandedURL: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- mediaURL: string (nullable = true)
 |    |    |-- mediaURLHttps: string (nullable = true)
 |    |    |-- sizes: struct (nullable = true)
 |    |    |    |-- 0: struct (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- 2: struct (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- 3: struct (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- videoAspectRatioHeight: long (nullable = true)
 |    |    |-- videoAspectRatioWidth: long (nullable = true)
 |    |    |-- videoDurationMillis: long (nullable = true)
 |    |    |-- videoVariants: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- quotedStatus: struct (nullable = true)
 |    |-- contributorsIDs: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- createdAt: string (nullable = true)
 |    |-- currentUserRetweetId: long (nullable = true)
 |    |-- displayTextRangeEnd: long (nullable = true)
 |    |-- displayTextRangeStart: long (nullable = true)
 |    |-- favoriteCount: long (nullable = true)
 |    |-- hashtagEntities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- id: long (nullable = true)
 |    |-- inReplyToScreenName: string (nullable = true)
 |    |-- inReplyToStatusId: long (nullable = true)
 |    |-- inReplyToUserId: long (nullable = true)
 |    |-- isFavorited: boolean (nullable = true)
 |    |-- isPossiblySensitive: boolean (nullable = true)
 |    |-- isRetweeted: boolean (nullable = true)
 |    |-- isTruncated: boolean (nullable = true)
 |    |-- lang: string (nullable = true)
 |    |-- mediaEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- displayURL: string (nullable = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- expandedURL: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- mediaURL: string (nullable = true)
 |    |    |    |-- mediaURLHttps: string (nullable = true)
 |    |    |    |-- sizes: struct (nullable = true)
 |    |    |    |    |-- 0: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- 2: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- 3: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |    |-- url: string (nullable = true)
 |    |    |    |-- videoAspectRatioHeight: long (nullable = true)
 |    |    |    |-- videoAspectRatioWidth: long (nullable = true)
 |    |    |    |-- videoDurationMillis: long (nullable = true)
 |    |    |    |-- videoVariants: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |-- place: struct (nullable = true)
 |    |    |-- boundingBoxCoordinates: array (nullable = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- latitude: double (nullable = true)
 |    |    |    |    |    |-- longitude: double (nullable = true)
 |    |    |-- boundingBoxType: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- countryCode: string (nullable = true)
 |    |    |-- fullName: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- placeType: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |-- quotedStatusId: long (nullable = true)
 |    |-- retweetCount: long (nullable = true)
 |    |-- source: string (nullable = true)
 |    |-- symbolEntities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- text: string (nullable = true)
 |    |-- urlEntities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- user: struct (nullable = true)
 |    |    |-- createdAt: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- descriptionURLEntities: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- favouritesCount: long (nullable = true)
 |    |    |-- followersCount: long (nullable = true)
 |    |    |-- friendsCount: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- isContributorsEnabled: boolean (nullable = true)
 |    |    |-- isDefaultProfile: boolean (nullable = true)
 |    |    |-- isDefaultProfileImage: boolean (nullable = true)
 |    |    |-- isFollowRequestSent: boolean (nullable = true)
 |    |    |-- isGeoEnabled: boolean (nullable = true)
 |    |    |-- isProtected: boolean (nullable = true)
 |    |    |-- isVerified: boolean (nullable = true)
 |    |    |-- listedCount: long (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- profileBackgroundColor: string (nullable = true)
 |    |    |-- profileBackgroundImageUrl: string (nullable = true)
 |    |    |-- profileBackgroundImageUrlHttps: string (nullable = true)
 |    |    |-- profileBackgroundTiled: boolean (nullable = true)
 |    |    |-- profileBannerImageUrl: string (nullable = true)
 |    |    |-- profileImageUrl: string (nullable = true)
 |    |    |-- profileImageUrlHttps: string (nullable = true)
 |    |    |-- profileLinkColor: string (nullable = true)
 |    |    |-- profileSidebarBorderColor: string (nullable = true)
 |    |    |-- profileSidebarFillColor: string (nullable = true)
 |    |    |-- profileTextColor: string (nullable = true)
 |    |    |-- profileUseBackgroundImage: boolean (nullable = true)
 |    |    |-- screenName: string (nullable = true)
 |    |    |-- showAllInlineMedia: boolean (nullable = true)
 |    |    |-- statusesCount: long (nullable = true)
 |    |    |-- translator: boolean (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- utcOffset: long (nullable = true)
 |    |-- userMentionEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- screenName: string (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |-- quotedStatusId: long (nullable = true)
 |-- quotedStatusPermalink: struct (nullable = true)
 |    |-- displayURL: string (nullable = true)
 |    |-- end: long (nullable = true)
 |    |-- expandedURL: string (nullable = true)
 |    |-- start: long (nullable = true)
 |    |-- url: string (nullable = true)
 |-- retweetCount: long (nullable = true)
 |-- retweetedStatus: struct (nullable = true)
 |    |-- contributorsIDs: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- createdAt: string (nullable = true)
 |    |-- currentUserRetweetId: long (nullable = true)
 |    |-- displayTextRangeEnd: long (nullable = true)
 |    |-- displayTextRangeStart: long (nullable = true)
 |    |-- favoriteCount: long (nullable = true)
 |    |-- hashtagEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- inReplyToScreenName: string (nullable = true)
 |    |-- inReplyToStatusId: long (nullable = true)
 |    |-- inReplyToUserId: long (nullable = true)
 |    |-- isFavorited: boolean (nullable = true)
 |    |-- isPossiblySensitive: boolean (nullable = true)
 |    |-- isRetweeted: boolean (nullable = true)
 |    |-- isTruncated: boolean (nullable = true)
 |    |-- lang: string (nullable = true)
 |    |-- mediaEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- displayURL: string (nullable = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- expandedURL: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- mediaURL: string (nullable = true)
 |    |    |    |-- mediaURLHttps: string (nullable = true)
 |    |    |    |-- sizes: struct (nullable = true)
 |    |    |    |    |-- 0: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- 2: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- 3: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |    |-- url: string (nullable = true)
 |    |    |    |-- videoAspectRatioHeight: long (nullable = true)
 |    |    |    |-- videoAspectRatioWidth: long (nullable = true)
 |    |    |    |-- videoDurationMillis: long (nullable = true)
 |    |    |    |-- videoVariants: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |-- quotedStatusId: long (nullable = true)
 |    |-- retweetCount: long (nullable = true)
 |    |-- source: string (nullable = true)
 |    |-- symbolEntities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- text: string (nullable = true)
 |    |-- urlEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- displayURL: string (nullable = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- expandedURL: string (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |    |    |    |-- url: string (nullable = true)
 |    |-- user: struct (nullable = true)
 |    |    |-- createdAt: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- descriptionURLEntities: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- favouritesCount: long (nullable = true)
 |    |    |-- followersCount: long (nullable = true)
 |    |    |-- friendsCount: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- isContributorsEnabled: boolean (nullable = true)
 |    |    |-- isDefaultProfile: boolean (nullable = true)
 |    |    |-- isDefaultProfileImage: boolean (nullable = true)
 |    |    |-- isFollowRequestSent: boolean (nullable = true)
 |    |    |-- isGeoEnabled: boolean (nullable = true)
 |    |    |-- isProtected: boolean (nullable = true)
 |    |    |-- isVerified: boolean (nullable = true)
 |    |    |-- listedCount: long (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- profileBackgroundColor: string (nullable = true)
 |    |    |-- profileBackgroundImageUrl: string (nullable = true)
 |    |    |-- profileBackgroundImageUrlHttps: string (nullable = true)
 |    |    |-- profileBackgroundTiled: boolean (nullable = true)
 |    |    |-- profileBannerImageUrl: string (nullable = true)
 |    |    |-- profileImageUrl: string (nullable = true)
 |    |    |-- profileImageUrlHttps: string (nullable = true)
 |    |    |-- profileLinkColor: string (nullable = true)
 |    |    |-- profileSidebarBorderColor: string (nullable = true)
 |    |    |-- profileSidebarFillColor: string (nullable = true)
 |    |    |-- profileTextColor: string (nullable = true)
 |    |    |-- profileUseBackgroundImage: boolean (nullable = true)
 |    |    |-- screenName: string (nullable = true)
 |    |    |-- showAllInlineMedia: boolean (nullable = true)
 |    |    |-- statusesCount: long (nullable = true)
 |    |    |-- translator: boolean (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- utcOffset: long (nullable = true)
 |    |-- userMentionEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- screenName: string (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |-- source: string (nullable = true)
 |-- symbolEntities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- text: string (nullable = true)
 |-- urlEntities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- displayURL: string (nullable = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- expandedURL: string (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- url: string (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- createdAt: string (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- descriptionURLEntities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- favouritesCount: long (nullable = true)
 |    |-- followersCount: long (nullable = true)
 |    |-- friendsCount: long (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- isContributorsEnabled: boolean (nullable = true)
 |    |-- isDefaultProfile: boolean (nullable = true)
 |    |-- isDefaultProfileImage: boolean (nullable = true)
 |    |-- isFollowRequestSent: boolean (nullable = true)
 |    |-- isGeoEnabled: boolean (nullable = true)
 |    |-- isProtected: boolean (nullable = true)
 |    |-- isVerified: boolean (nullable = true)
 |    |-- listedCount: long (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- profileBackgroundColor: string (nullable = true)
 |    |-- profileBackgroundImageUrl: string (nullable = true)
 |    |-- profileBackgroundImageUrlHttps: string (nullable = true)
 |    |-- profileBackgroundTiled: boolean (nullable = true)
 |    |-- profileBannerImageUrl: string (nullable = true)
 |    |-- profileImageUrl: string (nullable = true)
 |    |-- profileImageUrlHttps: string (nullable = true)
 |    |-- profileLinkColor: string (nullable = true)
 |    |-- profileSidebarBorderColor: string (nullable = true)
 |    |-- profileSidebarFillColor: string (nullable = true)
 |    |-- profileTextColor: string (nullable = true)
 |    |-- profileUseBackgroundImage: boolean (nullable = true)
 |    |-- screenName: string (nullable = true)
 |    |-- showAllInlineMedia: boolean (nullable = true)
 |    |-- statusesCount: long (nullable = true)
 |    |-- translator: boolean (nullable = true)
 |    |-- url: string (nullable = true)
 |    |-- utcOffset: long (nullable = true)
 |-- userMentionEntities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- screenName: string (nullable = true)
 |    |    |-- start: long (nullable = true)
//outJson.select("id","text").show(false) // output not displayed to comply with Twitter Developer rules
//display(outJson)  // output not displayed to comply with Twitter Developer rules

Now, let's be good at house-keeping and clean-up the unnecessary data in dbfs, our distributed file system (in databricks).

// to remove a pre-existing directory and start from scratch uncomment next line and evaluate this cell
dbutils.fs.rm(outputDirectoryRoot, true) 
res24: Boolean = true

Clearly there is a lot one can do with tweets!

Enspecially, after you can get a few more primitives under your belt from the following areas:

  • Natural Language Processing (MLlib, beyond word counts of course),
  • Distributed vertex programming (Graph Frames, which you already know), and
  • Scalable geospatial computing with location data on open street maps (roughly a third of tweets are geo-enabled with Latitude and Longitude of the tweet location) - we will get into this.

Method for Spark Streaming Collector

Let's try to throw the bits and bobs of code in the above 5 steps into a method called streamFunc for simplicity and modularity.

import com.google.gson.Gson 
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory
val batchInterval = 1 // in minutes
val timeoutJobLength =  batchInterval * 5

var newContextCreated = false
var numTweetsCollected = 0L // track number of tweets collected
//val conf = new SparkConf().setAppName("TrackedTweetCollector").setMaster("local")
// This is the function that creates the SteamingContext and sets up the Spark Streaming job.
def streamFunc(): StreamingContext = {
  // Create a Spark Streaming Context.
  val ssc = new StreamingContext(sc, Minutes(batchInterval))
  // Create the OAuth Twitter credentials 
  val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
  // Create a Twitter Stream for the input source.  
  val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
  // Transform the discrete RDDs into JSON
  val twitterStreamJson = twitterStream.map(x => { val gson = new Gson();
                                                 val xJson = gson.toJson(x)
                                                 xJson
                                               }) 
  // take care
  val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream.
  
  // what we want done with each discrete RDD tuple: (rdd, time)
  twitterStreamJson.foreachRDD((rdd, time) => { // for each filtered RDD in the DStream
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
        // to write to parquet directly in append mode in one directory per 'time'------------       
        val outputDF = outputRDD.toDF("tweetAsJsonString")
        // get some time fields from current `.Date()`
        val year = (new java.text.SimpleDateFormat("yyyy")).format(new java.util.Date())
        val month = (new java.text.SimpleDateFormat("MM")).format(new java.util.Date())
        val day = (new java.text.SimpleDateFormat("dd")).format(new java.util.Date())
        val hour = (new java.text.SimpleDateFormat("HH")).format(new java.util.Date())
        // write to a file with a clear time-based hierarchical directory structure for example
        outputDF.write.mode(SaveMode.Append)
                .parquet(outputDirectoryRoot+ "/"+ year + "/" + month + "/" + day + "/" + hour + "/" + time.milliseconds) 
        // end of writing as parquet file-------------------------------------
        numTweetsCollected += count // update with the latest count
      }
  })
  newContextCreated = true
  ssc
}
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
outputDirectoryRoot: String = /datasets/tweetsStreamTmp
batchInterval: Int = 1
timeoutJobLength: Int = 5
newContextCreated: Boolean = false
numTweetsCollected: Long = 0
streamFunc: ()org.apache.spark.streaming.StreamingContext
// Now just use the function to create a Spark Streaming Context
val ssc = StreamingContext.getActiveOrCreate(streamFunc)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@400b6153
// you only need one of these to start
ssc.start()
//ssc.awaitTerminationOrTimeout(timeoutJobLength)
// this will make sure all streaming job in the cluster are stopped
// but let' run it for a few minutes before stopping it
//StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } 
display(dbutils.fs.ls("/datasets/tweetsStreamTmp/2020/11/20/10")) // outputDirectoryRoot
path name size
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605867660000/ 1605867660000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605867720000/ 1605867720000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605867780000/ 1605867780000/ 0.0

Tweet Transmission Tree Tables

Next, let us take a quick peek at the notebook ./025_b_TTTDFfunctions to see how we have pipelined the JSON tweets into tabular or structured data as DataFrames.

Please see http://lamastex.org/lmse/mep/src/TweetAnatomyAndTransmissionTree.html to understand more deeply.

Note that the fundamental issue here is that we need to define what we exactly mean by a particular type of status update, i.e.:

  • How do we categorize a particular status update, based on the data contained in it, as one of the following?
    • Original Tweet
    • Retweet
    • Quoted tweet
    • retweet of a Quoted Tweet
    • etc.

Answers to the above question are exactly answered by the methods in ./025_b_TTTDFfunctions.

"./025_b_TTTDFfunctions"
USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.ColumnName
import org.apache.spark.sql.DataFrame
fromParquetFile2DF: (InputDFAsParquetFilePatternString: String)org.apache.spark.sql.DataFrame
tweetsJsonStringDF2TweetsDF: (tweetsAsJsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsIDLong_JsonStringPairDF2TweetsDF: (tweetsAsIDLong_JsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDF: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHashtags: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHashtagsLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
val rawDF = fromParquetFile2DF("/datasets/tweetsStreamTmp/2020/11/*/*/*/*") //.cache()
val TTTsDF = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(rawDF)).cache()
rawDF: org.apache.spark.sql.DataFrame = [tweetAsJsonString: string]
TTTsDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 35 more fields]
TTTsDF.count()
res34: Long = 14686
//display(TTTsDF)  // output not displayed to comply with Twitter Developer rules
TTTsDF.printSchema
root
 |-- CurrentTweetDate: timestamp (nullable = true)
 |-- CurrentTwID: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- CreationDateOfOrgTwInRT: timestamp (nullable = true)
 |-- OriginalTwIDinRT: long (nullable = true)
 |-- CreationDateOfOrgTwInQT: timestamp (nullable = true)
 |-- OriginalTwIDinQT: long (nullable = true)
 |-- OriginalTwIDinReply: long (nullable = true)
 |-- CPostUserId: long (nullable = true)
 |-- userCreatedAtDate: timestamp (nullable = true)
 |-- OPostUserIdinRT: long (nullable = true)
 |-- OPostUserIdinQT: long (nullable = true)
 |-- OPostUserIdinReply: long (nullable = true)
 |-- CPostUserName: string (nullable = true)
 |-- OPostUserNameinRT: string (nullable = true)
 |-- OPostUserNameinQT: string (nullable = true)
 |-- CPostUserSN: string (nullable = true)
 |-- OPostUserSNinRT: string (nullable = true)
 |-- OPostUserSNinQT: string (nullable = true)
 |-- OPostUserSNinReply: string (nullable = true)
 |-- favouritesCount: long (nullable = true)
 |-- followersCount: long (nullable = true)
 |-- friendsCount: long (nullable = true)
 |-- isVerified: boolean (nullable = true)
 |-- isGeoEnabled: boolean (nullable = true)
 |-- CurrentTweet: string (nullable = true)
 |-- UMentionRTiD: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- UMentionRTsN: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- UMentionQTiD: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- UMentionQTsN: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- UMentionASiD: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- UMentionASsN: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- TweetType: string (nullable = false)
 |-- MentionType: string (nullable = false)
 |-- Weight: long (nullable = false)
display(TTTsDF.groupBy($"tweetType").count().orderBy($"count".desc))
tweetType count
ReTweet 5956.0
Reply Tweet 4187.0
Original Tweet 3326.0
Quoted Tweet 754.0
Retweet of Quoted Tweet 416.0
Reply of Quoted Tweet 47.0
// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) } 
// this will delete what we collected to keep the disk usage tight and tidy
dbutils.fs.rm(outputDirectoryRoot, true) 
res40: Boolean = true

Writing to public clouds

Next, let's write the tweets into a scalable commercial cloud storage system in AWS (similarly into Azure blobstore, Google cloud storage, etc ).

We will make sure to write the tweets to AWS's simple storage service or S3, a scalable storage system in the cloud. See https://aws.amazon.com/s3/.

skip this if you don't have an AWS account.

But all the main syntactic bits are here for your future convenience :)

// Replace with your AWS S3 credentials
//
// NOTE: Set the access to this notebook appropriately to protect the security of your keys.
// Or you can delete this cell after you run the mount command below once successfully.

val AccessKey = getArgument("1. ACCESS_KEY", "REPLACE_WITH_YOUR_ACCESS_KEY")
val SecretKey = getArgument("2. SECRET_KEY", "REPLACE_WITH_YOUR_SECRET_KEY")
val EncodedSecretKey = SecretKey.replace("/", "%2F")
val AwsBucketName = getArgument("3. S3_BUCKET", "REPLACE_WITH_YOUR_S3_BUCKET")
val MountName = getArgument("4. MNT_NAME", "REPLACE_WITH_YOUR_MOUNT_NAME")
val s3Filename = "tweetDump"

Now just mount s3 as follows:

dbutils.fs.mount(s"s3a://$AccessKey:$EncodedSecretKey@$AwsBucketName", s"/mnt/$MountName")

Now you can use the dbutils commands freely to access data in the mounted S3.

dbutils.fs.help()

copying:

// to copy all the tweets to s3
dbutils.fs.cp("dbfs:/rawTweets",s"/mnt/$MountName/rawTweetsInS3/",recurse=true) 

deleting:

// to remove all the files from s3
dbutils.fs.rm(s"/mnt/$MountName/rawTweetsInS3",recurse=true) 

unmounting:

// finally unmount when done - IMPORTANT!
dbutils.fs.unmount(s"/mnt/$MountName") 

ScaDaMaLe Course site and book

Tweet Streaming Collector - Track & Follow

In the previous notebook we were capturing tweets from the public streams under the assumption that it is a random sample of roughly 1% of all tweets.

In this notebook, we can modify the collector to focus on specific communications of interest to us. Specifically, by including:

  • a list of strings to track and
  • a list of twitter user-IDs of interest to follow.

For this we will first %run the ExtendedTwitterUtils and TTTDFfunctions notebooks.

"./025_a_extendedTwitterUtils2run"
"./025_b_TTTDFfunctions"
import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.

defined class ExtendedTwitterReceiver
defined class ExtendedTwitterInputDStream
// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) } 
USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.ColumnName
import org.apache.spark.sql.DataFrame
fromParquetFile2DF: (InputDFAsParquetFilePatternString: String)org.apache.spark.sql.DataFrame
tweetsJsonStringDF2TweetsDF: (tweetsAsJsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsIDLong_JsonStringPairDF2TweetsDF: (tweetsAsIDLong_JsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDF: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHashtags: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
defined object ExtendedTwitterUtils
done running the extendedTwitterUtils2run notebook - ready to stream from twitter
tweetsDF2TTTDFLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHashtagsLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame

Load your twitter credentials (secretly!).

Enter your Twitter API Credentials.

  • Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
  • Run the code in a cell to Enter your own credentials.
// put your own twitter developer credentials below instead of xxx
// instead of the '%run ".../secrets/026_secret_MyTwitterOAuthCredentials"' below
// you need to copy-paste the following code-block with your own Twitter credentials replacing XXXX


// put your own twitter developer credentials below 

import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder


// These have been regenerated!!! - need to chane them

def myAPIKey       = "XXXX" // APIKey 
def myAPISecret    = "XXXX" // APISecretKey
def myAccessToken          = "XXXX" // AccessToken
def myAccessTokenSecret    = "XXXX" // AccessTokenSecret


System.setProperty("twitter4j.oauth.consumerKey", myAPIKey)
System.setProperty("twitter4j.oauth.consumerSecret", myAPISecret)
System.setProperty("twitter4j.oauth.accessToken", myAccessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", myAccessTokenSecret)

println("twitter OAuth Credentials loaded")

The cell-below will not expose my Twitter API Credentials: myAPIKey, myAPISecret, myAccessToken and myAccessTokenSecret. Use the code above to enter your own credentials in a scala cell.

"Users/raazesh.sainudiin@math.uu.se/scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"

Using Twitter REST API

Next we import and instantiate for Twitter REST API, which allows us to obtain data from Twitter that is not in the live stream but in Twitter's storage layers containing archives of historcial events (including past status updates).

// SOME IMPORTTS
import scala.collection.mutable.ArrayBuffer
import twitter4j._
import twitter4j.conf._
import scala.collection.JavaConverters._ 

import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType, StructField, StringType};
import twitter4j.RateLimitStatus;
import twitter4j.ResponseList;
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.google.gson.Gson
import org.apache.spark.sql.DataFrame

val cb = new ConfigurationBuilder()       

val twitter = {
  val c = new ConfigurationBuilder
    c.setDebugEnabled(false)
    .setOAuthConsumerKey(myAPIKey)
    .setOAuthConsumerSecret(myAPISecret)
    .setOAuthAccessToken(myAccessToken)
    .setOAuthAccessTokenSecret(myAccessTokenSecret);

  new TwitterFactory(c.build()).getInstance()
}
import scala.collection.mutable.ArrayBuffer
import twitter4j._
import twitter4j.conf._
import scala.collection.JavaConverters._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import twitter4j.RateLimitStatus
import twitter4j.ResponseList
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.google.gson.Gson
import org.apache.spark.sql.DataFrame
cb: twitter4j.conf.ConfigurationBuilder = twitter4j.conf.ConfigurationBuilder@4559b2a7
twitter: twitter4j.Twitter = TwitterImpl{INCLUDE_MY_RETWEET=HttpParameter{name='include_my_retweet', value='true', jsonObject=null, file=null, fileBody=null}}

Testing REST API

Let's quickly test that the REST API calls can be made.

twitter.showUser("@raazozone").getId() // quick test that REST API works - should get 4173723312
res4: Long = 4173723312
twitter.showUser("@realDonaldTrump").getId() // quick test that REST API works - should get 25073877
res5: Long = 25073877
twitter.showUser("@WASP_Research").getId() // quick test that REST API works - should get ?
res6: Long = 1124265687755755520

Creating ScreenNames of Interest

Let's import a list of twitterIDS of interest to us...

val screenNamesOfInterest = List("raazozone","realDonaldTrump","WASP_Research") // could be done from a large list of up to 4,000 or so accounts
screenNamesOfInterest: List[String] = List(raazozone, realDonaldTrump, WASP_Research)
def lookupUserSNs(Retweeterids:Seq[String])={
  val grouped=Retweeterids.grouped(100).toList 
  for {group<-grouped  
       users=twitter.lookupUsers(group:_*)
       user<-users.asScala 
   } yield user     
}// we loose some suspended accounts...

def lookupUsers(Retweeterids:Seq[Long])={
  val grouped=Retweeterids.grouped(100).toList 
  for {group<-grouped  
       users=twitter.lookupUsers(group:_*)
       user<-users.asScala 
   } yield user     
}// we loose some suspended accounts...
lookupUserSNs: (Retweeterids: Seq[String])List[twitter4j.User]
lookupUsers: (Retweeterids: Seq[Long])List[twitter4j.User]
val twitterUsersOfInterest = lookupUserSNs(screenNamesOfInterest) // not displayed
println(twitterUsersOfInterest.size, screenNamesOfInterest.size) // we could lose users due to suspended accounts etc...
(3,3)
// just get their IDs
val seedIDs = twitterUsersOfInterest.map(u => u.getId()).toSet.toSeq.filter(_.isValidLong) // just get the IDs of the seed users who are valid
seedIDs: Seq[Long] = Vector(4173723312, 25073877, 1124265687755755520)

Extending streamFunc to Track & Follow

Now, let's extend our function to be able to track and follow.

import com.google.gson.Gson 
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory
val batchInterval = 1 // in minutes
val timeoutJobLength =  batchInterval * 5

var newContextCreated = false
var numTweetsCollected = 0L // track number of tweets collected
//val conf = new SparkConf().setAppName("TrackedTweetCollector").setMaster("local")
// This is the function that creates the SteamingContext and sets up the Spark Streaming job.
def streamFunc(): StreamingContext = {
  // Create a Spark Streaming Context.
  val ssc = new StreamingContext(sc, Minutes(batchInterval))
  // Create the OAuth Twitter credentials 
  val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
  
  val track = List("WASP rules!", "#MakeDataGreatAgain","sds-3-x rules!")//, "Hi")// just added for some live tests
  //val track = List.empty // if you do not want to track by any string
  
  val follow = seedIDs // who to follow in Twitter
  //val follow = List.empty // if you do not want to folow any specific twitter user
  
  // Create a Twitter Stream for the input source.  
  val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth, track, follow)
  // Transform the discrete RDDs into JSON
  val twitterStreamJson = twitterStream.map(x => { val gson = new Gson();
                                                 val xJson = gson.toJson(x)
                                                 xJson
                                               }) 
  // take care
  val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream.
  
  // what we want done with each discrete RDD tuple: (rdd, time)
  twitterStreamJson.foreachRDD((rdd, time) => { // for each filtered RDD in the DStream
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
        // to write to parquet directly in append mode in one directory per 'time'------------       
        val outputDF = outputRDD.toDF("tweetAsJsonString")
        // get some time fields from current `.Date()`
        val year = (new java.text.SimpleDateFormat("yyyy")).format(new java.util.Date())
        val month = (new java.text.SimpleDateFormat("MM")).format(new java.util.Date())
        val day = (new java.text.SimpleDateFormat("dd")).format(new java.util.Date())
        val hour = (new java.text.SimpleDateFormat("HH")).format(new java.util.Date())
        // write to a file with a clear time-based hierarchical directory structure for example
        outputDF.write.mode(SaveMode.Append)
                .parquet(outputDirectoryRoot+ "/"+ year + "/" + month + "/" + day + "/" + hour + "/" + time.milliseconds) 
        // end of writing as parquet file-------------------------------------
        numTweetsCollected += count // update with the latest count
      }
  })
  newContextCreated = true
  ssc
}
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
outputDirectoryRoot: String = /datasets/tweetsStreamTmp
batchInterval: Int = 1
timeoutJobLength: Int = 5
newContextCreated: Boolean = false
numTweetsCollected: Long = 0
streamFunc: ()org.apache.spark.streaming.StreamingContext
twitter OAuth Credentials loaded
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
myAPIKey: String
myAPISecret: String
myAccessToken: String
myAccessTokenSecret: String

Collect, Verify and Explore

Let us collect data, store it and then explore it to see how our new experimental design changes to the Tweet collector actually performs.

You should ideally be doing live Tweets using your Twitter accounts as you are doing the experiment so you can confirm that your collector actually captures what you inted to. For example, retweet a userID in the list of userIDs you are following to check if this retweet ends up in your collector as expected. You can also tweet string of interest in the list of strings you are tracking.

val ssc = StreamingContext.getActiveOrCreate(streamFunc)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@54fb3d50
ssc.start()
//ssc.awaitTerminationOrTimeout(timeoutJobLength) // you only need one of these to start
display(dbutils.fs.ls("/datasets/tweetsStreamTmp/2020/11/20/10/"))
path name size
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605868860000/ 1605868860000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605868920000/ 1605868920000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605868980000/ 1605868980000/ 0.0
display(dbutils.fs.ls(outputDirectoryRoot+"/2020/11/20/10/1605868860000/")) // keep adding sub-dirs and descent into time-tree'd directory hierarchy
path name size
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605868860000/_SUCCESS _SUCCESS 0.0
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605868860000/_committed_7377493726042826738 _committed_7377493726042826738 125.0
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605868860000/_started_7377493726042826738 _started_7377493726042826738 0.0
dbfs:/datasets/tweetsStreamTmp/2020/11/20/10/1605868860000/part-00000-tid-7377493726042826738-363f9903-0ada-41ff-92fd-c87b1349727d-6412-1-c000.snappy.parquet part-00000-tid-7377493726042826738-363f9903-0ada-41ff-92fd-c87b1349727d-6412-1-c000.snappy.parquet 95863.0
val rawDF = fromParquetFile2DF(outputDirectoryRoot+"/2020/11/*/*/*/*") //.cache()
rawDF.count
rawDF: org.apache.spark.sql.DataFrame = [tweetAsJsonString: string]
res15: Long = 437
val TTTsDF = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(rawDF)).cache()

Collect for a few minutes so all fields are availabale in the Tweets... otherwise you will get errors like this (which may be unavidable if what you are tracking has no geoLocation information for example, only a small fraction of Tweets have this information):

"org.apache.spark.sql.AnalysisException: cannot resolve 'geoLocation.latitude' given input columns: [contributorsIDs, createdAt, currentUserRetweetId, displayTextRangeEnd, displayTextRangeStart, favoriteCount, hashtagEntities, id, inReplyToScreenName, inReplyToStatusId, inReplyToUserId, isFavorited, isPossiblySensitive, isRetweeted, isTruncated, lang, mediaEntities, place, quotedStatus, quotedStatusId, quotedStatusPermalink, retweetCount, retweetedStatus, source, symbolEntities, text, urlEntities, user, userMentionEntities, withheldInCountries];;"

We can parse more robustly... but let's go and modify the function so it does not look for the missing fields in these tweeets...

NOTE In certain experiments all the user-IDs being tracked may not have enabled geoLocation information. In such cases, you can use the tweetsDF2TTTDFLightWeight to obtain the TTTDF from the raw tweets.

"./025_b_TTTDFfunctions"
val TTTsDF = tweetsDF2TTTDFLightWeight(tweetsJsonStringDF2TweetsDF(rawDF)).cache()
TTTsDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 33 more fields]
display(TTTsDF)  // output not displayed to comply with Twitter Developer rules
// this will make sure all streaming job in the cluster are stopped 
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } 
USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.ColumnName
import org.apache.spark.sql.DataFrame
fromParquetFile2DF: (InputDFAsParquetFilePatternString: String)org.apache.spark.sql.DataFrame
tweetsJsonStringDF2TweetsDF: (tweetsAsJsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsIDLong_JsonStringPairDF2TweetsDF: (tweetsAsIDLong_JsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDF: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHashtags: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
val a = TTTsDF.filter($"CurrentTweet" contains "WASP rules!")//.collect()
a: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 33 more fields]
tweetsDF2TTTDFLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHashtagsLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
display(a)
val b = TTTsDF.filter($"CurrentTweet" contains "#MakeDataGreatAgain")//.collect()
display(b) // output not displayed to comply with Twitter Developer rules
// this will make sure all streaming job in the cluster are stopped - raaz
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } 
// this will delete what we collected to keep the disk usage tight and tidy
dbutils.fs.rm(outputDirectoryRoot, true) 
res25: Boolean = true

ScaDaMaLe Course site and book

Twitter Hashtag Count

Using Twitter Streaming is a great way to learn Spark Streaming if you don't have your streaming datasource and want a great rich input dataset to try Spark Streaming transformations on.

In this example, we show how to calculate the top hashtags seen in the last X window of time every Y time unit.

Top Hashtags in 4 Easy Steps

We will now show quickly how to compute the top hashtags in a few easy steps after running some utility functions and importing needed libraries.

"./025_a_extendedTwitterUtils2run"
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._


import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder

Step 1: Enter your Twitter API Credentials.

  • Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
  • Run the code in a cell to Enter your own credentials.
// put your own twitter developer credentials below instead of xxx
// instead of the '%run ".../secrets/026_secret_MyTwitterOAuthCredentials"' below
// you need to copy-paste the following code-block with your own Twitter credentials replacing XXXX


// put your own twitter developer credentials below 

import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder


// These have been regenerated!!! - need to chane them

def myAPIKey       = "XXXX" // APIKey 
def myAPISecret    = "XXXX" // APISecretKey
def myAccessToken          = "XXXX" // AccessToken
def myAccessTokenSecret    = "XXXX" // AccessTokenSecret


System.setProperty("twitter4j.oauth.consumerKey", myAPIKey)
System.setProperty("twitter4j.oauth.consumerSecret", myAPISecret)
System.setProperty("twitter4j.oauth.accessToken", myAccessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", myAccessTokenSecret)

println("twitter OAuth Credentials loaded")

The cell-below will not expose my Twitter API Credentials: myAPIKey, myAPISecret, myAccessToken and myAccessTokenSecret. Use the code above to enter your own credentials in a scala cell.

import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
"Users/raazesh.sainudiin@math.uu.se/scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"

Step 2: Configure where to output the top hashtags and how often to compute them.

val outputDirectory = "/datasets/tweetsStreamTmp" // output directory

//Recompute the top hashtags every N seconds. N=1
val slideInterval = new Duration(10 * 1000) // 1000 milliseconds is 1 second!

//Compute the top hashtags for the last M seconds. M=5
val windowLength = new Duration(30 * 1000)

// Wait W seconds before stopping the streaming job. W=100
val timeoutJobLength = 20 * 1000
outputDirectory: String = /datasets/tweetsStreamTmp
slideInterval: org.apache.spark.streaming.Duration = 10000 ms
windowLength: org.apache.spark.streaming.Duration = 30000 ms
timeoutJobLength: Int = 20000

Step 3: Run the Twitter Streaming job.

Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.

// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) } 

Clean up any old files.

dbutils.fs.mkdirs("dbfs:/datasets/tweetsStreamTmp/")
res3: Boolean = true
display(dbutils.fs.ls("dbfs:/datasets/tweetsStreamTmp/"))

Let us write the function that creates the Streaming Context and sets up the streaming job.

var newContextCreated = false
var num = 0

// This is a helper class used for ordering by the second value in a (String, Int) tuple
import scala.math.Ordering
object SecondValueOrdering extends Ordering[(String, Int)] {
  def compare(a: (String, Int), b: (String, Int)) = {
    a._2 compare b._2
  }
}

// This is the function that creates the SteamingContext and sets up the Spark Streaming job.
def creatingFunc(): StreamingContext = {
  // Create a Spark Streaming Context.
  val ssc = new StreamingContext(sc, slideInterval)
  // Create a Twitter Stream for the input source. 
  val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
  val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
  
  // Parse the tweets and gather the hashTags.
  val hashTagStream = twitterStream.map(_.getText).flatMap(_.split(" ")).filter(_.startsWith("#"))
  
  // Compute the counts of each hashtag by window.
  // reduceByKey on a window of length windowLength
  // Once this is computed, slide the window by slideInterval and calculate reduceByKey again for the second window
  val windowedhashTagCountStream = hashTagStream.map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, windowLength, slideInterval)

  // For each window, calculate the top hashtags for that time period.
  windowedhashTagCountStream.foreachRDD(hashTagCountRDD => {
    val topEndpoints = hashTagCountRDD.top(20)(SecondValueOrdering)
    dbutils.fs.put(s"${outputDirectory}/top_hashtags_${num}", topEndpoints.mkString("\n"), true)
    println(s"------ TOP HASHTAGS For window ${num}")
    println(topEndpoints.mkString("\n"))
    num = num + 1
  })
  
  newContextCreated = true
  ssc
}
newContextCreated: Boolean = false
num: Int = 0
import scala.math.Ordering
defined object SecondValueOrdering
creatingFunc: ()org.apache.spark.streaming.StreamingContext

Create the StreamingContext using getActiveOrCreate, as required when starting a streaming job in Databricks.

val ssc = StreamingContext.getActiveOrCreate(creatingFunc)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@3961e605

Start the Spark Streaming Context and return when the Streaming job exits or return with the specified timeout.

ssc.start()
ssc.awaitTerminationOrTimeout(timeoutJobLength)
ssc.stop(stopSparkContext = false)
Wrote 498 bytes.
------ TOP HASHTAGS For window 0
(#1212SHOPEESTRAYKIDS
MAU,8)
(#PS5,2)
(#LifeGoesOnWithBTS,2)
(#BTS_BE,2)
(#아미트친소,1)
(#rechtsterrorismus,1)
(#EXO,1)
(#Pourlamesse,,1)
(#MeralAkşener,1)
(#뷔,1)
(#赤いヨーグリーナ発売記念
#この冬の体調管理に​

体調管理をして風邪など引かない様にしなきゃ😤,1)
(#SoundCloud,1)
(#weareoneEXO
#开
#음…,1)
(#kilicdaroglu,1)
(#Pride2020,1)
(#icisleribakanlığı,1)
(#Vialli?,1)
(#Cuma,1)
(#アリス・マーガトロイド,1)
(#FridayVibes,1)
Wrote 423 bytes.
------ TOP HASHTAGS For window 1
(#1212SHOPEESTRAYKIDS
MAU,8)
(#Monster 
MONSTER,7)
(#FridayLivestream

SB19,4)
(#BTS_BE,3)
(#LifeGoesOnWithBTS,3)
(#EXO,2)
(#Unlock_GOLIVEINLIFE,2)
(#PS5,2)
(#redvelvet,2)
(#SUNGHOON,1)
(#omniscient_reader,1)
(#小学5年生より賢いの,1)
(#takasbükücüxhanthos
#GLYHO,1)
(#FCKNZS,1)
(#DAMI,1)
(#CHOICE,1)
(#fahrettinkoca,1)
(#FridayThoughts,1)
(#Lausanne.,1)
(#트레저
@treasuremembers

https://t.co/lo6QhtLCBB,1)

Check out the Clusters Streaming UI as the job is running.

It should automatically stop the streaming job after timeoutJobLength.

If not, then stop any active Streaming Contexts, but don't stop the spark contexts they are attached to using the following command.

StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }

Step 4: View the Results.

display(dbutils.fs.ls(outputDirectory))
path name size
dbfs:/datasets/tweetsStreamTmp/top_hashtags_0 top_hashtags_0 498.0
dbfs:/datasets/tweetsStreamTmp/top_hashtags_1 top_hashtags_1 423.0

There should be 100 intervals for each second and the top hashtags for each of them should be in the file top_hashtags_N for N in 0,1,2,...,99 and the top hashtags should be based on the past 5 seconds window.

dbutils.fs.head(s"${outputDirectory}/top_hashtags_0")
res8: String =
(#1212SHOPEESTRAYKIDS
MAU,8)
(#PS5,2)
(#LifeGoesOnWithBTS,2)
(#BTS_BE,2)
(#아미트친소,1)
(#rechtsterrorismus,1)
(#EXO,1)
(#Pourlamesse,,1)
(#MeralAkşener,1)
(#뷔,1)
(#赤いヨーグリーナ発売記念
#この冬の体調管理に​

体調管理をして風邪など引かない様にしなきゃ😤,1)
(#SoundCloud,1)
(#weareoneEXO
#开
#음…,1)
(#kilicdaroglu,1)
(#Pride2020,1)
(#icisleribakanlığı,1)
(#Vialli?,1)
(#Cuma,1)
(#アリス・マーガトロイド,1)
(#FridayVibes,1)
defined class ExtendedTwitterReceiver
defined class ExtendedTwitterInputDStream
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
defined object ExtendedTwitterUtils
twitter OAuth Credentials loaded
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
myAPIKey: String
myAPISecret: String
myAccessToken: String
myAccessTokenSecret: String
done running the extendedTwitterUtils2run notebook - ready to stream from twitter

Let's brainstorm

What could you do with this type of streaming capability?

  • marketing?
  • pharmaceutical vigilance?
  • linking twitter activity to mass media activity?
  • data quality and integrity measures...

Note that there are various Spark Streaming ML algorithms that one could easily throw at such reduceByKeyAndWindow tweet streams:

Student Project or Volunteer for next Meetup - let's check it out now:

HOME-WORK:

Responsible Experiments

Extracting knowledge from tweets is "easy" using techniques shown here, but one has to take responsibility for the use of this knowledge and conform to the rules and policies linked below.

Remeber that the use of twitter itself comes with various strings attached. Read:

Crucially, the use of the content from twitter by you (as done in this worksheet) comes with some strings. Read: - Developer Agreement & Policy Twitter Developer Agreement

ScaDaMaLe Course site and book

Twitter Streaming Language Classifier

This is a databricksification of https://databricks.gitbooks.io/databricks-spark-reference-applications/content/twitter_classifier/index.html by Amendra Shreshta.

Note that you need to change the fields in background notebook 025_a_extendedTwitterUtils2run so more fields for lang in Tweets are exposed. This is a good example of how one has to go deeper into the java code of twitter4j as new needs arise.

"./025_c_extendedTwitterUtils2runWithLangs"
import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
defined class ExtendedTwitterReceiver
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._

import scala.math.Ordering

import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import scala.math.Ordering
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
defined class ExtendedTwitterInputDStream

Enter your Twitter API Credentials. * Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them. * Run the code in a cell to Enter your own credentials.

// put your own twitter developer credentials below instead of xxx
// instead of the '%run ".../secrets/026_secret_MyTwitterOAuthCredentials"' below
// you need to copy-paste the following code-block with your own Twitter credentials replacing XXXX


// put your own twitter developer credentials below 

import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder


// These have been regenerated!!! - need to chane them

def myAPIKey       = "XXXX" // APIKey 
def myAPISecret    = "XXXX" // APISecretKey
def myAccessToken          = "XXXX" // AccessToken
def myAccessTokenSecret    = "XXXX" // AccessTokenSecret


System.setProperty("twitter4j.oauth.consumerKey", myAPIKey)
System.setProperty("twitter4j.oauth.consumerSecret", myAPISecret)
System.setProperty("twitter4j.oauth.accessToken", myAccessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", myAccessTokenSecret)

println("twitter OAuth Credentials loaded")

The cell-below will not expose my Twitter API Credentials: myAPIKey, myAPISecret, myAccessToken and myAccessTokenSecret. Use the code above to enter your own credentials in a scala cell.

import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
defined object ExtendedTwitterUtils
done running the extendedTwitterUtils2run notebook - ready to stream from twitter by filtering on strings, users and language
"Users/raazesh.sainudiin@math.uu.se/scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"
twitter OAuth Credentials loaded
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
myAPIKey: String
myAPISecret: String
myAccessToken: String
myAccessTokenSecret: String

Step 1. Collect Data

Start downloading tweets in order to start building a simple model for language classification.

// ## Let's create a directory in dbfs for storing tweets in the cluster's distributed file system.
val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory
outputDirectoryRoot: String = /datasets/tweetsStreamTmp
// to remove a pre-existing directory and start from scratch uncomment next line and evaluate this cell
dbutils.fs.rm(outputDirectoryRoot, true) 
res2: Boolean = true
// ## Capture tweets in every sliding window of slideInterval many milliseconds.
val slideInterval = new Duration(1 * 1000) // 1 * 1000 = 1000 milli-seconds = 1 sec
slideInterval: org.apache.spark.streaming.Duration = 1000 ms
// Our goal is to take each RDD in the twitter DStream and write it as a json file in our dbfs.
// Create a Spark Streaming Context.
val ssc = new StreamingContext(sc, slideInterval)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@466128fb
// Create a Twitter Stream for the input source. 
val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
auth: Some[twitter4j.auth.OAuthAuthorization] = Some(OAuthAuthorization{consumerKey='8uN0N9RTLT1viaR811yyG7xwk', consumerSecret='******************************************', oauthToken=AccessToken{screenName='null', userId=4173723312}})
twitterStream: org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status] = ExtendedTwitterInputDStream@206a254d
// Let's import google's json library next.
import com.google.gson.Gson 
//Let's map the tweets into json formatted string (one tweet per line).
val twitterStreamJson = twitterStream.map(
                                            x => { val gson = new Gson();
                                                 val xJson = gson.toJson(x)
                                                 xJson
                                                 }
                                          ) 
import com.google.gson.Gson
twitterStreamJson: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@20405bce
val partitionsEachInterval = 1 

val batchInterval = 1 // in minutes
val timeoutJobLength =  batchInterval * 5

var newContextCreated = false
var numTweetsCollected = 0L // track number of tweets collected

twitterStreamJson.foreachRDD((rdd, time) => { // for each filtered RDD in the DStream
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
        // to write to parquet directly in append mode in one directory per 'time'------------       
        val outputDF = outputRDD.toDF("tweetAsJsonString")
        // get some time fields from current `.Date()`
        val year = (new java.text.SimpleDateFormat("yyyy")).format(new java.util.Date())
        val month = (new java.text.SimpleDateFormat("MM")).format(new java.util.Date())
        val day = (new java.text.SimpleDateFormat("dd")).format(new java.util.Date())
        val hour = (new java.text.SimpleDateFormat("HH")).format(new java.util.Date())
        // write to a file with a clear time-based hierarchical directory structure for example
        outputDF.write.mode(SaveMode.Append)
                .parquet(outputDirectoryRoot+ "/"+ year + "/" + month + "/" + day + "/" + hour + "/" + time.milliseconds) 
        // end of writing as parquet file-------------------------------------
        numTweetsCollected += count // update with the latest count
      }
  })
partitionsEachInterval: Int = 1
batchInterval: Int = 1
timeoutJobLength: Int = 5
newContextCreated: Boolean = false
numTweetsCollected: Long = 0
// ## Let's start the spark streaming context we have created next.
ssc.start()
// total tweets downloaded
numTweetsCollected
res14: Long = 6936
// ## Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.
// #  let's stop the streaming job next.
ssc.stop(stopSparkContext = false) 
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } 
"./025_b_TTTDFfunctions"
// #Let's examine what was saved in dbfs
display(dbutils.fs.ls(outputDirectoryRoot))
path name size
dbfs:/datasets/tweetsStreamTmp/2020/ 2020/ 0.0
// Replace the date with current date
val date = "/2020/11/*"
val rawDF = fromParquetFile2DF(outputDirectoryRoot + date +"/*/*") //.cache()
date: String = /2020/11/*
rawDF: org.apache.spark.sql.DataFrame = [tweetAsJsonString: string]
val TTTsDF = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(rawDF)).cache()
TTTsDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 35 more fields]
// Creating SQL table 
TTTsDF.createOrReplaceTempView("tbl_tweet")

Step 3. Build Model

Let us use the structured data in tbl_tweet to build a simple classifier of the language using K-means.

sqlContext.sql("SELECT lang, CPostUserName, CurrentTweet FROM tbl_tweet LIMIT 10").collect.foreach(println)
[ja,古泉幕僚総長,#hour_14#あ、あの…ここで…は、ちょっと…んっ…!]
[ja,もしよき,それをよしとする空間もわりぃよ結局。だから声が大きいやつがのさばるんだよ]
[ja,雨山,あさひちゃん表情かわいすぎぃ]
[en,HourGMT5,It's November 20, 2020 at 06:01AM!]
[fr,𝘈 ₇,Meghan kali et bts ont drop leurs albums 😳😳]
[en,Mika,Another hour! It's November 20, 2020 at 08:01PM]
[ja,ベリー,RT @Sdan_sanrio: ずっと楽しくサンリオ男子していきたいです!!みんなも一緒にサンリオ活しましょう~!康太 #サンリオ男子5周年 https://t.co/Wnel4BWKAR]
[pt,TRIGGERED LOLI,@ItachinMr Opa como tá]
[th,ING. ไม่มูฟออนจากคาโอชิน,แล้วด้อมดันกันนี่ คนชิปฆาตกรกับเหยื่อเยอะมากนะ อ้ย ทุกคน😭😭😭😭]
[tr,𝐛𝐞 𝐭𝐡𝐚𝐧𝐤𝐟𝐮𝐥,@soudadesky hadi gelin]
// Checking the language of tweets
sqlContext.sql(
    "SELECT lang, COUNT(*) as cnt FROM tbl_tweet " +
    "GROUP BY lang ORDER BY cnt DESC limit 1000")
    .collect.foreach(println)
[ja,1985]
[en,1868]
[und,532]
[th,467]
[ko,338]
[in,325]
[es,281]
[pt,280]
[ar,279]
[tr,200]
[fr,160]
[tl,109]
[ru,57]
[pl,45]
[de,42]
[hi,41]
[fa,31]
[it,31]
[nl,23]
[ur,22]
[et,20]
[zh,18]
[ht,11]
[el,9]
[ta,9]
[iw,8]
[ro,6]
[cs,6]
[eu,5]
[lt,5]
[ca,5]
[da,4]
[sr,4]
[gu,4]
[no,4]
[is,3]
[uk,3]
[vi,3]
[sl,3]
[fi,3]
[te,2]
[sv,2]
[cy,2]
[bg,2]
[mr,1]
[or,1]
[sd,1]
[kn,1]
[km,1]
[bn,1]
[ne,1]
// extracting just tweets from the table and converting it to String
val texts = sqlContext
      .sql("SELECT CurrentTweet from tbl_tweet")
      .map(_.toString)
texts: org.apache.spark.sql.Dataset[String] = [value: string]
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}

Featurize as bigrams

Create feature vectors by turning each tweet into bigrams of characters (an n-gram model) and then hashing those to a length-1000 feature vector that we can pass to MLlib.

def featurize(s: String): Vector = {
  val n = 1000
  val result = new Array[Double](n)
  val bigrams = s.sliding(2).toArray
  for (h <- bigrams.map(_.hashCode % n)) {
    result(h) += 1.0 / bigrams.length
  }
  Vectors.sparse(n, result.zipWithIndex.filter(_._1 != 0).map(_.swap))
}
featurize: (s: String)org.apache.spark.mllib.linalg.Vector
//Cache the vectors RDD since it will be used for all the KMeans iterations.
val vectors = texts.rdd
      .map(featurize)
      .cache()
vectors: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[3608] at map at command-2972105651607107:3
// cache is lazy so count will force the data to store in memory
vectors.count()
res22: Long = 7264
vectors.first()
res23: org.apache.spark.mllib.linalg.Vector = (1000,[189,227,263,313,335,344,351,358,372,382,389,411,439,500,501,529,558,565,568,571,629,647,658,741,804,856,872,892,994],[0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.03333333333333333,0.06666666666666667,0.03333333333333333,0.03333333333333333])

K-Means model trained with 10 clusters and 10 iterations.

// Training model with 10 cluster and 10 iteration
val model = KMeans.train(vectors, k=10, maxIterations = 10)
model: org.apache.spark.mllib.clustering.KMeansModel = org.apache.spark.mllib.clustering.KMeansModel@61f49262
// Sample 100 of the original set
val some_tweets = texts.take(100)
some_tweets: Array[String] =
Array([#hour_14#あ、あの…ここで…は、ちょっと…んっ…!], [それをよしとする空間もわりぃよ結局。だから声が大きいやつがのさばるんだよ], [あさひちゃん表情かわいすぎぃ], [It's November 20, 2020 at 06:01AM!], [Meghan kali et bts ont drop leurs albums 😳😳], [Another hour! It's November 20, 2020 at 08:01PM], [RT @Sdan_sanrio: ずっと楽しくサンリオ男子していきたいです!!みんなも一緒にサンリオ活しましょう~!康太 #サンリオ男子5周年 https://t.co/Wnel4BWKAR], [@ItachinMr Opa como tá], [แล้วด้อมดันกันนี่ คนชิปฆาตกรกับเหยื่อเยอะมากนะ อ้ย ทุกคน😭😭😭😭], [@soudadesky hadi gelin], [RT @p8h33: 5000 آلاف ريال 💸💸💸💸💸
1000 💸
1000 💸
1000 💸
1000 💸
1000 💸
السحب غدا ان شاء الله من الرتويت موثق], [RT @doktongchangyed: นับถือไอดอลหญิงมากอะ ถ้าปวดท้องเมนส์ก็ยังต้องยิ้มแย้ม แอคทีฟงานปกติ เพื่อนกุปวดท้องเมนส์คือโทรมาร้องไห้ โทรมาบ่นกับกู…], [RT @selahattingrkn: Hoş geldik evet çünkü hoş’a geldik.

Biz hizmet üretmeye, gönüllere dokunmaya geldik.

Biz bu şehrin her karış toprağın…], [@OmarNassar01 💙💙❤💙], [1週間がんばり女だったのにテストで休み2日潰されるのキレそうだし殺す  しね…………………………], [RT @ministop_fan: \フォロー&RTで当たる/

本日から新発売★
その名も「世界のライスぼうや」
世界の食べ物をちまきでアレンジしたミミ!
ちょこっとお腹がすいた時にもおすすめだYO

フォロー&RTしてくれた方の中から抽選で
1,000名さまに鶏五目ちまき無…], [@Kero_kero_pad ただいまー!(≧∇≦)], [@pradverse Mera no. ni aane wala ab😂😭], [voltar a assistir spn só pra saber o final], [RT @aztvresmi: Azərbaycan Ordusunun bölmələri Ağdam rayonuna daxil olub ✊🇦🇿

Müdafiə Nazirliyi https://t.co/OGXPqdoalB], [「コロナラン」「ノンコロン」くらいまでなら薬事法OKかな。 #小林製薬 #もしも小林製薬がコロナワクチンを作ったら], [RT @_G3WP3R: เคี้ยวงั้ม ๆ แต่มันร้อน หนูก็สู้เคี้ยวไม่หยุด เอ็นดูววว https://t.co/ibjFPqHBCM], [RT @mkodchannel: 相方の誕生日にヒカキンさん
@hikakin プレゼントしてみた🎁🎉
https://t.co/47YXJxeaNp https://t.co/fjsNWCCUI9], [RT @istopeverything: ทุกคนรู้มั้ยคะว่าตอนนี้มีผู้ป่วยด้วยโรคไตเรื้อรังระยะสุดท้ายรักษาไม่หายจำนวนเท่าไหร่ แล้วที่มีการโปรโมทข้าวกล่องแช่แข็…], [RT @TheDeverakonda: I told you it comes with full approval ❤️

Good opportunity to do something together with your entire family - get tog…], [RT @merlinx26: ก็ดีใจด้วยที่เธอมูฟออนได้ แต่เรายังว่ะ], [@Sarashina_BB277 なんてえちえちな尻なんだ], [@steph_sunandar HAHAHA gue ud ep 7 tp gue tim han ji pyeong bgt sun], [RT @anuxaaaaa: oi mudei o cabelo 💋 https://t.co/u3KKQ8rhAS], [RT @OficialSala12: Flamengo precisando de gol:

Michael: https://t.co/FdEyuUbqzZ], [RT @c_emre11: https://t.co/JijOPNwfJH], [RT @bts_bighit: [#오늘의방탄] 어느 날 세상이 멈췄어..
방탄을 본 그 순간부터..💜
아미와 함께해서 즐거웠던 Live Goes On!
아미들에게도 즐거운 시간이었길🥰

#인터내셔널팝케이센세이션썬샤인레인보우트레디셔널트랜스퍼USB허브쉬림…], [It’s possible you'll fall head over heels for a new lover or f... More for Cancer https://t.co/y5ZArDaO0u], [@uwith_b อาจจะไปร้านนั้นแทน อย่า จวพ เลย รำคาญแอดมิน 55555555555555555555], [RT @telugufilmnagar: All Smiles ❤️❤️
@urstrulymahesh shares a lovely picture wishing #ShilpaShirodkar on her birthday!

#MaheshBabu #Namrat…], [RT @parkxminxmin: 아 긋냥 해요 ~ >▽<
https://t.co/9NjzwRziRP

#BTS #JIMIN #지민 @BTS_twt https://t.co/TYBgv3lmAP], [RT @tarouame96: 次男がプラレールに餌をやっていた。 https://t.co/gSmqJRrd7i], [Украина испугалась подрыва мифа об ОУН-УПА и дивизии СС «Галичина».  https://t.co/8VHbtT8edk], [RT @Anime_ABEMA: 🏫 #よつば音楽学院
第2回の配信が決定!👏

📣 11月27日(金) よる10時30分〜

今回はゲスト講師👨‍🏫として
「 #KENTHE390 」 (@KENTHE390 )
を迎えヒップホップ講義‼️

音楽をテーマにしたABEMA…], [@jam_cauilan Opo ate], [free hunter icons
https://t.co/tAZ5loOKbZ], [@omanai_on اجلس في بيتك وما رايح تسمع شيء، أما مازلت تبحث عن الأخبار ستسمع ما يسرك وما يزعجك وعليك أن تتحمل], [RT @pineapplebreads: Here, I summarized the #Supernatural finale so you don't have to watch it https://t.co/LJ9jDHFbSm], [RT @WinterBbang: No one:
Literally no one:
Bit and Hangyul: (ability of thhe other members that you wanna have)
DOHYON'S HEIGHT https://t.…], [RT @WaseemPatriot: #SaveChildrenOfIIOJK
During this phase of the armed conflict, the scale of the violence made documentation difficult. A…], [RT @Olanmoveforward: #EndSARS
#EndNorthBanditry
#EndSarsNow
#EndBadGovernmentinNIGERIA
#LekkiMassacre
#LekkiTollGateShooting
#EndPoliceB…], [@thatginamiller https://t.co/uQTIWSzELG], [RT @deryloops: his reaction 😭😭 https://t.co/aDAGeYOuOf], [RT @Promise_Royalz: @AlbumTalksHQ @yemialadee https://t.co/e8mwpyQmK2], [RT @bleksip: 2020 masih takut mencari pertolongan untuk masalah kesehatan mentalmu?

Emang ngapain sih kalau ke psikiater?
Apa akan disetru…], [RT @JorgeMoruno: Al margen de que su abuelo era un nazi, el mengele español, me ha recordado a esto. Cuando los millonarios empiezan de cer…], [Aku giniin sendiri kok s*nge yaa, apakah ada yg salah (?) wkwk], [RT @SNOOPYZ3US: jungwoo preview for vogue

a thread: https://t.co/v7xiKXvsTG], [Vraiment 🤣🤣🤣🤣 même mon entourage ne va pas me croire 🤣🤣], [RT @aespasmiling: music bank interview!
#aespa #에스파 https://t.co/TSmezo1OX3], [Nigerian politicians], [mass indoctrination day24
#PureDoctrineOfChrist], [RT @Primacaque: เราก็ไม่เคยคิดจะทำแท้ง แต่ก็สนับสนุนทำแท้งเสรีค่า

เหล้าก็ไม่ชอบกิน มันไม่อร่อย แต่ก็ไม่สนับสนุนการผูกขาดของนายทุน

เป็นเซ็…], [Coronavirus vaccine: Moderna completes phase 3 of vaccine study https://t.co/YAylsaf8uq via @YouTube], [미프프리스톤의 과격한 낙태약반대론자들의 각종 위협에 시달렸던 임신여성들은 보다 은밀하게 낙태문제를 해결할 수 있게 됐다. https://t.co/fcG9s71RR3 미페프리스톤은 수정란이 자궁에 접착할 수 없도록 방해하는약품으로 자궁수축 미프진인 미소프로스톨과 함께 사용], [Was it not a few weeks ago that the TL was up in arms and ready to protect young girlhood after that awful video surfaced?

Why am I now seeing justifications for a grown men sleeping with teenagers?], [consegui dormir e acordar cedo pela primeira vez em meses e tô exatamente assim agora https://t.co/ATcfUH8CK9], [サンプル画像👉https://t.co/HyTgIpFsF0 https://t.co/gjCnoludHB], [ᏴᎬ : 𝐋𝐢𝐟𝐞 𝐠𝐨𝐞𝐬 𝐨𝐧, 𝐋𝐞𝐭’𝐬 𝐥𝐢𝐯𝐞 𝐨𝐧.
X 𝐔𝐍 𝐆𝐞𝐧𝐞𝐫𝐚𝐥 𝐀𝐬𝐬𝐞𝐦𝐛𝐥𝐲

선입금 특전안내

[ 𝐔𝐍 연설 스티커]
기존 3종에서 디자인 변경 후 8종 제공

✔️ 8종류 각 1장씩, 총 8장
✔️ 1인 1세트 (1개당 1세트 X)

🔗 추가 및 구매 링크
https://t.co/NUpv3GZqpc https://t.co/7WW7kNKjFQ], [Soluciones prueba 8: JK 🐰 https://t.co/jXVheWj8YO], [@Pkhetarpal1 @junkie_for_news @ANI Madar chod], [Weitere angekündigte Features für #Twitter um die Plattform konkurrenzfähig zu halten:

-In Zukunft nur noch quadratische Fotos
-Tweets verschwinden nach einmaligem Ansehen
-Verifikationshaken wechselt Farbe je nach Followerzahl
-Videotweets (max 12s) ab 2021 standard], [日刊ポケモンソードシールド 5日目-1
エンジンシティ到着からジムチャレンジ申込まで

今作のライバルとなるアノ人が登場
ジムチャレンジ開会式はこの街のスタジアムで行われるけれど、バトルに挑戦できるのは2つのバッジを手に入れたあと
いわゆるミアレシティポジションの街ですね
#ポケモン剣盾 https://t.co/x8nT9lG88V], [“In the light of the appalling abuse of human rights perpetrated on the people of Hong Kong, surely Essex should consider closing down all relationship with China?"

https://t.co/8SRDUUvDPn], [·
     🐸かえるモチーフの女の子🐸
· https://t.co/ZV0gosDPji], [うつくし… https://t.co/OxqUo4l98T], [⚽️ #PORRALEGA | Acierta resultado y goleadores pepineros del #MálagaLeganés y entra en el sorteo de una mascarilla oficial 😷. ¡Suerte a todos! https://t.co/qrEHXyjs4w], [Woman miraculously survives being shoved onto train tracks by crazed NYC subway rider https://t.co/xt80mV7VQr], [Sevimli miniklerle eğlence devam ediyor!

#ÇocuktanAlHaberi yeni bölümüyle yarın 17.00'de Show TV'de! @cocuktnalshowtv https://t.co/jdZZ6f8gix], [❤🥺], [\📢セミナー告知/
10月にもご登壇いただいた橋元恵一氏(@atjam_hashimoto)がカムバック!

社会人の方だけでなく、
エンタメ業界を志望している学生さん必見!
是非奮ってご参加ください!

詳細/お申込みはコチラ↓
 https://t.co/BXnn2YScdF
#エンターズ #エンタメ #就活  #就活支援 #22卒 #21卒 https://t.co/lnJvxwExzA], [135日目①  画像提供:むくろさん( @2m5k2 )
侑ちゃん
#ラブライブ #lovelive #虹ヶ咲 #ニジガクアニメ #ラブライブ壁紙遊戯 #高咲侑 https://t.co/CqkUUaZB1Q], [<朝活どうでしょう~解答編~>
I_1側の抵抗の値が分からないので、まずはI_2を計算します
次にI_2とR_2から電圧V_2を求めます

電験三種の直流回路では、このように数回の計算を経て解にたどり着ける問題が多いです。詰将棋を解くように先を見据えて計算ができる力をつけていきましょう!

#電験朝活 https://t.co/RFYRwrxjUS], [=TEAM 51% VOTING=

ONCEs! Here's the mass voting schedule. We are in need of more voters. Encourage more ONCEs in different SNS and close the gap!

Please follow your regional fanbase and @billboard_twice for more information.

@JYPETWICE #TWICE #MAMAVOTE #51_Percent https://t.co/ZklwziCFXo], [How does Coronavirus compare to Ebola  SARS  etc  https://t.co/AT484Gd9kc https://t.co/klm1KUQQC7], [Arhiepiscopia Târgoviștei anunță acțiuni caritabile de 1 milion de lei în preajma Crăciunului.

https://t.co/o9QwTPhoua https://t.co/lIJS9iHk4W], [Это комплимент, тк я мало кому доверяю], [#どうぶつ #ねこ

なになに♪なになに♪

(´・ω・`)っ💕 https://t.co/yVSos0wW8d], [On 21 November 1920, Michael Feery and Jerome O'Leary attended a football match in Croke Park. They never came home. Michael and Jerome were two of 14 people who were killed on Bloody Sunday.

#B100dySunday - the GAA remembers.

Learn more at https://t.co/0YyrIyygwQ https://t.co/ufmb7aZk9B], [信頼している人に言われたことを、愚直に、ただ素直にやり続ける。
簡単なことのようで、難しいものです。

「勝利を引き寄せる実践的成功法」とは。
https://t.co/i9pTilGJxk
#YouTube #実践 #成功 #動画編集 #動画制作 #動画 https://t.co/wQYljzmi3E], [En sus últimos lanzamientos le podíamos ver de la mano de @omarmontesSr o @lolaindigomusic, pero ahora @Rvfvrxiz reclama protagonismo en solitario con nuevo material bajo el brazo.

Descúbrelo en https://t.co/4qpCcEh0rw https://t.co/SW6eylT1mR], [@Nefie26 中腰めちゃ可愛い(*´艸`*)], [Register https://t.co/EhcY9eoAUb https://t.co/AbgyyqMErV], [Playing chess… and a few other things ✌️ https://t.co/iEOqPaysO1], [@miinyan_hanabi おつかれ!], [TIC y educación en Tweeted Times @lmggr https://t.co/4u0k8Q7DKG], [@dc_maru_ 何枚買うか悩み中でごわす], [RT @narendramodi: Neutralising of 4 terrorists belonging to Pakistan-based terrorist organisation Jaish-e-Mohammed and the presence of larg…], [no #Gears5], [@NestT4ku あんがと!寝る😴], [Allahım sen utandırma🤲 gönlümden geçeni hakkımda hayırlı eyle bu son olsun en güzeli olsun🌸], [RT @ATEEZofficial: [#윤호] 자연과 함께💐
#ATEEZ #에이티즈 https://t.co/EivLdPVLit], [@_itsbunnie @GivingAnimeboo doneee], [RT @IHK_FFM_INT: Die AHK #Peru 🇵🇪 lädt Sie zu den 3. Deutsch-Peruanischen Wirtschaftstagen unter dem Motto "Gemeinsame Ziele vereinen" ein.…], [RT @LeireOlmeda: C's dice que la M-50 la pague Rivas, por rojos o algo así. Y el PP que palmaditas en la espalda las que hagan falta, pero…])
// iterate through the 100 samples and show which cluster they are in
for (i <- 0 until 10) {
  println(s"\nCLUSTER $i:")
  some_tweets.foreach { t =>
    if (model.predict(featurize(t)) == i) {
      println(t)
    }
  }
}
CLUSTER 0:
[RT @mkodchannel: 相方の誕生日にヒカキンさん
@hikakin プレゼントしてみた🎁🎉
https://t.co/47YXJxeaNp https://t.co/fjsNWCCUI9]
[RT @anuxaaaaa: oi mudei o cabelo 💋 https://t.co/u3KKQ8rhAS]
[RT @c_emre11: https://t.co/JijOPNwfJH]
[RT @parkxminxmin: 아 긋냥 해요 ~ >▽<
https://t.co/9NjzwRziRP

#BTS #JIMIN #지민 @BTS_twt https://t.co/TYBgv3lmAP]
[RT @tarouame96: 次男がプラレールに餌をやっていた。 https://t.co/gSmqJRrd7i]
[free hunter icons
https://t.co/tAZ5loOKbZ]
[@thatginamiller https://t.co/uQTIWSzELG]
[RT @deryloops: his reaction 😭😭 https://t.co/aDAGeYOuOf]
[RT @Promise_Royalz: @AlbumTalksHQ @yemialadee https://t.co/e8mwpyQmK2]
[サンプル画像👉https://t.co/HyTgIpFsF0 https://t.co/gjCnoludHB]
[Soluciones prueba 8: JK 🐰 https://t.co/jXVheWj8YO]
[·
     🐸かえるモチーフの女の子🐸
· https://t.co/ZV0gosDPji]
[うつくし… https://t.co/OxqUo4l98T]
[How does Coronavirus compare to Ebola  SARS  etc  https://t.co/AT484Gd9kc https://t.co/klm1KUQQC7]
[#どうぶつ #ねこ

なになに♪なになに♪

(´・ω・`)っ💕 https://t.co/yVSos0wW8d]
[信頼している人に言われたことを、愚直に、ただ素直にやり続ける。
簡単なことのようで、難しいものです。

「勝利を引き寄せる実践的成功法」とは。
https://t.co/i9pTilGJxk
#YouTube #実践 #成功 #動画編集 #動画制作 #動画 https://t.co/wQYljzmi3E]
[Register https://t.co/EhcY9eoAUb https://t.co/AbgyyqMErV]
[RT @ATEEZofficial: [#윤호] 자연과 함께💐
#ATEEZ #에이티즈 https://t.co/EivLdPVLit]

CLUSTER 1:
[It's November 20, 2020 at 06:01AM!]
[Meghan kali et bts ont drop leurs albums 😳😳]
[Another hour! It's November 20, 2020 at 08:01PM]
[RT @TheDeverakonda: I told you it comes with full approval ❤️ 

Good opportunity to do something together with your entire family - get tog…]
[@steph_sunandar HAHAHA gue ud ep 7 tp gue tim han ji pyeong bgt sun]
[RT @telugufilmnagar: All Smiles ❤️❤️
@urstrulymahesh shares a lovely picture wishing #ShilpaShirodkar on her birthday!

#MaheshBabu #Namrat…]
[RT @WinterBbang: No one:
Literally no one:
Bit and Hangyul: (ability of thhe other members that you wanna have) 
DOHYON'S HEIGHT https://t.…]
[RT @WaseemPatriot: #SaveChildrenOfIIOJK 
During this phase of the armed conflict, the scale of the violence made documentation difficult. A…]
[mass indoctrination day24
#PureDoctrineOfChrist]
[Was it not a few weeks ago that the TL was up in arms and ready to protect young girlhood after that awful video surfaced?

Why am I now seeing justifications for a grown men sleeping with teenagers?]
[Weitere angekündigte Features für #Twitter um die Plattform konkurrenzfähig zu halten: 

-In Zukunft nur noch quadratische Fotos
-Tweets verschwinden nach einmaligem Ansehen
-Verifikationshaken wechselt Farbe je nach Followerzahl
-Videotweets (max 12s) ab 2021 standard]
[“In the light of the appalling abuse of human rights perpetrated on the people of Hong Kong, surely Essex should consider closing down all relationship with China?"

https://t.co/8SRDUUvDPn]
[=TEAM 51% VOTING=

ONCEs! Here's the mass voting schedule. We are in need of more voters. Encourage more ONCEs in different SNS and close the gap! 

Please follow your regional fanbase and @billboard_twice for more information.

@JYPETWICE #TWICE #MAMAVOTE #51_Percent https://t.co/ZklwziCFXo]
[RT @narendramodi: Neutralising of 4 terrorists belonging to Pakistan-based terrorist organisation Jaish-e-Mohammed and the presence of larg…]
[RT @IHK_FFM_INT: Die AHK #Peru 🇵🇪 lädt Sie zu den 3. Deutsch-Peruanischen Wirtschaftstagen unter dem Motto "Gemeinsame Ziele vereinen" ein.…]

CLUSTER 2:
[#hour_14#あ、あの…ここで…は、ちょっと…んっ…!]
[それをよしとする空間もわりぃよ結局。だから声が大きいやつがのさばるんだよ]
[あさひちゃん表情かわいすぎぃ]
[RT @Sdan_sanrio: ずっと楽しくサンリオ男子していきたいです!!みんなも一緒にサンリオ活しましょう~!康太 #サンリオ男子5周年 https://t.co/Wnel4BWKAR]
[แล้วด้อมดันกันนี่ คนชิปฆาตกรกับเหยื่อเยอะมากนะ อ้ย ทุกคน😭😭😭😭]
[RT @p8h33: 5000 آلاف ريال 💸💸💸💸💸
1000 💸
1000 💸
1000 💸
1000 💸
1000 💸
السحب غدا ان شاء الله من الرتويت موثق]
[RT @doktongchangyed: นับถือไอดอลหญิงมากอะ ถ้าปวดท้องเมนส์ก็ยังต้องยิ้มแย้ม แอคทีฟงานปกติ เพื่อนกุปวดท้องเมนส์คือโทรมาร้องไห้ โทรมาบ่นกับกู…]
[1週間がんばり女だったのにテストで休み2日潰されるのキレそうだし殺す  しね…………………………]
[RT @ministop_fan: \フォロー&RTで当たる/

本日から新発売★
その名も「世界のライスぼうや」
世界の食べ物をちまきでアレンジしたミミ!
ちょこっとお腹がすいた時にもおすすめだYO

フォロー&RTしてくれた方の中から抽選で
1,000名さまに鶏五目ちまき無…]
[「コロナラン」「ノンコロン」くらいまでなら薬事法OKかな。 #小林製薬 #もしも小林製薬がコロナワクチンを作ったら]
[RT @_G3WP3R: เคี้ยวงั้ม ๆ แต่มันร้อน หนูก็สู้เคี้ยวไม่หยุด เอ็นดูววว https://t.co/ibjFPqHBCM]
[RT @istopeverything: ทุกคนรู้มั้ยคะว่าตอนนี้มีผู้ป่วยด้วยโรคไตเรื้อรังระยะสุดท้ายรักษาไม่หายจำนวนเท่าไหร่ แล้วที่มีการโปรโมทข้าวกล่องแช่แข็…]
[RT @merlinx26: ก็ดีใจด้วยที่เธอมูฟออนได้ แต่เรายังว่ะ]
[RT @bts_bighit: [#오늘의방탄] 어느 날 세상이 멈췄어..
방탄을 본 그 순간부터..💜
아미와 함께해서 즐거웠던 Live Goes On!
아미들에게도 즐거운 시간이었길🥰

#인터내셔널팝케이센세이션썬샤인레인보우트레디셔널트랜스퍼USB허브쉬림…]
[Украина испугалась подрыва мифа об ОУН-УПА и дивизии СС «Галичина».  https://t.co/8VHbtT8edk]
[RT @Anime_ABEMA: 🏫 #よつば音楽学院 
第2回の配信が決定!👏

📣 11月27日(金) よる10時30分〜

今回はゲスト講師👨‍🏫として
「 #KENTHE390 」 (@KENTHE390 ) 
を迎えヒップホップ講義‼️

音楽をテーマにしたABEMA…]
[@omanai_on اجلس في بيتك وما رايح تسمع شيء، أما مازلت تبحث عن الأخبار ستسمع ما يسرك وما يزعجك وعليك أن تتحمل]
[RT @Olanmoveforward: #EndSARS 
#EndNorthBanditry 
#EndSarsNow 
#EndBadGovernmentinNIGERIA
#LekkiMassacre
#LekkiTollGateShooting
#EndPoliceB…]
[RT @Primacaque: เราก็ไม่เคยคิดจะทำแท้ง แต่ก็สนับสนุนทำแท้งเสรีค่า

เหล้าก็ไม่ชอบกิน มันไม่อร่อย แต่ก็ไม่สนับสนุนการผูกขาดของนายทุน

เป็นเซ็…]
[미프프리스톤의 과격한 낙태약반대론자들의 각종 위협에 시달렸던 임신여성들은 보다 은밀하게 낙태문제를 해결할 수 있게 됐다. https://t.co/fcG9s71RR3 미페프리스톤은 수정란이 자궁에 접착할 수 없도록 방해하는약품으로 자궁수축 미프진인 미소프로스톨과 함께 사용]
[ᏴᎬ : 𝐋𝐢𝐟𝐞 𝐠𝐨𝐞𝐬 𝐨𝐧, 𝐋𝐞𝐭’𝐬 𝐥𝐢𝐯𝐞 𝐨𝐧. 
X 𝐔𝐍 𝐆𝐞𝐧𝐞𝐫𝐚𝐥 𝐀𝐬𝐬𝐞𝐦𝐛𝐥𝐲 

선입금 특전안내

[ 𝐔𝐍 연설 스티커]
기존 3종에서 디자인 변경 후 8종 제공 

✔️ 8종류 각 1장씩, 총 8장 
✔️ 1인 1세트 (1개당 1세트 X) 

🔗 추가 및 구매 링크
https://t.co/NUpv3GZqpc https://t.co/7WW7kNKjFQ]
[日刊ポケモンソードシールド 5日目-1
エンジンシティ到着からジムチャレンジ申込まで

今作のライバルとなるアノ人が登場
ジムチャレンジ開会式はこの街のスタジアムで行われるけれど、バトルに挑戦できるのは2つのバッジを手に入れたあと
いわゆるミアレシティポジションの街ですね
#ポケモン剣盾 https://t.co/x8nT9lG88V]
[❤🥺]
[\📢セミナー告知/
10月にもご登壇いただいた橋元恵一氏(@atjam_hashimoto)がカムバック!

社会人の方だけでなく、
エンタメ業界を志望している学生さん必見!
是非奮ってご参加ください!

詳細/お申込みはコチラ↓
 https://t.co/BXnn2YScdF
#エンターズ #エンタメ #就活  #就活支援 #22卒 #21卒 https://t.co/lnJvxwExzA]
[135日目①  画像提供:むくろさん( @2m5k2 )
侑ちゃん
#ラブライブ #lovelive #虹ヶ咲 #ニジガクアニメ #ラブライブ壁紙遊戯 #高咲侑 https://t.co/CqkUUaZB1Q]
[<朝活どうでしょう~解答編~>
I_1側の抵抗の値が分からないので、まずはI_2を計算します
次にI_2とR_2から電圧V_2を求めます

電験三種の直流回路では、このように数回の計算を経て解にたどり着ける問題が多いです。詰将棋を解くように先を見据えて計算ができる力をつけていきましょう!

#電験朝活 https://t.co/RFYRwrxjUS]
[Это комплимент, тк я мало кому доверяю]

CLUSTER 3:
[@soudadesky hadi gelin]
[RT @selahattingrkn: Hoş geldik evet çünkü hoş’a geldik.

Biz hizmet üretmeye, gönüllere dokunmaya geldik.

Biz bu şehrin her karış toprağın…]
[@OmarNassar01 💙💙❤💙]
[@Kero_kero_pad ただいまー!(≧∇≦)]
[@Sarashina_BB277 なんてえちえちな尻なんだ]
[@jam_cauilan Opo ate]
[Nigerian politicians]
[@Pkhetarpal1 @junkie_for_news @ANI Madar chod]
[@Nefie26 中腰めちゃ可愛い(*´艸`*)]
[@miinyan_hanabi おつかれ!]
[@dc_maru_ 何枚買うか悩み中でごわす]
[@NestT4ku あんがと!寝る😴]
[@_itsbunnie @GivingAnimeboo doneee]

CLUSTER 4:
[@ItachinMr Opa como tá]
[@pradverse Mera no. ni aane wala ab😂😭]
[voltar a assistir spn só pra saber o final]
[RT @bleksip: 2020 masih takut mencari pertolongan untuk masalah kesehatan mentalmu?

Emang ngapain sih kalau ke psikiater?
Apa akan disetru…]
[RT @JorgeMoruno: Al margen de que su abuelo era un nazi, el mengele español, me ha recordado a esto. Cuando los millonarios empiezan de cer…]
[Aku giniin sendiri kok s*nge yaa, apakah ada yg salah (?) wkwk]
[Vraiment 🤣🤣🤣🤣 même mon entourage ne va pas me croire 🤣🤣]
[consegui dormir e acordar cedo pela primeira vez em meses e tô exatamente assim agora https://t.co/ATcfUH8CK9]
[⚽️ #PORRALEGA | Acierta resultado y goleadores pepineros del #MálagaLeganés y entra en el sorteo de una mascarilla oficial 😷. ¡Suerte a todos! https://t.co/qrEHXyjs4w]
[En sus últimos lanzamientos le podíamos ver de la mano de @omarmontesSr o @lolaindigomusic, pero ahora @Rvfvrxiz reclama protagonismo en solitario con nuevo material bajo el brazo.

Descúbrelo en https://t.co/4qpCcEh0rw https://t.co/SW6eylT1mR]
[no #Gears5]
[Allahım sen utandırma🤲 gönlümden geçeni hakkımda hayırlı eyle bu son olsun en güzeli olsun🌸]
[RT @LeireOlmeda: C's dice que la M-50 la pague Rivas, por rojos o algo así. Y el PP que palmaditas en la espalda las que hagan falta, pero…]

CLUSTER 5:
[@uwith_b อาจจะไปร้านนั้นแทน อย่า จวพ เลย รำคาญแอดมิน 55555555555555555555]

CLUSTER 6:
[RT @aztvresmi: Azərbaycan Ordusunun bölmələri Ağdam rayonuna daxil olub ✊🇦🇿

Müdafiə Nazirliyi https://t.co/OGXPqdoalB]
[RT @OficialSala12: Flamengo precisando de gol:

Michael: https://t.co/FdEyuUbqzZ]
[It’s possible you'll fall head over heels for a new lover or f... More for Cancer https://t.co/y5ZArDaO0u]
[RT @pineapplebreads: Here, I summarized the #Supernatural finale so you don't have to watch it https://t.co/LJ9jDHFbSm]
[RT @SNOOPYZ3US: jungwoo preview for vogue

a thread: https://t.co/v7xiKXvsTG]
[RT @aespasmiling: music bank interview! 
#aespa #에스파 https://t.co/TSmezo1OX3]
[Coronavirus vaccine: Moderna completes phase 3 of vaccine study https://t.co/YAylsaf8uq via @YouTube]
[Woman miraculously survives being shoved onto train tracks by crazed NYC subway rider https://t.co/xt80mV7VQr]
[Sevimli miniklerle eğlence devam ediyor!

#ÇocuktanAlHaberi yeni bölümüyle yarın 17.00'de Show TV'de! @cocuktnalshowtv https://t.co/jdZZ6f8gix]
[Arhiepiscopia Târgoviștei anunță acțiuni caritabile de 1 milion de lei în preajma Crăciunului.

https://t.co/o9QwTPhoua https://t.co/lIJS9iHk4W]
[On 21 November 1920, Michael Feery and Jerome O'Leary attended a football match in Croke Park. They never came home. Michael and Jerome were two of 14 people who were killed on Bloody Sunday.

#B100dySunday - the GAA remembers.

Learn more at https://t.co/0YyrIyygwQ https://t.co/ufmb7aZk9B]
[Playing chess… and a few other things ✌️ https://t.co/iEOqPaysO1]
[TIC y educación en Tweeted Times @lmggr https://t.co/4u0k8Q7DKG]

CLUSTER 7:

CLUSTER 8:

CLUSTER 9:
dbutils.fs.ls("/datasets/model/")
res25: Seq[com.databricks.backend.daemon.dbutils.FileInfo] = WrappedArray(FileInfo(dbfs:/datasets/model/_SUCCESS, _SUCCESS, 0), FileInfo(dbfs:/datasets/model/part-00000, part-00000, 8266), FileInfo(dbfs:/datasets/model/part-00001, part-00001, 8266), FileInfo(dbfs:/datasets/model/part-00002, part-00002, 8266), FileInfo(dbfs:/datasets/model/part-00003, part-00003, 16282), FileInfo(dbfs:/datasets/model/part-00004, part-00004, 8266), FileInfo(dbfs:/datasets/model/part-00005, part-00005, 8266), FileInfo(dbfs:/datasets/model/part-00006, part-00006, 8266), FileInfo(dbfs:/datasets/model/part-00007, part-00007, 16282))
// to remove a pre-existing model and start from scratch
dbutils.fs.rm("/datasets/model", true) 
res26: Boolean = true
// save the model
sc.makeRDD(model.clusterCenters).saveAsObjectFile("/datasets/model")
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.clustering.KMeansModel
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.clustering.KMeansModel
USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.ColumnName
import org.apache.spark.sql.DataFrame
fromParquetFile2DF: (InputDFAsParquetFilePatternString: String)org.apache.spark.sql.DataFrame
tweetsJsonStringDF2TweetsDF: (tweetsAsJsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsIDLong_JsonStringPairDF2TweetsDF: (tweetsAsIDLong_JsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDF: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHashtags: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
// Checking if the model works
val clusterNumber = 5

val modelFile = "/datasets/model"

val model: KMeansModel = new KMeansModel(sc.objectFile[Vector](modelFile).collect)
model.predict(featurize("واحد صاحبى لو حد يعرف اكونت وزير التعليم ")) == clusterNumber
clusterNumber: Int = 5
modelFile: String = /datasets/model
model: org.apache.spark.mllib.clustering.KMeansModel = org.apache.spark.mllib.clustering.KMeansModel@45bb929a
res28: Boolean = false
tweetsDF2TTTDFLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
model.predict(featurize("ご参加ありがとうございます❣")) == 2
res29: Boolean = true
model.predict(featurize("واحد صاحبى لو حد يعرف اكونت وزير التعليم ")) == 2
res30: Boolean = true

Loading model and printing tweets that matched the desired cluster.

var newContextCreated = false
var num = 0

// Create a Spark Streaming Context.
@transient val ssc = new StreamingContext(sc, slideInterval)
// Create a Twitter Stream for the input source. 
@transient val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
@transient val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)

//Replace the cluster number as you desire between 0 to 9
val clusterNumber = 2

//model location
val modelFile = "/datasets/model"

// Get tweets from twitter
val Tweet = twitterStream.map(_.getText)
//Tweet.print()

println("Initalizaing the the KMeans model...")
val model: KMeansModel = new KMeansModel(sc.objectFile[Vector](modelFile).collect)

//printing tweets that match our choosen cluster
Tweet.foreachRDD(rdd => {  
rdd.collect().foreach(i =>
    {
       val record = i
       if (model.predict(featurize(record)) == clusterNumber) {
       println(record)
    }
    })
})

// Start the streaming computation
println("Initialization complete.")
ssc.start()
ssc.awaitTermination()
Initalizaing the the KMeans model...
Initialization complete.
せえな氏…
RT @chieri_kakyoin: ((( _( _'ω')_ ((( _( _'ω')_ コソコソ
ああドレワン早く見たい~~~~~~~~
美味しい焼肉が食べたい(´・∀・`)!
RT @1_yanny: Feliz Viernes🍀🌻
Activos👇
#InnovaciónYSoberaníaTecnológica
@ElizTorrres @kerny70
@Puerto462 @BaenaDegas @luiscarrillo66 @jdmend…
يارب تخليلي ♥️♥️.
RT @crn_ru: Вкалывают роботы? Каковы перспективы рынка цифровых аватаров в России https://t.co/1AXuN336dY Цифровые аватары успешно «перерос…
RT @FilipeRet: B O M   D I A   !
#نحن_نلهم_العالم برؤيتنا الطموحة، وقيادتنا الحكيمة، وهمة أبناء وبنات الوطن. #مجموعة_العشرين #G20
コロナの1日あたりの感染者、仮に1日二億人だったとしても、医療機関がそれを捌き切る事が出来るなら問題ないし、仮に1日に1人だったとしても、医療機関が捌き切れないなら、それは医療崩壊なわけで、人数だけドーーーーン発表するのは話題作りというか、それは「報道」と違うぞ、という感じ。
RT @mechocola: ทีม Gen.Gเค้าโควทตอบน้องแบคน่ารักจัง ไม่รู้ถูกไหมนะ กดแปลภาษาอะ

สั่นเพราะเกม ... หรือสั่นเพราะการสนับสนุนที่อบอุ่นของแบคฮยอ…
@yuu294 美人さんだからなってます
していいのか?
@FzWWCPMYu1k0EKr 今、まさに
同じ格好でいますー🤭
RT @gsushma55: फिर हुआ उनका ज़िक्र .. 
फिर हुआ मन उदास .. 

फिर याद आया वो बातों का सिलसिला ..
फिर हुई आँखें नम .. !! 

#Sush!
@BTS_History613 @BTS_twt YES
@euyuule 公式がこの解釈なので安心して不埒な妄想しよう(?)
RT @Rmlove09127: "7시3분기준 22,117,000회 13분기준 22,753,094회 10분동안 조회수 636,094회 늘었고 뮤비 나온지는 5시간15분이 되었습니다"
23시간내로 1억뷰는 이 추이대로라면 조금 힘들지만 내일은 주말이고…
RT @meniichigo: 残り2分‼️

参加してない人急いで参加💨
RT @ArmyYourselfBR: 📰| Todas as músicas do álbum ‘BE’ estão no Top 30 do iTunes Brasil. 

#1 - #LifeGoesOn  
#5 - Blue & Grey
#6 - Fly to M…
RT @BFidr: トランプ大統領は70%以上の得票率で大勝し、400人以上の選挙人票を勝ち取っていたと確信している——リン・ウッド弁護士がラジオ番組で語る

https://t.co/8ChsUGqX4M
RT @Hyuga_aipara: 瑠唯さん、ハッピーバースデー!!

#八潮瑠唯生誕祭2020 
#八潮瑠唯誕生祭2020 
#バンドリーマーさんと繋がりたい https://t.co/biygw0ExzT
みんなからの匿名質問を募集中!

こんな質問に答えてるよ
● 一目惚れってしたことある?…
● 彼氏いがいの男性と二人きりでド…
● 皆さんはどこからが浮気と感じま…
● 彼氏と大体いつも3ヶ月で別れる…
#質問箱 #匿名質問募集中

https://t.co/dssTEuSiGL
RT @DONNYSQUADPH: Trust in You...                   amidst the   
                                            darkness.

#WalkWithYouMV htt…
@shashikant88857 हरे रंग को खुश करने में तुमने #भगवा को नीलाम किया..... 🙄🙄
 अरे #सियासी_कलमुहो ये बहुत घिनोना काम किया..!!!

 खत्म करनी तुम्हारी नौटंकी है  😡😡😡
#भगवा मेरी जान है !!!!

#भारत_माँगे_हिन्दू_राष्ट्र

#भारत_माँगे_हिन्दू_राष्ट्र 

#जय_जय_श्री__राम 
🚩🚩🚩🚩🚩
RT @monsterzmate: 【新作動画投稿】

📺ドキッ!男だらけの体力測定 ポロリもあるかも!アレ?あったっけ...?あれ?僕は誰?...君の名は? 後編📺

🔻🔻フルはこちら🔻🔻
https://t.co/Hl3N5ByO4Y

舞元さん( @maimoto_k )…
RT @KharkhariAmit: अनिल जी बेशर्मी @RahulGandhi से सीखी है क्या? हजारों हिन्दू पूजा स्थलों को नष्ठ करने वाला,हजारों हिंदुओ का जबरदस्ती धर्म…
RT @bluesherbet_: เราเคยเขียนเธรดยาวๆ เกี่ยวกับวิกฤตIMF ที่เกาหลีต้องผลักดัน KPOP เป็นสินค้าออกทั้งที่ไม่พร้อม (อ่านได้ที่นี่)
https://t.co…
薔薇 ケンジントンガーデン

咲くにつれ花の形を変えていく
スクエアから丸くへ

既視感はイソギンチャク
ワイルドな感じもして

友達から「あいつはやめておけ」
って言われたことあるよね?
あるでしょ?
忠告は間違っていない…

あいつは、私だったこともある
人も丸くなるのかな https://t.co/ypNVkpElUv
誰か遊ぼ♪巨乳体型の甘えん坊です(〃▽〃)お洒落できれいなお姉さんタイプですょ♪今日確実に会える人限定で連絡ちょーだい♪DM来るかな。。。
らぶりつしてくれると嬉しい
盾子もってるから罪木一緒に遊んで
DMしたいなぁ♪Mの方限定です!今からホテルで乳○責め・目隠し・言葉責め・ア○ル責め・前○腺責めで虐○ちゃいます♪我こそは!!!って人いたらいいなぁ♪DM来るのたのしみぃ~♪ 
あっ、らぶりつもよろしくね
基樹さんと左遷さんがやってたのは覚えてる!もちろん守護星も(記憶にございます)
@BTS_LS0618 당근
@ippy1pandshot73 おつありです!
RT @m0momochan: 7話のラストで示されたようにあの姉妹はこれからもやっぱり私が…というようなことを繰り返して、その度お互いがお互いを大好きということを確認し、二人で支え合っていくんだよな…
@lfsswt @0llyfyb Yes.
@pc3589_ え?簡単だよ
明日の飲酒が無しになったので今日🍷のもう
@Poketoru0820 そーいや今更だけど誕おめ
ハヤテ、オレはな、
お前が次期族長の有力候補だったことをこの前初めて知ったんだ。   

そうか……、ハヤテもヤンチャしてたんだなあ。驚いたぜ、マジで。
でも交通規則は守れよな、一歩間違えたら大事故だぞ。

え?そっちの族長じゃない?
RT @furukawa_staff: 第二部~Special Live~は完全無観客生配信で実施!
古川のオリジナル楽曲はもちろん、今年、残念ながら舞台上では披露できなかったあのミュージカル曲まで…!
画面越しに熱いステージをお届けします!

内容の異なる2ステージをお見逃し…
RT @wdmnss: คือแบบมันต้องตัวติดกันขนาดไหนอ่ะถึงมีรูปคู่กันปล่อยออกมา ทั้งๆที่ในหนังสือก็ไม่ได้ถ่ายคู่กัน😳
#ปัญเจนนิษฐ์ #เฌอสิค https://t.co…
RT @fernhaeforY: ทุกอย่างมาจากภาษีประชาชน เราต้องนั่งรถเมล์อายุเกือบ 100 ปี แต่รถตำรวจเป็น BMW ว้าววววว
すやなまは
【ク】クロスワードが得意で
【リ】理解力があって
【ス】好き嫌いがなく
【マ】枚挙に暇がないほど魅力がある
【ス】素晴らしい人です。
#shindanmaker
https://t.co/ZhVMhNijiW
正反対なんですけど...
通話したい気分…
まずは友達!DMから!
だれやねん!とおもったらわこ?!みこ??みこ!!!
RT @ayln69: İnfazdaAyrım ÖlümeTerketmek
👆🏼🕊
長くお世話になってる医院で予防注射行ったら診察室の方から全集中!ってきこえたんだけど、先生が子供に打つときにやってるみたいでなんだかほっこりしたな
何の話で、オートレーサーって、中居さんが言ったのか🤭
#金スマ
RT @usagi_kitty_: GoTo批判うっとおしい💢!!!!!

人が動くことで経済も動くんだよ?
全ては繋がっているの!
批判するならせめてまともな代案を出したらどうなんだ??
こっちはもうボロボロで自殺寸前だ!!!!

どうせまた雰囲気だけで東京除外とか札幌自粛と…
@KARMARALONSO Muchas gracias!!!🙏🙋⚘⚘⚘😘
@smile_slow_life 早く素がみたい😂😂😂
@Q97455 حلو كلهم نفس الاستايل
@LoveOrPoor 酒は?
@yellowkitten_ 키튼깅 다치게한 새* 다 패줌
@n_aygod 調べてみてください!、
応援してます…!
@O9XnISYteURdtUX 言論で勝った…
口だけは良いって事ね😓
@FonproyA LISA LALISA MANOBAN
블랙핑크 리사
I vote for #LISA from @BLACKPINK for #BousnidStars2020 #Bousnid #4thBousnidStars2020
観ま〜す😊💕
@shionsiphon0530 おっぱいのすげぇ!!
@ferruh270370 - Hele hele   DIŞ KÜRESEL GÜÇLERE SATILMIŞ  DARBECİLERE ARKA DURANLARLA HİÇ İŞİM OLMAZ. BÖYLE BİR GÖRÜNTÜNÜN YANINDA OLMAM..PARTİLERLE MENFAAT İLİŞKİSİ HİÇ  YAŞAMADIM..!
RT @LLinWood: TRUTH.
僕はブロックされてませんでした😃
@sumokyokai 東ではなく西の15枚目、超ギリギリ
若隆景のように東の16枚目で七戦全勝優勝で幕下筆頭止まりだった力士もいますし
RT @AsreVishwakarma: सभी देशवासियों को धनतेरस व दीपावली पर्व की हार्दिक हार्दिक बधाई एवं शुभकामनाएं। @yadavakhilesh @mulayamyadav @proframg…
生首ドラマすき #虹ヶ咲
画竜点睛〜
프레임 언제 정리하구 트친소 열어!
ジャージできたんですかぁ?って言っちゃうとこすごい好きよwww
@Devendara_ පොලිසියකට ගියත් හරි විදියට ප්‍රතිකාර වලට යොමු කරන්නෙ නෑ මම දන්න විදියට.ඒකයි ලොකුම ප්‍රශ්නෙ
RT @eh12__: - وفي يوم الجُمعة ؛ "اللهُم بدل اقدارنا الى الأجمل"❤️
@chukwudigamer Cry
베댓 고지전하다가 탈모올듯;
@moemii えっ!行く
RT @ffluffyb: อันนี้เล่นตามรีล่า คนอื่นชอบmvไหนที่สุดของน้องเค้กกันบ้างคะ 🥺 https://t.co/UQ5PvuNqA9
フォロー&いいね&RTありがとうございます!
こちらのアカウントは大量にRTいたしますので、気になる方は『RTミュート』をお願いいたします!
@KanittaK5 เอารูปมาจากไหนคะเตง
RT @OfficialMonstaX: 📸
#몬스타엑스 #셔누
#뷰티쁠 BEAUTY+ 
12월호에서 곧 만나요!

@beautyplmania
#MONSTAX #MONSTA_X
#SHOWNU #beautyplmagazine https://t.co/R87…
@richan__xx 大丈夫です👍
@x9r1WdRq4O9malO 素敵な時間が、伝わって来ます。
   幸せは笑顔と感謝の多さ^ - ^
RT @NOS_PROJECT: 【新色】「ジゼル・スカート」
品格、華やか、女性らしい美しさ、3つのエレメントを持ったドレッシーなスカート。
タック分量を最大限までたっぷりといれることで、他にはないフレアシルエットに。 
クラシカルなボルドースカートでレディライクなコーデをお…
@luvxiaos ITS CURSED GN
RT @Fushigur0Megumi: "ยังกล้าไปสอนคนอื่นอีกหรอครับ.."

เห็นเคยบอกว่าคุณไม่เลี้ยงลูกตัวเองนี่ น่าสงสารลูกเขาจริงๆเลย.. https://t.co/evkvTJAt…
RT @Nassr_Almai: @coach__nawaf 👌👌👌👀
طبيعي لان الطرفين مصريين !!!!!؟
官僚とは名ばかりなの?
それでよし、とされる何か力があったの?
フェイスシールドに意味があるなんて思えないんですけど。
100歩譲ってマスク併用必須。
でも必要なのは医療職とか店員さんとかだよ…

それにしても街中で急にフェイスシールドオンリーで歩いている人見ると、ドッキリかと思う…
ADOROO ADOORO ADOROOO LA NUEVA CANCION DE MILEY CYRUS Y DUA LIPAA QUE REINASSSS 🤩🤩🤩😍😍 #PRISONER #DuaLipa #MileyDuaPrisioner #MileyCyrus https://t.co/jWwtyfGcfb
RT @DELTAGG058: ㊗️㊗️\\1万人感謝祭//㊗️㊗️

フォロワー1万人を記念してプレゼント企画開催します❗️❗️

第一弾は Apexpro TKL🎮

※応募条件
@DELTAGG058  をフォロー
このツイートをRT

是非みんな参加してね❗️
あと拡散…
けんmちのkざき呼びで天に召した
ペーパー頑張る気力ガガガ
素組みでもわかる素性の良さ。いいプロポ。(積んでるけど
@Avokado62228 ну-ну
RT @yupe_1123: @choco6rabi たこさんウィンナーぺろぺろ🐙( '-' 🐙 )タコサン
青オーラ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ外れたよ
RT @kanintr_: ทำไมเอ้ดเอมมันขายเด็กไม่ตรงปกเลยอะ เปิดมาลุคเฟียสๆแบบจงอิน แทยง คาริน่า ของจริงคือ แห่ะๆหวัดดีกั๊บกันทุกคนเลยอะ 5555555555555…
@haluokun この漫画がすごい!の一位になったやつですよ!
@soda_rafinsky そうね〜時間が経つと色々と変わっていくもんなあ… ゲームしてた方が楽しいのもわかるわ〜
RT @7AHaWClulx8cjzM: الله خلقنا ناس ما نقبل الهون هذا بلانا يا مدور بلانا 
ما نلتفت للي ورانا يهدون ميزه من الله ما ناظر ورانا
#عوده_الرويع…
RT @heicat_movie_jp: 劇場版『羅小黒戦記』(ロシャオヘイセンキ)制作スタジオのスタッフ陣が歌う「羅小黒大電影之歌」をご紹介します。監督のMTJJを始めた主力アニメーターたちの歌声と、ムゲン師匠のキレキレダンスをご堪能できますよ🥰
https://t.co/s…
RT @amocafe_ikeb01: 【超勇者展×AMOCAFE池袋店】勇者シリーズ30周年を
記念したコラボカフェ開催が決定!かっこいいデフォルメ
勇者ロボとかわいい主人公ミニキャラのイラストが登場✨
グッズやメニューは来週発表🚅展示会連動企画も!予約開始致します☆ご予約は…
RT @boonlert1512: "ชุดไม่แดงไม่มีแรงเดิน"❤คิดถึงนะครับคุณคณาวุฒิ🌻☺😌💕
#GulfKanawut 🌻☀
#คิดถึงคุณคนสวยคะน้าน้อย https://t.co/XvCPioPNyE
RT @neuron1204: 坂道研修生東名阪Zeppツアーお疲れ様でした。ありがとう坂道研修生、ありがとう黒見明香さん。
@saaalsheikh 😭😭😭😭😭😭😭😭😭
@iili4p @Mllk_11 حطيت كل الاجابات ههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههه
ハヤシさんまじ最高
RT @coscos_makeup: コス用、普段用どっちにも使える
【パーフェクトHDファンデーション N03】
肌なじみする”ナチュラルオークルカラー”✨

高カバー&ひと塗りでツルンと陶器肌に💛

【@coscos_makeup をフォロー&RT】で抽選で30名の方にプレ…
RT @megido72: 皆さま!アートチームから素敵なイラストが届いたので公開します🎨子犬も思わず振り返る、キリっとした表情のウァプラさんとガミジンさん…いったいどちらへ向かうのでしょうか✨

#メギド #メギド72 https://t.co/JS4LJmPRwm
RT @ma1ahot: 롱패딩 이제 유행 지났다고 하는 거 정말 이해할 수 없어... 롱패딩의 시작은 유행이 아니라 발명이었으며 롱패딩은 이제 생존의 문제라니까... 너는 유행 챙기면서 맨날 숏패딩 입고 당기다가 궁디 얼어붙으세요 저는 유행 지난 롱…
①デュエルランカー毎週15名前後
②終末戦争はボーナスエリアを選択し順位は3-5位
③アウライベントは前回2位
④ファクションチャット活発に利用中
⑤メンバー全員がTwitterグループに参加しており情報共有もスムーズ
その他ご不明点やご質問等はご遠慮なくDM下さいませ♡
次のリプに続きます
인별 파바박 게시물에 댓글 보니까 피티 받는거 같더니 그 결과물이 이것인가•• 𝙒𝙝𝙮𝙧𝙖𝙣𝙤.. 𝙒𝙝𝙮𝙧𝙖𝙣𝙤..
بِسْم الله وش هذا
آج پھر درد و غم کے دھاگے میں 
ہم پرو کر ترے خیال کے پھول 
ترک الفت کے دشت سے چن کر 
آشنائی کے ماہ و سال کے پھول 
تیری دہلیز پر سجا آئے 
پھر تری یاد پر چڑھا آئے 
باندھ کر آرزو کے پلے میں 
ہجر کی راکھ اور وصال کے پھول 

#فیض_احمد_فیض
زاد جوعك وتبغى تطبيق سريع يجيب لك الأكل اللي في خاطرك ؟😋👀

-حمّل تطبيق شقردي 📲
-اكتب طلبك ✅
-الشقردي عند بابك 🚙
-شفت كيف شقردي يضّبطك 😉

-وكمان اكواد خصم ⬅️ wow
Em11 
للتحميل 👇
💛https://t.co/gfAGQfd8pL
-الشقردي يوصل في جميع أنحاء المملكة 
جده مرسول هنقرستيشن ماك بارك https://t.co/MxCyFmaRVV
タコのおふんふん………
学生の時から付き合って結婚するって

すごい憧れだよね。
RT @bint_alshmrii: اللهم في يوم الجمعة إرحم من توسدت أجسادهم الأكفان و اختلطت عظامهم بالتراب ، اللهم إجعل قبورهم خير مسكن تغفو به أعينهم.
RT @DUCKYWORLD_KR: 🎉단독 출시 기념 RT 이벤트🎉
🎁산리오 포토카드 홀더 10개 증정🎁
-
✔️기간
11월 20일~11월 26일까지
✔️참여방법
더키월드 이벤트 게시물 리트윗
✔️당첨인원 : 10명
✔️당첨발표 : 11월 26일
-…
RT @aitcsudip: আবার বাড়লো পেট্রোল-ডিজেলের দাম!ভারতবর্ষে শুধু মানুষের দাম কমছে, নিত্য প্রয়োজনীয় জিনিসের দাম বাড়ছে দিন দিন। দেশ জুড়ে @BJ…
RT @ssarawatlismm: @Tine__chicchic ไหนๆเล่ยให้ฟังหน่อย
RT @hypnosismic_RA: BDDVD発売記念トークイベント第2弾発表🎉

🟡3巻
2021/6/13開催
【出演】白井悠介、斉藤壮馬、野津山幸宏

⚪️4巻
2021/7/3開催
【出演】速水 奨、木島隆一、伊東健人

100名限定無料招待✨
イベント当日はBDD…
@atiab_alia1 بالضبط..
نفس سناب وستوري الأنستغرام
@Feel_free_ 아미친 ㅠ 아몬드의 와랄라 
ㅜ
ㅠ
ㅠ
괜찮은거같아요 투명....쿠키 커스텀은 이케하셧구나 아하 파일은 편한대루 써주새요! 냐냐묘~~~
RT @SerieTV46: #BTS "BE" songs on MelOn hits at 20:00 KST:

#1 Dynamite (=)
#15 Life Goes On (+6)
#44 Fly To My Room (+10)
#50 Blue & Grey…
#விடியலைநோக்கி_ஸ்டாலினின்குரல்
かわいい 成仏する
にょんさんの歌は安定に天才なのですが、絵も天才なのでぜひ…😿
🔥 #まんが王国お得爆発DAY 🔥 #全マンガ最大80パーセント還元 11/22まで実施中🎉「📘お得に一気読みしたいマンガをココに入力📘」読みたい😆💕 https://t.co/6HvM0OfZbn @manga_okokuより
RT @vminggukx: SPRING DAY                  LIFE GOES ON

#LifeGoesOnWithBTS #BETODAY https://t.co/zSBS0uCQnA
RT @queblick: 11/7に出演したHUL OVERが
4th Single『SAIEN』feat.田中聖/JUU (UZMK)

のMVを公開したそうです🎊

🔽要チェック❗️
Tomorrowbytogether은 망신살이 몇명이나 잇는거지
アバッキオ「ブチャラティは今朝から腰が痛いって言うから今日は休ませたぜ。…何だよその目は。」
@AhmedEl93905010 فالح 🙃😂
Yieee
بقى كيكه وادخل 400 🤭؟
RT @Kehlani: im so icéeeeee
RT @dj_kiko28: うんこが漏れそうな時に流れる曲ランキング
1位 Shiny Kung-fu Revival
2位 FIRE FIRE
3位 snow storm
장소있는분?

#일산 #파주 #게이 #오프 #번개
وباعد ياالله بيننا وبين القاسية قلوبهم وقرب الينا من يحفظ الود...
RT @ancle7chill: ถ้าGoing crazy คือเพลงชาติ 
Orange ก็คือเพลงสรรเสริญพระบารมี
@Kalaage_27 焦げ感、チーズの色、レタスの大きさ…美味しいでしょう!!🤤
RT @GoncharenkoUa: Якщо Зеленський у Феофанії, то чому ніщо не вказує на його присутність в лікарні? На території тиша, спокій, і ні одного…
RT @Moonlover_Miyu: ยัยน้องสึกกี้ก็คือตัวสูง 190 กว่าแต่กลัวความสูงนะ ดูทำหน้าเข้าสิ กำมือแน่นอีก น้องก็ตัวเท่านิ้วก้อยเนี่ย😭💘💖 https://t.c…
@TOGth_Dowon ม่ายช่าย ให้โอกาสอีกรอบเพราะเราลำเอียง คิกคิก
RT @zlem88369215: @Zeynepe37572607 @AYMBASKANLIGI @NumanKurtulmus İnfazdaAyrım ÖlümeTerketmek
RT @ministop_fan: \フォロー&RTで当たる/

本日から新発売★
その名も「世界のライスぼうや」
世界の食べ物をちまきでアレンジしたミミ!
ちょこっとお腹がすいた時にもおすすめだYO

フォロー&RTしてくれた方の中から抽選で
1,000名さまに鶏五目ちまき無…
@ucpzwDpHuIwso9d ステラの時も、表紙を内側に丸めて、必要ないパンだのお菓子だのを大量買いしてカモフラージュさせてようやく買えたのに…_(:3」∠)_💕💕💕
@tausy72 栃木-東北新幹線→盛岡-秋田新幹線→秋田-五能線→五所川原-五能線・奥羽本線→新青森-北海道新幹線→新函館北斗-函館本線→札幌
RT @bts_bighit: [#오늘의방탄] 어느 날 세상이 멈췄어..
방탄을 본 그 순간부터..💜
아미와 함께해서 즐거웠던 Live Goes On!
아미들에게도 즐거운 시간이었길🥰

#인터내셔널팝케이센세이션썬샤인레인보우트레디셔널트랜스퍼USB허브쉬림…
@BKF94 お疲れさまです〜\(^o^)/
ありがとうございます!オヤスンさんも早く帰れると良いですね🤗(無理か)
RT @BLINKVotingPage: DAILY REMINDER for Social50:

@BLACKPINK #BLACKPINK #블랙핑크 #JISOO #지수 #JENNIE #제니 #ROSÉ #로제 #LISA #리사
@magda9666 يسعده احلا من الف منسف
感染に一番よくないのはさ、カリカリして冷静さを欠いて不潔な行動をしても気をつけなくなること。ガタガタ騒がないで各自やれることをやれって話。誰がハンドリングしたって同じよ、ここまで感染が世界規模で広がったらさ。唯一言えるのは冷静な人が多い国が優勝、ってことじゃない?
@DPY332820 Nice
RT @mthai: ผลการวิเคราะห์น้ำที่เก็บได้จากแนวปะทะที่เกียกกาย  โดยอ.อ๊อด พบว่า

*** WARNING: skipped 561051281 bytes of output ***

ดีงามมากเพลงโคตรดีอ่าาาา
@nctyarchive 90's Love MV

#RESONANCE_PT2
#NCT_RESONANCE
#90sLove @NCTsmtown
♥️♥️♥️♥️♥️
𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅 𐌅  https://t.co/TISDfkDBIh
【エグプリ結成のキッカケの場所】さようなら!!たまご街道!! https://t.co/8SNHgEd9FR @YouTubeより

この養鶏場が…
なかったら→今のエグプリはなかったかもしれない重要な動画です❣️

エグプリ結成して…
初の撮影です❣️

初々しいエグプリちゃん達→見てね❣️ https://t.co/36M6iGI7Sb
温度差で風邪引くわ
#アイマス三昧
RT @UncleCafe: ความระยำของการปราบคนเสื้อแดง คือมันไม่ใช่การปราบ มันเป็นการล่า ยิงให้บาดเจ็บ เพื่อให้คนมาช่วย และยิงคนช่วยอีก วนๆ ซ้ำๆ ไปอย่…
RT @GZB7IVtxmjcW26b: أصبحنا و أصبح الملك لله

أللهم بك أصبحنا و بك أمسينا و بك نحيا و بك نموت و إليك النشور

صباح السعادة و السرور
RT @jxmoods: แพ้คนใจเย็น พูดอะไรก็ใช้เหตุผลอยู่เสมอ
@qnnP1PaLYz35uEo よろ
RT @BaisleyKendra: 念願の裏アカ女子でびゅーっ🤗
田舎で募集しても誰もいないからね、この度田舎から出てきて、念願を叶える事がようやく出来ました!

性欲強くて困っちゃうんですけど
何人かセフレ欲しいです!

生とかも未経験だから興味あったり…💕

RT後にDM…
友から「知らないですまされるほど甘くないぞ」とおすすめ()された呪術廻戦を一気見しました。五条先生すごいな。
RT @lololo46925975: คนที่น่าสงสารที่สุดคือนางเอก ดีใจสุดได้เป็นทีมชนะวางแผนพร้อมท่องโลก พอถึงเวลาถูกปลด ตั้งสติได้ปรึกษาพระรองไม่ใช่เพื่อตั…
@halup_o 제발,,,,제발 몸 챙겨주세요.,,!!! 간곡히 부탁드립니다 ㅠㅠㅠㅠ!!!!
RT @_satoreika: 展示される絵に関してですが、どちらも自信作ですので、是非、展示を見に行って頂けると嬉しいです。番組で見て頂いた方は分かるかもしれませんが、色使いにもこだわっています🙂!!
#計算中
RT @norystch: นอกเหนือจากนั้นคือMV ดีนะคะ ดูมีความลงทุน มีการไปถ่ายข้างนอกที่ไม่ใช่สตูอีกต่อไปแล้ว และพร้อบเยอะมาก เป็นอะไรที่ไม่ได้เห็นมาน…
@sunagimotter いや、人だったんだなって…
RT @FIFAQ8_: سحب سريع على ستور 50$والشروط :
تابع @huda_fut وسو رتويت
* بالكثير 5 ساعات واسحب https://t.co/8C3FgGipI5
【予約状況について】
11/24(火)は、12時~閉店まで予約に空きがございます🤗来店予約をして頂くと優先的にご案内出来ますので、お気軽にお問い合わせください☺️👍
また、#ドコモのロング学割 もスタートしましたのでぜひご相談にいらしてください🎉
☎️0120-022-767(10:00~19:00/第三火曜日以外)
@0pUhczppMfMPsuI 😂
RT @NCTsmtown: #2020ENQUETE20📝

#WINWIN #JUNGWOO #LUCAS #NCT_MARK
#NCT #RESONANCE
#RESONANCE_Pt2
#NCT_RESONANCE https://t.co/fGq0tXQ4AQ
محاضرات الظهر محد يسمع لها الا الدكتور نفسه
RT @reiou0106: !音量注意!
夜の人が大声で囁いてるよ
気をつけてね皆んな
@BobaKaTea_ Doneeepo
いいショットです👌
やっぱりエイトはこれが無いと🐝
撮可あってのチーム📷
RT @shima_s2: さかたさ~~~N!
@mood_sakura จริงพี่ชอบๆๆ
RT @bIazedmark: mark lee being                  but also
cute as hell                  what the hell https://t.co/Zb6sSqVokI
白ロンTとか
頭ぐしゃぐしゃって
タオルで拭いちゃうとことか
…萌えポイントすぎて死ぬ😍😇

続き見なくちゃ👀
これはもう悔しさを積みゲーにぶつけるしかない
RT @bellechans: เอสเอ็มมึงทำงานชุ่ยอีกละอีเหี้ย ทำไมมันมีปัญหาทุกการคัมแบคเลย กุพูดทุกครั้งว่ากุรักที่ตัวศิลปินแต่ชังอีค่าย ผิดหวังซ้ำแล้ว…
RT @CaramelCorn1971: /
今日は、勤労感謝の日!
みんないつもお仕事お疲れ様!!
\
今は在宅勤務の人も増えているみたいだけれど、お仕事のときにはスーツもいいよね✨
ボクもビシッと青いスーツできめてみたよ🎶似合ってるかな!? https://t.co/5WA…
書き込みボード返信できずごめんなさい😭もうしばらくおまちくださいいいいい
@Sakura_050218 じゃあLINE交換するかぁ
@t_o_e_m_u わー❗生クリーム絶対美味しい😋⭐️
体重減らなくなったら生クリームも摂取しよっと🤤

1ヶ月の辛抱です☺️
その後は自分なりに調整しながら行きますよ❗
RT @ern_konpeito: 朝からふざけたことばっかしてないで
こっちにも真面目に載せときます🥴🤍
似てないとかのクレームは受け付けてません🥺🤍笑
スマホリメイクにどうぞ🤍🤍ˎˊ˗ https://t.co/lADfcMmaTd
RT @AlMosahf: وَوَهَبْنَا لَهُ إِسْحَاقَ وَيَعْقُوبَ نَافِلَةً ۖ وَكُلًّا جَعَلْنَا صَالِحِينَ
RT @topazine: ที่ไม่ด่าก้อยแบบ slut-shaming, body-shaming เพราะจะได้ไม่ต้องมาตอบคำถามตัวเองว่าทำไมคนนี้ด่าได้ ทีกลับคนอื่นทำเป็นรณรงค์ไม่ใ…
@sari_111 @Hamedallah_9 الله يجعل النصر حليفكم🌹
@1009_1126 👍👍👍
感染判明の市長、都内で国交省幹部との懇親会に参加 https://t.co/rgltA1e6VF #スマートニュース
これ宮城の行政の長の資質が問われる…村井県知事云々関係なく!
@JINF4IRY 😭😭😭😭 ALE TO JA NIE WYSLALAM ZALOSNE
RT @25hmm__: 일을 이딴식으로 하면서 뭘 또 전세계로 확장시키겟다고...
RT @rysyrys: 谷地中という最も悪質なレイシストが、今回暴力事件を引き起したことは、極めて危険な兆候。
次の流血事の前兆です。
もし「被害者が日本人だから」という理由からレイシズムとして無視・軽視するならば、私たち全員が手痛いしっぺ返しを食うことになる。
これを反レイ…
✨23時半スク✨

交換枠ご所望です🎩✨
主催される方は是非行ってらっしゃいです(つ✧ω✧)つ

#23日 #23時半 #スク 
#スクワッド #賞金ルーム
RT @wanghabits: @gyeommine BREATH OUT NOW
#GOT7_BreathRelease
#믿듣아르스_Breath_6시선공개 #GOT7_Breath @GOT7official
@jjaejaepeach 90's Love MV

#RESONANCE_PT2
#NCT_RESONANCE
#90sLove @NCTsmtown
RT @dztp: 女性の胸の画像にED治療の広告を載っけたビルボードが撤去されて、ED治療の広告がプリントされたTシャツのみの画像に差し替えられたのを見て、「こんな世の中で満足ですか」みたいなことを言ってる人がいるんですけど、いや普通に満足でしょ。
RT @thelittle_CB: 🍪เธอชื่อ : ว่าน
🍯ดูฮอตกี่% :50%
🎂มีแฟนยัง : ยังแน่เลย
💗คิดว่าเธอสูงเท่าไหร่ : 168
🍧ตรงสเป็คกี่% : 40%
🐻emojiสำหรับเธอ : 🍪
@Kristin99350413 👏 #JusticeForJohnnyDepp #ThankYouDior
sis same #GOT7_BreathRelease
#GOT7_Breath  #GOT7  #갓세븐 @GOT7Official
本当に坂田さんのおかげで人生が楽しくなりました✨今の私があるのはお世辞抜きに坂田さんのおかげで感謝してもしてもしきれないです。今までもこれからもずっと応援しています✨大好きです!!❤️ https://t.co/cYkgXPpbVM
あってるwwwwwwww
できればノンストップで繋げてほしかったけどなかなか難しいかな #アイマス三昧
RT @imyounumber1fan: เอนซีทีเดี๋ยวนี้มาไกลเว่อ เดี๋ยวนี้ปล่อยเพลงได้ขึ้นกราฟเมล่อนตลอด
水族館いきてえ
RT @0949_LQS: ˗ˏˋ 連戦 デュオゲリラ ˎˊ˗

⏰|11月23日 18:30 18時30分

🗝| ジャスト @0949_LQS 

👤|デュオ

💸|1500×2 ( a ) 1週間後
 
▫️参加条件 ( 全員 )
Follow @0949_LQS RT…
RT @topazine: ที่ไม่ด่าก้อยแบบ slut-shaming, body-shaming เพราะจะได้ไม่ต้องมาตอบคำถามตัวเองว่าทำไมคนนี้ด่าได้ ทีกลับคนอื่นทำเป็นรณรงค์ไม่ใ…
RT @beingarun28: आज मैं नेटफ्लिक्स को बैन करवाने के लिए आवाज उठा रहा हूँ 

क्या आप सभी भगवाधारी  साथ हो.. ❓

#BanNetflix
私は、11月24日(火)発売のお得な #丸亀ランチセット 3種類のうち #神コスパ500円かけうどんセット が食べたい! #丸亀製麺
RT @yamanow: 22日朝食 殆どパンを食べる事は無いが、卵消費の為にサンドを作ってみた。卵サンド、フリッジ、イチゴジャム(冷凍イチゴ、砂糖、レモン)、初めての、家で土鍋石焼き芋(石をネットで、522円購入) #石焼き芋 #サンドイッチ #土鍋 https://t.co…
RT @mankai_company: ★HAPPY BIRTHDAY★
今日は秋組・古市左京さんの誕生日です!
本日の主役から一言:左京「やけにでかくて軽い箱を貰ったが、中身は駄菓子の詰め合わせだった。この年になっても駄菓子を送られるとはな…まあ、あいつらなりに考えて用意して…
@xponochka и «она лезби потому что у неё короткая стрижка»
@t_Ace_ZERO つぅたぁあああん!
RT @Casiosusu: 只系告「行為失當」。
@Nozu_Rato ほ、本人アイコンとか知らんし~(口笛)
@S_standflower りっかはグクミンシパですか?
#원영메일 
원영이 손이 많이 시렵구낭 ㅠㅠ 그렇지만 톡톡 타자 치고있구낭 ㅎㅎ 지금 아아 시킨거 후회하고 있구낭 따뜻한거 마셔야되는뎅 ㅠㅠ 난 라떼 추천이면 무조건 초코라떼징 ㅎㅎ 난 초코라떼가 좋앙 ㅎㅎ 그랭 원영아 오늘 같이 화이팅하장 ㅎㅎ 원영이 너무 이쁘당💕💕 좋은 하루 보냉💕💕
RT @heymooooooom: แกกกกกกกก มันเกินไปไหมมมมม ปล่อยเพลงมาซับต้องเยอะขนาดนี้ไหม//รับความแมสไม่ไหว #GOT7_Breath https://t.co/zCLNc5kHgc
Вообще, прогресс в моей гигиене сна очевиден, хотя пока мигрени все равно меня мучают почти каждый день, так как это состояние, когда меняешь режим, похоже на jet lag.
Надеюсь, что в общей сложности за месяц организм привыкнет 🙏🏻
@AJELNEWS24 حتى يطمئن قلبك  

وينجلي همك 

وينشرح صدرك 

ويتسع رزقك

أكثر من 

أَسْتَغْفِرُ الله الْعَظِيمَ

 الَّذي لَا إلَهَ إلاَّ هُوَ الْحَيُّ الْقَيُّوم

 وَأَتُوبُ إلَيْهِ  

❤️❤️
RT @seoulcuteclub: ที่ Watsons ตอนนี้มีโปรลดราคากันแดด Bioreอยู่น้า รุ่น Aqua rich กลิ่น Blooming Blossom🌷เหลือ 347 บาท ตัวนี้หอมแนวๆดอกไม้…
RT @Littleminn: SM แจ้งว่าอัลบั้ม Departure ver. มีปัญหาเหี่ยวกับการพิมพ์ทำให้ต้องเลื่อนการวางขายอัลบั้มออกไปแบบหลีกเลี่ยงไม่ได้ แต่จะพยายา…
الاستاذة تنادي وحدة اسمها جواهر مو موجودة عندنا وتقول مين جواهر هذي اللي دخلت علينا😭😭😭😭😭قرت اسمها تحسب طالبه دخلت ااااه دايخة من الضحك https://t.co/dZPKo0k3aH
RLP वालो हनुमान जी बेनीवाल  से पुछो क्या राजस्थान में Rlp की सरकार आई तो ये RTO परेशान करना छोड़ देगा क्या भ्रस्टाचार खत्म करेगे हनुमान जी बेनिवाल तो मे भी rlp का समर्थन शुरु करु फिर ।बाड़मेर जिला संपर्क प्रमुख नानकाराम कड़वासरा @hanumanbeniwal  @UmmedaRamBaytu  @NarayanBeniwal7 https://t.co/KvmZauT4fw
A BEYBLADE FUCKS UP A DISH AND KAI TRYIN TO ACT LIKE THAT DONT SCARE HIM LIKE BOIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII https://t.co/y5R7mk3cr9
こ、ここで!
иногда меня пугает то как много у меня мью которых я знаю ирл но с другой стороны это круто
---
ちょーちょーほんときも!
ほんとキモ!
にしか聞こえない
 #アイマス三昧
こんにちは!YDAイラスト部っ!です。YDAイラスト部っ!公式サイトは、イラスト部の活動情報や部員イラスト等を発信しています!是非みてください! YDAイラスト部っ!公式サイト→[ https://t.co/OR1S0iiVse]
RT @yukiyuki0_0r6s: 🎁総額100万円企画🎁

第2弾 🧸Switch liteを1名様🧸

 ギフカ・PayPay・現金対応

〜応募方法〜

🔸ゆきまると@LemoGoBo1118 
       @ore_1919 
🔸3人のフォローRT

締め切り 3…
スマホケースが待てど暮らせど届かないから繋ぎ用に100均でケース買ったら思った以上に良くて草
@chry142w 먼데
夏の化身ビーチブレイバー
MY EVERYTHING IS SO AAAAAAAAAAA IM GONNA CRY RIGHT NOW
RT @star_isshhh: ภาพบรรยากาศ ' ออฟ จุมพล - กัน อรรถพันธ์ ' ในงานคอนเสิร์ต ไทยประกันชีวิต Presents " FANTOPIA " ที่ Impact Arena & Challenge…
「なんで俺のプリン食うの?」「どこにもしょウたくんのなんて書いてなかったもん」「いや普通わかんだろ」「わかんないでしょ!それに同じの2個あったし!」「俺が2個食べる予定だったんだよ!」「そしたら名前書いといてよ!」「書いたし!」「書いてあったら私食べないから!」
RT @Ammume02: ทุกคัมเลยค้างช่วงสี่แสนวิวตลอด ปั่นวนไปค้าบบบ
#GOT7_Breath https://t.co/zxi5Li9kzJ
RT @JOOXTH: #GOT7_BreathRelease 
เพลงปล่อยแล้วน้าาา 💚

ฟัง 'Breath' พรีรีลิสซิงเกิลจากอัลบั้มใหม่ Breath of Love : Last Piece ได้แล้วที่ #J…
RT @alwaysmendex: GO WATCH THE DOCUMENTARY #InWonderWatchParty
@TuTi8HiThuZi WAO…SOULEATER…
RT @BTS_twt: 💜💜💜💜💜💜💜💜💜
ワタシジュンちゃんオシナノカモシレナイ
激しく感情を揺さぶるんじゃないよ
RT最高、こうであって…泣
RT @sfkkfs_: ต่อให้ปากบอกว่าไม่คาดหวัง แม่งก็คาดหวังอยู่ดีอะ
RT @taeyongelvisx: สงสารคนที่โดนทักเรื่องการแต่งตัวละเสียความมั่นใจ จนไม่กล้าแต่งตัวในแบบที่ชอบไปเลยอะ มันแย่นะ แค่เพราะการพูดไม่คิดของคนอะ…
@haruhidemasaki えっ…!?
(´×ω×`)(´×ω×`)(´×ω×`)
バイオ(実写じゃない方)の映画が全部入ったとのことなので、ヴェンデッタまず吹き替えで見る。その後字幕見るーん。
@shoki_jazz_tp まあそうね
@re_1230_ 入れた入れた💜
@FragileHeart96 سلام آدم عاشق همیشه عاشق تا وقتی به بیپولی بخوره اون وقت یادش میره
RT @Mohamed34461331: مصنع لمسه للاثاث الحديث 
تفصيل على حسب الطلب صناعه وطنى خامات تايلاندى  ضمان ١٠ سنوات ضد عيوب الصناعه 
خشب تايلاندى وا…
RT @na2key_: 左目はもう君を映さなくなって
左手はもう君を満足に触れなくなって
今度は君の料理も受け付けなくなってしまった

それでも“私“として君を見守り続けよう https://t.co/6PrKC0pO8a
@smtakemymoney 지금 저도 듣는 중인데 사기입니다 .. 🥺
@143Viceyy HAHAHHAHA KYOT!!! NAKAUWI KA NA?? INGAT KA HAA?? WUV U
RT @LOH_information: ■ 오벨리스크 구성 변경

“ 신비한 고대의 건축물은 주기적으로 그 구조가 변경됩니다.

▶ 모든 층의 주인과 클리어 조건이 변경됩니다.

※ 변경된 구성은 12월 1일(화) 00:00 (로컬 시간)부터 적용 h…
@kaede7728 ありがとうございますwwwww
RT @sourcemusicFS: #여자친구스탭

🎉 #여자친구 #GFRIEND ONLINE CONCERT
[ GFRIEND C:ON ] 이벤트 (2)  당첨자 안내

📍 <G C:ON 보고 왔지> 당첨자 발표
📍 당첨자 명단을 확인해주세요

자세한…
น้องซองชานเยี่ยวมากกกกก
RT @monamourjd: @BUNNYMYE0N @weareoneEXO omg😭

WE ARE ONE

#AMAs   #AMAsTNT EXO #EXOL 
@weareoneEXO
Diego Alves, Rafinha, Rodrigo Caio, Pablo Mari, Filipe Luis, Willian Arão, Gerson, Everton Ribeiro, Arrascaeta, Bruno Henrique, Gabriel Barbosa.
Jorge Jesus. 

NINGUÉM ME CONTOU, EU VI.

#1AnoDaGloriaEterna
可愛い相方ちゃんです(๑•̀ㅂ•́)و✧いつも暇してるので構ってあげて♡
RT @immalilbbb: rt; #รีลั่น clip-0:34sec: ลองเอาด้ามแปรงสีฟันแหย่🐚#imgforลั่น #ลั่น #imgforเงี่ยน #หมีฟ้าน่ะนะ #ลั่นเจ้าหมีฟ้า
//ว่างเดี๋ยว…
瓜江:俺と六月と米林との3人でご飯食べに行ったんだが、 いろいろ話してて、俺と米林が楽しげに話すたびに六月が机の下で足をこつこつ蹴ってきたときは萌えた
┏┛墓┗┓三
だから!!!!
振れ幅が  すごい!!!!!
#アイマス三昧
今更ながらこの回の現場猫安達が好き過ぎる…>RT
単行本の方ではもう少し可愛く書き直されたよね😂
どっちも好きだー
@icxrea 저기요, 제 팔자•ㅔ 유투브는 업문대요
RT @I3amBam1A: ถูกจริตอินเตอร์ที่แท้ทรู!! #GOT7_BreathRelease #GOT7_Breath  #GOT7  #갓세븐 @GOT7Official
RT @8AIcTgxKbubUGD2: おはようございます
テレビつけたら コロナ祭りやまた。それしかないのか!
@j_myHOPE218 岸くんの力では禊に太刀打ちできんかったか…

自力で頑張るw
まぁ周りにコロナ脳的な人が誰もいないだけ幸せか
RT @NCTsmtown: ‘NCT – The 2nd Album RESONANCE Pt.2’ 피지컬 앨범 발매 연기 관련 안내 말씀 드립니다.  

https://t.co/xwRFkCCxbA
RT @nn_ss99a: 500人突破〜
みんなありがとう!

リプしてくれた人に手書きメッセージ書く!!
※字は汚いです

顔は載せらんないからDMくれた人(知ってる人)にだけ見せるかも!かも https://t.co/VbYjvIXvBB
@M98cocco 希望出すの忘れちゃったどん🥺w
@mae_saeng2 엽떡 착한맛 먹는 맵찔이는 찍먹으로 갑니다..⭐️
@Ging10622755 @LukasZackeff 아.니. 아니. 깅님.봐봐. 루카스님이 먼저 자기 작업물 기대하지말랬다니까?
RT @retokani: 動画投稿!
ついに完全初見部分!イケメン少年がその後どうなったのかのIFストーリーです!!次回はBLOODモード!

追加されたIFストーリー 少年はその後どうなった?『マッドファーザー 完全リメイク版 』 https://t.co/ztJg0hGI…
RT @AA55388: *يُنبت آلله من قآع آلحزن*
*فرحًا نسعد به ولو بعد حين ."*

فقط كن مع الله 😊 https://t.co/0MubGIxAUA
あーだめ……集めな…… RT
BRAZIL LOVES YOU
RT @yeonsek498: https://t.co/SAWnTQNtSo

링크 접속 후 로그인, 회의 시작하기 > 지금 참여하기로 들어가실 수 있고, 마찬가지로 초대 링크 배부 가능합니다. 화면 공유 기능은 우측 하단에 '발표시작'을 눌러서 전체 화…
RT @Shin_Kurose: デモすらできないのが本物の独裁国家であり、「アベは独裁だー!」とどんちゃん騒ぎしてるサヨクは本当に矛盾してますよね。

しかし日本も中国に飲み込まれそうなのに、呑気に政府の揚げ足取りばかりしてる連中は本当に抜けている。 https://t.co…
RT @AAairty: ใครเอาจีซองไปหมุนกาอน โคตรน่ารัก555555
@lirx111 م اتحممملل😭😭😭😭
@DCulee هههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههه هههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههههه
หนูก็ไมรู้ออมม่า หนูเช็คไม่เป็น🤣🤣🤣

I vote for #Taehyung #BTSV from #BTS @BTS_twt for #BousnidStars2020 #Bousnid #4thBousnidStars2020
เอากูไปเผาเลยจ่ะ ไม่ต้องทอดไม่ต้องเวฟ ไม่ต้องต้มยำทำแกงไรทั้งนั้น เผาไปเลย เผาเท่านั้น เผาชุ้นไปเลย !!!!!! #GOT7 #갓세븐 @GOT7Official #IGOT7 #아가새 #GOT7_BreathofLove_LastPiece #GOT7_Breath #GOT7_LastPiece
#JacksongWang https://t.co/z3mJAqb3fq
JOIN NOW DELAY IS DANGEROUS
CONTINUE DOUBTING WE CONTINUE WINNING
😍😍😍😍😍😍😍😍😍 https://t.co/ATfhHSLWnC
感情の落差!!ちょっと!!
#アイマス三昧
تظن أنك مُت ، ولكن تعيش .
RT @mymtuan__: ทีมสองได้เงินเดือนวันนี้เหรอ เอ็มวีปล่อยปุ้ป ทำงานหนักเลยนะ ทวิตทุกนาที ทรศสั่นเป็นเจ้าเข้าแล้วแม่
RT @Riaqx_: #りあからのお年玉
プレゼント企画🎁します!!
内容は好きなプリペ3000円分×2名様です‼️
条件
リプにコメントとRT
こちらの人を全員フォロー😎😎
@Riaqx_  @order_a_ll  @RuruaZeros  @meruxg 
引用RTと…
// ## Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.
// #  let's stop the streaming job next.
ssc.stop(stopSparkContext = false) 
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } 

ScaDaMaLe Course site and book

Visualizations of Status Updates in Twitterverse

  • https://github.com/olofbjorck/twitterVisualizations
    • The notebook 029_Viz_1_GraphNetworkTimeline has three visualizations of Tweet Transmission Trees.
  • The notebook 029_Viz_2_... has some possiblilities by leveraging work from a hackathon in 2020.

ScaDaMaLe Course site and book

Interactive Exploration in Twitter as Graph, Network and Timeline

by Olof Björck, Joakim Johansson, Rania Sahioun, Raazesh Sainudiin and Ivan Sadikov

This is part of Project MEP: Meme Evolution Programme and supported by databricks, AWS and a Swedish VR grant.

Copyright 2016-2020 Olof Björck, Joakim Johansson, Rania Sahioun, Ivan Sadikov and Raazesh Sainudiin

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Run notebooks

These notebooks have needed variables and functions.

./025_b_TTTDFfunctions

Read twitter data as TTTDF

We cache and count the data as a TTTDF or Twwet-Transmission-Tree dataFrame.

Make sure the count is not too big as the driver may crash. One can do a time-varying version that truncates the visualized data to an appropriate finite number.

USAGE: val df = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  val df = tweetsDF2TTTDF(tweetsIDLong_JsonStringPairDF2TweetsDF(fromParquetFile2DF("parquetFileName")))
                  
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.ColumnName
import org.apache.spark.sql.DataFrame
fromParquetFile2DF: (InputDFAsParquetFilePatternString: String)org.apache.spark.sql.DataFrame
tweetsJsonStringDF2TweetsDF: (tweetsAsJsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsIDLong_JsonStringPairDF2TweetsDF: (tweetsAsIDLong_JsonStringInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDF: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHashtags: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
tweetsDF2TTTDFWithURLsAndHashtagsLightWeight: (tweetsInputDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
// Get the TTT  
val rawTweetsDir = "PATH_To_TWEETS_From_StreamingJOB"
val TTTDF = tweetsDF2TTTDFWithURLsAndHashtagsLightWeight(tweetsJsonStringDF2TweetsDF(fromParquetFile2DF(rawTweetsDir))).cache()
TTTDF.count // Check how much we got (need to check size and auto-truncate to not crash driver program... all real for now)
TTTDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 35 more fields]
res2: Long = 14451
TTTDF.rdd.getNumPartitions
res14: Int = 8

Next we first write the TTTDF to a parquet file and then read it into the visualizer. This will usually be done via a fast database.

TTTDF.write.mode("overwrite")
     .parquet("PATH_TO_PARQUET_TTTDF.parquet")
val TTTDFpq = spark.read.parquet("PATH_TO_PARQUET_TTTDF.parquet")
TTTDFpq: org.apache.spark.sql.DataFrame = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 35 more fields]
TTTDFpq.rdd.getNumPartitions
res16: Int = 2

Visualize a Twitter Graph

The graph shows the twitter users in the collection organized by their popularity in terms of various status updates.

See the function visualizeGraph for details. More sophisticated D3 interactive graphs are possible with the TTTDF, especially when filtered by time intervals or interactions of interest.

./029_Viz_x_VizGraphFunction
visualizeGraph(TTTDFpq, 0, 700, 700)
*** WARNING: ***

THERE IS NO SIZE CHECKING! Driver might crash as DataFrame.collect is used.
Also, the display can only handle so and so many items at once. Too large
dataset and the visualization might be very laggy or crash.

***  USAGE:  ***

visualizeGraph(TTTDF)

****************

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
visualizeGraph: (TTTDF: org.apache.spark.sql.DataFrame, tupleWeightCutOff: Int, width: Int, height: Int)Unit

Visualizing Retweet Network

Let us interactively explore the retweet network to identify tweets in terms of the number of retweets versus the number of its unique retweeeters.

./029_Viz_x_VizNetworkFunction
*** WARNING: ***

THERE IS NO SIZE CHECKING! Driver might crash as DataFrame.collect is used.
Also, the display can only handle so and so many items at once. Too large
dataset and the visualization might be very laggy or crash.

***  USAGE:  ***

visualizeNetwork(TTTDF)

****************

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
visualizeNetwork: (TTTDF: org.apache.spark.sql.DataFrame, width: Int, height: Int)Unit
visualizeNetwork(TTTDFpq)

Interactively Visualize a twitter Timeline

Search through Tweets over a timeline.

./029_Viz_x_VizTimelineFunction
*** WARNING: ***

BUG: The first Tweet in the dataset doesn't display for some reason.

THERE IS NO SIZE CHECKING! Driver might crash as DataFrame.collect is used.
Also, the display can only handle so and so many items at once. Too large
dataset and the visualization might be very laggy or crash.

***  USAGE:  ***

visualizeTimeline(TTTDF)
 
****************

import org.apache.spark.sql.DataFrame
visualizeTimeline: (TTTDF: org.apache.spark.sql.DataFrame, width: Int, height: Int)Unit
visualizeTimeline(TTTDFpq) 

What next?

Ideally the TTTDF and its precursor raw tweets are in silver and bronze delta.io tables and one can then use an interactive dashboard such as:

Superset is fast, lightweight, intuitive, and loaded with options that make it easy for users of all skill sets to explore and visualize their data, from simple line charts to highly detailed geospatial charts.

ScaDaMaLe Course site and book

Visualizations of Status Updates in Twitterverse as Emotional Dimensions

The following codes can be leveraged to obtain emotional dimensions on display_HTML in this notebook and later for dashboards, say via Apache Superset or others.

  • https://github.com/lamastex/CoSENSE
    • Video Presented to hackthecrisis.se
    • CoSENSE Presentation to hackthecrisis.se

ScaDaMaLe Course site and book

Function for visualizing a Twitter graph

by Olof Björck, Joakim Johansson, Rania Sahioun, Raazesh Sainudiin and Ivan Sadikov

This is part of Project MEP: Meme Evolution Programme and supported by databricks, AWS and a Swedish VR grant.

Copyright 2016-2020 Olof Björck, Joakim Johansson, Rania Sahioun, Ivan Sadikov and Raazesh Sainudiin

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

/** Displays an interactive visualization of a Twitter graph.
 *
 *  This function takes a DataFrame from TTTDFfunctions function tweetsDF2TTTDF()
 *  and displays it in an interactive visualization. The DataFrame content should
 *  be a collection of Twitter users. 
 *  
 *  Visualization description:
 *
 *  This is a graph visualization of a Twitter graph. Each circle represents a 
 *  Twitter user (that is, a unique Twitter account). The radius of the user circle 
 *  represents how many Retweets the user has within this specific Twitter network 
 *  from small radius (fewest Retweets) to large radius (most Retweets). If you 
 *  hover a user circle, the user screen name (as specified in the data) will appear 
 *  by the user circle. The line thickness connecting user circles represents how 
 *  many Retweets the user circle tuple tuple has between each other in total.
 *  Retweet direction is not accounted for. Connecting lines only appear if the user 
 *  circle tuple has at least tupleWeightCutOff Retweets.
 *  
 *  @param TTTDF A DataFrame from TTTDFfunctions function tweetsDF2TTTDF() containing
 *               the data to be displayed.
 *  @param tupleWeightCutOff The display width in pixels.
 *  @param width The display width in pixels.
 *  @param height The display height in pixels.
 */
def visualizeGraph(TTTDF: DataFrame, tupleWeightCutOff: Int = 15, width: Int = 900, height: Int = 400): Unit = {

  // Get user info from the "primary" users in our network. They are the users we've specified.
  val usersInfo = TTTDF
                  .select("CPostUserID", "CPostUserSN", "followersCount")
                  .groupBy("CPostUserID", "CPostUserSN")
                  .agg(max("followersCount").as("followersCount"))
                  .toDF(Seq("id", "ScreenName", "followersCount"): _*)
                  .select("id", "ScreenName")
  
  // Get all retweets and their weights (weights == 1 as we haven't done anything yet)
  val RTNetworkDF = TTTDF
                    .filter($"TweetType"==="ReTweet") // TweetType PARAM
                    .select("OPostUserSNinRT", "CPostUserSN", "OPostUserIdinRT","CPostUserId","Weight")
  
  // Get all (directed) retweet tuples // PARAM - case by case depending on the TweetType PARAM since SN RT QT etc are Akin's TTTDF SQL names
  val weightedRTNetworkDF = RTNetworkDF
                            .groupBy("OPostUserSNinRT", "CPostUserSN", "OPostUserIdinRT","CPostUserId")
                            .agg(sum("Weight").as("Weight"))
                            .orderBy($"Weight".desc)
                            .toDF("source", "target", "sourceId", "targetId", "weight")
  
  // Get the out degree of each user. That is, get only the number of times a user is retweeted. We're now losing the tuples.
  val outDegreeOfOPostUserIdinRTNetwork = weightedRTNetworkDF
                                          .select("sourceId","weight")
                                          .groupBy("sourceId")
                                          .agg(sum("weight").as("weight"))
                                          .orderBy($"weight".desc)
                                          .toDF(Seq("id", "weight"): _*)

  // Get the nodes
  val nodes = usersInfo
              .join(outDegreeOfOPostUserIdinRTNetwork, Seq("id")).withColumn("group", lit(1))
              .toDF(Seq("idNr", "id", "weight", "group"): _*)
              .where("id is not null")
              .toDF(Seq("idNr", "id", "weight", "group"): _*)

  // Get links
  val linksSource = nodes.select("idNr")
                    .join(weightedRTNetworkDF, $"idNr" === $"sourceId", "left_outer")
                    .select("source", "target", "sourceId", "targetId", "weight")
  val links = nodes.select("idNr")
              .join(linksSource, $"idNr" === $"targetId", "left_outer")
              .select("source", "target", "weight")
              .where("source is not null")
              .where($"weight" >= tupleWeightCutOff)
              .toDF(Seq("source", "target", "weight"): _*)
 
  // Get JSON data
  val nodesData = nodes.toJSON.collect;
  val linksData = links.toJSON.collect;

  // CSS code
  val visualizeGraphCSS: String = 
  s"""
  /*
  General styling
  */

  body {
      font-family: Sans-Serif;
      width: 100%;
      height: 100%;
      margin: 0;
      margin-bottom: 100px;
      background-color: #FFF;
      overflow: scroll;
  }

  /*
  Page content
  */

  div.start {
      text-align: center;
  }

  ul.start {
      margin-top: 0;
      display: inline-block;
      text-align: left;
  }

  div.text {
      text-align: left;
      margin-left: 19.1%;
      margin-right: 19.1%;
  }

  /*
  For visualizations
  */

  .visualization {
      display: block;
      position: relative;
      align: center;
  }

  .hidden {
      visibility: hidden;
  }

  .visible {
      visibility: visible;
  }

  .zoom {
      cursor: move;
      fill: none;
      pointer-events: fill;
  }

  #graphVisualizationDiv {
      width: auto;
      height: auto;
      position: relative;
      align: center;
  }

  .links line {
      stroke: #999;
      stroke-opacity: 1;
  }

  .nodes circle {
      fill: black;
  }

  circle.clicked {
      fill: #1da1f2;
  }

  #tweet {
      z-index: 100;
      position: absolute;
      left: 80%;
      height: auto;
  }
  """
  
  // JavaScript code
  val visualizeGraphJS: String = 
  s"""
  /*******************************************************************************

  This visualisation is a Twitter graph.

  *******************************************************************************/

  var circleEnlargeConstant = 2;
  var circleClickedStrokeWidth = 5;
  var maxRadius = 10;
  
  // Specify display sizes.
  var width = ${width}, 
      height = ${height},
      margin = {
                top: 0.1 * height,
                right: 0 * width, 
                bottom: 0.1 * height,
                left: 0 * width
            };
            
  width = width - margin.left - margin.right;
  height = height - margin.top - margin.bottom;

  // Get div
  var div = d3.select("#graphVisualizationDiv");

  // Create svg
  var svg = div.append("svg")
      .attr("class", "visualization")
      .attr("width", width + margin.left + margin.right)
      .attr("height", height + margin.top + margin.bottom)
      .append("g")
          .attr("transform",
              "translate(" + margin.left + "," + margin.top + ")");

  // Create zoom
  var zoom = d3.zoom()
      .on("zoom", zoomed);

  // Create zoomable area
  var zoomView = svg.append("rect")
      .attr("class", "zoom")
      .attr("width", width)
      .attr("height", height)
      .on("click", clickView)
      .call(zoom)

  // Create simulation
  var simulation = d3.forceSimulation()
      .force("charge", d3.forceManyBody().strength(-10))
      .force("center", d3.forceCenter(width / 2, height / 2))

  // Create loading text
  var loading = svg.append("text")
      .attr("y", height / 2)
      .attr("x", width / 2)
      .attr("text-anchor", "middle")
      .text("Loading graph... Takes a couple of seconds");
 
  var nodes = ${nodesData.mkString("[", ",\n", "]")};
  var links = ${linksData.mkString("[", ",\n", "]")};

  // Create links
  var link = svg.append("g")
      .attr("class", "links")
      .selectAll("line")
      .data(links)
      .enter().append("line")
          .attr("stroke-width", function(d) { return Math.sqrt(d.weight / 1000); });

   // Create nodes
   var node = svg.append("g")
       .attr("class", "nodes")
       .selectAll("circle")
       .data(nodes)
       .enter().append("circle")
           //.attr("fill", function(d) { return color(d.group); })
           .attr("r", function(d) { return Math.sqrt(d.weight / 100) + 2 })
           .on("mouseover", mouseoverCircle)
           .on("mouseout", mouseoutCircle)
           .on("click", clickCircle)
           .call(d3.drag()
               .on("start", dragstarted)
               .on("drag", dragged)
               .on("end", dragended));

    // Add title as child to circle
    node.append("title")
        .text(function(d) { return d.id; });

    // Link nodes and links to the simulation
    simulation
        .nodes(nodes)
        .on("tick", ticked)
        .force('link', d3.forceLink(links).id(function(d) { return d.id; }));

    // Updates for each simulation tick
    function ticked() {
        link
            .attr("x1", function(d) { return d.source.x; })
            .attr("y1", function(d) { return d.source.y; })
            .attr("x2", function(d) { return d.target.x; })
            .attr("y2", function(d) { return d.target.y; });

         node
            .attr("cx", function(d) { return d.x; })
            .attr("cy", function(d) { return d.y; })
     }

     // Compute several steps before rendering
     loading.remove(); // Remove loading text
     for (var i = 0, n = 150; i < n; ++i) {
         simulation.tick();
     }

  /**
   * Handle mouse hover on circle. Enlarge circle.
   */
  function mouseoverCircle() {

      // Get circle
      var circle = d3.select(this);

      // Display activated circle
      circle.attr("r", circle.attr("r") * circleEnlargeConstant);
  }

  /**
   * Handle mouse out on circle. Resize circle.
   */
  function mouseoutCircle() {

      // Get circle
      var circle = d3.select(this);

      // Display idle circle
      circle.attr("r", circle.attr("r") / circleEnlargeConstant);

  }

  /**
   * Handle circle drag start.
   */
  function dragstarted(d) {
      if (!d3.event.active) simulation.alphaTarget(0.3).restart();
      d.fx = d.x;
      d.fy = d.y;
  }

  /**
   * Handle circle drag.
   */
  function dragged(d) {
      d.fx = d3.event.x;
      d.fy = d3.event.y;
  }

  /**
   * Handle circle drag end.
   */
  function dragended(d) {
      if (!d3.event.active) simulation.alphaTarget(0);
      d.fx = null;
      d.fy = null;
  }

  /**
   * Handle zoom. Zoom both x-axis and y-axis.
   */
  function zoomed() {
      d3.selectAll(".nodes").attr("transform", d3.event.transform)
      d3.selectAll(".links").attr("transform", d3.event.transform)
  }

  /**
   * Handle click on zoomable area. That is, handle click outside a node which
   * is considered a deselecting click => deselect previously clicked node
   * and remove displayed tweets.
   */
  function clickView() {

      // Remove clicked status on clicked nodes
      d3.selectAll(".clicked")
          .attr("stroke-width", "0")
          .classed("clicked", false)

      // Remove timeline
      document.getElementById("tweet").innerHTML = ""
  }

  /**
   * Handle click on a tweet circle. Display the clicked tweet and let the tweet
   * appear selected by adding a stroke to it.
   */
  function clickCircle(d) {

      // Remove results from old click
      clickView();

      // Add stroke width and set clicked class
      d3.select(this)
          .attr("stroke-width", circleClickedStrokeWidth)
          .classed("clicked", true);

      // Display tweet
      twttr.widgets.createTimeline(
          {
              sourceType: "profile",
              userId: d.idNr
          },
          document.getElementById("tweet"), // Tweet div
          {
              height: height
          }
      )
  }
  """

  // HTML code
  displayHTML(s"""
  <!DOCTYPE html>
  <html lang="en">

      <head>

          <meta charset="utf-8">
          <meta name="author" content="Olof Björck">
          <title>Graph</title>

          <style>
            ${visualizeGraphCSS}
          </style>

      </head>

      <body>

          <div id="graphVisualizationDiv" class="visualization" align="center">
              <div id="tweet"></div>
          </div>

          <script sync src="https://platform.twitter.com/widgets.js"></script>
          <script src="https://d3js.org/d3.v4.min.js"></script>
          <script>
            ${visualizeGraphJS}
          </script>

      </body>
  </html>
""")
}

// Info
println("""
*** WARNING: ***

THERE IS NO SIZE CHECKING! Driver might crash as DataFrame.collect is used.
Also, the display can only handle so and so many items at once. Too large
dataset and the visualization might be very laggy or crash.

***  USAGE:  ***

visualizeGraph(TTTDF)

****************
""")
*** WARNING: ***

THERE IS NO SIZE CHECKING! Driver might crash as DataFrame.collect is used.
Also, the display can only handle so and so many items at once. Too large
dataset and the visualization might be very laggy or crash.

***  USAGE:  ***

visualizeGraph(TTTDF)

****************

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
visualizeGraph: (TTTDF: org.apache.spark.sql.DataFrame, tupleWeightCutOff: Int, width: Int, height: Int)Unit

ScaDaMaLe Course site and book

Function for visualizing a Twitter network

by Olof Björck, Joakim Johansson, Rania Sahioun, Raazesh Sainudiin and Ivan Sadikov

This is part of Project MEP: Meme Evolution Programme and supported by databricks, AWS and a Swedish VR grant.

Copyright 2016-2020 Olof Björck, Joakim Johansson, Rania Sahioun, Ivan Sadikov and Raazesh Sainudiin

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

/** Displays an interactive visualization of a Twitter network.
 *
 *  This function takes a DataFrame from TTTDFfunctions function tweetsDF2TTTDF()
 *  and displays it in an interactive visualization. The DataFrame content should
 *  be a collection of Twitter users.
 *  
 *  Visualization description:
 *
 *  This is a visualization of a Twitter network. Each circle represents a Twitter 
 *  user (that is, a unique Twitter account). The color of the user circle represents 
 *  how many followers the user has from blue (fewest) to red (most). The radius of 
 *  the user circle also represents how many followers the user has from small radius 
 *  (fewest) to large radius (most) on a log scale. The x-axis shows the number of 
 *  Retweets of a user within the Twitter network (that is, the number of times the 
 *  user was Retweeted by users in this specific Twitter network). The y-axis shows 
 *  the number of unique Retweeters within the Twitter network (that is, the number 
 *  of unique users in this specific Twitter network that Retweeted a user). If you 
 *  hover over a user circle, the user screen name (as specified in the data) will 
 *  appear by the user circle and the user circle you're hovering will become enlarged.
 *  
 *  @param TTTDF A DataFrame from TTTDFfunctions function tweetsDF2TTTDF() containing
 *               the data to be displayed.
 *  @param width The display width in pixels.
 *  @param height The display height in pixels.
 */
def visualizeNetwork(TTTDF: DataFrame, width: Int = 900, height: Int = 400): Unit = {
  
  // Get all retweets and their weights (weights == 1 as we haven't done anything yet)
  val RTNetworkDF = TTTDF.filter($"TweetType"==="ReTweet").select("OPostUserIdinRT","CPostUserId","Weight")
  
  // Get all (directed) retweet tuples
  val weightedRTNetworkDF = RTNetworkDF
                            .groupBy("OPostUserIdinRT","CPostUserId")
                            .agg(sum("Weight").as("Weight"))
                            .orderBy($"Weight".desc)
  
  // Get the out degree of each user. That is, get only the number of times a user is retweeted. We're now losing the tuples.
  val outDegreeOfOPostUserIdinRTNetwork = weightedRTNetworkDF
                                          .select("OPostUserIdinRT","Weight")
                                          .groupBy("OPostUserIdinRT")
                                          .agg(sum("Weight").as("Weight"))
                                          .orderBy($"Weight".desc)
  
  // Get number of unique retweeters (out degree neighborhood):
  // Use weightedRTNetwork, drop weight, add new weight of 1. 
  // We got all the retweet tuples with a weight of 1, then group by UserID to count number of unique retweeters
  val outNgbhdOfOPostUserIdinRTNetwork = weightedRTNetworkDF
                                          .drop("Weight")
                                          .withColumn("Weight",lit(1L))
                                          .groupBy("OPostUserIdinRT")
                                          .agg(sum("Weight").as("Weight"))
                                          .orderBy($"Weight".desc) 
  
  // Combine retweets and retweeters count and rename columns
  val tweetsAndRetweets = outDegreeOfOPostUserIdinRTNetwork
                          .toDF(Seq("UserID", "NrOfRetweets"): _*)
                          .join(outNgbhdOfOPostUserIdinRTNetwork
                                .toDF(Seq("UserID", "NrOfRetweeters"): _*), "UserID"
                               ).orderBy($"NrOfRetweets".desc)
  
  // Get user info from the "primary" users in our network. They are the users we've specified.
  val usersInfo = TTTDF.filter($"TweetType"==="ReTweet")
                  .select("CPostUserID", "CPostUserSN", "followersCount")
                  .groupBy("CPostUserID", "CPostUserSN")
                  .agg(max("followersCount").as("followersCount"))
                  .toDF(Seq("UserID", "ScreenName", "followersCount"): _*)
  
  // Get the final data to visualize
  val data = usersInfo.join(tweetsAndRetweets, Seq("UserID")).distinct.toJSON.collect
  
  // CSS code
  val visualizeNetworkCSS: String = 
  s"""
  /*
  General styling
  */

  body {
      font-family: Sans-Serif;
      width: 100%;
      height: 100%;
      margin: 0;
      margin-bottom: 100px;
      background-color: #FFF;
      overflow: scroll;
  }

  /*
  Page content
  */

  div.start {
      text-align: center;
  }

  ul.start {
      margin-top: 0;
      display: inline-block;
      text-align: left;
  }

  div.text {
      text-align: left;
      margin-left: 19.1%;
      margin-right: 19.1%;
  }

  /*
  For visualizations
  */

  .visualization {
      display: block;
      position: relative;
      align: center;
  }

  .hidden {
      visibility: hidden;
  }

  .visible {
      visibility: visible;
  }

  .zoom {
      cursor: move;
      fill: none;
      pointer-events: fill;
  }
  
  .title {
      fill: black;
      font-family: sans-serif;
      text-anchor: middle;
      text-align: center;
      font-size: 1.5em;
  }

  #tweet {
      position: absolute;
      z-index: 100;
      left: 80%;
      height: auto;
  }
  """;
  
  // JavaScript code
  val visualizeNetworkJS: String = 
  s"""
  /*******************************************************************************

  This visualization is an overview of a Twitter network.

  Contained in the visualisation is, for each user:

   - Number of retweets
   - Number of individual retweeters
   - Number of followers

  *******************************************************************************/

  // TODO: Add search option

  /*
  Create accessors that specify data from the csv-file
  */
  function x(d) { return d.NrOfRetweets; }
  function y(d) { return d.NrOfRetweeters; }
  function radius(d) { return d.followersCount; }
  function color(d) { return d.followersCount; } // What to do here?
  function name(d) { return d.ScreenName; }
  function id(d) { return d.UserID; }

  /*
  Create id-functions
  */
  function getCircleId(d) { return "circ" + id(d); }
  function getTextId(d) { return "text" + id(d); }

  /*
  Specify circle constants
  */
  var circleMaxRadius = 8;
  var circleMinRadius = 3;
  var circleEnlargeConstant = 2;
  var circleIdleOpacity = 0.2;
  var circleActiveOpacity = 1;
  var circleClickedStrokeWidth = 4;

  /*
  Create svg and specify display sizes
  */
      
  // Specify display sizes.
  var width = ${width}, 
      height = ${height},
      margin = {
                top: 0.1 * height,
                right: 0.05 * width, 
                bottom: 0.1 * height,
                left: 0.05 * width
            };
            
  width = width - margin.left - margin.right;
  height = height - margin.top - margin.bottom;

  // Get div
  var div = d3.select("#treeVisualizationDiv");

  // Create svg
  var svg = div.append('svg')
      .attr("class", "visualization")
      .attr('width', width + margin.left + margin.right)
      .attr('height', height + margin.top + margin.bottom)
      .append("g").attr("id", "inner-space")
          .attr("transform",
              "translate(" + margin.left + "," + margin.top + ")");

  /*
  Create title
  */
  var title = svg.append("text")
      .attr("class", "title") // style in css
      .attr("x", width / 2)
      .attr("y", 0)
      .text("Twitter network");

  /*
  Create x-axis
  */

  // Create x-scale
  var xScale = d3.scaleLog()
      .range([0, width]);

  // Create x-axis
  var xAxis = d3.axisBottom(xScale)
      .ticks(5, d3.format(",d"))

  // Create "g" for displaying of x-axis
  var gXAxis = svg.append("g")
      .attr("class", "x axis")
      // Position at bottom
      .attr("transform", "translate(" + 0 + "," + height + ")")

  // Create x-axis label.
  var xAxisLabel = svg.append("text")
      .attr("class", "x label")
      .attr("text-anchor", "end")
      .attr("x", width)
      .attr("y", height - 6)
      .text("Number of retweets");

  /*
  Create y-axis
  */

  // Create y-scale
  var yScale = d3.scaleLinear()
      .range([height, 0]);

  // Create y-axis
  var yAxis = d3.axisLeft(yScale)

  // Create "g" for displaying of y-axis
  var gYAxis = svg.append("g")
      .attr("class", "y axis")

  // Create y-axis label
  var yAxisLabel = svg.append("text")
      .attr("class", "y label")
      .attr("text-anchor", "end")
      .attr("y", 6)
      .attr("dy", ".75em")
      .attr("transform", "rotate(-90)")
      .text("Number of unique retweeters");


  /*
  Create scale for radius
  */
  var radiusScale = d3.scaleLog()
      .base(10)
      .range([circleMinRadius, circleMaxRadius])

  /*
  Create scale for color
  */
  var colorScale = d3.scaleLinear()
      .range(["blue", "red"])

  /*
  Create zoom
  */
  var zoom = d3.zoom()
      .scaleExtent([0.5, Infinity])
      .on("zoom", zoomed);

  // Create zoomable area
  var zoomView = svg.append("rect")
      .attr("class", "zoom")
      .attr("width", width)
      .attr("height", height)
      .call(zoom)
      .on("click", clickView)

  // Add data. Each row represented as a "g" of class "node" inside the svg.
  var data = ${data.mkString("[", ",\n", "]")};

  xScale.domain([1, getMaxX(data)])
  gXAxis.call(xAxis)
  yScale.domain([0, getMaxY(data)])
  gYAxis.call(yAxis)
  radiusScale.domain([1, getMaxRadius(data)])
  colorScale.domain([1, getMaxColor(data)])

  // Enter the data
  var nodes = svg.append("g").selectAll("g")
      .data(data)
      .enter()

  // Create circles to display the data
  nodes.append("circle")
      .call(setCircleAttributes)
      .call(setCircleMouseEvents)
      .sort(orderLargestBelow)
  // Create tooltip that shows username
  nodes.append("text")
      .call(setTextAttributes)

  /**
   * Set attributes for circles (Twitter account nodes).
   */
  function setCircleAttributes(circle) {
      circle
          .attr("class", "nodeCircle")
          .attr("data-id", id)
          .attr("id", getCircleId)
          .attr("opacity", circleIdleOpacity)
          .attr("fill", function(d) { return colorScale(color(d)); })
          .attr("stroke", "black")
          .attr("stroke-width", 0)
          .attr("r", function(d) { return radiusScale(radius(d)); })
          .attr("cx", function(d) { return xScale(x(d)); })
          .attr("cy", function(d) { return yScale(y(d)); })
  }

  /**
   * Set mouse events for circles.
   */
  function setCircleMouseEvents(circle) {
      circle
          // Add tooltip and enlarge circle on mouse hover
          .on("mouseover", mouseoverCircle)
          // Remove tooltip and restore circle on mouseout
          .on("mouseout", mouseoutCircle)
          // Display timeline on click
          .on("click", clickCircle)
  }

  /**
   * Set attributes for tooltip (showing screen name) text.
   */
  function setTextAttributes(text) {
      text
          .attr("class", "hidden nodeText") // Set class to hidden upon creation
          .attr("data-id", id)
          .attr("id", getTextId)
          .attr("x", function(d) { return xScale(x(d)); })
          .attr("y", function(d) { return yScale(y(d)); })
          .attr("dy", function(d) { return - (circleMaxRadius * circleEnlargeConstant * 1.5); })
          .attr("text-anchor", "beginning")
          .text(function(d) { return name(d); })
  }

  /**
   * Order so that largest circle gets placed deepest.
   */
  function orderLargestBelow(a, b) {
      return radius(b) - radius(a);
  }

  /**
   * Handle mouse hover on circle. Display circle's screen name.
   */
  function mouseoverCircle() {

      // Get circle
      var circle = d3.select(this);

      // Display activated circle
      circle.attr("r", circle.attr("r") * circleEnlargeConstant);
      circle.attr("opacity", circleActiveOpacity);

      // Set text class to visible
      svg.select("#text" + circle.attr("data-id"))
          .classed("hidden", false)
          .classed("visible", true)

  }

  /**
   * Handle zoom. Zoom both x-axis and y-axis.
   */
  function mouseoutCircle() {

      // Get circle
      var circle = d3.select(this);

      // Display idle circle
      circle.attr("r", circle.attr("r") / circleEnlargeConstant);
      circle.attr("opacity", circleIdleOpacity);

      // Set text class to hidden
      svg.select("#text" + circle.attr("data-id"))
          .classed("visible", false)
          .classed("hidden", true)

  }

  /**
   * Handle zoom. Zoom both x-axis and y-axis.
   */
  function zoomed() {

      // Create new x- and y-scale
      var new_xScale = d3.event.transform.rescaleX(xScale);
      var new_yScale = d3.event.transform.rescaleY(yScale);

      // Display new axes
      gXAxis.call(xAxis.scale(new_xScale));
      gYAxis.call(yAxis.scale(new_yScale));

      // Reposition circles
      d3.selectAll(".nodeCircle")
          .attr("cx", function(d) { return new_xScale(x(d)); })
          .attr("cy", function(d) { return new_yScale(y(d)); })
          // To force constant circle radius on zoom:
          //.attr("r", function(d) { return d3.event.transform.scale(radiusScale(radius(d))).k; })

      // Reposition texts
      d3.selectAll(".nodeText")
          .attr("x", function(d) { return new_xScale(x(d)); })
          .attr("y", function(d) { return new_yScale(y(d)); })

  };

  /**
   * Handle click on zoomable area. That is, handle click outside a node which
   * is considered a deselecting click => deselect previously clicked node
   * and remove displayed tweets.
   */
  function clickView() {

      // Remove clicked status on clicked nodes
      d3.selectAll(".clicked")
          .attr("stroke-width", "0")
          .classed("clicked", false)

      // Remove timeline
      document.getElementById("tweet").innerHTML = ""
  }

  /**
   * Handle click on a tweet circle. Display the clicked tweet and let the tweet
   * appear selected by adding a stroke to it.
   */
  function clickCircle(d) {

      // Remove results from old click
      clickView();

      // Add stroke width and set clicked class
      d3.select(this)
          .attr("stroke-width", circleClickedStrokeWidth)
          .classed("clicked", true);

      // Display tweet
      twttr.widgets.createTimeline(
          {
              sourceType: "profile",
              userId: id(d)
          },
          document.getElementById("tweet"), // Tweet div
          {
              height: height
          }
      )
  }

  /**
   * Returns the largest x-value in the data.
   */
  function getMaxX(data) {

      return Math.max(...data.map(d => x(d)));
  }

  /**
   * Returns the largest y-value in the data.
   */
  function getMaxY(data) {

      return Math.max(...data.map(d => y(d)));
  }

  /**
   * Returns the largest radius in the data.
   */
  function getMaxRadius(data) {

      return Math.max(...data.map(d => radius(d)));
  }

  /**
   * Returns the "largest" color in the data.
   */
  function getMaxColor(data) {

      var maxColor = Math.max(...data.map(d => color(d)));
      var cutOff = 10000;

      if (maxColor > cutOff) return cutOff;
      return maxColor;
  }

  """
  
  // HTML code
  displayHTML(s"""
  <!DOCTYPE html>
  <html lang="en">

      <head>

          <meta charset="utf-8">
          <meta name="author" content="Olof Björck">
          <title>Network</title>

          <style>
            ${visualizeNetworkCSS}
          </style>

      </head>

      <body>

          <div id="treeVisualizationDiv" class="visualization" align="center">
              <div id="tweet"></div>
          </div>

          <script sync src="https://platform.twitter.com/widgets.js"></script>
          <script src="https://d3js.org/d3.v4.min.js"></script>
          <script>
            ${visualizeNetworkJS}
          </script>

      </body>
  </html>
  """)
}

// Info
println("""
*** WARNING: ***

THERE IS NO SIZE CHECKING! Driver might crash as DataFrame.collect is used.
Also, the display can only handle so and so many items at once. Too large
dataset and the visualization might be very laggy or crash.

***  USAGE:  ***

visualizeNetwork(TTTDF)

****************
""")
*** WARNING: ***

THERE IS NO SIZE CHECKING! Driver might crash as DataFrame.collect is used.
Also, the display can only handle so and so many items at once. Too large
dataset and the visualization might be very laggy or crash.

***  USAGE:  ***

visualizeNetwork(TTTDF)

****************

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
visualizeNetwork: (TTTDF: org.apache.spark.sql.DataFrame, width: Int, height: Int)Unit

ScaDaMaLe Course site and book

Function for visualizing a Twitter timeline

by Olof Björck, Joakim Johansson, Rania Sahioun, Raazesh Sainudiin and Ivan Sadikov

This is part of Project MEP: Meme Evolution Programme and supported by databricks, AWS and a Swedish VR grant.

Copyright 2016-2020 Olof Björck, Joakim Johansson, Rania Sahioun, Ivan Sadikov and Raazesh Sainudiin

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
import org.apache.spark.sql.DataFrame

/** Displays an interactive visualization of a Twitter timeline.
 *
 *  This function takes a DataFrame from TTTDFfunctions function tweetsDF2TTTDF()
 *  and displays it in an interactive visualization. The DataFrame content should
 *  be a Twitter user timeline.
 *  
 *  Visualization description:
 *
 *
 *  This is a visualization of a Twitter user timeline.
 *  Each circle represents a Tweet. The x-axis shows the time a Tweet was posted and 
 *  the y-axis shows the Tweet type of the Tweet. The displayed time interval can be 
 *  zoomed by scrolling. If you hover over a Tweet circle, the Tweet text content 
 *  will appear in the upper left corner and the Tweet circle you're hovering will 
 *  become enlarged. Above the Tweet text in the upper left corner, there's a search 
 *  box that let's you search all Tweet text content. If the character sequence you're 
 *  searching for appears in a Tweet text, Tweet circle where the character sequence 
 *  appears is colored yellow.
 *  
 *  @param TTTDF A DataFrame from TTTDFfunctions function tweetsDF2TTTDF() containing
 *               the data to be displayed.
 *  @param width The display width in pixels.
 *  @param height The display height in pixels.
 */
def visualizeTimeline(TTTDF: DataFrame, width: Int = 900, height: Int = 400): Unit = {
  
  // The CSS code
  val visualizeTimelineCSS: String = 
  s"""
  /*
  General styling
  */

  body {
      font-family: Sans-Serif;
      width: 100%;
      height: 100%;
      margin: 0;
      margin-bottom: 100px;
      background-color: #FFF;
      overflow: scroll;
  }

  /*
  Page content
  */

  div.start {
      text-align: center;
  }

  ul.start {
      margin-top: 0;
      display: inline-block;
      text-align: left;
  }

  div.text {
      text-align: left;
      margin-left: 19.1%;
      margin-right: 19.1%;
  }

  /*
  For visualizations
  */

  .visualization {
      display: block;
      position: relative;
      align: center;
  }

  .hidden {
      visibility: hidden;
  }

  .visible {
      visibility: visible;
  }

  .zoom {
      cursor: move;
      fill: none;
      pointer-events: fill;
  }

  .tweet {
	fill: #1da1f2;
	stroke: black;
	opacity: 0.5;
  }

  circle.clicked {
      stroke-width: 5;
  }

  .searchedTweet {
      fill: yellow;
      opacity: 0.9;
  }

  .infoDisplay {
      margin: 1%;
  }

  #tweetTextDiv {
      position: absolute;
      font-family: Helvetica;
      text-align: left;
      width: 20%;
  }

  #searchDiv {
      height: 20px;
  }

  .highlight {
    background-color: yellow;
  }

  path, line {
    fill: none;
    stroke: black;
  }

  .axis {
      position:relative;
      z-index:1000000;
  }

  .axis text {
      fill: black;
      font-size: 1em;
  }

  #userTimelineVisualizationDiv {
      width: auto;
      height: auto;
      position: relative;
      align: center;
  }

  .title {
      fill: black;
      font-family: sans-serif;
      text-anchor: middle;
      text-align: center;
      font-size: 1.5em;
  }

  .searchField {
      font-family: sans-serif;
  }

  #tweet {
      position: absolute;
      left: 60%;
      height: auto;
  }
  """;
  
  // Convert the timeline TTT DataFrame to a JSON object
  val data = TTTDF.toJSON.collect

  // The JavaScript code
  val visualizeTimelineJS: String = 
  s"""
  /*******************************************************************************

  This user timeline visualisation let's you explore a Twitter timeline.

  Contained in the visualisation is:

   - All Tweets in the data
   - A search function to search Tweets
   - A click option to view the original Tweet on Twitter

  *******************************************************************************/

  // Specify display sizes.
  var width = ${width}, 
      height = ${height},
      margin = {
                top: 0.1 * height,
                right: 0.05 * width, 
                bottom: 0.2 * height,
                left: 0.05 * width
            };

  // Get div.
  var div = d3.select("#userTimelineVisualizationDiv");

  // Create svg.
  var svg = div.append('svg')
      //.attr("class", "visualization")
      .style("z-index", "-1")
      .attr('width', width + margin.left + margin.right)
      .attr('height', height + margin.top + margin.bottom)
      .append("g")
          .attr("transform",
              "translate(" + margin.left + "," + margin.top + ")");

  // Declare global searched string.
  var searchedStr = "";

  // Create zoom object. Zooms x-axis.
  var zoom = d3.zoom()
      .on("zoom", zoomed);

  // Create zoomable area. Basically just an overlaid rectangle.
  var view = svg.append("rect")
      .attr("class", "zoom")
      .attr("width", width)
      .attr("height", height)
      // Allow for zoom while hovering x-axis
      .attr("transform",
          "translate(" + 0 + "," + margin.top + ")")
      // Remove currently displayed tweet on click
      .on("click", function() { clickView(); })
      // Link to zoom
      .call(zoom);

  // Set various tweet radius
  var idleTweetRadius = 15;
  var activeTweetRadius = idleTweetRadius * 1.618;
  var highlightedActiveTweetRadius = activeTweetRadius * 1.618;

  // Add title to the figure.
  svg.append("text")
      .attr("class", "title") // style in css
      .attr("x", width / 2)
      .attr("y", margin.top)
      .text("Twitter Timeline Visualization");

  // Create x-scale and set x-range.
  var xScale = d3.scaleTime()
      .range([0, width]);

  // Create xAxis.
  var xAxis = d3.axisBottom(xScale)
      .tickFormat(d3.timeFormat("%c")) // Set tick format date and time

  // Create x-axis g.
  var gXAxis = svg.append("g")
      .attr("class", "x axis")
      .attr("transform", "translate(" + 0 + "," + height + ")")

  // Create x-axis label.
  svg.append("text")
      .attr("class", "x label")
      .attr("text-anchor", "end")
      .attr("x", width)
      .attr("y", height - 6)
      .text("Time of Tweet");

  // y-range. Sets data placement along y-axis.
  // y-axis is divided in 6 lines, including top/bottom of chart,
  // and data is placed in the middle, lines 2 to 5.
  var yRange = [2, 3, 4, 5].map(function(x) { return x * height / 6; });

  // y-domain. Specifies which data should be placed where along y-axis.
  // Important: Matches with data from file.
  var yDomain = ["ReplyTweet",
                  "QuotedTweet",
                  "ReTweet",
                  "OriginalTweet"];

  // y-ticks to be displayed.
  var yTickValues = ["Reply",
                      "Quote",
                      "Retweet",
                      "Tweet"];

  // Create the y-scale and set y-range
  var yScale = d3.scaleOrdinal()
      .range(yRange)
      .domain(yDomain);

  // Create y-axis.
  var yAxis = d3.axisLeft(yScale)
      .tickValues(yTickValues); // Set y-axis tick values

  // Display y-axis (and label) after circles are placed to put y-axis above the circles

  // Read data. Note: file needs to be chronologically structured so that
  // data[0] is newest and data[length - 1] is oldest
  var data = ${data.mkString("[", ",\n", "]")};
        
  // Create and display the x-axis
  createAndDisplayXAxis(data);

  // Create circle for each tweet
  svg.selectAll("g")
      .data(data)
      .enter().append("g").attr("id", function(d) {
          return getGID(d.CurrentTwID);
      })
          .append("circle")
          // Set class to tweet ID
          .attr("id", function(d) {
              return getTweetID(d.CurrentTwID);
          })
          // Set position
          .attr("cy", function(d) {
              return yScale(d.TweetType.replace(/\\s/g, ''));
          })
          .attr("cx", function(d) { // x-position by tweet date
              return xScale(new Date(d.CurrentTweetDate));
          })
          // Set circle radius
          .attr("r", idleTweetRadius)
          // Set stroke
          .attr("stroke", "black")
          .attr("stroke-width", "0")
          // Set color by tweet type
          .attr("class", function(d) {
              // remove whitespace and return TweetType
              return "tweet " + d.TweetType.replace(/\\s/g, '');
            })
            // Add tooltip and enlarge tweet on mouse hover
            .on("mouseover", mouseoverTweet)
            // Restore tweet on mouseout
            .on("mouseout", mouseoutTweet)
            // Show actual tweet on click
            .on("click", clickTweet);

      // Display y-axis.
      var gYAxis = svg.append("g")
          .attr("class", "y axis") // Set class to y and axis
          .call(yAxis);

      // Create y-axis label.
      svg.append("text")
          .attr("class", "y label")
          .attr("text-anchor", "end")
          .attr("x", - 2 * height / 6)
          .attr("y", 6)
          .attr("dy", ".75em")
          .attr("transform", "rotate(-90)")
          .text("Type of Tweet");
          
      // display x-axis
      xAxis.ticks(5);
      gXAxis.call(xAxis);

      // Handle input search
      d3.select("#searchInput").on("input",
          function() {
              searchedStr = this.value.toLowerCase();
              searchTweets(data);
          });

  /**
   * Searches all tweets for a specific string.
   *
   * @param {string} searchStr - The string to search for
   */
  function searchTweets(data) {

      // Perform search if searched string is at least 3 chars long
      if (searchedStr.length > 2) {

          // Loop through all rows
          for (i = 0; i < data.length; i++) {

              // Get tweet text
              var tweetText = data[i].CurrentTweet;
              var tweet = d3.select("#" + getTweetID(data[i].CurrentTwID));

              // If tweet includes search string, display
              if (tweetText.toLowerCase().includes(searchedStr)) {
                  // Set class to searched tweet and enlarge
                  tweet
                      .classed("searchedTweet", true)
                      .attr("r", activeTweetRadius)
              // else, restore tweet to normal
              } else {
                  tweet
                      .classed("searchedTweet", false)
                      .attr("r", idleTweetRadius)
              }
          }

          // Highlight the searched string
          highlight();

      // else, restore tweets and dehighlight
      } else {
          // Restore tweets
          d3.selectAll(".tweet")
              .classed("searchedTweet", false)
              .attr("r", idleTweetRadius)

          // Dehighlight the displayed tweet
          dehighlight();
      }
  }

  /**
   * Create and display x-axis based on newest
   * and oldest dates in the dataset. Also sets the x-scale domain.
   *
   * @param data - Twitter dataset
   */
  function createAndDisplayXAxis(data) {

      // Get oldest date (that is, date of first tweet in the data)
      var oldestDate = new Date(data[data.length - 1].CurrentTweetDate);
      // Get newest date (that is, date of latest tweet in the data)
      var newestDate = new Date(data[0].CurrentTweetDate);
      // Add 2 weeks at beginning and end of axis for prettier display
      oldestDate.setDate(oldestDate.getDate() - 14); // go back 14 days
      newestDate.setDate(newestDate.getDate() + 14); // go forward 14 days

      // Set x-scale domain from newest and oldest date
      xScale.domain([oldestDate, newestDate]);
  }

  /**
   * Handle mouseover for Tweet.
   *
   * @param {list} d - Row from Twitter dataset
   */
  function mouseoverTweet(d) {

      // Get tweet
      var tweet = d3.select(this)

      // Get tweet text div
      var tweetTextDiv = d3.select("#tweetTextDiv");

      // Remove old tweet
      tweetTextDiv.selectAll("span")
          .remove();

      // Display tweet text
      tweetTextDiv.append("span")
          .text(d.CurrentTweet);

      // Enlarge tweet
      tweet.attr("r", activeTweetRadius);

      // If the tweet is searched, highlight and enlarge it
      if (tweet.classed("searchedTweet")) {

          // Enlarge the tweet to active and highlighted
          tweet.attr("r", highlightedActiveTweetRadius);

          // Highlight the tweet
          highlight();

      // else (that is, tweet is not searched), just enlarge the tweet
      } else {

          // Enlarge tweet to active
          tweet.attr("r", activeTweetRadius);
      }
  }

  /**
   * Highlights the searched part of the tweet text.
   */
  function highlight() {

      // Get tweet text div
      var tweetTextDiv = d3.select("#tweetTextDiv");

      // Get tweet text (works although text is inside a <span>)
      var tweetText = tweetTextDiv.text();
      // Get tweet text in lower case (used to highlight without case sensitivity)
      var tweetTextLowerCase = tweetText.toLowerCase();

      // Highlight if string to highlight is currently displayed
      if (tweetTextLowerCase.includes(searchedStr)) {

          // Get string before the string to highlight
          var strBefore = tweetText.substr(0, (tweetTextLowerCase.indexOf(searchedStr)));
          // Get string after the string to highlight
          var strAfter = tweetText.substr((tweetTextLowerCase.indexOf(searchedStr) +
                                      searchedStr.length),
                                      (tweetText.length - 1));

          // Remove non highlighted tweet text (the old tweet text with 1 <span>)
          tweetTextDiv.selectAll("span").remove();

          // Append string before highlight
          tweetTextDiv.append("span")
              .text(strBefore);
          // Append highlighted string
          tweetTextDiv.append("span")
              .attr("class", "highlight")
              .text(searchedStr);
          // Append string after highlight
          tweetTextDiv.append("span")
              .text(strAfter);
      }
  }

  /**
   * Dehighlights the tweet text.
   */
  function dehighlight() {

      // Get tweet text div
      var tweetTextDiv = d3.select("#tweetTextDiv");

      // Get tweet text
      var tweetText = tweetTextDiv.text();

      // Remove highlighted text (the old tweet text with 3 <span>s)
      tweetTextDiv.selectAll("span").remove();

      // Add non highlighted text
      tweetTextDiv.append("span").text(tweetText);

      // Add actual tweet

  }

  /**
   * Handle mouseout for Tweet.
   * Removes the tooltip displaying the tweet.
   *
   * @param {list} d - Row from Twitter dataset
   */
  function mouseoutTweet(d) {

      // Get tweet
      var tweet = d3.select(this)

      // Restore tweet to idle unless the tweet is searched
      if (!tweet.classed("searchedTweet")) {

          // Restore tweet
          tweet.attr("r", idleTweetRadius);

      // else (that is, tweet is searched), restore to active radius
      } else {
          // Restore tweet
          tweet.attr("r", activeTweetRadius);
      }
  }

  /**
   * Removes tooltip by ID.
   *
   * @param {string} id - The tooltip ID.
   */
  function removeTooltip(id) {
      d3.select("#" + id).remove();
  }

  /**
   * Creates a tooltip ID from a raw data tweet ID.
   *
   * @param {string} id - The tweet ID.
   */
  function getTooltipID(currentTwID) {
      return "tt" + currentTwID;
  }

  /**
   * Creates a tweet ID from a raw data tweet ID.
   *
   * @param {string} id - The tweet ID.
   */
  function getTweetID(currentTwID) {
      return "tw" + currentTwID;
  }

  function getGID(currentTwID) {
      return "g" + currentTwID;
  }

  /**
   * Handle zoom: Zoom the x-axis.
   */
  function zoomed() {

      // Create new x-scale based on zoom
      var new_xScale = d3.event.transform.rescaleX(xScale);

      // Display new x-scale. .ticks(3) to prettify
      gXAxis.call(xAxis.ticks(5).scale(new_xScale));

      // Reposition tweets based on zoom
      var tweets = d3.selectAll(".tweet");
      tweets.attr("cx", function(d) {
          return new_xScale(new Date(d.CurrentTweetDate));
      });
  };

  /**
   * Handle click on zoomable area. That is, handle click outside a tweet which
   * is considered a deselecting click. So, deselect previously clicked tweets
   * and remove displayed tweets.
   */
  function clickView() {

      // Get all clicked tweets
      var clicked = d3.selectAll(".clicked");

      // Remove clicked status on clicked tweets
      clicked.attr("stroke-width", "0");
      clicked.classed("clicked", false);

      // Remove tweet
      document.getElementById("tweet").innerHTML = ""
  }

  /**
   * Handle click on a tweet circle. Display the clicked tweet and let the tweet
   * appear selected by adding a stroke to it.
   */
  function clickTweet(d) {

      // Remove results from old click
      clickView();

      // Get tweet
      var tweet = d3.select(this)

      // Set tweet to clicked
      tweet.classed("clicked", true);

      // Get tweet div
      // Cannot do d3.select because twttr doesn't handle D3 selections
      var tweetDiv = document.getElementById("tweet");

      // Display tweet
      twttr.widgets.createTweet(d.CurrentTwID, tweetDiv)
  }
  """
  
  // The HTML code
  displayHTML(s"""
  <!DOCTYPE html>
  <html lang="en">

      <head>

          <meta charset="utf-8">
          <meta name="author" content="Olof Björck">
          <title>User Timeline</title>

          <style>
            ${visualizeTimelineCSS}
          </style>

      </head>

      <body>

          <!-- The input where you can search the Tweets -->
          <div id="searchDiv" class="infoDisplay">
              Search: <input name="searchStr" type="text" id="searchInput" class="searchField">
          </div>

          <!-- The place where Tweet texts are displayed -->
          <div id="tweetTextDiv" class="infoDisplay">
          </div>

          <!-- The D3 visualization -->
          <div id="userTimelineVisualizationDiv" class="visualization">
              <div id="tweet"></div>
          </div>

          <script sync src="https://platform.twitter.com/widgets.js"></script>
          <script src="https://d3js.org/d3.v4.min.js"></script>
          <script>
            ${visualizeTimelineJS}
          </script>

      </body>
  </html>

  """)
}

// Info
println("""
*** WARNING: ***

BUG: The first Tweet in the dataset doesn't display for some reason.

THERE IS NO SIZE CHECKING! Driver might crash as DataFrame.collect is used.
Also, the display can only handle so and so many items at once. Too large
dataset and the visualization might be very laggy or crash.

***  USAGE:  ***

visualizeTimeline(TTTDF)
 
****************
""")
*** WARNING: ***

THERE IS NO SIZE CHECKING! Driver might crash as DataFrame.collect is used.
Also, the display can only handle so and so many items at once. Too large
dataset and the visualization might be very laggy or crash.

***  USAGE:  ***

visualizeTimeline(TTTDF)
 
****************

import org.apache.spark.sql.DataFrame
visualizeTimeline: (TTTDF: org.apache.spark.sql.DataFrame, width: Int, height: Int)Unit