ScaDaMaLe Course site and book

import org.apache.spark.mllib.random.RandomRDDs
import org.apache.spark.mllib.feature.Normalizer
import org.apache.spark.rdd.RDD
import breeze.linalg._
import breeze.numerics._
import org.apache.spark.mllib.random.RandomRDDs
import org.apache.spark.mllib.feature.Normalizer
import org.apache.spark.rdd.RDD
import breeze.linalg._
import breeze.numerics._

Constants required for the method.

// Constants
val N = 250 // train size
val M = 250 // test size
val D = 2 // dimensionality
val T = 500 // number of rays
val one_vs_all = true

assert((!one_vs_all) || N == M)
N: Int = 250
M: Int = 250
D: Int = 2
T: Int = 500
one_vs_all: Boolean = true

Generate points from standard Gaussian distribution.

val train_data = RandomRDDs.normalVectorRDD(sc, N, D).zipWithIndex().map { case (v, i) => (i, new DenseVector(v.toArray)) }
val test_data = if(one_vs_all) train_data else RandomRDDs.normalVectorRDD(sc, M, D).zipWithIndex().map { case (v, i) => (i, new DenseVector(v.toArray)) }
train_data: org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])] = MapPartitionsRDD[10486] at map at command-1767923094595286:1
test_data: org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])] = MapPartitionsRDD[10486] at map at command-1767923094595286:1

Generate T random rays

def get_uni_sphere() = {
  var u = RandomRDDs.normalVectorRDD(sc, T, D)
  u = new Normalizer().transform(u)
  var t = u.zipWithIndex().map { case (v, i) => (i, new DenseVector(v.toArray)) }
  t
}
  
val rays = get_uni_sphere()
get_uni_sphere: ()org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])]
rays: org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])] = MapPartitionsRDD[10490] at map at command-685894176423986:4

Compute optimizations: all squared distances and all dot products of points with directions vectors.

def compute_dst_sq() = { // (N, M)
  // dst[n, m] = |x_n - x'_m|^2
  val dst = train_data.cartesian(test_data).map { case ((n, train_vec), (m, test_vec)) => ((n, m), sum(((train_vec - test_vec) *:* (train_vec - test_vec)) ^:^ 2.0) ) }
  dst
}

def compute_pu(data: RDD[(Long, DenseVector[Double])]) = { // (data.N, T)
  // pu[n, t] = <data_n, ray_t>
  val pu = data.cartesian(rays).map { case ((n, data_vec), (t, ray_vec)) => ((n, t), data_vec dot ray_vec) }
  pu
}

val dst = compute_dst_sq()
val pu_train = compute_pu(train_data)
val pu_test = compute_pu(test_data)
compute_dst_sq: ()org.apache.spark.rdd.RDD[((Long, Long), Double)]
compute_pu: (data: org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])])org.apache.spark.rdd.RDD[((Long, Long), Double)]
dst: org.apache.spark.rdd.RDD[((Long, Long), Double)] = MapPartitionsRDD[10492] at map at command-685894176423990:3
pu_train: org.apache.spark.rdd.RDD[((Long, Long), Double)] = MapPartitionsRDD[10494] at map at command-685894176423990:9
pu_test: org.apache.spark.rdd.RDD[((Long, Long), Double)] = MapPartitionsRDD[10496] at map at command-685894176423990:9

Compute the lengths of all rays. The most expensive step.

def compute_ray_lengths() = { // (M, T)
  // lengths[m, t, n] = dst[n, m] / (2 * (pu_train[n, t] - pu_test[m, t]))
  def compute_length(n: Long, m: Long, dst_val: Double, pu_train_val: Double, pu_test_val: Double) = {
    if (one_vs_all && n == m) {
      Double.PositiveInfinity
    } else {
      val res = dst_val / (2 * (pu_train_val - pu_test_val))
      if (res < 0) Double.PositiveInfinity else res
    }
  }
  
  def my_min(a: Double, b: Double) = {min(a, b)}
        
  val lengths = dst.cartesian(sc.range(0, T))
    .map { case (((n, m), dst_val), t) => ((n, t), (m, dst_val)) }  
    .join(pu_train) 
    .map { case ((n, t), ((m, dst_val), pu_train_val)) => ((m, t), (n, dst_val, pu_train_val)) }
    .join(pu_test) 
    .map { case ((m, t), ((n, dst_val, pu_train_val), pu_test_val)) => ((m, t), compute_length(n, m, dst_val, pu_train_val, pu_test_val)) } 
    .aggregateByKey(Double.PositiveInfinity)(my_min, my_min)  
  lengths
}

val lengths = compute_ray_lengths()
compute_ray_lengths: ()org.apache.spark.rdd.RDD[((Long, Long), Double)]
lengths: org.apache.spark.rdd.RDD[((Long, Long), Double)] = ShuffledRDD[10509] at aggregateByKey at command-685894176423991:20

Compute the approximated weights.

def compute_weights() = { // (M, )
  def agg_f(a: (Double, Double), b: (Double, Double)) = { (a._1 + b._1, a._2 + b._2) }
  
  val weights = lengths.map { case ((m, t), length) => (m, if (!length.isInfinity) (1.0, length) else (0.0, 0.0)) }
    .aggregateByKey((0.0, 0.0))(agg_f, agg_f)
    .map { case (m, (val1, val2)) => (m, if (val1 > 0) val1 / val2 else 0.0) }
  weights
}

val weights = compute_weights()
compute_weights: ()org.apache.spark.rdd.RDD[(Long, Double)]
weights: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[10512] at map at command-685894176424002:6

Save obtained data in csv.

Note: we repartition the tables here to work with one csv only; this should be removed for larger data.

def save_data(name: String, data: RDD[(Long, DenseVector[Double])]) = {
  data.map { case (k, v) => k.toString() + "," + v.toArray.mkString(",")}
    .toDF.repartition(1).write.format("csv").mode(SaveMode.Overwrite).option("quote", " ").save("dbfs:/FileStore/group17/data/" + name)
}

def save_weights(name: String, data: RDD[(Long, Double)]) = {
  data.map { case (k, v) => k.toString() + "," + v.toString}
    .toDF.repartition(1).write.format("csv").mode(SaveMode.Overwrite).option("quote", " ").save("dbfs:/FileStore/group17/data/" + name)
}

save_data("gaussian_train", train_data)
save_data("gaussian_test", test_data)
save_weights("gaussian_weights", weights)
save_data: (name: String, data: org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])])Unit
save_weights: (name: String, data: org.apache.spark.rdd.RDD[(Long, Double)])Unit