import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
class ExtendedTwitterReceiver(
twitterAuth: Authorization,
filters: Seq[String],
userFilters: Seq[Long],
storageLevel: StorageLevel
) extends Receiver[Status](storageLevel) {
private var twitterStream: TwitterStream = _
private var stopped = false
def onStart() {
try {
val newTwitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
newTwitterStream.addListener(new StatusListener {
def onStatus(status: Status): Unit = {
store(status)
}
// Unimplemented
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
def onTrackLimitationNotice(i: Int) {}
def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {}
def onException(e: Exception) {
if (!stopped) {
restart("Error receiving tweets", e)
}
}
})
// do filtering only when filters are available
if (filters.nonEmpty || userFilters.nonEmpty) {
val query = new FilterQuery()
if (filters.nonEmpty) {
query.track(filters.mkString(","))
}
if (userFilters.nonEmpty) {
query.follow(userFilters: _*)
}
newTwitterStream.filter(query)
} else {
newTwitterStream.sample()
}
setTwitterStream(newTwitterStream)
println("Twitter receiver started")
stopped = false
} catch {
case e: Exception => restart("Error starting Twitter stream", e)
}
}
def onStop() {
stopped = true
setTwitterStream(null)
println("Twitter receiver stopped")
}
private def setTwitterStream(newTwitterStream: TwitterStream) = synchronized {
if (twitterStream != null) {
twitterStream.shutdown()
}
twitterStream = newTwitterStream
}
}
defined class ExtendedTwitterReceiver
class ExtendedTwitterInputDStream(
ssc_ : StreamingContext,
twitterAuth: Option[Authorization],
filters: Seq[String],
userFilters: Seq[Long],
storageLevel: StorageLevel
) extends ReceiverInputDStream[Status](ssc_) {
private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())
}
private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
override def getReceiver(): Receiver[Status] = {
new ExtendedTwitterReceiver(authorization, filters, userFilters, storageLevel)
}
}
defined class ExtendedTwitterInputDStream
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object ExtendedTwitterUtils {
def createStream(
ssc: StreamingContext,
twitterAuth: Option[Authorization],
filters: Seq[String] = Nil,
userFilters: Seq[Long] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[Status] = {
new ExtendedTwitterInputDStream(ssc, twitterAuth, filters, userFilters, storageLevel)
}
}
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
defined module ExtendedTwitterUtils
ScaDaMaLe Course site and book