ScaDaMaLe Course site and book

Million Song Dataset - Kaggle Challenge

Predict which songs a user will listen to.

SOURCE: This is just a Scala-rification of the Python notebook published in databricks community edition in 2016.

CAUTION: This notebook is expected to have an error in command 28 (Cmd 28 in databricks notebook). You are meant to learn how to fix this error with simple exception-handling to become a better data scientist. So ignore this warning, if any.

Stage 1: Parsing songs data

This is the first notebook in this tutorial. In this notebook we will read data from DBFS (DataBricks FileSystem). We will parse data and load it as a table that can be readily used in following notebooks.

By going through this notebook you can expect to learn how to read distributed data as an RDD, how to transform RDDs, and how to construct a Spark DataFrame from an RDD and register it as a table.

We first explore different files in our distributed file system. We use a header file to construct a Spark Schema object. We write a function that takes the header and casts strings in each line of our data to corresponding types. Once we run this function on the data we find that it fails on some corner caes. We update our function and finally get a parsed RDD. We combine that RDD and the Schema to construct a DataFame and register it as a temporary table in SparkSQL.

Datasets

Text data files that are a bit larger than the ones in /datasets/sds/songs/data-001/ are stored in dbfs:/databricks-datasets/songs/data-001 in the databricks shard.

You can conveniently list files on distributed file system (DBFS, S3 or HDFS) using %fs commands in databricks.

NOTE: we will work first with the dataset in /datasets/sds/songs/data-001/.

ls /datasets/sds/songs/data-001/
path name size modificationTime
dbfs:/datasets/sds/songs/data-001/header.txt header.txt 377.0 1.664295987e12
dbfs:/datasets/sds/songs/data-001/part-00000 part-00000 52837.0 1.664295981e12
dbfs:/datasets/sds/songs/data-001/part-00001 part-00001 52469.0 1.664295985e12
dbfs:/datasets/sds/songs/data-001/part-00002 part-00002 51778.0 1.664295981e12
dbfs:/datasets/sds/songs/data-001/part-00003 part-00003 50551.0 1.664295993e12
dbfs:/datasets/sds/songs/data-001/part-00004 part-00004 53449.0 1.664295988e12
dbfs:/datasets/sds/songs/data-001/part-00005 part-00005 53301.0 1.664295983e12
dbfs:/datasets/sds/songs/data-001/part-00006 part-00006 54184.0 1.664295991e12
dbfs:/datasets/sds/songs/data-001/part-00007 part-00007 50924.0 1.664295989e12
dbfs:/datasets/sds/songs/data-001/part-00008 part-00008 52533.0 1.664295984e12
dbfs:/datasets/sds/songs/data-001/part-00009 part-00009 54570.0 1.664295981e12
dbfs:/datasets/sds/songs/data-001/part-00010 part-00010 54338.0 1.664295986e12
dbfs:/datasets/sds/songs/data-001/part-00011 part-00011 51836.0 1.664295993e12
dbfs:/datasets/sds/songs/data-001/part-00012 part-00012 52297.0 1.664295986e12
dbfs:/datasets/sds/songs/data-001/part-00013 part-00013 52044.0 1.664295993e12
dbfs:/datasets/sds/songs/data-001/part-00014 part-00014 50704.0 1.664295985e12
dbfs:/datasets/sds/songs/data-001/part-00015 part-00015 54158.0 1.664295995e12
dbfs:/datasets/sds/songs/data-001/part-00016 part-00016 50080.0 1.664295987e12
dbfs:/datasets/sds/songs/data-001/part-00017 part-00017 47708.0 1.66429598e12
dbfs:/datasets/sds/songs/data-001/part-00018 part-00018 8858.0 1.664295987e12
dbfs:/datasets/sds/songs/data-001/part-00019 part-00019 53323.0 1.66429599e12
dbfs:/datasets/sds/songs/data-001/part-00020 part-00020 57877.0 1.664295994e12
dbfs:/datasets/sds/songs/data-001/part-00021 part-00021 52491.0 1.664295988e12
dbfs:/datasets/sds/songs/data-001/part-00022 part-00022 54791.0 1.66429598e12
dbfs:/datasets/sds/songs/data-001/part-00023 part-00023 50682.0 1.664295982e12
dbfs:/datasets/sds/songs/data-001/part-00024 part-00024 52863.0 1.664295988e12
dbfs:/datasets/sds/songs/data-001/part-00025 part-00025 47416.0 1.664295987e12
dbfs:/datasets/sds/songs/data-001/part-00026 part-00026 50130.0 1.664295991e12
dbfs:/datasets/sds/songs/data-001/part-00027 part-00027 53462.0 1.664295989e12
dbfs:/datasets/sds/songs/data-001/part-00028 part-00028 54179.0 1.664295984e12
dbfs:/datasets/sds/songs/data-001/part-00029 part-00029 52738.0 1.664295991e12
dbfs:/datasets/sds/songs/data-001/part-00030 part-00030 54159.0 1.664295993e12
dbfs:/datasets/sds/songs/data-001/part-00031 part-00031 51247.0 1.664295986e12
dbfs:/datasets/sds/songs/data-001/part-00032 part-00032 51610.0 1.664295983e12
dbfs:/datasets/sds/songs/data-001/part-00033 part-00033 53895.0 1.664295981e12
dbfs:/datasets/sds/songs/data-001/part-00034 part-00034 53125.0 1.664295991e12
dbfs:/datasets/sds/songs/data-001/part-00035 part-00035 54066.0 1.664295991e12
dbfs:/datasets/sds/songs/data-001/part-00036 part-00036 54265.0 1.664295993e12
dbfs:/datasets/sds/songs/data-001/part-00037 part-00037 54264.0 1.664295992e12
dbfs:/datasets/sds/songs/data-001/part-00038 part-00038 50540.0 1.664295983e12
dbfs:/datasets/sds/songs/data-001/part-00039 part-00039 55193.0 1.664295994e12
dbfs:/datasets/sds/songs/data-001/part-00040 part-00040 54537.0 1.664295983e12
dbfs:/datasets/sds/songs/data-001/part-00041 part-00041 52402.0 1.664295987e12
dbfs:/datasets/sds/songs/data-001/part-00042 part-00042 54673.0 1.664295988e12
dbfs:/datasets/sds/songs/data-001/part-00043 part-00043 53009.0 1.66429598e12
dbfs:/datasets/sds/songs/data-001/part-00044 part-00044 51789.0 1.664295982e12
dbfs:/datasets/sds/songs/data-001/part-00045 part-00045 52986.0 1.664295988e12
dbfs:/datasets/sds/songs/data-001/part-00046 part-00046 54442.0 1.664295993e12
dbfs:/datasets/sds/songs/data-001/part-00047 part-00047 52971.0 1.664295991e12
dbfs:/datasets/sds/songs/data-001/part-00048 part-00048 53331.0 1.664295985e12
dbfs:/datasets/sds/songs/data-001/part-00049 part-00049 44263.0 1.664295994e12
dbfs:/datasets/sds/songs/data-001/part-00050 part-00050 54841.0 1.664295993e12
dbfs:/datasets/sds/songs/data-001/part-00051 part-00051 54306.0 1.664295982e12
dbfs:/datasets/sds/songs/data-001/part-00052 part-00052 53610.0 1.664295986e12
dbfs:/datasets/sds/songs/data-001/part-00053 part-00053 53573.0 1.66429598e12
dbfs:/datasets/sds/songs/data-001/part-00054 part-00054 53854.0 1.664295994e12
dbfs:/datasets/sds/songs/data-001/part-00055 part-00055 54236.0 1.664295989e12
dbfs:/datasets/sds/songs/data-001/part-00056 part-00056 54455.0 1.664295989e12
dbfs:/datasets/sds/songs/data-001/part-00057 part-00057 52307.0 1.664295983e12
dbfs:/datasets/sds/songs/data-001/part-00058 part-00058 52313.0 1.664295992e12
dbfs:/datasets/sds/songs/data-001/part-00059 part-00059 52446.0 1.664295992e12
dbfs:/datasets/sds/songs/data-001/part-00060 part-00060 51958.0 1.664295986e12
dbfs:/datasets/sds/songs/data-001/part-00061 part-00061 53859.0 1.664295987e12
dbfs:/datasets/sds/songs/data-001/part-00062 part-00062 53698.0 1.664295981e12
dbfs:/datasets/sds/songs/data-001/part-00063 part-00063 54482.0 1.66429599e12
dbfs:/datasets/sds/songs/data-001/part-00064 part-00064 40182.0 1.664295979e12
dbfs:/datasets/sds/songs/data-001/part-00065 part-00065 54410.0 1.66429598e12
dbfs:/datasets/sds/songs/data-001/part-00066 part-00066 49123.0 1.66429598e12
dbfs:/datasets/sds/songs/data-001/part-00067 part-00067 50796.0 1.664295992e12
dbfs:/datasets/sds/songs/data-001/part-00068 part-00068 49561.0 1.664295986e12
dbfs:/datasets/sds/songs/data-001/part-00069 part-00069 52294.0 1.664295994e12
dbfs:/datasets/sds/songs/data-001/part-00070 part-00070 51250.0 1.66429598e12
dbfs:/datasets/sds/songs/data-001/part-00071 part-00071 58942.0 1.664295986e12
dbfs:/datasets/sds/songs/data-001/part-00072 part-00072 54589.0 1.664295986e12
dbfs:/datasets/sds/songs/data-001/part-00073 part-00073 54233.0 1.66429599e12
dbfs:/datasets/sds/songs/data-001/part-00074 part-00074 54725.0 1.664295982e12
dbfs:/datasets/sds/songs/data-001/part-00075 part-00075 54877.0 1.66429599e12
dbfs:/datasets/sds/songs/data-001/part-00076 part-00076 54333.0 1.664295984e12
dbfs:/datasets/sds/songs/data-001/part-00077 part-00077 51927.0 1.664295984e12
dbfs:/datasets/sds/songs/data-001/part-00078 part-00078 51744.0 1.664295994e12
dbfs:/datasets/sds/songs/data-001/part-00079 part-00079 53187.0 1.664295986e12
dbfs:/datasets/sds/songs/data-001/part-00080 part-00080 43246.0 1.664295985e12
dbfs:/datasets/sds/songs/data-001/part-00081 part-00081 54269.0 1.664295993e12
dbfs:/datasets/sds/songs/data-001/part-00082 part-00082 48464.0 1.66429599e12
dbfs:/datasets/sds/songs/data-001/part-00083 part-00083 52144.0 1.664295982e12
dbfs:/datasets/sds/songs/data-001/part-00084 part-00084 53375.0 1.664295981e12
dbfs:/datasets/sds/songs/data-001/part-00085 part-00085 55139.0 1.664295983e12
dbfs:/datasets/sds/songs/data-001/part-00086 part-00086 50924.0 1.664295985e12
dbfs:/datasets/sds/songs/data-001/part-00087 part-00087 52013.0 1.664295982e12
dbfs:/datasets/sds/songs/data-001/part-00088 part-00088 54262.0 1.664295992e12
dbfs:/datasets/sds/songs/data-001/part-00089 part-00089 53007.0 1.664295989e12
dbfs:/datasets/sds/songs/data-001/part-00090 part-00090 55142.0 1.664295979e12
dbfs:/datasets/sds/songs/data-001/part-00091 part-00091 52049.0 1.664295991e12
dbfs:/datasets/sds/songs/data-001/part-00092 part-00092 54714.0 1.664295987e12
dbfs:/datasets/sds/songs/data-001/part-00093 part-00093 52906.0 1.664295991e12
dbfs:/datasets/sds/songs/data-001/part-00094 part-00094 52188.0 1.664295985e12
dbfs:/datasets/sds/songs/data-001/part-00095 part-00095 50768.0 1.664295988e12
dbfs:/datasets/sds/songs/data-001/part-00096 part-00096 55242.0 1.66429598e12
dbfs:/datasets/sds/songs/data-001/part-00097 part-00097 52059.0 1.664295988e12
dbfs:/datasets/sds/songs/data-001/part-00098 part-00098 52982.0 1.664295989e12
dbfs:/datasets/sds/songs/data-001/part-00099 part-00099 52015.0 1.664295982e12
dbfs:/datasets/sds/songs/data-001/part-00100 part-00100 51467.0 1.664295983e12
dbfs:/datasets/sds/songs/data-001/part-00101 part-00101 50926.0 1.664295984e12
dbfs:/datasets/sds/songs/data-001/part-00102 part-00102 55018.0 1.664295994e12
dbfs:/datasets/sds/songs/data-001/part-00103 part-00103 50043.0 1.664295981e12
dbfs:/datasets/sds/songs/data-001/part-00104 part-00104 51936.0 1.664295992e12
dbfs:/datasets/sds/songs/data-001/part-00105 part-00105 57311.0 1.664295984e12
dbfs:/datasets/sds/songs/data-001/part-00106 part-00106 55090.0 1.66429599e12
dbfs:/datasets/sds/songs/data-001/part-00107 part-00107 54396.0 1.664295984e12
dbfs:/datasets/sds/songs/data-001/part-00108 part-00108 56594.0 1.664295987e12
dbfs:/datasets/sds/songs/data-001/part-00109 part-00109 53260.0 1.664295992e12
dbfs:/datasets/sds/songs/data-001/part-00110 part-00110 42007.0 1.664295984e12
dbfs:/datasets/sds/songs/data-001/part-00119 part-00119 0.0 1.664295989e12

As you can see in the listing we have data files and a single header file. The header file seems interesting and worth a first inspection at first. The file is 377 bytes, therefore it is safe to collect the entire content of the file in the notebook.

sc.textFile("/datasets/sds/songs/data-001/header.txt").collect() 
res1: Array[String] = Array(artist_id:string, artist_latitude:double, artist_longitude:double, artist_location:string, artist_name:string, duration:double, end_of_fade_in:double, key:int, key_confidence:double, loudness:double, release:string, song_hotnes:double, song_id:string, start_of_fade_out:double, tempo:double, time_signature:double, time_signature_confidence:double, title:string, year:double, partial_sequence:int)

Remember you can collect() a huge RDD and crash the driver program - so it is a good practise to take a couple lines and count the number of lines, especially if you have no idea what file you are trying to read.

sc.textFile("/datasets/sds/songs/data-001/header.txt").take(2)
res2: Array[String] = Array(artist_id:string, artist_latitude:double)
sc.textFile("/datasets/sds/songs/data-001/header.txt").count()
res3: Long = 20
sc.textFile("/datasets/sds/songs/data-001/header.txt").collect.map(println) // uncomment to see line-by-line
artist_id:string
artist_latitude:double
artist_longitude:double
artist_location:string
artist_name:string
duration:double
end_of_fade_in:double
key:int
key_confidence:double
loudness:double
release:string
song_hotnes:double
song_id:string
start_of_fade_out:double
tempo:double
time_signature:double
time_signature_confidence:double
title:string
year:double
partial_sequence:int
res4: Array[Unit] = Array((), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), ())

As seen above each line in the header consists of a name and a type separated by colon. We will need to parse the header file as follows:

val header = sc.textFile("/datasets/sds/songs/data-001/header.txt").map(
      line => {
                val headerElement = line.split(":")
                (headerElement(0), headerElement(1))
            }
           ).collect()
header: Array[(String, String)] = Array((artist_id,string), (artist_latitude,double), (artist_longitude,double), (artist_location,string), (artist_name,string), (duration,double), (end_of_fade_in,double), (key,int), (key_confidence,double), (loudness,double), (release,string), (song_hotnes,double), (song_id,string), (start_of_fade_out,double), (tempo,double), (time_signature,double), (time_signature_confidence,double), (title,string), (year,double), (partial_sequence,int))

Let's define a case class called Song that will be used to represent each row of data in the files:

  • /databricks-datasets/songs/data-001/part-00000 through /databricks-datasets/songs/data-001/part-00119 or the last .../part-***** file.
case class Song(artist_id: String, artist_latitude: Double, artist_longitude: Double, artist_location: String, artist_name: String, duration: Double, end_of_fade_in: Double, key: Int, key_confidence: Double, loudness: Double, release: String, song_hotness: Double, song_id: String, start_of_fade_out: Double, tempo: Double, time_signature: Double, time_signature_confidence: Double, title: String, year: Double, partial_sequence: Int)
defined class Song

Now we turn to data files. First, step is inspecting the first line of data to inspect its format.

// this is loads all the data - a subset of the 1M songs dataset
val dataRDD = sc.textFile("/datasets/sds/songs/data-001/part-*") 
dataRDD: org.apache.spark.rdd.RDD[String] = /datasets/sds/songs/data-001/part-* MapPartitionsRDD[209] at textFile at command-2971213210276292:2
dataRDD.count // number of songs
res5: Long = 31369
dataRDD.take(3)
res6: Array[String] = Array(AR81V6H1187FB48872	nan	nan		Earl Sixteen	213.7073	0.0	11	0.419	-12.106	Soldier of Jah Army	nan	SOVNZSZ12AB018A9B8	208.289	125.882	1	0.0	Rastaman	2003	--, ARVVZQP11E2835DBCB	nan	nan		Wavves	133.25016	0.0	0	0.282	0.596	Wavvves	0.471578247701	SOJTQHQ12A8C143C5F	128.116	89.519	1	0.0	I Want To See You (And Go To The Movies)	2009	--, ARFG9M11187FB3BBCB	nan	nan	Nashua USA	C-Side	247.32689	0.0	9	0.612	-4.896	Santa Festival Compilation 2008 vol.1	nan	SOAJSQL12AB0180501	242.196	171.278	5	1.0	Loose on the Dancefloor	0	225261)

Each line of data consists of multiple fields separated by \t. With that information and what we learned from the header file, we set out to parse our data.

  • We have already created a case class based on the header (which seems to agree with the 3 lines above).
  • Next, we will create a function that takes each line as input and returns the case class as output.
// let's do this 'by hand' to re-flex our RDD-muscles :)
// although this is not a robust way to read from a data engineering perspective (without fielding exceptions)
def parseLine(line: String): Song = {
  
  val tokens = line.split("\t")
  Song(tokens(0), tokens(1).toDouble, tokens(2).toDouble, tokens(3), tokens(4), tokens(5).toDouble, tokens(6).toDouble, tokens(7).toInt, tokens(8).toDouble, tokens(9).toDouble, tokens(10), tokens(11).toDouble, tokens(12), tokens(13).toDouble, tokens(14).toDouble, tokens(15).toDouble, tokens(16).toDouble, tokens(17), tokens(18).toDouble, tokens(19).toInt)
}
parseLine: (line: String)Song

With this function we can transform the dataRDD to another RDD that consists of Song case classes

val parsedRDD = dataRDD.map(parseLine)
parsedRDD: org.apache.spark.rdd.RDD[Song] = MapPartitionsRDD[210] at map at command-2971213210276298:1

To convert an RDD of case classes to a DataFrame, we just need to call the toDF method

val df = parsedRDD.toDF
df: org.apache.spark.sql.DataFrame = [artist_id: string, artist_latitude: double ... 18 more fields]

Once we get a DataFrame we can register it as a temporary table. That will allow us to use its name in SQL queries.

df.createOrReplaceTempView("songsTable")

We can now cache our table. So far all operations have been lazy. This is the first time Spark will attempt to actually read all our data and apply the transformations.

If you are running Spark 1.6+ the next command will throw a parsing error.

cache table songsTable

The error means that we are trying to convert a missing value to a Double. Here is an updated version of the parseLine function to deal with missing values.

// good data engineering science practise
def parseLine(line: String): Song = {
  
  
  def toDouble(value: String, defaultVal: Double): Double = {
    try {
       value.toDouble
    } catch {
      case e: Exception => defaultVal
    }
  }

  def toInt(value: String, defaultVal: Int): Int = {
    try {
       value.toInt
      } catch {
      case e: Exception => defaultVal
    }
  }
  
  val tokens = line.split("\t")
  Song(tokens(0), toDouble(tokens(1), 0.0), toDouble(tokens(2), 0.0), tokens(3), tokens(4), toDouble(tokens(5), 0.0), toDouble(tokens(6), 0.0), toInt(tokens(7), -1), toDouble(tokens(8), 0.0), toDouble(tokens(9), 0.0), tokens(10), toDouble(tokens(11), 0.0), tokens(12), toDouble(tokens(13), 0.0), toDouble(tokens(14), 0.0), toDouble(tokens(15), 0.0), toDouble(tokens(16), 0.0), tokens(17), toDouble(tokens(18), 0.0), toInt(tokens(19), -1))
}
parseLine: (line: String)Song
val df = dataRDD.map(parseLine).toDF
df.createOrReplaceTempView("songsTable")
df: org.apache.spark.sql.DataFrame = [artist_id: string, artist_latitude: double ... 18 more fields]

And let's try caching the table. We are going to access this data multiple times in following notebooks, therefore it is a good idea to cache it in memory for faster subsequent access.

cache table songsTable

From now on we can easily query our data using the temporary table we just created and cached in memory. Since it is registered as a table we can conveniently use SQL as well as Spark API to access it.

select * from songsTable limit 10
artist_id artist_latitude artist_longitude artist_location artist_name duration end_of_fade_in key key_confidence loudness release song_hotness song_id start_of_fade_out tempo time_signature time_signature_confidence title year partial_sequence
AR81V6H1187FB48872 0.0 0.0 Earl Sixteen 213.7073 0.0 11.0 0.419 -12.106 Soldier of Jah Army 0.0 SOVNZSZ12AB018A9B8 208.289 125.882 1.0 0.0 Rastaman 2003.0 -1.0
ARVVZQP11E2835DBCB 0.0 0.0 Wavves 133.25016 0.0 0.0 0.282 0.596 Wavvves 0.471578247701 SOJTQHQ12A8C143C5F 128.116 89.519 1.0 0.0 I Want To See You (And Go To The Movies) 2009.0 -1.0
ARFG9M11187FB3BBCB 0.0 0.0 Nashua USA C-Side 247.32689 0.0 9.0 0.612 -4.896 Santa Festival Compilation 2008 vol.1 0.0 SOAJSQL12AB0180501 242.196 171.278 5.0 1.0 Loose on the Dancefloor 0.0 225261.0
ARK4Z2O1187FB45FF0 0.0 0.0 Harvest 337.05751 0.247 4.0 0.46 -9.092 Underground Community 0.0 SOTDRVW12AB018BEB9 327.436 84.986 4.0 0.673 No Return 0.0 101619.0
AR4VQSG1187FB57E18 35.25082 -91.74015 Searcy, AR Gossip 430.23628 0.0 2.0 3.4e-2 -6.846 Yr Mangled Heart 0.0 SOTVOCL12A8AE478DD 424.06 121.998 4.0 0.847 Yr Mangled Heart 2006.0 740623.0
ARNBV1X1187B996249 0.0 0.0 Alex 186.80118 0.0 4.0 0.641 -16.108 Jolgaledin 0.0 SODTGRY12AB0182438 166.156 140.735 4.0 5.5e-2 Mariu Sonur Jesus 0.0 673970.0
ARXOEZX1187B9B82A1 0.0 0.0 Elie Attieh 361.89995 0.0 7.0 0.863 -4.919 ELITE 0.0 SOIINTJ12AB0180BA6 354.476 128.024 4.0 0.399 Fe Yom We Leila 0.0 280304.0
ARXPUIA1187B9A32F1 0.0 0.0 Rome, Italy Simone Cristicchi 220.00281 2.119 4.0 0.486 -6.52 Dall'Altra Parte Del Cancello 0.484225272411 SONHXJK12AAF3B5290 214.761 99.954 1.0 0.928 L'Italiano 2007.0 745962.0
ARNPPTH1187B9AD429 51.4855 -0.37196 Heston, Middlesex, England Jimmy Page 156.86485 0.334 7.0 0.493 -9.962 No Introduction Necessary [Deluxe Edition] 0.0 SOGUHGW12A58A80E06 149.269 162.48 4.0 0.534 Wailing Sounds 2004.0 599250.0
AROGWRA122988FEE45 0.0 0.0 Christos Dantis 256.67873 2.537 9.0 0.742 -13.404 Daktilika Apotipomata 0.0 SOJJOYI12A8C13399D 248.912 134.944 4.0 0.162 Stin Proigoumeni Zoi 0.0 611396.0

Next up is exploring this data. Click on the Exploration notebook to continue the tutorial.