// create a points DataFrame val points = sc.parallelize(Seq((-1.0, -1.0), (-1.0, 1.0), (1.0, -1.0))).toDF("x", "y")
points: org.apache.spark.sql.DataFrame = [x: double, y: double]
// transform (lat,lon) into Point using custom user-defined function import magellan.Point import org.apache.spark.sql.functions.udf val toPointUDF = udf{(x:Double,y:Double) => Point(x,y) }
import magellan.Point
import org.apache.spark.sql.functions.udf
toPointUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,org.apache.spark.sql.types.PointUDT@105fcd11,Some(List(DoubleType, DoubleType)))
// let's show the results of the DF with a new column called point points.withColumn("point", toPointUDF('x, 'y)).show()
+----+----+-----------------+
| x| y| point|
+----+----+-----------------+
|-1.0|-1.0|Point(-1.0, -1.0)|
|-1.0| 1.0| Point(-1.0, 1.0)|
| 1.0|-1.0| Point(1.0, -1.0)|
+----+----+-----------------+
// Let's instead use the built-in expression to do the same - it's much faster on larger DataFrames due to code-gen import org.apache.spark.sql.magellan.dsl.expressions._ points.withColumn("point", point('x, 'y)).show()
+----+----+-----------------+
| x| y| point|
+----+----+-----------------+
|-1.0|-1.0|Point(-1.0, -1.0)|
|-1.0| 1.0| Point(-1.0, 1.0)|
| 1.0|-1.0| Point(1.0, -1.0)|
+----+----+-----------------+
import org.apache.spark.sql.magellan.dsl.expressions._
// to generate a sequence of pairs of random numbers we can do: import util.Random.nextDouble Seq.fill(10)((-1.0*nextDouble,+1.0*nextDouble))
import util.Random.nextDouble
res2: Seq[(Double, Double)] = List((-0.020043427602710828,0.9375053662414891), (-0.994920524839198,0.845271190508138), (-0.1501812761209732,0.10704139325335771), (-0.9891649012229055,0.8031283537358862), (-0.9576677869252214,0.4852309234418518), (-0.3615417292821861,0.026888794684844397), (-0.20066285059225897,0.32093278495843036), (-0.7157377454281582,0.9061198917840395), (-0.1812174392506678,0.19036607653819304), (-0.0999544225947615,0.5381675138406278))
// using the UDF method with 1 million points we can do a count action of the DF with point column // don'yt add too many zeros as it may crash your driver program sc.parallelize(Seq.fill(1000000)((-1.0*nextDouble,+1.0*nextDouble))) .toDF("x", "y") .withColumn("point", toPointUDF('x, 'y)) .count()
res3: Long = 1000000
// Create a Polygon DataFrame import magellan.Polygon case class PolygonExample(polygon: Polygon) val ring = Array(Point(1.0, 1.0), Point(1.0, -1.0), Point(-1.0, -1.0), Point(-1.0, 1.0), Point(1.0, 1.0)) val polygon = Polygon(Array(0), ring) val polygons = sc.parallelize(Seq( PolygonExample(Polygon(Array(0), ring)) )).toDF()
import magellan.Polygon
defined class PolygonExample
ring: Array[magellan.Point] = Array(Point(1.0, 1.0), Point(1.0, -1.0), Point(-1.0, -1.0), Point(-1.0, 1.0), Point(1.0, 1.0))
polygon: magellan.Polygon = magellan.Polygon@1ed26b1
polygons: org.apache.spark.sql.DataFrame = [polygon: polygon]
polygons.show(false)
+-------------------------+
|polygon |
+-------------------------+
|magellan.Polygon@f36f7eca|
+-------------------------+
// join points with polygons upon intersection points.withColumn("point", point('x, 'y)) .join(polygons) .where($"point" intersects $"polygon") .count()
res13: Long = 3
// join points with polygons upon within or containement points.withColumn("point", point('x, 'y)) .join(polygons) .where($"point" within $"polygon") .count()
res14: Long = 0
//creating line from two points import magellan.Line case class LineExample(line: Line) val line = Line(Point(1.0, 1.0), Point(1.0, -1.0)) val lines = sc.parallelize(Seq( LineExample(line) )).toDF() display(lines)
// creating polyline import magellan.PolyLine case class PolyLineExample(polyline: PolyLine) val ring = Array(Point(1.0, 1.0), Point(1.0, -1.0), Point(-1.0, -1.0), Point(-1.0, 1.0)) val polylines1 = sc.parallelize(Seq( PolyLineExample(PolyLine(Array(0), ring)) )).toDF()
import magellan.PolyLine
defined class PolyLineExample
ring: Array[magellan.Point] = Array(Point(1.0, 1.0), Point(1.0, -1.0), Point(-1.0, -1.0), Point(-1.0, 1.0))
polylines1: org.apache.spark.sql.DataFrame = [polyline: polyline]
// now let's make a polyline with two or more lines out of the same ring val polylines2 = sc.parallelize(Seq( PolyLineExample(PolyLine(Array(0,2), ring)) // first line starts are index 0 and second one starts at index 2 )).toDF() display(polylines2)
display(dbutils.fs.ls("dbfs:/datasets/magellan/")) // display the contents of the dbfs directory "dbfs:/datasets/magellan/"
sc.textFile("dbfs:/datasets/magellan/all.tsv").take(5).foreach(println)
00001 2007-01-07T10:54:50+00:00 37.782551 -122.445368
00001 2007-01-07T10:54:54+00:00 37.782745 -122.444586
00001 2007-01-07T10:54:58+00:00 37.782842 -122.443688
00001 2007-01-07T10:55:02+00:00 37.782919 -122.442815
00001 2007-01-07T10:55:06+00:00 37.782992 -122.442112
case class UberRecord(tripId: String, timestamp: String, point: Point) // a case class for UberRecord
defined class UberRecord
val uber = sc.textFile("dbfs:/datasets/magellan/all.tsv") .map { line => val parts = line.split("\t" ) val tripId = parts(0) val timestamp = parts(1) val point = Point(parts(3).toDouble, parts(2).toDouble) UberRecord(tripId, timestamp, point) } //.repartition(100) // using default repartition .toDF() .cache()
uber: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tripId: string, timestamp: string ... 1 more field]
val neighborhoods = sqlContext.read.format("magellan") // this may be busted... try to make it work... .load("dbfs:/datasets/magellan/SFNbhd/") .select($"polygon", $"metadata") .cache()
neighborhoods: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [polygon: polygon, metadata: map<string,string>]
neighborhoods.printSchema
root
|-- polygon: polygon (nullable = true)
|-- metadata: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
neighborhoods.show(2,false) // see the first two neighbourhoods
+-------------------------+--------------------------------------------+
|polygon |metadata |
+-------------------------+--------------------------------------------+
|magellan.Polygon@e18dd641|Map(neighborho -> Twin Peaks )|
|magellan.Polygon@46d47c8 |Map(neighborho -> Pacific Heights )|
+-------------------------+--------------------------------------------+
only showing top 2 rows
import org.apache.spark.sql.functions._ // this is needed for sql functions like explode, etc.
import org.apache.spark.sql.functions._
//names of all 37 neighborhoods of San Francisco neighborhoods.select(explode($"metadata").as(Seq("k", "v"))).show(37,false)
+----------+-------------------------+
|k |v |
+----------+-------------------------+
|neighborho|Twin Peaks |
|neighborho|Pacific Heights |
|neighborho|Visitacion Valley |
|neighborho|Potrero Hill |
|neighborho|Crocker Amazon |
|neighborho|Outer Mission |
|neighborho|Bayview |
|neighborho|Lakeshore |
|neighborho|Russian Hill |
|neighborho|Golden Gate Park |
|neighborho|Outer Sunset |
|neighborho|Inner Sunset |
|neighborho|Excelsior |
|neighborho|Outer Richmond |
|neighborho|Parkside |
|neighborho|Bernal Heights |
|neighborho|Noe Valley |
|neighborho|Presidio |
|neighborho|Nob Hill |
|neighborho|Financial District |
|neighborho|Glen Park |
|neighborho|Marina |
|neighborho|Seacliff |
|neighborho|Mission |
|neighborho|Downtown/Civic Center |
|neighborho|South of Market |
|neighborho|Presidio Heights |
|neighborho|Inner Richmond |
|neighborho|Castro/Upper Market |
|neighborho|West of Twin Peaks |
|neighborho|Ocean View |
|neighborho|Treasure Island/YBI |
|neighborho|Chinatown |
|neighborho|Western Addition |
|neighborho|North Beach |
|neighborho|Diamond Heights |
|neighborho|Haight Ashbury |
+----------+-------------------------+
neighborhoods .join(uber) .where($"point" within $"polygon") .select($"tripId", $"timestamp", explode($"metadata").as(Seq("k", "v"))) .withColumnRenamed("v", "neighborhood") .drop("k") .show(5)
+------+---------+------------+
|tripId|timestamp|neighborhood|
+------+---------+------------+
+------+---------+------------+
// This code was removed from magellan in this commit: // https://github.com/harsha2010/magellan/commit/8df0a62560116f8ed787fc7e86f190f8e2730826 // We bring this back to show how to roll our own transformations. import magellan.Point class NAD83(params: Map[String, Any]) { val RAD = 180d / Math.PI val ER = 6378137.toDouble // semi-major axis for GRS-80 val RF = 298.257222101 // reciprocal flattening for GRS-80 val F = 1.toDouble / RF // flattening for GRS-80 val ESQ = F + F - (F * F) val E = StrictMath.sqrt(ESQ) private val ZONES = Map( 401 -> Array(122.toDouble, 2000000.0001016, 500000.0001016001, 40.0, 41.66666666666667, 39.33333333333333), 403 -> Array(120.5, 2000000.0001016, 500000.0001016001, 37.06666666666667, 38.43333333333333, 36.5) ) def from() = { val zone = params("zone").asInstanceOf[Int] ZONES.get(zone) match { case Some(x) => if (x.length == 5) { toTransverseMercator(x) } else { toLambertConic(x) } case None => ??? } } def to() = { val zone = params("zone").asInstanceOf[Int] ZONES.get(zone) match { case Some(x) => if (x.length == 5) { fromTransverseMercator(x) } else { fromLambertConic(x) } case None => ??? } } def qqq(e: Double, s: Double) = { (StrictMath.log((1 + s) / (1 - s)) - e * StrictMath.log((1 + e * s) / (1 - e * s))) / 2 } def toLambertConic(params: Array[Double]) = { val cm = params(0) / RAD // CENTRAL MERIDIAN (CM) val eo = params(1) // FALSE EASTING VALUE AT THE CM (METERS) val nb = params(2) // FALSE NORTHING VALUE AT SOUTHERMOST PARALLEL (METERS), (USUALLY ZERO) val fis = params(3) / RAD // LATITUDE OF SO. STD. PARALLEL val fin = params(4) / RAD // LATITUDE OF NO. STD. PARALLEL val fib = params(5) / RAD // LATITUDE OF SOUTHERNMOST PARALLEL val sinfs = StrictMath.sin(fis) val cosfs = StrictMath.cos(fis) val sinfn = StrictMath.sin(fin) val cosfn = StrictMath.cos(fin) val sinfb = StrictMath.sin(fib) val qs = qqq(E, sinfs) val qn = qqq(E, sinfn) val qb = qqq(E, sinfb) val w1 = StrictMath.sqrt(1.toDouble - ESQ * sinfs * sinfs) val w2 = StrictMath.sqrt(1.toDouble - ESQ * sinfn * sinfn) val sinfo = StrictMath.log(w2 * cosfs / (w1 * cosfn)) / (qn - qs) val k = ER * cosfs * StrictMath.exp(qs * sinfo) / (w1 * sinfo) val rb = k / StrictMath.exp(qb * sinfo) (point: Point) => { val (long, lat) = (point.getX(), point.getY()) val l = - long / RAD val f = lat / RAD val q = qqq(E, StrictMath.sin(f)) val r = k / StrictMath.exp(q * sinfo) val gam = (cm - l) * sinfo val n = rb + nb - (r * StrictMath.cos(gam)) val e = eo + (r * StrictMath.sin(gam)) Point(e, n) } } def toTransverseMercator(params: Array[Double]) = { (point: Point) => { point } } def fromLambertConic(params: Array[Double]) = { val cm = params(0) / RAD // CENTRAL MERIDIAN (CM) val eo = params(1) // FALSE EASTING VALUE AT THE CM (METERS) val nb = params(2) // FALSE NORTHING VALUE AT SOUTHERMOST PARALLEL (METERS), (USUALLY ZERO) val fis = params(3) / RAD // LATITUDE OF SO. STD. PARALLEL val fin = params(4) / RAD // LATITUDE OF NO. STD. PARALLEL val fib = params(5) / RAD // LATITUDE OF SOUTHERNMOST PARALLEL val sinfs = StrictMath.sin(fis) val cosfs = StrictMath.cos(fis) val sinfn = StrictMath.sin(fin) val cosfn = StrictMath.cos(fin) val sinfb = StrictMath.sin(fib) val qs = qqq(E, sinfs) val qn = qqq(E, sinfn) val qb = qqq(E, sinfb) val w1 = StrictMath.sqrt(1.toDouble - ESQ * sinfs * sinfs) val w2 = StrictMath.sqrt(1.toDouble - ESQ * sinfn * sinfn) val sinfo = StrictMath.log(w2 * cosfs / (w1 * cosfn)) / (qn - qs) val k = ER * cosfs * StrictMath.exp(qs * sinfo) / (w1 * sinfo) val rb = k / StrictMath.exp(qb * sinfo) (point: Point) => { val easting = point.getX() val northing = point.getY() val npr = rb - northing + nb val epr = easting - eo val gam = StrictMath.atan(epr / npr) val lon = cm - (gam / sinfo) val rpt = StrictMath.sqrt(npr * npr + epr * epr) val q = StrictMath.log(k / rpt) / sinfo val temp = StrictMath.exp(q + q) var sine = (temp - 1.toDouble) / (temp + 1.toDouble) var f1, f2 = 0.0 for (i <- 0 until 2) { f1 = ((StrictMath.log((1.toDouble + sine) / (1.toDouble - sine)) - E * StrictMath.log((1.toDouble + E * sine) / (1.toDouble - E * sine))) / 2.toDouble) - q f2 = 1.toDouble / (1.toDouble - sine * sine) - ESQ / (1.toDouble - ESQ * sine * sine) sine -= (f1/ f2) } Point(StrictMath.toDegrees(lon) * -1, StrictMath.toDegrees(StrictMath.asin(sine))) } } def fromTransverseMercator(params: Array[Double]) = { val cm = params(0) // CENTRAL MERIDIAN (CM) val fe = params(1) // FALSE EASTING VALUE AT THE CM (METERS) val or = params(2) / RAD // origin latitude val sf = 1.0 - (1.0 / params(3)) // scale factor val fn = params(4) // false northing // translated from TCONPC subroutine val eps = ESQ / (1.0 - ESQ) val pr = (1.0 - F) * ER val en = (ER - pr) / (ER + pr) val en2 = en * en val en3 = en * en * en val en4 = en2 * en2 var c2 = -3.0 * en / 2.0 + 9.0 * en3 / 16.0 var c4 = 15.0d * en2 / 16.0d - 15.0d * en4 /32.0 var c6 = -35.0 * en3 / 48.0 var c8 = 315.0 * en4 / 512.0 val u0 = 2.0 * (c2 - 2.0 * c4 + 3.0 * c6 - 4.0 * c8) val u2 = 8.0 * (c4 - 4.0 * c6 + 10.0 * c8) val u4 = 32.0 * (c6 - 6.0 * c8) val u6 = 129.0 * c8 c2 = 3.0 * en / 2.0 - 27.0 * en3 / 32.0 c4 = 21.0 * en2 / 16.0 - 55.0 * en4 / 32.0d c6 = 151.0 * en3 / 96.0 c8 = 1097.0d * en4 / 512.0 val v0 = 2.0 * (c2 - 2.0 * c4 + 3.0 * c6 - 4.0 * c8) val v2 = 8.0 * (c4 - 4.0 * c6 + 10.0 * c8) val v4 = 32.0 * (c6 - 6.0 * c8) val v6 = 128.0 * c8 val r = ER * (1.0 - en) * (1.0 - en * en) * (1.0 + 2.25 * en * en + (225.0 / 64.0) * en4) val cosor = StrictMath.cos(or) val omo = or + StrictMath.sin(or) * cosor * (u0 + u2 * cosor * cosor + u4 * StrictMath.pow(cosor, 4) + u6 * StrictMath.pow(cosor, 6)) val so = sf * r * omo (point: Point) => { val easting = point.getX() val northing = point.getY() // translated from TMGEOD subroutine val om = (northing - fn + so) / (r * sf) val cosom = StrictMath.cos(om) val foot = om + StrictMath.sin(om) * cosom * (v0 + v2 * cosom * cosom + v4 * StrictMath.pow(cosom, 4) + v6 * StrictMath.pow(cosom, 6)) val sinf = StrictMath.sin(foot) val cosf = StrictMath.cos(foot) val tn = sinf / cosf val ts = tn * tn val ets = eps * cosf * cosf val rn = ER * sf / StrictMath.sqrt(1.0 - ESQ * sinf * sinf) val q = (easting - fe) / rn val qs = q * q val b2 = -tn * (1.0 + ets) / 2.0 val b4 = -(5.0 + 3.0 * ts + ets * (1.0 - 9.0 * ts) - 4.0 * ets * ets) / 12.0 val b6 = (61.0 + 45.0 * ts * (2.0 + ts) + ets * (46.0 - 252.0 * ts -60.0 * ts * ts)) / 360.0 val b1 = 1.0 val b3 = -(1.0 + ts + ts + ets) / 6.0 val b5 = (5.0 + ts * (28.0 + 24.0 * ts) + ets * (6.0 + 8.0 * ts)) / 120.0 val b7 = -(61.0 + 662.0 * ts + 1320.0 * ts * ts + 720.0 * StrictMath.pow(ts, 3)) / 5040.0 val lat = foot + b2 * qs * (1.0 + qs * (b4 + b6 * qs)) val l = b1 * q * (1.0 + qs * (b3 + qs * (b5 + b7 * qs))) val lon = -l / cosf + cm Point(StrictMath.toDegrees(lon) * -1, StrictMath.toDegrees(lat)) } } }
import magellan.Point
defined class NAD83
val transformer: Point => Point = (point: Point) => { val from = new NAD83(Map("zone" -> 403)).from() val p = point.transform(from) Point(3.28084 * p.getX, 3.28084 * p.getY) } // add a new column in nad83 coordinates val uberTransformed = uber .withColumn("nad83", $"point".transform(transformer)) .cache()
transformer: magellan.Point => magellan.Point = <function1>
uberTransformed: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tripId: string, timestamp: string ... 2 more fields]
uberTransformed.show(5,false) // nad83 transformed points
+------+-------------------------+-----------------------------+---------------------------------------------+
|tripId|timestamp |point |nad83 |
+------+-------------------------+-----------------------------+---------------------------------------------+
|00001 |2007-01-07T10:54:50+00:00|Point(-122.445368, 37.782551)|Point(5999523.477715266, 2113253.7290443885) |
|00001 |2007-01-07T10:54:54+00:00|Point(-122.444586, 37.782745)|Point(5999750.8888492435, 2113319.6570987953)|
|00001 |2007-01-07T10:54:58+00:00|Point(-122.443688, 37.782842)|Point(6000011.08106823, 2113349.5785887106) |
|00001 |2007-01-07T10:55:02+00:00|Point(-122.442815, 37.782919)|Point(6000263.898268142, 2113372.3716762937) |
|00001 |2007-01-07T10:55:06+00:00|Point(-122.442112, 37.782992)|Point(6000467.566895697, 2113394.7303657546) |
+------+-------------------------+-----------------------------+---------------------------------------------+
only showing top 5 rows
val joined = neighborhoods .join(uberTransformed) .where($"nad83" within $"polygon") .select($"tripId", $"timestamp", explode($"metadata").as(Seq("k", "v"))) .withColumnRenamed("v", "neighborhood") .drop("k") .cache()
joined: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tripId: string, timestamp: string ... 1 more field]
val UberRecordsInNbhdsCount = joined.count() // about 131 seconds for first action (doing broadcast hash join)
UberRecordsInNbhdsCount: Long = 1085087
joined.show(5,false)
+------+-------------------------+-------------------------+
|tripId|timestamp |neighborhood |
+------+-------------------------+-------------------------+
|00001 |2007-01-07T10:54:50+00:00|Western Addition |
|00001 |2007-01-07T10:54:54+00:00|Western Addition |
|00001 |2007-01-07T10:54:58+00:00|Western Addition |
|00001 |2007-01-07T10:55:02+00:00|Western Addition |
|00001 |2007-01-07T10:55:06+00:00|Western Addition |
+------+-------------------------+-------------------------+
only showing top 5 rows
uberRecordCount - UberRecordsInNbhdsCount // records not in the neighbouthood shape files
res48: Long = 43576
joined .groupBy($"neighborhood") .agg(countDistinct("tripId") .as("trips")) .orderBy(col("trips").desc) .show(5,false)
+-------------------------+-----+
|neighborhood |trips|
+-------------------------+-----+
|South of Market |9891 |
|Western Addition |6794 |
|Downtown/Civic Center |6697 |
|Financial District |6038 |
|Mission |5620 |
+-------------------------+-----+
only showing top 5 rows
SDS-2.2-360-in-525-03: Geospatial Analytics and Big Data
Last refresh: Never