import scala.util.Random import scala.util.Random._ import scala.util.{Success, Failure} // make a sample to produce a mixture of two normal RVs with standard deviation 1 but with different location or mean parameters def myMixtureOf2NormalsReg( normalLocation: Double, abnormalLocation: Double, normalWeight: Double, r: Random) : (String, Double) = { val sample = if (r.nextDouble <= normalWeight) {r.nextGaussian+normalLocation } else {r.nextGaussian + abnormalLocation} Thread.sleep(5L) // sleep 5 milliseconds val now = (new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")).format(new java.util.Date()) return (now,sample) }
dbutils.fs.rm("/datasets/streamingFiles/",true) //dbutils.fs.rm("/tmp",true) // this is to delete the directory before staring a job val r = new Random(12345L) var a = 0; import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global // for loop execution to write files to distributed fs //We have made a Future out of this, which means that it runs concurrently with what we do next, i.e. essentially it is a seperate thread. val writeStreamFuture = Future { for( a <- 1 to 10){ val data = sc.parallelize(Vector.fill(1000){myMixtureOf2NormalsReg(1.0, 10.0, 0.99, r)}).coalesce(1).toDF.as[(String,Double)] val minute = (new java.text.SimpleDateFormat("mm")).format(new java.util.Date()) val second = (new java.text.SimpleDateFormat("ss")).format(new java.util.Date()) data.write.mode(SaveMode.Overwrite).csv("/datasets/streamingFiles/" + minute +"_" + second + ".csv") Thread.sleep(50000L) // sleep 5 seconds } }
r: scala.util.Random = scala.util.Random@27d25df7
a: Int = 0
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
writeStreamFuture: scala.concurrent.Future[Unit] = List()
import org.apache.spark.sql.types._ import java.sql.{Date, Timestamp} /** * timedScore is the SQL schema for timedScoreCC, and the files written in the above code */ val timedScore = new StructType().add("time", "timestamp").add("score", "Double") case class timedScoreCC(time: Timestamp, val score: Double) { } val streamingLinesDS = spark .readStream .option("sep", ",") .schema(timedScore) // Specify schema of the csv files .option("MaxFilesPerTrigger", 1) // maximum number of new files to be considered in every trigger (default: no max) .csv("/datasets/streamingFiles/*").as[timedScoreCC]
import org.apache.spark.sql.types._
import java.sql.{Date, Timestamp}
timedScore: org.apache.spark.sql.types.StructType = StructType(StructField(time,TimestampType,true), StructField(score,DoubleType,true))
defined class timedScoreCC
streamingLinesDS: org.apache.spark.sql.Dataset[timedScoreCC] = [time: timestamp, score: double]
import org.isarnproject.sketches._ import org.isarnproject.sketches.udaf._ import org.apache.spark.isarnproject.sketches.udt._ import org.isarnproject.sketches._ import org.isarnproject.sketches.udaf._ import org.apache.spark.isarnproject.sketches.udt._ case class TdigAndAnomaly(tDigSql:TDigestSQL, tDigThreshold:Double, time:Timestamp, score:Double) //State definition def updateTDIG(state:TDigestSQL, input:timedScoreCC):TDigestSQL = { //For each input let us update the TDigest TDigestSQL(state.tdigest + input.score) } import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, GroupState} // Update function, takes a key, an iterator of events and a previous state, returns an iterator which represents the // rows of the output from flatMapGroupsWithState def updateAcrossBatch(dummy:Int, inputs: Iterator[timedScoreCC], oldState: GroupState[TDigestSQL]):Iterator[TdigAndAnomaly] = { // state is the oldState if it exists otherwise we create an empty state to start from var state:TDigestSQL = if (oldState.exists) oldState.get else TDigestSQL(TDigest.empty()) // We copy the traversableOnce iterator inputs into inputs1 and inputs2, this implies we need to discard inputs val (inputs1,inputs2) = inputs.duplicate // Loop to update the state, i.e. the tDigest for (input <- inputs1) { state = updateTDIG(state, input) oldState.update(state) } //Precompute the threshold for which we will sort the anomalies val cdfInv:Double = state.tdigest.cdfInverse(0.99) // Yields an iterator of anomalies val anomalies:Iterator[TdigAndAnomaly] = for(input <- inputs2; if (input.score > cdfInv)) yield TdigAndAnomaly(state,cdfInv,input.time,input.score) //Return the anomalies iterator, each item in the iterator gives a row in the output anomalies } import org.apache.spark.sql.streaming.GroupStateTimeout val query = streamingLinesDS .groupByKey(x => 1) .flatMapGroupsWithState(OutputMode.Append,GroupStateTimeout.NoTimeout)(updateAcrossBatch) .writeStream .outputMode("append") .format("console") .start() query.awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+------------------+--------------------+------------------+
| tDigSql| tDigThreshold| time| score|
+--------------------+------------------+--------------------+------------------+
|TDigestSQL(TDiges...|7.9098819334928265|2018-01-30 07:18:...| 9.639219241219372|
|TDigestSQL(TDiges...|7.9098819334928265|2018-01-30 07:18:...|11.539205812425335|
|TDigestSQL(TDiges...|7.9098819334928265|2018-01-30 07:18:...| 9.423175513609095|
|TDigestSQL(TDiges...|7.9098819334928265|2018-01-30 07:18:...| 8.99959554980265|
|TDigestSQL(TDiges...|7.9098819334928265|2018-01-30 07:18:...|10.174199861232976|
|TDigestSQL(TDiges...|7.9098819334928265|2018-01-30 07:18:...|10.442627838980057|
|TDigestSQL(TDiges...|7.9098819334928265|2018-01-30 07:18:...|10.460772141286911|
|TDigestSQL(TDiges...|7.9098819334928265|2018-01-30 07:18:...|11.260505056159252|
|TDigestSQL(TDiges...|7.9098819334928265|2018-01-30 07:18:...| 9.905282503779972|
|TDigestSQL(TDiges...|7.9098819334928265|2018-01-30 07:18:...| 9.102639076417908|
+--------------------+------------------+--------------------+------------------+
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-----------------+--------------------+------------------+
| tDigSql| tDigThreshold| time| score|
+--------------------+-----------------+--------------------+------------------+
|TDigestSQL(TDiges...|9.553157173102415|2018-01-30 07:19:...| 9.695132992174205|
|TDigestSQL(TDiges...|9.553157173102415|2018-01-30 07:19:...|10.439052640762693|
|TDigestSQL(TDiges...|9.553157173102415|2018-01-30 07:19:...| 10.02254460606071|
|TDigestSQL(TDiges...|9.553157173102415|2018-01-30 07:19:...| 9.87803253322451|
|TDigestSQL(TDiges...|9.553157173102415|2018-01-30 07:19:...| 9.858438409632281|
|TDigestSQL(TDiges...|9.553157173102415|2018-01-30 07:19:...| 10.45683581285141|
+--------------------+-----------------+--------------------+------------------+
-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+-----------------+--------------------+------------------+
| tDigSql| tDigThreshold| time| score|
+--------------------+-----------------+--------------------+------------------+
|TDigestSQL(TDiges...|9.185194249546159|2018-01-30 07:20:...| 10.13608393266294|
|TDigestSQL(TDiges...|9.185194249546159|2018-01-30 07:20:...| 9.562663532092044|
|TDigestSQL(TDiges...|9.185194249546159|2018-01-30 07:20:...| 10.50152359072326|
|TDigestSQL(TDiges...|9.185194249546159|2018-01-30 07:20:...|10.061968291873699|
|TDigestSQL(TDiges...|9.185194249546159|2018-01-30 07:20:...|10.242131495863143|
|TDigestSQL(TDiges...|9.185194249546159|2018-01-30 07:20:...| 9.535096094790836|
|TDigestSQL(TDiges...|9.185194249546159|2018-01-30 07:20:...|11.012797937983356|
|TDigestSQL(TDiges...|9.185194249546159|2018-01-30 07:20:...| 9.841120163403126|
|TDigestSQL(TDiges...|9.185194249546159|2018-01-30 07:20:...|11.569770306228012|
|TDigestSQL(TDiges...|9.185194249546159|2018-01-30 07:20:...|10.947191786184677|
|TDigestSQL(TDiges...|9.185194249546159|2018-01-30 07:20:...|10.380284632322022|
|TDigestSQL(TDiges...|9.185194249546159|2018-01-30 07:20:...|10.399812080160988|
|TDigestSQL(TDiges...|9.185194249546159|2018-01-30 07:20:...| 10.47155413079559|
+--------------------+-----------------+--------------------+------------------+
-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+-----------------+--------------------+------------------+
| tDigSql| tDigThreshold| time| score|
+--------------------+-----------------+--------------------+------------------+
|TDigestSQL(TDiges...|9.111097583328926|2018-01-30 07:21:...|11.028282567178604|
|TDigestSQL(TDiges...|9.111097583328926|2018-01-30 07:21:...| 9.801446956198197|
|TDigestSQL(TDiges...|9.111097583328926|2018-01-30 07:21:...| 9.349642991847796|
|TDigestSQL(TDiges...|9.111097583328926|2018-01-30 07:21:...|10.446018187089411|
|TDigestSQL(TDiges...|9.111097583328926|2018-01-30 07:21:...|10.735315117514041|
|TDigestSQL(TDiges...|9.111097583328926|2018-01-30 07:21:...|11.160788156092288|
|TDigestSQL(TDiges...|9.111097583328926|2018-01-30 07:21:...| 9.741913362611065|
|TDigestSQL(TDiges...|9.111097583328926|2018-01-30 07:21:...|10.031203472330613|
|TDigestSQL(TDiges...|9.111097583328926|2018-01-30 07:21:...| 9.310488974576659|
|TDigestSQL(TDiges...|9.111097583328926|2018-01-30 07:21:...|10.669624608178813|
+--------------------+-----------------+--------------------+------------------+
-------------------------------------------
Batch: 4
-------------------------------------------
Cancelled
SDS-2.x, Scalable Data Engineering Science
Last refresh: Never