import com.graphhopper.matching._ import com.graphhopper._ import com.graphhopper.routing.util.{EncodingManager, CarFlagEncoder} import com.graphhopper.storage.index.LocationIndexTree import com.graphhopper.util.GPXEntry import magellan.Point import scala.collection.JavaConverters._ import spray.json._ import DefaultJsonProtocol._ import scala.util.{Try, Success, Failure} import org.apache.spark.sql.functions._
import com.graphhopper.matching._
import com.graphhopper._
import com.graphhopper.routing.util.{EncodingManager, CarFlagEncoder}
import com.graphhopper.storage.index.LocationIndexTree
import com.graphhopper.util.GPXEntry
import magellan.Point
import scala.collection.JavaConverters._
import spray.json._
import DefaultJsonProtocol._
import scala.util.{Try, Success, Failure}
import org.apache.spark.sql.functions._
def genLeafletHTML(features: Array[String]): String = { val featureArray = features.reduce(_ + "," + _) // get your own access-token from https://leafletjs.com/examples/quick-start/ // see request-access token link above at: https://account.mapbox.com/auth/signin/?route-to=%22/access-tokens/%22 val accessToken = "pk.eyJ1IjoiZHRnIiwiYSI6ImNpaWF6MGdiNDAwanNtemx6MmIyNXoyOWIifQ.ndbNtExCMXZHKyfNtEN0Vg" val generatedHTML = f"""<!DOCTYPE html> <html> <head> <title>Maps</title> <meta charset="utf-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/leaflet/0.7.7/leaflet.css"> <style> #map {width: 600px; height:400px;} </style> </head> <body> <div id="map" style="width: 1000px; height: 600px"></div> <script src="https://cdnjs.cloudflare.com/ajax/libs/leaflet/0.7.7/leaflet.js"></script> <script type="text/javascript"> var map = L.map('map').setView([37.77471008393265, -122.40422604391485], 14); L.tileLayer('https://api.tiles.mapbox.com/v4/{id}/{z}/{x}/{y}.png?access_token=$accessToken', { maxZoom: 18, attribution: 'Map data © <a href="http://openstreetmap.org">OpenStreetMap</a> contributors, ' + '<a href="http://creativecommons.org/licenses/by-sa/2.0/">CC-BY-SA</a>, ' + 'Imagery © <a href="http://mapbox.com">Mapbox</a>', id: 'mapbox.streets' }).addTo(map); var features = [$featureArray]; colors = features.map(function (_) {return rainbow(100, Math.floor(Math.random() * 100)); }); for (var i = 0; i < features.length; i++) { console.log(i); L.geoJson(features[i], { pointToLayer: function (feature, latlng) { return L.circleMarker(latlng, { radius: 4, fillColor: colors[i], color: colors[i], weight: 1, opacity: 1, fillOpacity: 0.8 }); } }).addTo(map); } function rainbow(numOfSteps, step) { // This function generates vibrant, "evenly spaced" colours (i.e. no clustering). This is ideal for creating easily distinguishable vibrant markers in Google Maps and other apps. // Adam Cole, 2011-Sept-14 // HSV to RBG adapted from: http://mjijackson.com/2008/02/rgb-to-hsl-and-rgb-to-hsv-color-model-conversion-algorithms-in-javascript var r, g, b; var h = step / numOfSteps; var i = ~~(h * 6); var f = h * 6 - i; var q = 1 - f; switch(i %% 6){ case 0: r = 1; g = f; b = 0; break; case 1: r = q; g = 1; b = 0; break; case 2: r = 0; g = 1; b = f; break; case 3: r = 0; g = q; b = 1; break; case 4: r = f; g = 0; b = 1; break; case 5: r = 1; g = 0; b = q; break; } var c = "#" + ("00" + (~ ~(r * 255)).toString(16)).slice(-2) + ("00" + (~ ~(g * 255)).toString(16)).slice(-2) + ("00" + (~ ~(b * 255)).toString(16)).slice(-2); return (c); } </script> </body> """ generatedHTML }
genLeafletHTML: (features: Array[String])String
case class UberRecord(tripId: Int, time: String, latlon: Array[Double]) val uberData = sc.textFile("dbfs:/datasets/magellan/all.tsv").map { line => val parts = line.split("\t" ) val tripId = parts(0).toInt val time = parts(1) val latlon = Array(parts(3).toDouble, parts(2).toDouble) UberRecord(tripId, time, latlon) }. repartition(100). toDF(). select($"tripId", to_utc_timestamp($"time", "yyyy-MM-dd'T'HH:mm:ss").as("timeStamp"), $"latlon"). cache()
defined class UberRecord
uberData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tripId: int, timeStamp: timestamp ... 1 more field]
uberData.show(5,false)
+------+-------------------+------------------------+
|tripId|timeStamp |latlon |
+------+-------------------+------------------------+
|12481 |2007-01-01 16:10:56|[-122.4129, 37.762637] |
|12483 |2007-01-07 07:42:32|[-122.428597, 37.743612]|
|12486 |2007-01-07 20:36:33|[-122.41125, 37.771483] |
|12488 |2007-01-06 20:43:51|[-122.407304, 37.763531]|
|12490 |2007-01-06 07:45:29|[-122.424798, 37.802722]|
+------+-------------------+------------------------+
only showing top 5 rows
val uberOSMMapBoundingBoxFiltered = uberData .filter($"latlon"(0) >= minLatInOSMMap && $"latlon"(0) <= maxLatInOSMMap && $"latlon"(1) >= minLonInOSMMap && $"latlon"(1) <= maxLonInOSMMap) .cache() uberOSMMapBoundingBoxFiltered.count()
uberOSMMapBoundingBoxFiltered: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tripId: int, timeStamp: timestamp ... 1 more field]
res3: Long = 253696
uberOSMMapBoundingBoxFiltered.show(5,false)
+------+-------------------+------------------------+
|tripId|timeStamp |latlon |
+------+-------------------+------------------------+
|12481 |2007-01-01 16:10:56|[-122.4129, 37.762637] |
|12486 |2007-01-07 20:36:33|[-122.41125, 37.771483] |
|12488 |2007-01-06 20:43:51|[-122.407304, 37.763531]|
|12497 |2007-01-02 23:49:01|[-122.41817, 37.748174] |
|12499 |2007-01-07 00:20:41|[-122.432002, 37.75287] |
+------+-------------------+------------------------+
only showing top 5 rows
val uberValidData = uberOSMMapBoundingBoxFiltered .join(uberCountsFiltered, uberOSMMapBoundingBoxFiltered("tripId") === uberCountsFiltered("validTripId")) // Only want trips with more than 2 data points .drop("validTripId").cache
uberValidData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tripId: int, timeStamp: timestamp ... 1 more field]
uberValidData.show(5,false)
+------+-------------------+------------------------+
|tripId|timeStamp |latlon |
+------+-------------------+------------------------+
|12481 |2007-01-01 16:10:56|[-122.4129, 37.762637] |
|12486 |2007-01-07 20:36:33|[-122.41125, 37.771483] |
|12488 |2007-01-06 20:43:51|[-122.407304, 37.763531]|
|12497 |2007-01-02 23:49:01|[-122.41817, 37.748174] |
|12499 |2007-01-07 00:20:41|[-122.432002, 37.75287] |
+------+-------------------+------------------------+
only showing top 5 rows
val ubers = uberValidData.select($"tripId", $"latlon", $"timeStamp") .map( row => { val id = row.get(0).asInstanceOf[Integer] val time = row.get(2).asInstanceOf[java.sql.Timestamp].getTime // Array(lat, lon) val latlon = row.get(1).asInstanceOf[scala.collection.mutable.WrappedArray[Double]] val entry = Array((latlon(0), latlon(1), time)) (id, entry) } ) .rdd.reduceByKey( (e1, e2) => e1 ++ e2) // Sequence of timespace tuples .cache
ubers: org.apache.spark.rdd.RDD[(Integer, Array[(Double, Double, Long)])] = ShuffledRDD[94] at reduceByKey at command-1940103467373508:11
ubers.take(1) // first of 8,321 trip ids prepped and ready for map-matching
res14: Array[(Integer, Array[(Double, Double, Long)])] = Array((2100,Array((-122.430456,37.766368,1168142819000), (-122.430874,37.766065,1168142873000), (-122.43189,37.76524,1168142885000), (-122.432537,37.764759,1168142897000), (-122.432833,37.764534,1168142903000), (-122.433421,37.764042,1168142909000), (-122.434094,37.763526,1168142915000), (-122.406148,37.769156,1168142399000), (-122.407442,37.768924,1168142405000), (-122.409003,37.76907,1168142411000), (-122.410424,37.76931,1168142417000), (-122.419548,37.770068,1168142447000), (-122.420887,37.770144,1168142453000), (-122.422174,37.770579,1168142459000), (-122.422506,37.770788,1168142465000), (-122.423186,37.771882,1168142507000), (-122.423467,37.771914,1168142639000), (-122.42389,37.771557,1168142645000), (-122.424858,37.770832,1168142705000), (-122.425321,37.770462,1168142711000), (-122.426489,37.769485,1168142723000), (-122.42671,37.769349,1168142729000), (-122.42785,37.768438,1168142753000), (-122.427882,37.768396,1168142759000), (-122.428191,37.76816,1168142783000), (-122.428687,37.767765,1168142789000), (-122.430268,37.766517,1168142813000), (-122.430588,37.766267,1168142825000), (-122.431452,37.765596,1168142879000), (-122.432244,37.764965,1168142891000), (-122.406513,37.771497,1168142387000), (-122.40595,37.770276,1168142393000), (-122.412292,37.769523,1168142423000), (-122.414228,37.769585,1168142429000), (-122.416105,37.769652,1168142435000), (-122.4181,37.76988,1168142441000), (-122.422915,37.771149,1168142471000), (-122.422932,37.771242,1168142477000), (-122.423008,37.771513,1168142483000), (-122.423167,37.771772,1168142489000), (-122.424358,37.771214,1168142651000), (-122.42451,37.771112,1168142657000), (-122.424577,37.77107,1168142699000), (-122.425981,37.769912,1168142717000), (-122.426785,37.769304,1168142735000), (-122.427076,37.769072,1168142741000), (-122.427508,37.768713,1168142747000), (-122.427958,37.768333,1168142765000), (-122.429273,37.767284,1168142795000), (-122.429752,37.766923,1168142801000), (-122.430132,37.766626,1168142807000))))
def extractLatLong(mr: MatchResult): Array[(Double, Double)] = { val pointsList = mr.getEdgeMatches.asScala.zipWithIndex .map{ case (e, i) => if (i == 0) e.getEdgeState.fetchWayGeometry(3) // FetchWayGeometry returns vertices on graph if 2, else e.getEdgeState.fetchWayGeometry(2) } // and edges if 3 .map{case pointList => pointList.asScala.toArray} .flatMap{ case e => e} val latLongs = pointsList.map(point => (point.lon, point.lat)).toArray latLongs }
extractLatLong: (mr: com.graphhopper.matching.MatchResult)Array[(Double, Double)]
SDS-2.x, Scalable Data Engineering Science
This is part of Project MEP: Meme Evolution Programme and supported by databricks academic partners program.
Map-matching Noisy Spatial Trajectories of Vehicles to Roadways in Open Street Map
Dillon George, Dan Lilja and Raazesh Sainudiin
Last refresh: Never