001_GeospatialAnalyticsInMagellan(Scala)

What is Geospatial Analytics?

(watch now 3 minutes and 23 seconds: 111-314 seconds):

Spark Summit East 2016 - What is Geospatial Analytics by Ram Sri Harsha

Some Concrete Examples of Scalable Geospatial Analytics

Let us check out cross-domain data fusion in MSR's Urban Computing Group

Several sciences are naturally geospatial

  • forestry,
  • geography,
  • geology,
  • seismology,
  • etc. etc.

See for example the global EQ datastreams from US geological Service below.

For a global data source, see US geological Service's Earthquake hazards Program "http://earthquake.usgs.gov/data/.

Introduction to Magellan for Scalable Geospatial Analytics

This is a minor augmentation of Ram Harsha's Magellan code blogged here:

First you need to attach the following library:

  • the magellan library (maven coordinates harsha2010:magellan:1.0.5-s_2.11)

Do we need one more geospatial analytics library?

From Ram's slide 4 of this Spark Summit East 2016 talk at slideshare:

  • Spatial Analytics at scale is challenging
    • Simplicity + Scalability = Hard
  • Ancient Data Formats
    • metadata, indexing not handled well, inefficient storage
  • Geospatial Analytics is not simply Business Intelligence anymore
    • Statistical + Machine Learning being leveraged in geospatial
  • Now is the time to do it!
    • Explosion of mobile data
    • Finer granularity of data collection for geometries
    • Analytics stretching the limits of traditional approaches
    • Spark SQL + Catalyst + Tungsten makes extensible SQL engines easier than ever before!

Nuts and Bolts of Magellan

Let us go and grab this databricks notebook:

and look at the magellan README in github:

HOMEWORK: Watch the magellan presentation by Ram Harsha (Hortonworks) in Spark Summit East 2016.

Other resources for magellan:

Let's get our hands dirty with basics in magellan.

Data Structures

  • Points
  • Polygons
  • lines
  • Polylines

Predicates

  • within
  • intersects
// create a points DataFrame
val points = sc.parallelize(Seq((-1.0, -1.0), (-1.0, 1.0), (1.0, -1.0))).toDF("x", "y")
points: org.apache.spark.sql.DataFrame = [x: double, y: double]
// transform (lat,lon) into Point using custom user-defined function
import magellan.Point
import org.apache.spark.sql.functions.udf
val toPointUDF = udf{(x:Double,y:Double) => Point(x,y) }
import magellan.Point import org.apache.spark.sql.functions.udf toPointUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,org.apache.spark.sql.types.PointUDT@105fcd11,Some(List(DoubleType, DoubleType)))
// let's show the results of the DF with a new column called point
points.withColumn("point", toPointUDF('x, 'y)).show()
+----+----+-----------------+ | x| y| point| +----+----+-----------------+ |-1.0|-1.0|Point(-1.0, -1.0)| |-1.0| 1.0| Point(-1.0, 1.0)| | 1.0|-1.0| Point(1.0, -1.0)| +----+----+-----------------+
// Let's instead use the built-in expression to do the same - it's much faster on larger DataFrames due to code-gen
import org.apache.spark.sql.magellan.dsl.expressions._

points.withColumn("point", point('x, 'y)).show()
+----+----+-----------------+ | x| y| point| +----+----+-----------------+ |-1.0|-1.0|Point(-1.0, -1.0)| |-1.0| 1.0| Point(-1.0, 1.0)| | 1.0|-1.0| Point(1.0, -1.0)| +----+----+-----------------+ import org.apache.spark.sql.magellan.dsl.expressions._

Let's verify empirically if it is indeed faster for larger DataFrames.

// to generate a sequence of pairs of random numbers we can do:
import util.Random.nextDouble
Seq.fill(10)((-1.0*nextDouble,+1.0*nextDouble))
import util.Random.nextDouble res2: Seq[(Double, Double)] = List((-0.020043427602710828,0.9375053662414891), (-0.994920524839198,0.845271190508138), (-0.1501812761209732,0.10704139325335771), (-0.9891649012229055,0.8031283537358862), (-0.9576677869252214,0.4852309234418518), (-0.3615417292821861,0.026888794684844397), (-0.20066285059225897,0.32093278495843036), (-0.7157377454281582,0.9061198917840395), (-0.1812174392506678,0.19036607653819304), (-0.0999544225947615,0.5381675138406278))
// using the UDF method with 1 million points we can do a count action of the DF with point column
// don'yt add too many zeros as it may crash your driver program
sc.parallelize(Seq.fill(1000000)((-1.0*nextDouble,+1.0*nextDouble)))
  .toDF("x", "y")
  .withColumn("point", toPointUDF('x, 'y))
  .count()
res3: Long = 1000000
// seems twice as fast with code-gen
sc.parallelize(Seq.fill(1000000)((-1.0*nextDouble,+1.0*nextDouble)))
  .toDF("x", "y")
  .withColumn("point", point('x, 'y))
  .count()
res4: Long = 1000000
// Create a Polygon DataFrame
import magellan.Polygon

case class PolygonExample(polygon: Polygon)

val ring = Array(Point(1.0, 1.0), Point(1.0, -1.0), Point(-1.0, -1.0), Point(-1.0, 1.0), Point(1.0, 1.0))
val polygon = Polygon(Array(0), ring)

val polygons = sc.parallelize(Seq(
  PolygonExample(Polygon(Array(0), ring))
)).toDF()

import magellan.Polygon defined class PolygonExample ring: Array[magellan.Point] = Array(Point(1.0, 1.0), Point(1.0, -1.0), Point(-1.0, -1.0), Point(-1.0, 1.0), Point(1.0, 1.0)) polygon: magellan.Polygon = magellan.Polygon@1ed26b1 polygons: org.apache.spark.sql.DataFrame = [polygon: polygon]
polygons.show(false)
+-------------------------+ |polygon | +-------------------------+ |magellan.Polygon@f36f7eca| +-------------------------+
//display(polygons)

Predicates

// join points with polygons upon intersection
points.withColumn("point", point('x, 'y))
      .join(polygons)
      .where($"point" intersects $"polygon")
      .count()
res13: Long = 3
// join points with polygons upon within or containement
points.withColumn("point", point('x, 'y))
      .join(polygons)
      .where($"point" within $"polygon")
      .count()
res14: Long = 0
//creating line from two points
import magellan.Line

case class LineExample(line: Line)

val line = Line(Point(1.0, 1.0), Point(1.0, -1.0))

val lines = sc.parallelize(Seq(
  LineExample(line)
)).toDF()

display(lines)
{"empty":false,"mid":{"empty":true,"valid":true,"type":1},"valid":true,"end":{"empty":true,"valid":true,"type":1},"type":2,"start":{"empty":true,"valid":true,"type":1}}
// creating polyline
import magellan.PolyLine

case class PolyLineExample(polyline: PolyLine)

val ring = Array(Point(1.0, 1.0), Point(1.0, -1.0), Point(-1.0, -1.0), Point(-1.0, 1.0))

val polylines1 = sc.parallelize(Seq(
  PolyLineExample(PolyLine(Array(0), ring))
)).toDF()

import magellan.PolyLine defined class PolyLineExample ring: Array[magellan.Point] = Array(Point(1.0, 1.0), Point(1.0, -1.0), Point(-1.0, -1.0), Point(-1.0, 1.0)) polylines1: org.apache.spark.sql.DataFrame = [polyline: polyline]
display(polylines1)
{"xcoordinates":[1,1,-1,-1],"indices":[0],"empty":false,"ycoordinates":[1,-1,-1,1],"boundingBox":{"xmin":-1,"ymin":-1,"xmax":1,"ymax":1},"valid":true,"type":3}
// now let's make a polyline with two or more lines out of the same ring
val polylines2 = sc.parallelize(Seq(
  PolyLineExample(PolyLine(Array(0,2), ring)) // first line starts are index 0 and second one starts at index 2
)).toDF()

display(polylines2)
{"xcoordinates":[1,1,-1,-1],"indices":[0,2],"empty":false,"ycoordinates":[1,-1,-1,1],"boundingBox":{"xmin":-1,"ymin":-1,"xmax":1,"ymax":1},"valid":true,"type":3}

Check out the NYC Taxi Dataset in Magellan

This is a much larger dataset and we may need access to a larger cluster - unless we just analyse a smaller subset of the data (perhaps just a month of Taxi rides in NYC). We can understand the same concepts using a much smaller dataset of Uber rides in San Francisco. We will analyse this next.

Uber Dataset for the Demo done by Ram Harsha in Europe Spark Summit 2015

First the datasets have to be loaded. See the section below on Downloading datasets and putting them in distributed file system for doing this anew (This only needs to be done once if the data is persisted in the distributed file system).

After downloading the data, we expect to have the following files in distributed file system (dbfs):

  • all.tsv is the file of all uber trajectories
  • SFNbhd is the directory containing SF neighborhood shape files.
display(dbutils.fs.ls("dbfs:/datasets/magellan/")) // display the contents of the dbfs directory "dbfs:/datasets/magellan/"
dbfs:/datasets/magellan/SFNbhd/SFNbhd/0
dbfs:/datasets/magellan/all.tsvall.tsv60947802

First five lines or rows of the uber data containing: tripID, timestamp, Lon, Lat

sc.textFile("dbfs:/datasets/magellan/all.tsv").take(5).foreach(println)
00001 2007-01-07T10:54:50+00:00 37.782551 -122.445368 00001 2007-01-07T10:54:54+00:00 37.782745 -122.444586 00001 2007-01-07T10:54:58+00:00 37.782842 -122.443688 00001 2007-01-07T10:55:02+00:00 37.782919 -122.442815 00001 2007-01-07T10:55:06+00:00 37.782992 -122.442112
display(dbutils.fs.ls("dbfs:/datasets/magellan/SFNbhd")) // legacy shape files
dbfs:/datasets/magellan/SFNbhd/planning_neighborhoods.dbfplanning_neighborhoods.dbf1028
dbfs:/datasets/magellan/SFNbhd/planning_neighborhoods.prjplanning_neighborhoods.prj567
dbfs:/datasets/magellan/SFNbhd/planning_neighborhoods.sbnplanning_neighborhoods.sbn516
dbfs:/datasets/magellan/SFNbhd/planning_neighborhoods.sbxplanning_neighborhoods.sbx164
dbfs:/datasets/magellan/SFNbhd/planning_neighborhoods.shpplanning_neighborhoods.shp214576
dbfs:/datasets/magellan/SFNbhd/planning_neighborhoods.shp.xmlplanning_neighborhoods.shp.xml21958
dbfs:/datasets/magellan/SFNbhd/planning_neighborhoods.shxplanning_neighborhoods.shx396

Homework

First watch the more technical magellan presentation by Ram Sri Harsha (Hortonworks) in Spark Summit Europe 2015

Ram Sri Harsha's Magellan Spark Summit EU 2015 Talk]

Second, carefully repeat Ram's original analysis from the following blog as done below.

Ram's blog in HortonWorks and the ZeppelinHub view of the demo code in video above

This is just to get you started... You may need to moidfy this!

case class UberRecord(tripId: String, timestamp: String, point: Point) // a case class for UberRecord 
defined class UberRecord
val uber = sc.textFile("dbfs:/datasets/magellan/all.tsv")
              .map { line =>
                      val parts = line.split("\t" )
                      val tripId = parts(0)
                      val timestamp = parts(1)
                      val point = Point(parts(3).toDouble, parts(2).toDouble)
                      UberRecord(tripId, timestamp, point)
                    }
                     //.repartition(100) // using default repartition
                     .toDF()
                     .cache()
uber: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tripId: string, timestamp: string ... 1 more field]
val uberRecordCount = uber.count() // how many Uber records?
uberRecordCount: Long = 1128663

So there are over a million UberRecords.

val neighborhoods = sqlContext.read.format("magellan") // this may be busted... try to make it work...
                                   .load("dbfs:/datasets/magellan/SFNbhd/")
                                   .select($"polygon", $"metadata")
                                   .cache()
neighborhoods: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [polygon: polygon, metadata: map<string,string>]
neighborhoods.count() // how many neighbourhoods in SF?
res28: Long = 37
neighborhoods.printSchema
root |-- polygon: polygon (nullable = true) |-- metadata: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true)
neighborhoods.show(2,false) // see the first two neighbourhoods
+-------------------------+--------------------------------------------+ |polygon |metadata | +-------------------------+--------------------------------------------+ |magellan.Polygon@e18dd641|Map(neighborho -> Twin Peaks )| |magellan.Polygon@46d47c8 |Map(neighborho -> Pacific Heights )| +-------------------------+--------------------------------------------+ only showing top 2 rows
import org.apache.spark.sql.functions._ // this is needed for sql functions like explode, etc.
import org.apache.spark.sql.functions._
//names of all 37 neighborhoods of San Francisco
neighborhoods.select(explode($"metadata").as(Seq("k", "v"))).show(37,false)
+----------+-------------------------+ |k |v | +----------+-------------------------+ |neighborho|Twin Peaks | |neighborho|Pacific Heights | |neighborho|Visitacion Valley | |neighborho|Potrero Hill | |neighborho|Crocker Amazon | |neighborho|Outer Mission | |neighborho|Bayview | |neighborho|Lakeshore | |neighborho|Russian Hill | |neighborho|Golden Gate Park | |neighborho|Outer Sunset | |neighborho|Inner Sunset | |neighborho|Excelsior | |neighborho|Outer Richmond | |neighborho|Parkside | |neighborho|Bernal Heights | |neighborho|Noe Valley | |neighborho|Presidio | |neighborho|Nob Hill | |neighborho|Financial District | |neighborho|Glen Park | |neighborho|Marina | |neighborho|Seacliff | |neighborho|Mission | |neighborho|Downtown/Civic Center | |neighborho|South of Market | |neighborho|Presidio Heights | |neighborho|Inner Richmond | |neighborho|Castro/Upper Market | |neighborho|West of Twin Peaks | |neighborho|Ocean View | |neighborho|Treasure Island/YBI | |neighborho|Chinatown | |neighborho|Western Addition | |neighborho|North Beach | |neighborho|Diamond Heights | |neighborho|Haight Ashbury | +----------+-------------------------+

This join below yields nothing.

So what's going on?

Watch Ram's 2015 Spark Summit talk for details on geospatial formats and transformations.

neighborhoods
  .join(uber)
  .where($"point" within $"polygon")
  .select($"tripId", $"timestamp", explode($"metadata").as(Seq("k", "v")))
  .withColumnRenamed("v", "neighborhood")
  .drop("k")
  .show(5)

+------+---------+------------+ |tripId|timestamp|neighborhood| +------+---------+------------+ +------+---------+------------+

Need the right transformer to transform the points into the right coordinate system of the shape files.

// This code was removed from magellan in this commit:
// https://github.com/harsha2010/magellan/commit/8df0a62560116f8ed787fc7e86f190f8e2730826
// We bring this back to show how to roll our own transformations.
import magellan.Point

class NAD83(params: Map[String, Any]) {
  val RAD = 180d / Math.PI
  val ER  = 6378137.toDouble  // semi-major axis for GRS-80
  val RF  = 298.257222101  // reciprocal flattening for GRS-80
  val F   = 1.toDouble / RF  // flattening for GRS-80
  val ESQ = F + F - (F * F)
  val E   = StrictMath.sqrt(ESQ)

  private val ZONES =  Map(
    401 -> Array(122.toDouble, 2000000.0001016,
      500000.0001016001, 40.0,
      41.66666666666667, 39.33333333333333),
    403 -> Array(120.5, 2000000.0001016,
      500000.0001016001, 37.06666666666667,
      38.43333333333333, 36.5)
  )

  def from() = {
    val zone = params("zone").asInstanceOf[Int]
    ZONES.get(zone) match {
      case Some(x) => if (x.length == 5) {
        toTransverseMercator(x)
      } else {
        toLambertConic(x)
      }
      case None => ???
    }
  }

  def to() = {
    val zone = params("zone").asInstanceOf[Int]
    ZONES.get(zone) match {
      case Some(x) => if (x.length == 5) {
        fromTransverseMercator(x)
      } else {
        fromLambertConic(x)
      }
      case None => ???
    }
  }

  def qqq(e: Double, s: Double) = {
    (StrictMath.log((1 + s) / (1 - s)) - e *
      StrictMath.log((1 + e * s) / (1 - e * s))) / 2
  }

  def toLambertConic(params: Array[Double]) = {
    val cm = params(0) / RAD  // CENTRAL MERIDIAN (CM)
    val eo = params(1)  // FALSE EASTING VALUE AT THE CM (METERS)
    val nb = params(2)  // FALSE NORTHING VALUE AT SOUTHERMOST PARALLEL (METERS), (USUALLY ZERO)
    val fis = params(3) / RAD  // LATITUDE OF SO. STD. PARALLEL
    val fin = params(4) / RAD  // LATITUDE OF NO. STD. PARALLEL
    val fib = params(5) / RAD // LATITUDE OF SOUTHERNMOST PARALLEL
    val sinfs = StrictMath.sin(fis)
    val cosfs = StrictMath.cos(fis)
    val sinfn = StrictMath.sin(fin)
    val cosfn = StrictMath.cos(fin)
    val sinfb = StrictMath.sin(fib)
    val qs = qqq(E, sinfs)
    val qn = qqq(E, sinfn)
    val qb = qqq(E, sinfb)
    val w1 = StrictMath.sqrt(1.toDouble - ESQ * sinfs * sinfs)
    val w2 = StrictMath.sqrt(1.toDouble - ESQ * sinfn * sinfn)
    val sinfo = StrictMath.log(w2 * cosfs / (w1 * cosfn)) / (qn - qs)
    val k = ER * cosfs * StrictMath.exp(qs * sinfo) / (w1 * sinfo)
    val rb = k / StrictMath.exp(qb * sinfo)

    (point: Point) => {
      val (long, lat) = (point.getX(), point.getY())
      val l = - long / RAD
      val f = lat / RAD
      val q = qqq(E, StrictMath.sin(f))
      val r = k / StrictMath.exp(q * sinfo)
      val gam = (cm - l) * sinfo
      val n = rb + nb - (r * StrictMath.cos(gam))
      val e = eo + (r * StrictMath.sin(gam))
      Point(e, n)
    }
  }

  def toTransverseMercator(params: Array[Double]) = {
    (point: Point) => {
      point
    }
  }

  def fromLambertConic(params: Array[Double]) = {
    val cm = params(0) / RAD  // CENTRAL MERIDIAN (CM)
    val eo = params(1)  // FALSE EASTING VALUE AT THE CM (METERS)
    val nb = params(2)  // FALSE NORTHING VALUE AT SOUTHERMOST PARALLEL (METERS), (USUALLY ZERO)
    val fis = params(3) / RAD  // LATITUDE OF SO. STD. PARALLEL
    val fin = params(4) / RAD  // LATITUDE OF NO. STD. PARALLEL
    val fib = params(5) / RAD // LATITUDE OF SOUTHERNMOST PARALLEL
    val sinfs = StrictMath.sin(fis)
    val cosfs = StrictMath.cos(fis)
    val sinfn = StrictMath.sin(fin)
    val cosfn = StrictMath.cos(fin)
    val sinfb = StrictMath.sin(fib)

    val qs = qqq(E, sinfs)
    val qn = qqq(E, sinfn)
    val qb = qqq(E, sinfb)
    val w1 = StrictMath.sqrt(1.toDouble - ESQ * sinfs * sinfs)
    val w2 = StrictMath.sqrt(1.toDouble - ESQ * sinfn * sinfn)
    val sinfo = StrictMath.log(w2 * cosfs / (w1 * cosfn)) / (qn - qs)
    val k = ER * cosfs * StrictMath.exp(qs * sinfo) / (w1 * sinfo)
    val rb = k / StrictMath.exp(qb * sinfo)
    (point: Point) => {
      val easting = point.getX()
      val northing = point.getY()
      val npr = rb - northing + nb
      val epr = easting - eo
      val gam = StrictMath.atan(epr / npr)
      val lon = cm - (gam / sinfo)
      val rpt = StrictMath.sqrt(npr * npr + epr * epr)
      val q = StrictMath.log(k / rpt) / sinfo
      val temp = StrictMath.exp(q + q)
      var sine = (temp - 1.toDouble) / (temp + 1.toDouble)
      var f1, f2 = 0.0
      for (i <- 0 until 2) {
        f1 = ((StrictMath.log((1.toDouble + sine) / (1.toDouble - sine)) - E *
          StrictMath.log((1.toDouble + E * sine) / (1.toDouble - E * sine))) / 2.toDouble) - q
        f2 = 1.toDouble / (1.toDouble - sine * sine) - ESQ / (1.toDouble - ESQ * sine * sine)
        sine -= (f1/ f2)
      }
      Point(StrictMath.toDegrees(lon) * -1, StrictMath.toDegrees(StrictMath.asin(sine)))
    }
  }

  def fromTransverseMercator(params: Array[Double]) = {
    val cm = params(0)  // CENTRAL MERIDIAN (CM)
    val fe = params(1)  // FALSE EASTING VALUE AT THE CM (METERS)
    val or = params(2) / RAD  // origin latitude
    val sf = 1.0 - (1.0 / params(3)) // scale factor
    val fn = params(4)  // false northing
    // translated from TCONPC subroutine
    val eps = ESQ / (1.0 - ESQ)
    val pr = (1.0 - F) * ER
    val en = (ER - pr) / (ER + pr)
    val en2 = en * en
    val en3 = en * en * en
    val en4 = en2 * en2

    var c2 = -3.0 * en / 2.0 + 9.0 * en3 / 16.0
    var c4 = 15.0d * en2 / 16.0d - 15.0d * en4 /32.0
    var c6 = -35.0 * en3 / 48.0
    var c8 = 315.0 * en4 / 512.0
    val u0 = 2.0 * (c2 - 2.0 * c4 + 3.0 * c6 - 4.0 * c8)
    val u2 = 8.0 * (c4 - 4.0 * c6 + 10.0 * c8)
    val u4 = 32.0 * (c6 - 6.0 * c8)
    val u6 = 129.0 * c8

    c2 = 3.0 * en / 2.0 - 27.0 * en3 / 32.0
    c4 = 21.0 * en2 / 16.0 - 55.0 * en4 / 32.0d
    c6 = 151.0 * en3 / 96.0
    c8 = 1097.0d * en4 / 512.0
    val v0 = 2.0 * (c2 - 2.0 * c4 + 3.0 * c6 - 4.0 * c8)
    val v2 = 8.0 * (c4 - 4.0 * c6 + 10.0 * c8)
    val v4 = 32.0 * (c6 - 6.0 * c8)
    val v6 = 128.0 * c8

    val r = ER * (1.0 - en) * (1.0 - en * en) * (1.0 + 2.25 * en * en + (225.0 / 64.0) * en4)
    val cosor = StrictMath.cos(or)
    val omo = or + StrictMath.sin(or) * cosor *
      (u0 + u2 * cosor * cosor + u4 * StrictMath.pow(cosor, 4) + u6 * StrictMath.pow(cosor, 6))
    val so = sf * r * omo

    (point: Point) => {
      val easting = point.getX()
      val northing = point.getY()
      // translated from TMGEOD subroutine
      val om = (northing - fn + so) / (r * sf)
      val cosom = StrictMath.cos(om)
      val foot = om + StrictMath.sin(om) * cosom *
        (v0 + v2 * cosom * cosom + v4 * StrictMath.pow(cosom, 4) + v6 * StrictMath.pow(cosom, 6))
      val sinf = StrictMath.sin(foot)
      val cosf = StrictMath.cos(foot)
      val tn = sinf / cosf
      val ts = tn * tn
      val ets = eps * cosf * cosf
      val rn = ER * sf / StrictMath.sqrt(1.0 - ESQ * sinf * sinf)
      val q = (easting - fe) / rn
      val qs = q * q
      val b2 = -tn * (1.0 + ets) / 2.0
      val b4 = -(5.0 + 3.0 * ts + ets * (1.0 - 9.0 * ts) - 4.0 * ets * ets) / 12.0
      val b6 = (61.0 + 45.0 * ts * (2.0 + ts) + ets * (46.0 - 252.0 * ts -60.0 * ts * ts)) / 360.0
      val b1 = 1.0
      val b3 = -(1.0 + ts + ts + ets) / 6.0
      val b5 = (5.0 + ts * (28.0 + 24.0 * ts) + ets * (6.0 + 8.0 * ts)) / 120.0
      val b7 = -(61.0 + 662.0 * ts + 1320.0 * ts * ts + 720.0 * StrictMath.pow(ts, 3)) / 5040.0
      val lat = foot + b2 * qs * (1.0 + qs * (b4 + b6 * qs))
      val l = b1 * q * (1.0 + qs * (b3 + qs * (b5 + b7 * qs)))
      val lon = -l / cosf + cm
      Point(StrictMath.toDegrees(lon) * -1, StrictMath.toDegrees(lat))
    }
  }
}
import magellan.Point defined class NAD83
val transformer: Point => Point = (point: Point) => {
  val from = new NAD83(Map("zone" -> 403)).from()
  val p = point.transform(from)
  Point(3.28084 * p.getX, 3.28084 * p.getY)
}

// add a new column in nad83 coordinates
val uberTransformed = uber
                      .withColumn("nad83", $"point".transform(transformer))
                      .cache()
transformer: magellan.Point => magellan.Point = <function1> uberTransformed: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tripId: string, timestamp: string ... 2 more fields]
uberTransformed.count()
res42: Long = 1128663
uberTransformed.show(5,false) // nad83 transformed points
+------+-------------------------+-----------------------------+---------------------------------------------+ |tripId|timestamp |point |nad83 | +------+-------------------------+-----------------------------+---------------------------------------------+ |00001 |2007-01-07T10:54:50+00:00|Point(-122.445368, 37.782551)|Point(5999523.477715266, 2113253.7290443885) | |00001 |2007-01-07T10:54:54+00:00|Point(-122.444586, 37.782745)|Point(5999750.8888492435, 2113319.6570987953)| |00001 |2007-01-07T10:54:58+00:00|Point(-122.443688, 37.782842)|Point(6000011.08106823, 2113349.5785887106) | |00001 |2007-01-07T10:55:02+00:00|Point(-122.442815, 37.782919)|Point(6000263.898268142, 2113372.3716762937) | |00001 |2007-01-07T10:55:06+00:00|Point(-122.442112, 37.782992)|Point(6000467.566895697, 2113394.7303657546) | +------+-------------------------+-----------------------------+---------------------------------------------+ only showing top 5 rows
uberTransformed.select("tripId").distinct().count() // number of unique tripIds
res45: Long = 24999

Let' try the join again after appropriate transformation of coordinate system.

val joined = neighborhoods
              .join(uberTransformed)
              .where($"nad83" within $"polygon")
              .select($"tripId", $"timestamp", explode($"metadata").as(Seq("k", "v")))
              .withColumnRenamed("v", "neighborhood")
              .drop("k")
              .cache()
joined: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tripId: string, timestamp: string ... 1 more field]
val UberRecordsInNbhdsCount = joined.count() // about 131 seconds for first action (doing broadcast hash join)
UberRecordsInNbhdsCount: Long = 1085087
joined.show(5,false)
+------+-------------------------+-------------------------+ |tripId|timestamp |neighborhood | +------+-------------------------+-------------------------+ |00001 |2007-01-07T10:54:50+00:00|Western Addition | |00001 |2007-01-07T10:54:54+00:00|Western Addition | |00001 |2007-01-07T10:54:58+00:00|Western Addition | |00001 |2007-01-07T10:55:02+00:00|Western Addition | |00001 |2007-01-07T10:55:06+00:00|Western Addition | +------+-------------------------+-------------------------+ only showing top 5 rows
uberRecordCount - UberRecordsInNbhdsCount // records not in the neighbouthood shape files
res48: Long = 43576
joined
  .groupBy($"neighborhood")
  .agg(countDistinct("tripId")
  .as("trips"))
  .orderBy(col("trips").desc)
  .show(5,false)
+-------------------------+-----+ |neighborhood |trips| +-------------------------+-----+ |South of Market |9891 | |Western Addition |6794 | |Downtown/Civic Center |6697 | |Financial District |6038 | |Mission |5620 | +-------------------------+-----+ only showing top 5 rows