032b_UberMapMatchingAndVisualization(Scala)

SDS-2.x, Scalable Data Engineering Science

This is part of Project MEP: Meme Evolution Programme and supported by databricks academic partners program.

Map-matching Noisy Spatial Trajectories of Vehicles to Roadways in Open Street Map

Dillon George, Dan Lilja and Raazesh Sainudiin

Copyright 2016-2019 Dillon George, Dan Lilja and Raazesh Sainudiin

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

This is the precursor 2016 presentation by Dillon George as part of Scalable Data Science from Middle Earth student project.

sds/uji/studentProjects/01_DillonGeorge/038_UberMapMatchingAndVisualization

Here we are updating it to more recent versions of the needed libraries.

What is map-matching?

Map matching is the problem of how to match recorded geographic coordinates to a logical model of the real world, typically using some form of Geographic Information System. See https://en.wikipedia.org/wiki/Map_matching.

Show code

Why are we interested in map-matching?

Mainly because we can naturally deal with noise in raw GPS trajectories of entities moving along mapped ways, such as, vehicles, pedestrians or cyclists.

  • Trajectories from sources like Uber are typically noisy and we will map-match such trajectories in this worksheet.
  • Often, such trajectories lead to significant graph-dimensionality reduction as you will see below.
  • More importantly, map-matching is a natural first step towards learning distributions over historical trajectories of an entity.
  • Moreover, a set of map-matched trajectories (with additional work using kNN operations) can be turned into a graphX graph that can be vertex-programmed and joined with other graphX representations of the map itself.

How are we map-matching?

We are using graphHopper for this for now. See https://en.wikipedia.org/wiki/GraphHopper.

Show code

The following alternatives need exploration:

Steps in this worksheet

The basic steps are the following:

  1. Preliminaries: 0. Attach needed libraries, load osm data and initialize graphhopper
    • the two steps 0.1 and 0.2 need to be done only once per cluster
  2. Setting up leaflet for visualisation
  3. Load table of Uber Data from earlier analysis. Then convert to an RDD for mapmatching
  4. Start Map Matching
  5. Display Results of a map-matched trajectory

1. Preliminaries

Loading required libraries

  1. Launch a cluster using spark 2.4.3 (this is for compatibility with magellan built from the forked repos; see first notebook in this folder!).
  2. Attach following libraries if you have not already done so:
    • map_matching - com.graphhopper:map-matching:0.6.0 (more recent libraries may work but are note tested yet!)
    • magellan - import custom-built jar by downloading locally from https://github.com/lamastex/scalable-data-science/blob/master/custom-builds/jars/magellan/forks/ and then uploading to databricks
    • If needed only (this is already in databricks): spray-json io.spray:spray-json_2.11:1.3.4
import com.graphhopper.matching._
import com.graphhopper._
import com.graphhopper.routing.util.{EncodingManager, CarFlagEncoder}
import com.graphhopper.storage.index.LocationIndexTree
import com.graphhopper.util.GPXEntry

import magellan.Point

import scala.collection.JavaConverters._
import spray.json._
import DefaultJsonProtocol._

import scala.util.{Try, Success, Failure}

import org.apache.spark.sql.functions._
import com.graphhopper.matching._ import com.graphhopper._ import com.graphhopper.routing.util.{EncodingManager, CarFlagEncoder} import com.graphhopper.storage.index.LocationIndexTree import com.graphhopper.util.GPXEntry import magellan.Point import scala.collection.JavaConverters._ import spray.json._ import DefaultJsonProtocol._ import scala.util.{Try, Success, Failure} import org.apache.spark.sql.functions._

Do Step 0 at the bottom of the notebook

Only once in shard per OSM file (ignore this step the second time!):

  • follow section below on Step 0.1: Loading our OSM Data
  • follow section below on Step 0.2: Initialising GraphHopper

NOTE

If you loaded a smaller map so as to be able to analyze in the community edition, then you need the bounding box of this map to filter those trajectories that fall within this smaller map.

For example SanfranciscoSmall OSM map has the following bounding box:

  • -122.449,37.747 and -122.397,37.772

Let's put them in Scala vals as follows:

val minLatInOSMMap = -122.449
val minLonInOSMMap = 37.747
val maxLatInOSMMap = -122.397
val maxLonInOSMMap = 37.772
minLatInOSMMap: Double = -122.449 minLonInOSMMap: Double = 37.747 maxLatInOSMMap: Double = -122.397 maxLonInOSMMap: Double = 37.772

2. Setting up leaflet and visualisation

2.1 Setting up leaflet

You need to go to the following URL and set-up access-token in map-box to use leaflet independently:

2.2 Visualising with leaflet

Take an array of Strings in 'GeoJson' format, then insert this into a prebuild html string that contains all the code neccesary to display these features using Leaflet. The resulting html can be displayed in databricks using the displayHTML function.

See http://leafletjs.com/examples/geojson.html for a detailed example of using GeoJson with Leaflet.

def genLeafletHTML(features: Array[String]): String = {

  val featureArray = features.reduce(_ + "," +  _)
  // get your own access-token from https://leafletjs.com/examples/quick-start/
  // see request-access token link above at: https://account.mapbox.com/auth/signin/?route-to=%22/access-tokens/%22
  val accessToken = "pk.eyJ1IjoiZHRnIiwiYSI6ImNpaWF6MGdiNDAwanNtemx6MmIyNXoyOWIifQ.ndbNtExCMXZHKyfNtEN0Vg"

  val generatedHTML = f"""<!DOCTYPE html>
  <html>
  <head>
  <title>Maps</title>
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/leaflet/0.7.7/leaflet.css">
  <style>
  #map {width: 600px; height:400px;}
  </style>

  </head>
  <body>
  <div id="map" style="width: 1000px; height: 600px"></div>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/leaflet/0.7.7/leaflet.js"></script>
  <script type="text/javascript">
  var map = L.map('map').setView([37.77471008393265, -122.40422604391485], 14);

  L.tileLayer('https://api.tiles.mapbox.com/v4/{id}/{z}/{x}/{y}.png?access_token=$accessToken', {
  maxZoom: 18,
  attribution: 'Map data &copy; <a href="http://openstreetmap.org">OpenStreetMap</a> contributors, ' +
  '<a href="http://creativecommons.org/licenses/by-sa/2.0/">CC-BY-SA</a>, ' +
  'Imagery © <a href="http://mapbox.com">Mapbox</a>',
  id: 'mapbox.streets'
  }).addTo(map);

  var features = [$featureArray];

 colors = features.map(function (_) {return rainbow(100, Math.floor(Math.random() * 100)); });

  for (var i = 0; i < features.length; i++) {
      console.log(i);
      L.geoJson(features[i], {
          pointToLayer: function (feature, latlng) {
              return L.circleMarker(latlng, {
                  radius: 4,
                  fillColor: colors[i],
                  color: colors[i],
                  weight: 1,
                  opacity: 1,
                  fillOpacity: 0.8
              });
          }
      }).addTo(map);
  }


  function rainbow(numOfSteps, step) {
  // This function generates vibrant, "evenly spaced" colours (i.e. no clustering). This is ideal for creating easily distinguishable vibrant markers in Google Maps and other apps.
  // Adam Cole, 2011-Sept-14
  // HSV to RBG adapted from: http://mjijackson.com/2008/02/rgb-to-hsl-and-rgb-to-hsv-color-model-conversion-algorithms-in-javascript
  var r, g, b;
  var h = step / numOfSteps;
  var i = ~~(h * 6);
  var f = h * 6 - i;
  var q = 1 - f;
  switch(i %% 6){
  case 0: r = 1; g = f; b = 0; break;
  case 1: r = q; g = 1; b = 0; break;
  case 2: r = 0; g = 1; b = f; break;
  case 3: r = 0; g = q; b = 1; break;
  case 4: r = f; g = 0; b = 1; break;
  case 5: r = 1; g = 0; b = q; break;
  }
  var c = "#" + ("00" + (~ ~(r * 255)).toString(16)).slice(-2) + ("00" + (~ ~(g * 255)).toString(16)).slice(-2) + ("00" + (~ ~(b * 255)).toString(16)).slice(-2);
  return (c);
  }
  </script>


  </body>
  """
  generatedHTML
}
genLeafletHTML: (features: Array[String])String

3. Load Uber Data as in earlier analysis.

Then convert to an RDD for mapmatching

case class UberRecord(tripId: Int, time: String, latlon: Array[Double])

val uberData = sc.textFile("dbfs:/datasets/magellan/all.tsv").map { line =>
  val parts = line.split("\t" )
  val tripId = parts(0).toInt
  val time  = parts(1)
  val latlon = Array(parts(3).toDouble, parts(2).toDouble)
  UberRecord(tripId, time, latlon)
}.
repartition(100).
toDF().
select($"tripId", to_utc_timestamp($"time", "yyyy-MM-dd'T'HH:mm:ss").as("timeStamp"), $"latlon").
cache()
defined class UberRecord uberData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tripId: int, timeStamp: timestamp ... 1 more field]
uberData.count()
res1: Long = 1128663
uberData.show(5,false)
+------+-------------------+------------------------+ |tripId|timeStamp |latlon | +------+-------------------+------------------------+ |12481 |2007-01-01 16:10:56|[-122.4129, 37.762637] | |12483 |2007-01-07 07:42:32|[-122.428597, 37.743612]| |12486 |2007-01-07 20:36:33|[-122.41125, 37.771483] | |12488 |2007-01-06 20:43:51|[-122.407304, 37.763531]| |12490 |2007-01-06 07:45:29|[-122.424798, 37.802722]| +------+-------------------+------------------------+ only showing top 5 rows
val uberOSMMapBoundingBoxFiltered = uberData
                                      .filter($"latlon"(0) >= minLatInOSMMap &&
                                              $"latlon"(0) <= maxLatInOSMMap &&
                                              $"latlon"(1) >= minLonInOSMMap &&
                                              $"latlon"(1) <= maxLonInOSMMap)
                                      .cache()
uberOSMMapBoundingBoxFiltered.count()
uberOSMMapBoundingBoxFiltered: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tripId: int, timeStamp: timestamp ... 1 more field] res3: Long = 253696
uberOSMMapBoundingBoxFiltered.show(5,false)
+------+-------------------+------------------------+ |tripId|timeStamp |latlon | +------+-------------------+------------------------+ |12481 |2007-01-01 16:10:56|[-122.4129, 37.762637] | |12486 |2007-01-07 20:36:33|[-122.41125, 37.771483] | |12488 |2007-01-06 20:43:51|[-122.407304, 37.763531]| |12497 |2007-01-02 23:49:01|[-122.41817, 37.748174] | |12499 |2007-01-07 00:20:41|[-122.432002, 37.75287] | +------+-------------------+------------------------+ only showing top 5 rows

The number of trajectory points that are not within our bounding box of the OSM is:

uberData.count() - uberOSMMapBoundingBoxFiltered.count()
res6: Long = 874967

We will consider a trip to be invalid when it contains less that two data points, as this is required by Graph Hopper. First identify the all trips that are valid.

val uberCountsFiltered = uberOSMMapBoundingBoxFiltered
                          .groupBy($"tripId".alias("validTripId"))
                          .count.filter($"count" > 1)
                          .drop("count")
uberCountsFiltered: org.apache.spark.sql.DataFrame = [validTripId: int]
uberCountsFiltered.show(5, false)
+-----------+ |validTripId| +-----------+ |833 | |1342 | |1829 | |3175 | |5300 | +-----------+ only showing top 5 rows

Next is to join this list of valid Ids with the original data set, only the entries for those trips contained in uberCountsFiltered.

val uberValidData = uberOSMMapBoundingBoxFiltered
  .join(uberCountsFiltered, uberOSMMapBoundingBoxFiltered("tripId") === uberCountsFiltered("validTripId")) // Only want trips with more than 2 data points
  .drop("validTripId").cache 
uberValidData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tripId: int, timeStamp: timestamp ... 1 more field]

Now seeing how many data points were dropped:

uberOSMMapBoundingBoxFiltered.count - uberValidData.count
res10: Long = 221
uberValidData.show(5,false)
+------+-------------------+------------------------+ |tripId|timeStamp |latlon | +------+-------------------+------------------------+ |12481 |2007-01-01 16:10:56|[-122.4129, 37.762637] | |12486 |2007-01-07 20:36:33|[-122.41125, 37.771483] | |12488 |2007-01-06 20:43:51|[-122.407304, 37.763531]| |12497 |2007-01-02 23:49:01|[-122.41817, 37.748174] | |12499 |2007-01-07 00:20:41|[-122.432002, 37.75287] | +------+-------------------+------------------------+ only showing top 5 rows

Graphopper considers a trip to be a sequence of (latitude, longitude, time) tuples. First the relevant columns are selected from the DataFrame, and then the rows are mapped to key-value pairs with the tripId as the key. After this is done the reduceByKey step merges all the (lat, lon, time) arrays for each key (trip Id) so that there is one entry for each trip id containing all the relevant data points.

// To use sql api instead of rdd api
// val ubers = uberValidData.
//   select($"tripId", struct($"latlon"(0), $"latLon"(1), $"timeStamp").as("coord"))
//   .groupBy($"tripId")
//   .agg(collect_set("coord").as("coords"))
val ubers = uberValidData.select($"tripId", $"latlon", $"timeStamp")
  .map( row => {
        val id = row.get(0).asInstanceOf[Integer]
        val time = row.get(2).asInstanceOf[java.sql.Timestamp].getTime
        // Array(lat, lon)
        val latlon = row.get(1).asInstanceOf[scala.collection.mutable.WrappedArray[Double]] 
        val entry = Array((latlon(0), latlon(1), time))
        (id, entry)
        }
      )
.rdd.reduceByKey( (e1, e2) => e1 ++ e2) // Sequence of timespace tuples
.cache
ubers: org.apache.spark.rdd.RDD[(Integer, Array[(Double, Double, Long)])] = ShuffledRDD[94] at reduceByKey at command-1940103467373508:11
ubers.count
res13: Long = 8321
ubers.take(1) // first of 8,321 trip ids prepped and ready for map-matching
res14: Array[(Integer, Array[(Double, Double, Long)])] = Array((2100,Array((-122.430456,37.766368,1168142819000), (-122.430874,37.766065,1168142873000), (-122.43189,37.76524,1168142885000), (-122.432537,37.764759,1168142897000), (-122.432833,37.764534,1168142903000), (-122.433421,37.764042,1168142909000), (-122.434094,37.763526,1168142915000), (-122.406148,37.769156,1168142399000), (-122.407442,37.768924,1168142405000), (-122.409003,37.76907,1168142411000), (-122.410424,37.76931,1168142417000), (-122.419548,37.770068,1168142447000), (-122.420887,37.770144,1168142453000), (-122.422174,37.770579,1168142459000), (-122.422506,37.770788,1168142465000), (-122.423186,37.771882,1168142507000), (-122.423467,37.771914,1168142639000), (-122.42389,37.771557,1168142645000), (-122.424858,37.770832,1168142705000), (-122.425321,37.770462,1168142711000), (-122.426489,37.769485,1168142723000), (-122.42671,37.769349,1168142729000), (-122.42785,37.768438,1168142753000), (-122.427882,37.768396,1168142759000), (-122.428191,37.76816,1168142783000), (-122.428687,37.767765,1168142789000), (-122.430268,37.766517,1168142813000), (-122.430588,37.766267,1168142825000), (-122.431452,37.765596,1168142879000), (-122.432244,37.764965,1168142891000), (-122.406513,37.771497,1168142387000), (-122.40595,37.770276,1168142393000), (-122.412292,37.769523,1168142423000), (-122.414228,37.769585,1168142429000), (-122.416105,37.769652,1168142435000), (-122.4181,37.76988,1168142441000), (-122.422915,37.771149,1168142471000), (-122.422932,37.771242,1168142477000), (-122.423008,37.771513,1168142483000), (-122.423167,37.771772,1168142489000), (-122.424358,37.771214,1168142651000), (-122.42451,37.771112,1168142657000), (-122.424577,37.77107,1168142699000), (-122.425981,37.769912,1168142717000), (-122.426785,37.769304,1168142735000), (-122.427076,37.769072,1168142741000), (-122.427508,37.768713,1168142747000), (-122.427958,37.768333,1168142765000), (-122.429273,37.767284,1168142795000), (-122.429752,37.766923,1168142801000), (-122.430132,37.766626,1168142807000))))

4. Start Map Matching

Now stepping into GraphHopper land we first define some utility functions for interfacing with the GraphHopper map matching library. Attaching the following artefact:

  • com.graphhopper:map-matching:0.6.0

This function takes a MatchResult from graphhopper and converts it into an Array of LON,LAT points.

def extractLatLong(mr: MatchResult): Array[(Double, Double)] = {
  val pointsList = mr.getEdgeMatches.asScala.zipWithIndex
                    .map{ case  (e, i) =>
                              if (i == 0) e.getEdgeState.fetchWayGeometry(3) // FetchWayGeometry returns vertices on graph if 2,
                              else e.getEdgeState.fetchWayGeometry(2) }      // and edges if 3 
                    .map{case pointList => pointList.asScala.toArray}
                    .flatMap{ case e => e}
  val latLongs = pointsList.map(point => (point.lon, point.lat)).toArray

  latLongs   
}
extractLatLong: (mr: com.graphhopper.matching.MatchResult)Array[(Double, Double)]

The following creates returns a new GraphHopper object and encoder. It reads the pre generated graphhopper 'database' from the dbfs, this way multiple graphHopper objects can be created on the workers all reading from the same shared database.

Currently the documentation is scattered all over the place if it exists at all. The method to create the Graph as specified in the map-matching repository differs from the main GraphHopper repository. The API should hopefully converge as GraphHopper matures

See the main graphHopper documentation here, and the map-matching documentation here.