//Let's quickly recall the schema and make sure our table is here now table("power_plant_table").printSchema
root
|-- AT: double (nullable = true)
|-- V: double (nullable = true)
|-- AP: double (nullable = true)
|-- RH: double (nullable = true)
|-- PE: double (nullable = true)
powerPlantDF // make sure we have the DataFrame too
res23: org.apache.spark.sql.DataFrame = [AT: double, V: double ... 3 more fields]
import org.apache.spark.ml.feature.VectorAssembler // make a DataFrame called dataset from the table val dataset = sqlContext.table("power_plant_table") val vectorizer = new VectorAssembler() .setInputCols(Array("AT", "V", "AP", "RH")) .setOutputCol("features")
import org.apache.spark.ml.feature.VectorAssembler
dataset: org.apache.spark.sql.DataFrame = [AT: double, V: double ... 3 more fields]
vectorizer: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_bca323dc00ef
// First let's hold out 20% of our data for testing and leave 80% for training var Array(split20, split80) = dataset.randomSplit(Array(0.20, 0.80), 1800009193L)
split20: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [AT: double, V: double ... 3 more fields]
split80: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [AT: double, V: double ... 3 more fields]
// Let's cache these datasets for performance val testSet = split20.cache() val trainingSet = split80.cache()
testSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [AT: double, V: double ... 3 more fields]
trainingSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [AT: double, V: double ... 3 more fields]
dataset.take(3)
res24: Array[org.apache.spark.sql.Row] = Array([14.96,41.76,1024.07,73.17,463.26], [25.18,62.96,1020.04,59.08,444.37], [5.11,39.4,1012.16,92.14,488.56])
testSet.take(3)
res25: Array[org.apache.spark.sql.Row] = Array([1.81,39.42,1026.92,76.97,490.55], [3.2,41.31,997.67,98.84,489.86], [3.38,41.31,998.79,97.76,489.11])
trainingSet.take(3)
res26: Array[org.apache.spark.sql.Row] = Array([2.34,39.42,1028.47,69.68,490.34], [2.58,39.42,1028.68,69.03,488.69], [2.64,39.64,1011.02,85.24,481.29])
// ***** LINEAR REGRESSION MODEL **** import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.ml.regression.LinearRegressionModel import org.apache.spark.ml.Pipeline // Let's initialize our linear regression learner val lr = new LinearRegression()
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.Pipeline
lr: org.apache.spark.ml.regression.LinearRegression = linReg_161e726c1f38
// We use explain params to dump the parameters we can use lr.explainParams()
res29: String =
aggregationDepth: suggested depth for treeAggregate (>= 2) (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)
featuresCol: features column name (default: features)
fitIntercept: whether to fit an intercept term (default: true)
labelCol: label column name (default: label)
maxIter: maximum number of iterations (>= 0) (default: 100)
predictionCol: prediction column name (default: prediction)
regParam: regularization parameter (>= 0) (default: 0.0)
solver: the solver algorithm for optimization. If this is not set or empty, default value is 'auto' (default: auto)
standardization: whether to standardize the training features before fitting the model (default: true)
tol: the convergence tolerance for iterative algorithms (>= 0) (default: 1.0E-6)
weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0 (undefined)
SDS-2.2, Scalable Data Science
Last refresh: Never