ScaDaMaLe Course site and book

import org.apache.spark.mllib.random.RandomRDDs import org.apache.spark.mllib.feature.Normalizer import org.apache.spark.rdd.RDD import breeze.linalg._ import breeze.numerics._
import org.apache.spark.mllib.random.RandomRDDs import org.apache.spark.mllib.feature.Normalizer import org.apache.spark.rdd.RDD import breeze.linalg._ import breeze.numerics._

Constants required for the method.

// Constants val N = 250 // train size val M = 250 // test size val D = 2 // dimensionality val T = 500 // number of rays val one_vs_all = true assert((!one_vs_all) || N == M)
N: Int = 250 M: Int = 250 D: Int = 2 T: Int = 500 one_vs_all: Boolean = true

Generate points from standard Gaussian distribution.

val train_data = RandomRDDs.normalVectorRDD(sc, N, D).zipWithIndex().map { case (v, i) => (i, new DenseVector(v.toArray)) } val test_data = if(one_vs_all) train_data else RandomRDDs.normalVectorRDD(sc, M, D).zipWithIndex().map { case (v, i) => (i, new DenseVector(v.toArray)) }
train_data: org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])] = MapPartitionsRDD[10486] at map at command-1767923094595286:1 test_data: org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])] = MapPartitionsRDD[10486] at map at command-1767923094595286:1

Generate T random rays

def get_uni_sphere() = { var u = RandomRDDs.normalVectorRDD(sc, T, D) u = new Normalizer().transform(u) var t = u.zipWithIndex().map { case (v, i) => (i, new DenseVector(v.toArray)) } t } val rays = get_uni_sphere()
get_uni_sphere: ()org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])] rays: org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])] = MapPartitionsRDD[10490] at map at command-685894176423986:4

Compute optimizations: all squared distances and all dot products of points with directions vectors.

def compute_dst_sq() = { // (N, M) // dst[n, m] = |x_n - x'_m|^2 val dst = train_data.cartesian(test_data).map { case ((n, train_vec), (m, test_vec)) => ((n, m), sum(((train_vec - test_vec) *:* (train_vec - test_vec)) ^:^ 2.0) ) } dst } def compute_pu(data: RDD[(Long, DenseVector[Double])]) = { // (data.N, T) // pu[n, t] = <data_n, ray_t> val pu = data.cartesian(rays).map { case ((n, data_vec), (t, ray_vec)) => ((n, t), data_vec dot ray_vec) } pu } val dst = compute_dst_sq() val pu_train = compute_pu(train_data) val pu_test = compute_pu(test_data)
compute_dst_sq: ()org.apache.spark.rdd.RDD[((Long, Long), Double)] compute_pu: (data: org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])])org.apache.spark.rdd.RDD[((Long, Long), Double)] dst: org.apache.spark.rdd.RDD[((Long, Long), Double)] = MapPartitionsRDD[10492] at map at command-685894176423990:3 pu_train: org.apache.spark.rdd.RDD[((Long, Long), Double)] = MapPartitionsRDD[10494] at map at command-685894176423990:9 pu_test: org.apache.spark.rdd.RDD[((Long, Long), Double)] = MapPartitionsRDD[10496] at map at command-685894176423990:9

Compute the lengths of all rays. The most expensive step.

def compute_ray_lengths() = { // (M, T) // lengths[m, t, n] = dst[n, m] / (2 * (pu_train[n, t] - pu_test[m, t])) def compute_length(n: Long, m: Long, dst_val: Double, pu_train_val: Double, pu_test_val: Double) = { if (one_vs_all && n == m) { Double.PositiveInfinity } else { val res = dst_val / (2 * (pu_train_val - pu_test_val)) if (res < 0) Double.PositiveInfinity else res } } def my_min(a: Double, b: Double) = {min(a, b)} val lengths = dst.cartesian(sc.range(0, T)) .map { case (((n, m), dst_val), t) => ((n, t), (m, dst_val)) } .join(pu_train) .map { case ((n, t), ((m, dst_val), pu_train_val)) => ((m, t), (n, dst_val, pu_train_val)) } .join(pu_test) .map { case ((m, t), ((n, dst_val, pu_train_val), pu_test_val)) => ((m, t), compute_length(n, m, dst_val, pu_train_val, pu_test_val)) } .aggregateByKey(Double.PositiveInfinity)(my_min, my_min) lengths } val lengths = compute_ray_lengths()
compute_ray_lengths: ()org.apache.spark.rdd.RDD[((Long, Long), Double)] lengths: org.apache.spark.rdd.RDD[((Long, Long), Double)] = ShuffledRDD[10509] at aggregateByKey at command-685894176423991:20

Compute the approximated weights.

def compute_weights() = { // (M, ) def agg_f(a: (Double, Double), b: (Double, Double)) = { (a._1 + b._1, a._2 + b._2) } val weights = lengths.map { case ((m, t), length) => (m, if (!length.isInfinity) (1.0, length) else (0.0, 0.0)) } .aggregateByKey((0.0, 0.0))(agg_f, agg_f) .map { case (m, (val1, val2)) => (m, if (val1 > 0) val1 / val2 else 0.0) } weights } val weights = compute_weights()
compute_weights: ()org.apache.spark.rdd.RDD[(Long, Double)] weights: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[10512] at map at command-685894176424002:6

Save obtained data in csv.

Note: we repartition the tables here to work with one csv only; this should be removed for larger data.

def save_data(name: String, data: RDD[(Long, DenseVector[Double])]) = { data.map { case (k, v) => k.toString() + "," + v.toArray.mkString(",")} .toDF.repartition(1).write.format("csv").mode(SaveMode.Overwrite).option("quote", " ").save("dbfs:/FileStore/group17/data/" + name) } def save_weights(name: String, data: RDD[(Long, Double)]) = { data.map { case (k, v) => k.toString() + "," + v.toString} .toDF.repartition(1).write.format("csv").mode(SaveMode.Overwrite).option("quote", " ").save("dbfs:/FileStore/group17/data/" + name) } save_data("gaussian_train", train_data) save_data("gaussian_test", test_data) save_weights("gaussian_weights", weights)
save_data: (name: String, data: org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])])Unit save_weights: (name: String, data: org.apache.spark.rdd.RDD[(Long, Double)])Unit