041_SketchingWithTDigest(Scala)

Loading...

ScaDaMaLe Course site and book

Sketching with T-digest for quantiles

A Toy Anomaly Detector

Fisher noticed the fundamental computational difference between mean, covariance, etc. and median, quantiles, in early 1900s.

The former ones are today called recursively computable statistics. When you take the memory footprint needed to keep these statistics updated then we get into the world of probabilistic datastructures...

The basic idea of sketching is formally conveyed in Chapter 6 of Foundations of data Science.

Let's get a more informal view form the following sources.

Here we focus on a specific sketch called T-Digest for approximating extreme quantiles:

Pointers:

NOTE:

  • Once you could see Ted Dunning's explanation of t-digest here:

Demonstration of t-digest to detect anomalous scores

Let us import the following scala implementation of t-digest:

  • for Spark 3.0.1 use maven coordinates: org.isarnproject:isarn-sketches-spark_2.12:0.5.0-sp3.0

See the library: https://github.com/isarn/isarn-sketches-spark

import org.isarnproject.sketches.java.TDigest
import org.isarnproject.sketches.spark.tdigest._
import scala.util.Random
import scala.util.Random._
import org.isarnproject.sketches.java.TDigest import org.isarnproject.sketches.spark.tdigest._ import scala.util.Random import scala.util.Random._
// make a sample to produce a mixture of two normal RVs with standard deviation 1 but with different location or mean parameters
def myMixtureOf2Normals( normalLocation: Double, abnormalLocation: Double, normalWeight: Double, r: Random) : Double = {
  val sample = if (r.nextDouble <= normalWeight) {r.nextGaussian+normalLocation } 
               else {r.nextGaussian + abnormalLocation} 
  return sample
   }
myMixtureOf2Normals: (normalLocation: Double, abnormalLocation: Double, normalWeight: Double, r: scala.util.Random)Double

Here is a quick overview of the simple mixture of two Normal or Gaussian random variables we will be simulating from.

val r = new Random(1L)
println(myMixtureOf2Normals(1.0, 10.0, 0.99, r), myMixtureOf2Normals(1.0, 10.0, 0.99, r))
// should always produce (0.5876430182311466,-0.34037937678788865) when seed = 1L
(0.5876430182311466,-0.34037937678788865) r: scala.util.Random = scala.util.Random@2ac6c652
val r = new Random(12345L)
val data = sc.parallelize(Vector.fill(10000){myMixtureOf2Normals(1.0, 10.0, 0.99, r)}).toDF.as[Double]
r: scala.util.Random = scala.util.Random@568f8382 data: org.apache.spark.sql.Dataset[Double] = [value: double]
data.show(5)
+--------------------+ | value| +--------------------+ | 0.2576188264990721| |-0.13149698512045327| | 1.4139063973267458| |-0.02383387596851...| | 0.7274784426774964| +--------------------+ only showing top 5 rows
display(data)
0.000.050.100.150.200.250.30-2.00.002.04.06.08.0101214valueDensity

Aggregated (by count) in the backend.

Let's t-digest this data using a user-defined function udf evaluated below.

val udf = TDigestAggregator.udf[Double](compression = 0.2, maxDiscrete = 25)
 
udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedAggregator(org.isarnproject.sketches.spark.tdigest.TDigestAggregator@4ff546b7,class[value[0]: double],None,true,true)

We can agg or aggregate the data DataFrame's value column of Doubles that contain our data as follows.