037b_Mix2NormalsStructStreamingFiles(Scala)

Write files periodically with normal mixture samples for structured streaming

This notebook can be used to write files every few seconds into the distributed file system where each of these files contains a time stamp field followed by randomly drawn words.

After running the commands in this notebook you should have a a set of files named by the minute and second for easy setting up of structured streaming jobs in another notebook.

Mixture of 2 Normals

Here we will write some Gaussian mixture samples to files.

import scala.util.Random
import scala.util.Random._

// make a sample to produce a mixture of two normal RVs with standard deviation 1 but with different location or mean parameters
def myMixtureOf2Normals( normalLocation: Double, abnormalLocation: Double, normalWeight: Double, r: Random) : (String, Double) = {
  val sample = if (r.nextDouble <= normalWeight) {r.nextGaussian+normalLocation } 
               else {r.nextGaussian + abnormalLocation} 
  Thread.sleep(5L) // sleep 5 milliseconds
  val now = (new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")).format(new java.util.Date())
  return (now,sample)
   }
import scala.util.Random import scala.util.Random._ myMixtureOf2Normals: (normalLocation: Double, abnormalLocation: Double, normalWeight: Double, r: scala.util.Random)(String, Double)
val r = new Random(1L)
println(myMixtureOf2Normals(1.0, 10.0, 0.99, r), myMixtureOf2Normals(1.0, 10.0, 0.99, r))
// should always produce samples as (0.5876430182311466,-0.34037937678788865) when seed = 1L
((2019-05-31 15:18:40.210,0.5876430182311466),(2019-05-31 15:18:40.215,-0.34037937678788865)) r: scala.util.Random = scala.util.Random@76d00bb2
display(sc.parallelize(Vector.fill(1000){myMixtureOf2Normals(1.0, 10.0, 0.99, r)}).toDF.select("_2")) // histogram of 1000 samples
0.000.010.020.030.040.050.060.070.080.09-2.0-1.5-1.00-0.500.000.501.001.52.02.53.03.54.04.55.05.56.06.57.07.58.08.59.09.5101111_2Density
dbutils.fs.rm("/datasets/streamingFilesNormalMixture/",true) // this is to delete the directory before staring a job
res2: Boolean = false
val r = new Random(12345L) // set seed for reproducibility
var a = 0;
// for loop execution to write files to distributed fs
for( a <- 1 to 5){
  // make a DataSet
  val data = sc.parallelize(Vector.fill(100){myMixtureOf2Normals(1.0, 10.0, 0.99, r)}) // 100 samples from mixture
               .coalesce(1) // this is to make sure that we have only one partition per dir
               .toDF.as[(String,Double)]
  val minute = (new java.text.SimpleDateFormat("mm")).format(new java.util.Date())
  val second = (new java.text.SimpleDateFormat("ss")).format(new java.util.Date())
  // write to dbfs
  data.write.mode(SaveMode.Overwrite).csv("/datasets/streamingFilesNormalMixture/" + minute +"_" + second)
  Thread.sleep(5000L) // sleep 5 seconds
}

r: scala.util.Random = scala.util.Random@2334e6f0 a: Int = 0
display(dbutils.fs.ls("/datasets/streamingFilesNormalMixture/"))
dbfs:/datasets/streamingFilesNormalMixture/20_12/20_12/0
dbfs:/datasets/streamingFilesNormalMixture/20_20/20_20/0
dbfs:/datasets/streamingFilesNormalMixture/20_27/20_27/0
dbfs:/datasets/streamingFilesNormalMixture/20_34/20_34/0
dbfs:/datasets/streamingFilesNormalMixture/20_42/20_42/0
display(dbutils.fs.ls("/datasets/streamingFilesNormalMixture/20_20/"))
dbfs:/datasets/streamingFilesNormalMixture/20_20/_SUCCESS_SUCCESS0
dbfs:/datasets/streamingFilesNormalMixture/20_20/_committed_1614649133386675530_committed_1614649133386675530113
dbfs:/datasets/streamingFilesNormalMixture/20_20/_started_1614649133386675530_started_16146491333866755300
dbfs:/datasets/streamingFilesNormalMixture/20_20/part-00000-tid-1614649133386675530-764394c6-ba84-4689-991d-93f79ce07af0-21144-c000.csvpart-00000-tid-1614649133386675530-764394c6-ba84-4689-991d-93f79ce07af0-21144-c000.csv4315

Take a peek at what was written.

val df_csv = spark.read.option("inferSchema", "true").csv("/datasets/streamingFilesNormalMixture/20_20/*.csv")
df_csv: org.apache.spark.sql.DataFrame = [_c0: timestamp, _c1: double]
df_csv.count() // 100 samples per file
res7: Long = 100
df_csv.show(10,false) // first 10
+-----------------------+-------------------+ |_c0 |_c1 | +-----------------------+-------------------+ |2019-05-31 15:20:19.483|-0.5190278662580565| |2019-05-31 15:20:19.488|1.2549405940975034 | |2019-05-31 15:20:19.493|2.4267606721380233 | |2019-05-31 15:20:19.498|0.21858105660909444| |2019-05-31 15:20:19.503|1.7701229392924476 | |2019-05-31 15:20:19.508|0.08326770280505069| |2019-05-31 15:20:19.514|11.539205812425335 | |2019-05-31 15:20:19.519|0.612370126029857 | |2019-05-31 15:20:19.524|1.299073306785623 | |2019-05-31 15:20:19.529|2.6939073650678083 | +-----------------------+-------------------+ only showing top 10 rows