ScaDaMaLe Course site and book

Robotics Dataset

The estimation of probability density functions in a configuration space is of fundamental importance for many applications in probabilistic robotics and sampling-based robot motion planning.

Why is this application relevant? Standard sampling-based motion planners such as RRTConnect randomly explore the search space to iteratively build up a solution path. To speed up the planning it is common to use heuristics to explore the space in an informed way. The estimated densities are useful for designing such heurisitics. High-density regions might for example indicate narrow passages in the search space.

In the following we are going to apply the previously introduced algorithm to estimate densities of points in the configuration space of a multi-joint articulated robot arm. As shown in the figure below, the robot consists of 10 rotational joints and is placed in a 2-dim workspace. Several workspace obstacles are placed in the scene.

We ran RRT-Connect (a well-known shortest path motion planner) for different intial and goal configuration of the robot. The robot always starts in the right half of the workspace while the goal lies within one of the narrow passages as depicted in the figure.

To generate a dataset, we stitched all configurations from all planned paths together to one collection of joint configurations. Our goal is to estimate the density of robot configurations as generated by the RRTConnect motion planner. Originally, we used a dataset of ~125k points in 10 dimensions. Unfortunately, we observed that the current implementation does not scale well to such large dataset. For the scope of this project we instead run the method for 1000 points. Scaling up the implementation to larger dataset will be done in future work.

Here are some examples of paths found by the planner (red -> final configuration, black -> initial configuration, green-> intermediate confgurations along the path):

# # Read data from file
# joint_centers_all = spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/robert.gieselmann@gmail.com/robotics_dataset_joint_centers.csv",inferSchema =True,header=False)
# joint_centers_to_plot = np.reshape(np.array(joint_centers_all.collect()), (-1, 2))

# # Plot 2d histogram of joint center positions
# f = plt.figure()
# plt.hist2d(joint_centers_to_plot[:,0], joint_centers_to_plot[:,1], bins=250)
# plt.xlabel("x")
# plt.ylabel("y")
# plt.title("Distribution of joint center coordinates")

# # Display figure
# display()
#import os
#os.remove("/dbfs/FileStore/group17/data/robotics_test/joint_configs_test.csv")
#os.remove("/dbfs/FileStore/group17/data/robotics_train/joint_configs_train.csv")
#os.listdir("/dbfs/FileStore/group17/data/robotics_train")
#os.rename("/dbfs/FileStore/group17/data/robotics", "/dbfs/FileStore/group17/data/robotics_train")
import org.apache.spark.mllib.random.RandomRDDs
import org.apache.spark.mllib.feature.Normalizer
import org.apache.spark.rdd.RDD
import breeze.linalg._
import breeze.numerics._
import org.apache.spark.mllib.random.RandomRDDs
import org.apache.spark.mllib.feature.Normalizer
import org.apache.spark.rdd.RDD
import breeze.linalg._
import breeze.numerics._
// Constants
val N = 1000 // train size
val M = 100 // test size
val D = 10 // dimensionality
val T = 100 // number of rays
val one_vs_all = true
N: Int = 1000
M: Int = 100
D: Int = 10
T: Int = 100
one_vs_all: Boolean = true
// Read the files from csv and convert to RDD
val df_train = spark.read.option("inferSchema", "true").option("header", "false").format("csv").load("/FileStore/group17/data/robotics_train/joint_configs_train.csv")
val df_test = spark.read.option("inferSchema", "true").option("header", "false").format("csv").load("/FileStore/group17/data/robotics_test/joint_configs_test.csv")
df_train: org.apache.spark.sql.DataFrame = [_c0: double, _c1: double ... 8 more fields]
df_test: org.apache.spark.sql.DataFrame = [_c0: double, _c1: double ... 8 more fields]
// Convert to RDD
val rdd_train = df_train.rdd.map(_.toSeq.toArray.map(_.toString.toDouble))
val rdd_test = df_train.rdd.map(_.toSeq.toArray.map(_.toString.toDouble))

// Convert to Array[(Long, DenseVector)]
val train_data = rdd_train.zipWithIndex().map { case (v, i) => (i, new DenseVector(v.toArray)) }
val test_data = if(one_vs_all) train_data else rdd_test.zipWithIndex().map { case (v, i) => (i, new DenseVector(v.toArray)) }
rdd_train: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[2570] at map at command-3389902380791479:2
rdd_test: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[2571] at map at command-3389902380791479:3
train_data: org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])] = MapPartitionsRDD[2573] at map at command-3389902380791479:6
test_data: org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])] = MapPartitionsRDD[2573] at map at command-3389902380791479:6
train_data.collect
def get_uni_sphere() = {
  var u = RandomRDDs.normalVectorRDD(sc, T, D)
  u = new Normalizer().transform(u)
  var t = u.zipWithIndex().map { case (v, i) => (i, new DenseVector(v.toArray)) }
  t
}

val rays = get_uni_sphere()
get_uni_sphere: ()org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])]
rays: org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])] = MapPartitionsRDD[2577] at map at command-3389902380791431:4
def compute_dst_sq() = { // (N, M)
  // dst[n, m] = |x_n - x'_m|^2
  val dst = train_data.cartesian(test_data).map { case ((n, train_vec), (m, test_vec)) => ((n, m), sum(((train_vec - test_vec) *:* (train_vec - test_vec)) ^:^ 2.0) ) }
  dst
}

def compute_pu(data: RDD[(Long, DenseVector[Double])]) = { // (data.N, T)
  // pu[n, t] = <data_n, ray_t>
  val pu = data.cartesian(rays).map { case ((n, data_vec), (t, ray_vec)) => ((n, t), data_vec dot ray_vec) }
  pu
}

val dst = compute_dst_sq()
val pu_train = compute_pu(train_data)
val pu_test = compute_pu(test_data)
compute_dst_sq: ()org.apache.spark.rdd.RDD[((Long, Long), Double)]
compute_pu: (data: org.apache.spark.rdd.RDD[(Long, breeze.linalg.DenseVector[Double])])org.apache.spark.rdd.RDD[((Long, Long), Double)]
dst: org.apache.spark.rdd.RDD[((Long, Long), Double)] = MapPartitionsRDD[2579] at map at command-3389902380791475:3
pu_train: org.apache.spark.rdd.RDD[((Long, Long), Double)] = MapPartitionsRDD[2581] at map at command-3389902380791475:9
pu_test: org.apache.spark.rdd.RDD[((Long, Long), Double)] = MapPartitionsRDD[2583] at map at command-3389902380791475:9
def compute_ray_lengths() = { // (M, T)
  // lengths[m, t, n] = dst[n, m] / (2 * (pu_train[n, t] - pu_test[m, t]))
  def compute_length(n: Long, m: Long, dst_val: Double, pu_train_val: Double, pu_test_val: Double) = {
    if (one_vs_all && n == m) {
      Double.PositiveInfinity
    } else {
      val res = dst_val / (2 * (pu_train_val - pu_test_val))
      if (res < 0) Double.PositiveInfinity else res
    }
  }

  def my_min(a: Double, b: Double) = {min(a, b)}

  val lengths = dst.cartesian(sc.range(0, T))
    .map { case (((n, m), dst_val), t) => ((n, t), (m, dst_val)) }  
    .join(pu_train)
    .map { case ((n, t), ((m, dst_val), pu_train_val)) => ((m, t), (n, dst_val, pu_train_val)) }
    .join(pu_test)
    .map { case ((m, t), ((n, dst_val, pu_train_val), pu_test_val)) => ((m, t), compute_length(n, m, dst_val, pu_train_val, pu_test_val)) }
    .aggregateByKey(Double.PositiveInfinity)(my_min, my_min)  
  lengths
}

val lengths = compute_ray_lengths()
compute_ray_lengths: ()org.apache.spark.rdd.RDD[((Long, Long), Double)]
lengths: org.apache.spark.rdd.RDD[((Long, Long), Double)] = ShuffledRDD[2596] at aggregateByKey at command-3389902380791476:20
def compute_weights() = { // (M, )
  def agg_f(a: (Double, Double), b: (Double, Double)) = { (a._1 + b._1, a._2 + b._2) }

  val weights = lengths.map { case ((m, t), length) => (m, if (!length.isInfinity) (1.0, length) else (0.0, 0.0)) }
    .aggregateByKey((0.0, 0.0))(agg_f, agg_f)
    .map { case (m, (val1, val2)) => (m, if (val1 > 0) val1 / val2 else 0.0) }
  weights
}

val weights = compute_weights()
compute_weights: ()org.apache.spark.rdd.RDD[(Long, Double)]
weights: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[2599] at map at command-3389902380791477:6
//def save_data(name: String, data: RDD[(Long, DenseVector[Double])]) = {
//  data.map { case (k, v) => k.toString() + "," + v.toArray.mkString(",")}
//    .toDF.repartition(1).write.format("csv").mode(SaveMode.Overwrite).option("quote", " ").save("dbfs:/FileStore/group17/data/" + name)
//}

def save_weights(name: String, data: RDD[(Long, Double)]) = {
  data.map { case (k, v) => k.toString() + "," + v.toString}
    .toDF.repartition(1).write.format("csv").mode(SaveMode.Overwrite).option("quote", " ").save("dbfs:/FileStore/group17/data/" + name)
}

save_weights("robotics_weights", weights)
save_weights: (name: String, data: org.apache.spark.rdd.RDD[(Long, Double)])Unit
weights.collect