008_DiamondsPipeline_01ETLEDA(Scala)

Archived YouTube video of this live unedited lab-lecture:

Archived YouTube video of this live unedited lab-lecture

Diamonds ML Pipeline Workflow - DataFrame ETL and EDA Part

This is the Spark SQL parts that are focussed on extract-transform-Load (ETL) and exploratory-data-analysis (EDA) parts of an end-to-end example of a Machine Learning (ML) workflow.

Why are we using DataFrames? This is because of the Announcement in the Spark MLlib Main Guide for Spark 2.2 https://spark.apache.org/docs/latest/ml-guide.html that "DataFrame-based API is primary API".

This notebook is a scalarific break-down of the pythonic 'Diamonds ML Pipeline Workflow' from the Databricks Guide.

We will see this example again in the sequel

For this example, we analyze the Diamonds dataset from the R Datasets hosted on DBC.

Later on, we will use the DecisionTree algorithm to predict the price of a diamond from its characteristics.

Here is an outline of our pipeline:

  • Step 1. Load data: Load data as DataFrame
  • Step 2. Understand the data: Compute statistics and create visualizations to get a better understanding of the data.
  • Step 3. Hold out data: Split the data randomly into training and test sets. We will not look at the test data until after learning.
  • Step 4. On the training dataset:
    • Extract features: We will index categorical (String-valued) features so that DecisionTree can handle them.
    • Learn a model: Run DecisionTree to learn how to predict a diamond's price from a description of the diamond.
    • Tune the model: Tune the tree depth (complexity) using the training data. (This process is also called model selection.)
  • Step 5. Evaluate the model: Now look at the test dataset. Compare the initial model with the tuned model to see the benefit of tuning parameters.
  • Step 6. Understand the model: We will examine the learned model and results to gain further insight.

In this notebook, we will only cover Step 1 and Step 2. above. The other Steps will be revisited in the sequel.

Step 1. Load data as DataFrame

This section loads a dataset as a DataFrame and examines a few rows of it to understand the schema.

For more info, see the DB guide on importing data.

// We'll use the Diamonds dataset from the R datasets hosted on DBC.
val diamondsFilePath = "dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv"
diamondsFilePath: String = dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv
sc.textFile(diamondsFilePath).take(2) // looks like a csv file as it should
res2: Array[String] = Array("","carat","cut","color","clarity","depth","table","price","x","y","z", "1",0.23,"Ideal","E","SI2",61.5,55,326,3.95,3.98,2.43)
val diamondsRawDF = sqlContext.read    // we can use sqlContext instead of SparkSession for backwards compatibility to 1.x
    .format("com.databricks.spark.csv") // use spark.csv package
    .option("header", "true") // Use first line of all files as header
    .option("inferSchema", "true") // Automatically infer data types
    //.option("delimiter", ",") // Specify the delimiter as comma or ',' DEFAULT
    .load(diamondsFilePath)
diamondsRawDF: org.apache.spark.sql.DataFrame = [_c0: int, carat: double ... 9 more fields]
//There are 10 columns.  We will try to predict the price of diamonds, treating the other 9 columns as features.
diamondsRawDF.printSchema()
root |-- _c0: integer (nullable = true) |-- carat: double (nullable = true) |-- cut: string (nullable = true) |-- color: string (nullable = true) |-- clarity: string (nullable = true) |-- depth: double (nullable = true) |-- table: double (nullable = true) |-- price: integer (nullable = true) |-- x: double (nullable = true) |-- y: double (nullable = true) |-- z: double (nullable = true)

Note: (nullable = true) simply means if the value is allowed to be null.

Let us count the number of rows in diamondsDF.

diamondsRawDF.count() // Ctrl+Enter
res4: Long = 53940

So there are 53940 records or rows in the DataFrame.

Use the show(n) method to see the first n (default is 20) rows of the DataFrame, as folows:

diamondsRawDF.show(10)
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+ |_c0|carat| cut|color|clarity|depth|table|price| x| y| z| +---+-----+---------+-----+-------+-----+-----+-----+----+----+----+ | 1| 0.23| Ideal| E| SI2| 61.5| 55.0| 326|3.95|3.98|2.43| | 2| 0.21| Premium| E| SI1| 59.8| 61.0| 326|3.89|3.84|2.31| | 3| 0.23| Good| E| VS1| 56.9| 65.0| 327|4.05|4.07|2.31| | 4| 0.29| Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63| | 5| 0.31| Good| J| SI2| 63.3| 58.0| 335|4.34|4.35|2.75| | 6| 0.24|Very Good| J| VVS2| 62.8| 57.0| 336|3.94|3.96|2.48| | 7| 0.24|Very Good| I| VVS1| 62.3| 57.0| 336|3.95|3.98|2.47| | 8| 0.26|Very Good| H| SI1| 61.9| 55.0| 337|4.07|4.11|2.53| | 9| 0.22| Fair| E| VS2| 65.1| 61.0| 337|3.87|3.78|2.49| | 10| 0.23|Very Good| H| VS1| 59.4| 61.0| 338| 4.0|4.05|2.39| +---+-----+---------+-----+-------+-----+-----+-----+----+----+----+ only showing top 10 rows

If you notice the schema of diamondsRawDF you will see that the automatic schema inference of SqlContext.read method has cast the values in the column price as integer.

To cleanup:

  • let's recast the column price as double for downstream ML tasks later and
  • let's also get rid of the first column of row indices.
import org.apache.spark.sql.types.DoubleType
//we will convert price column from int to double for being able to model, fit and predict in downstream ML task
val diamondsDF = diamondsRawDF.select($"carat", $"cut", $"color", $"clarity", $"depth", $"table",$"price".cast(DoubleType).as("price"), $"x", $"y", $"z")
diamondsDF.cache() // let's cache it for reuse
diamondsDF.printSchema // print schema
root |-- carat: double (nullable = true) |-- cut: string (nullable = true) |-- color: string (nullable = true) |-- clarity: string (nullable = true) |-- depth: double (nullable = true) |-- table: double (nullable = true) |-- price: double (nullable = true) |-- x: double (nullable = true) |-- y: double (nullable = true) |-- z: double (nullable = true) import org.apache.spark.sql.types.DoubleType diamondsDF: org.apache.spark.sql.DataFrame = [carat: double, cut: string ... 8 more fields]
diamondsDF.show(10,false) // notice that price column has Double values that end in '.0' now
+-----+---------+-----+-------+-----+-----+-----+----+----+----+ |carat|cut |color|clarity|depth|table|price|x |y |z | +-----+---------+-----+-------+-----+-----+-----+----+----+----+ |0.23 |Ideal |E |SI2 |61.5 |55.0 |326.0|3.95|3.98|2.43| |0.21 |Premium |E |SI1 |59.8 |61.0 |326.0|3.89|3.84|2.31| |0.23 |Good |E |VS1 |56.9 |65.0 |327.0|4.05|4.07|2.31| |0.29 |Premium |I |VS2 |62.4 |58.0 |334.0|4.2 |4.23|2.63| |0.31 |Good |J |SI2 |63.3 |58.0 |335.0|4.34|4.35|2.75| |0.24 |Very Good|J |VVS2 |62.8 |57.0 |336.0|3.94|3.96|2.48| |0.24 |Very Good|I |VVS1 |62.3 |57.0 |336.0|3.95|3.98|2.47| |0.26 |Very Good|H |SI1 |61.9 |55.0 |337.0|4.07|4.11|2.53| |0.22 |Fair |E |VS2 |65.1 |61.0 |337.0|3.87|3.78|2.49| |0.23 |Very Good|H |VS1 |59.4 |61.0 |338.0|4.0 |4.05|2.39| +-----+---------+-----+-------+-----+-----+-----+----+----+----+ only showing top 10 rows
//View DataFrame in databricks
// note this 'display' is a databricks notebook specific command that is quite powerful for visual interaction with the data
// other notebooks like zeppelin have similar commands for interactive visualisation
display(diamondsDF) 
0.23IdealESI261.5553263.953.982.43
0.21PremiumESI159.8613263.893.842.31
0.23GoodEVS156.9653274.054.072.31
0.29PremiumIVS262.4583344.24.232.63
0.31GoodJSI263.3583354.344.352.75
0.24Very GoodJVVS262.8573363.943.962.48
0.24Very GoodIVVS162.3573363.953.982.47
0.26Very GoodHSI161.9553374.074.112.53
0.22FairEVS265.1613373.873.782.49
0.23Very GoodHVS159.46133844.052.39
0.3GoodJSI164553394.254.282.73
0.23IdealJVS162.8563403.933.92.46
0.22PremiumFSI160.4613423.883.842.33
0.31IdealJSI262.2543444.354.372.71
0.2PremiumESI260.2623453.793.752.27
0.32PremiumEI160.9583454.384.422.68
0.3IdealISI262543484.314.342.68
0.3GoodJSI163.4543514.234.292.7
0.3GoodJSI163.8563514.234.262.71
0.3Very GoodJSI162.7593514.214.272.66
0.3GoodISI263.3563514.264.32.71
0.23Very GoodEVS263.8553523.853.922.48
0.23Very GoodHVS161573533.943.962.41
0.31Very GoodJSI159.4623534.394.432.62
0.31Very GoodJSI158.1623534.444.472.59
0.23Very GoodGVVS260.4583543.974.012.41
0.24PremiumIVS162.5573553.973.942.47
0.3Very GoodJVS262.2573574.284.32.67
0.23Very GoodDVS260.5613573.963.972.4
0.23Very GoodFVS160.9573573.963.992.42
0.23Very GoodFVS1605740244.032.41
0.23Very GoodFVS159.8574024.044.062.42
0.23Very GoodEVS160.7594023.974.012.42
0.23Very GoodEVS159.5584024.014.062.4
0.23Very GoodDVS161.9584023.923.962.44
0.23GoodFVS158.2594024.064.082.37
0.23GoodEVS164.1594023.833.852.46
0.31GoodHSI164544024.294.312.75
0.26Very GoodDVS260.8594034.134.162.52
0.33IdealISI261.8554034.494.512.78
0.33IdealISI261.2564034.494.52.75
0.33IdealJSI161.1564034.494.552.76
0.26GoodDVS265.2564033.994.022.61
0.26GoodDVS158.4634034.194.242.46
0.32GoodHSI263.1564034.344.372.75
0.29PremiumFSI162.4584034.244.262.65
0.32Very GoodHSI261.8554034.354.422.71
0.32GoodHSI263.8564034.364.382.79
0.25Very GoodEVS263.36040444.032.54
0.29Very GoodHSI260.7604044.334.372.64
0.24Very GoodFSI160.9614044.024.032.45
0.23IdealGVS161.9544043.933.952.44
0.32IdealISI160.9554044.454.482.72
0.22PremiumEVS261.6584043.933.892.41
0.22PremiumDVS259.3624043.913.882.31
0.3IdealISI261594054.34.332.63
0.3PremiumJSI259.3614054.434.382.61
0.3Very GoodISI162.6574054.254.282.67
0.3Very GoodISI163574054.284.322.71
0.3GoodISI163.2554054.254.292.7
0.35IdealIVS160.9575524.544.592.78
0.3PremiumDSI162.6595524.234.272.66
0.3IdealDSI162.5575524.294.322.69
0.3IdealDSI162.1565524.34.332.68
0.42PremiumISI261.5595524.784.842.96
0.28IdealGVVS261.4565534.194.222.58
0.32IdealIVVS16255.35534.394.422.73
0.31Very GoodGSI163.3575534.334.32.73
0.31PremiumGSI161.8585534.354.322.68
0.24PremiumEVVS160.7585534.014.032.44
0.24Very GoodDVVS161.5605533.9742.45
0.3Very GoodHSI163.1565544.294.272.7
0.3PremiumHSI162.9595544.284.242.68
0.3PremiumHSI162.5575544.294.252.67
0.3GoodHSI163.7575544.284.262.72
0.26Very GoodFVVS259.2605544.194.222.49
0.26Very GoodEVVS259.9585544.154.232.51
0.26Very GoodDVVS262.4545544.084.132.56
0.26Very GoodDVVS262.8605544.014.052.53
0.26Very GoodEVVS162.6595544.064.092.55
0.26Very GoodEVVS163.45955444.042.55
0.26Very GoodDVVS162.1605544.034.122.53
0.26IdealEVVS262.9585544.024.062.54
0.38IdealISI261.6565544.654.672.87
0.26GoodEVVS157.9605544.224.252.45
0.24PremiumGVVS162.3595543.953.922.45
0.24PremiumHVVS161.2585544.013.962.44
0.24PremiumHVVS160.8595544.0242.44
0.24PremiumHVVS260.7585544.074.042.46
0.32PremiumISI162.9585544.354.332.73
0.7IdealESI162.55727575.75.723.57
0.86FairESI255.16927576.456.333.52
0.7IdealGVS261.65627575.75.673.5
0.71Very GoodEVS262.45727595.685.733.56
0.78Very GoodGSI263.85627595.815.853.72
0.7GoodEVS257.55827595.855.93.38
0.7GoodFVS159.46227595.715.763.4
0.96FairFSI266.36227596.275.954.07
0.73Very GoodESI161.65927605.775.783.56
0.8PremiumHSI161.55827605.975.933.66

Showing the first 1000 rows.

Step 2. Understand the data

Let's examine the data to get a better understanding of what is there. We only examine a couple of features (columns), but it gives an idea of the type of exploration you might do to understand a new dataset.

For more examples of using Databricks's visualization (even across languages) see https://docs.databricks.com/user-guide/visualizations/index.html NOW.

We can see that we have a mix of

  • categorical features (cut, color, clarity) and
  • continuous features (depth, x, y, z).
Let's first look at the categorical features.

You can also select one or more individual columns using so-called DataFrame API.

Let us select the column cut from diamondsDF and create a new DataFrame called cutsDF and then display it as follows:

val cutsDF = diamondsDF.select("cut") // Shift+Enter
cutsDF: org.apache.spark.sql.DataFrame = [cut: string]
cutsDF.show(10) // Ctrl+Enter
+---------+ | cut| +---------+ | Ideal| | Premium| | Good| | Premium| | Good| |Very Good| |Very Good| |Very Good| | Fair| |Very Good| +---------+ only showing top 10 rows

Let us use distinct to find the distinct types of cut's in the dataset.

// View distinct diamond cuts in dataset
val cutsDistinctDF = diamondsDF.select("cut").distinct()
cutsDistinctDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cut: string]
cutsDistinctDF.show()
+---------+ | cut| +---------+ | Premium| | Ideal| | Good| | Fair| |Very Good| +---------+

Clearly, there are just 5 kinds of cuts.