res2: Int = 240
powerPlantRDD: org.apache.spark.rdd.RDD[String] = /databricks-datasets/power-plant/data/Sheet1.tsv MapPartitionsRDD[10575] at textFile at command-45638284503054:1
AT V AP RH PE
14.96 41.76 1024.07 73.17 463.26
25.18 62.96 1020.04 59.08 444.37
5.11 39.4 1012.16 92.14 488.56
20.86 57.32 1010.24 76.64 446.48
powerPlantDF: org.apache.spark.sql.DataFrame = [AT: double, V: double ... 3 more fields]
root
|-- AT: double (nullable = true)
|-- V: double (nullable = true)
|-- AP: double (nullable = true)
|-- RH: double (nullable = true)
|-- PE: double (nullable = true)
res9: Long = 9568
+-----+-----+-------+-----+------+
| AT| V| AP| RH| PE|
+-----+-----+-------+-----+------+
|14.96|41.76|1024.07|73.17|463.26|
|25.18|62.96|1020.04|59.08|444.37|
| 5.11| 39.4|1012.16|92.14|488.56|
|20.86|57.32|1010.24|76.64|446.48|
|10.82| 37.5|1009.23|96.62| 473.9|
|26.27|59.44|1012.23|58.77|443.67|
|15.89|43.96|1014.02|75.24|467.35|
| 9.48|44.71|1019.12|66.43|478.42|
|14.64| 45.0|1021.78|41.25|475.98|
|11.74|43.56|1015.14|70.72| 477.5|
+-----+-----+-------+-----+------+
only showing top 10 rows
res12: Long = 9568
+--------+--------------------+-----------+
|database| tableName|isTemporary|
+--------+--------------------+-----------+
| default| adult| false|
| default| business_csv_csv| false|
| default| checkin_table| false|
| default| diamonds| false|
| default| inventory| false|
| default|item_merchant_cat...| false|
| default| items_left_csv| false|
| default| logistic_detail| false|
| default| merchant_ratings| false|
| default| order_data| false|
| default| order_ids_left_csv| false|
| default| repeat_csv| false|
| default| review_2019_csv| false|
| default|sample_logistic_t...| false|
| default| sentimentlex_csv| false|
| default| simple_range| false|
| default| social_media_usage| false|
| default| tip_json| false|
| default| tips_csv_csv| false|
| default| users_csv| false|
+--------+--------------------+-----------+
+------------------------+--------+-----------+---------+-----------+
|name |database|description|tableType|isTemporary|
+------------------------+--------+-----------+---------+-----------+
|adult |default |null |EXTERNAL |false |
|business_csv_csv |default |null |EXTERNAL |false |
|checkin_table |default |null |MANAGED |false |
|diamonds |default |null |EXTERNAL |false |
|inventory |default |null |MANAGED |false |
|item_merchant_categories|default |null |MANAGED |false |
|items_left_csv |default |null |EXTERNAL |false |
|logistic_detail |default |null |MANAGED |false |
|merchant_ratings |default |null |MANAGED |false |
|order_data |default |null |MANAGED |false |
|order_ids_left_csv |default |null |EXTERNAL |false |
|repeat_csv |default |null |MANAGED |false |
|review_2019_csv |default |null |EXTERNAL |false |
|sample_logistic_table |default |null |EXTERNAL |false |
|sentimentlex_csv |default |null |EXTERNAL |false |
|simple_range |default |null |MANAGED |false |
|social_media_usage |default |null |MANAGED |false |
|tip_json |default |null |EXTERNAL |false |
|tips_csv_csv |default |null |EXTERNAL |false |
|users_csv |default |null |EXTERNAL |false |
+------------------------+--------+-----------+---------+-----------+
+---------+---------------------+--------------------------------------+
|name |description |locationUri |
+---------+---------------------+--------------------------------------+
|db_ad_gcs| |dbfs:/user/hive/warehouse/db_ad_gcs.db|
|default |Default Hive database|dbfs:/user/hive/warehouse |
+---------+---------------------+--------------------------------------+
+--------+--------------------+-----------+
|database| tableName|isTemporary|
+--------+--------------------+-----------+
| default| adult| false|
| default| business_csv_csv| false|
| default| checkin_table| false|
| default| diamonds| false|
| default| inventory| false|
| default|item_merchant_cat...| false|
| default| items_left_csv| false|
| default| logistic_detail| false|
| default| merchant_ratings| false|
| default| order_data| false|
| default| order_ids_left_csv| false|
| default| repeat_csv| false|
| default| review_2019_csv| false|
| default|sample_logistic_t...| false|
| default| sentimentlex_csv| false|
| default| simple_range| false|
| default| social_media_usage| false|
| default| tip_json| false|
| default| tips_csv_csv| false|
| default| users_csv| false|
+--------+--------------------+-----------+
only showing top 20 rows
import org.apache.spark.ml.feature.VectorAssembler // make a DataFrame called dataset from the table val dataset = sqlContext.table("power_plant_table") val vectorizer = new VectorAssembler() .setInputCols(Array("AT", "V", "AP", "RH")) .setOutputCol("features")
import org.apache.spark.ml.feature.VectorAssembler
dataset: org.apache.spark.sql.DataFrame = [AT: double, V: double ... 3 more fields]
vectorizer: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_a6baa233f655
// First let's hold out 20% of our data for testing and leave 80% for training var Array(split20, split80) = dataset.randomSplit(Array(0.20, 0.80), 1800009193L)
split20: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [AT: double, V: double ... 3 more fields]
split80: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [AT: double, V: double ... 3 more fields]
// Let's cache these datasets for performance val testSet = split20.cache() val trainingSet = split80.cache()
testSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [AT: double, V: double ... 3 more fields]
trainingSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [AT: double, V: double ... 3 more fields]
// ***** LINEAR REGRESSION MODEL **** import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.ml.regression.LinearRegressionModel import org.apache.spark.ml.Pipeline // Let's initialize our linear regression learner val lr = new LinearRegression()
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.Pipeline
lr: org.apache.spark.ml.regression.LinearRegression = linReg_ba1ada380272
// We use explain params to dump the parameters we can use lr.explainParams()
res34: String =
aggregationDepth: suggested depth for treeAggregate (>= 2) (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)
epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. (default: 1.35)
featuresCol: features column name (default: features)
fitIntercept: whether to fit an intercept term (default: true)
labelCol: label column name (default: label)
loss: The loss function to be optimized. Supported options: squaredError, huber. (Default squaredError) (default: squaredError)
maxIter: maximum number of iterations (>= 0) (default: 100)
predictionCol: prediction column name (default: prediction)
regParam: regularization parameter (>= 0) (default: 0.0)
solver: The solver algorithm for optimization. Supported options: auto, normal, l-bfgs. (Default auto) (default: auto)
standardization: whether to standardize the training features before fitting the model (default: true)
tol: the convergence tolerance for iterative algorithms (>= 0) (default: 1.0E-6)
weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0 (undefined)
// Now we set the parameters for the method lr.setPredictionCol("Predicted_PE") .setLabelCol("PE") .setMaxIter(100) .setRegParam(0.1) // We will use the new spark.ml pipeline API. If you have worked with scikit-learn this will be very familiar. val lrPipeline = new Pipeline() lrPipeline.setStages(Array(vectorizer, lr)) // Let's first train on the entire dataset to see what we get val lrModel = lrPipeline.fit(trainingSet)
lrPipeline: org.apache.spark.ml.Pipeline = pipeline_97dfd6f4e2e5
lrModel: org.apache.spark.ml.PipelineModel = pipeline_97dfd6f4e2e5
// Now let's sort the coefficients from the largest to the smallest var equation = s"y = $intercept " //var variables = Array coefficentFeaturePairs.sortByKey().collect().foreach({ case (weight, feature) => { val symbol = if (weight > 0) "+" else "-" val absWeight = Math.abs(weight) equation += (s" $symbol (${absWeight} * ${feature})") } } )
equation: String = y = 427.9139822165837 - (1.9083064919040942 * AT) - (0.25381293007161654 * V) - (0.1474651301033126 * RH) + (0.08739350304730673 * AP)
//Now let's compute some evaluation metrics against our test dataset import org.apache.spark.mllib.evaluation.RegressionMetrics val metrics = new RegressionMetrics(predictionsAndLabels.select("Predicted_PE", "PE").rdd.map(r => (r(0).asInstanceOf[Double], r(1).asInstanceOf[Double])))
import org.apache.spark.mllib.evaluation.RegressionMetrics
metrics: org.apache.spark.mllib.evaluation.RegressionMetrics = org.apache.spark.mllib.evaluation.RegressionMetrics@7290c3ef
// First we calculate the residual error and divide it by the RMSE from predictionsAndLabels DataFrame and make another DataFrame that is registered as a temporary table Power_Plant_RMSE_Evaluation predictionsAndLabels.selectExpr("PE", "Predicted_PE", "PE - Predicted_PE AS Residual_Error", s""" (PE - Predicted_PE) / $rmse AS Within_RSME""").createOrReplaceTempView("Power_Plant_RMSE_Evaluation")
%sql SELECT case when Within_RSME <= 1.0 and Within_RSME >= -1.0 then 1 when Within_RSME <= 2.0 and Within_RSME >= -2.0 then 2 else 3 end RSME_Multiple, COUNT(*) count from Power_Plant_RMSE_Evaluation group by case when Within_RSME <= 1.0 and Within_RSME >= -1.0 then 1 when Within_RSME <= 2.0 and Within_RSME >= -2.0 then 2 else 3 end
//Let's set up our evaluator class to judge the model based on the best root mean squared error val regEval = new RegressionEvaluator() regEval.setLabelCol("PE") .setPredictionCol("Predicted_PE") .setMetricName("rmse")
regEval: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_7e20af62a956
res44: regEval.type = regEval_7e20af62a956
SDS-2.x, Scalable Data Engineering Science
Last refresh: Never