ScaDaMaLe Course site and book

//Load LDA model or the topic distributions // data format: org.apache.spark.sql.DataFrame = [genome:string, label:string, id:long] val k_mers_df_train = spark.read.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_train_nonoverlapping") val k_mers_df_test = spark.read.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_test_nonoverlapping") // data format: org.apache.spark.sql.DataFrame = [_1: bigint, _2: vector] the vector part contains the topic distributions val trainingData = spark.read.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/topic_dist_train_t20_i20k_no_cv") val testData = spark.read.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/topic_dist_test_t20_i20k_no_cv")
k_mers_df_train: org.apache.spark.sql.DataFrame = [genome: string, label: string ... 1 more field] k_mers_df_test: org.apache.spark.sql.DataFrame = [genome: string, label: string ... 1 more field] trainingData: org.apache.spark.sql.DataFrame = [_1: bigint, _2: vector] testData: org.apache.spark.sql.DataFrame = [_1: bigint, _2: vector]
//Merge data sources to get labels spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val mergedTrainingData = trainingData.join(k_mers_df_train,trainingData("_1") === k_mers_df_train("id"),"inner").withColumnRenamed("_2","features").drop("_1") val mergedTestData = testData.join(k_mers_df_test,testData("_1") === k_mers_df_test("id"),"inner").withColumnRenamed("_2","features").drop("_1")
mergedTrainingData: org.apache.spark.sql.DataFrame = [features: vector, genome: string ... 2 more fields] mergedTestData: org.apache.spark.sql.DataFrame = [features: vector, genome: string ... 2 more fields]
mergedTrainingData.show(5)
+--------------------+--------------------+-------+----+ | features| genome| label| id| +--------------------+--------------------+-------+----+ |[0.05124403025197...|CTT GTA GAT CTG T...|oceania| 26| |[0.04866065493877...|CTC TTG TAG ATC T...|oceania| 29| |[0.04856023665158...|ACT TTC GAT CTC T...|oceania| 474| |[0.05114514533282...|CTT GTA GAT CTG T...|oceania| 964| |[0.04856846330546...|ACT TTC GAT CTC T...|oceania|1677| +--------------------+--------------------+-------+----+ only showing top 5 rows
mergedTestData.show(5)
+--------------------+--------------------+-------+----+ | features| genome| label| id| +--------------------+--------------------+-------+----+ |[0.04856691976914...|ACT TTC GAT CTC T...|oceania|2250| |[0.04775129705622...|ACT TTC GAT CTC T...|oceania|3091| |[0.04856692420144...|ACT TTC GAT CTC T...|oceania|7279| |[0.04881282010425...|ACT TTC GAT CTC T...|oceania|8075| |[0.05110516016513...|CTT GTA GAT CTG T...|oceania|9458| +--------------------+--------------------+-------+----+ only showing top 5 rows
import org.apache.spark.sql.functions._ import org.apache.spark.ml._ //Split the feature vector into seprate columns val vecToArray = udf( (xs: linalg.Vector) => xs.toArray ) val dfArr = mergedTrainingData.withColumn("featuresArr" , vecToArray($"features") ) val elements = Array("f1", "f2", "f3", "f4", "f5", "f6","f7", "f8", "f9","f10") val sqlExpr = elements.zipWithIndex.map{ case (alias, idx) => col("featuresArr").getItem(idx).as(alias) } val df_feats = dfArr.select((col("label") +: sqlExpr) : _*)
import org.apache.spark.sql.functions._ import org.apache.spark.ml._ vecToArray: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$9301/1088827103@108f1ec1,ArrayType(DoubleType,false),List(Some(class[value[0]: vector])),None,true,true) dfArr: org.apache.spark.sql.DataFrame = [features: vector, genome: string ... 3 more fields] elements: Array[String] = Array(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10) sqlExpr: Array[org.apache.spark.sql.Column] = Array(featuresArr[0] AS `f1`, featuresArr[1] AS `f2`, featuresArr[2] AS `f3`, featuresArr[3] AS `f4`, featuresArr[4] AS `f5`, featuresArr[5] AS `f6`, featuresArr[6] AS `f7`, featuresArr[7] AS `f8`, featuresArr[8] AS `f9`, featuresArr[9] AS `f10`) df_feats: org.apache.spark.sql.DataFrame = [label: string, f1: double ... 9 more fields]
df_feats.describe().show()
+-------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+ |summary| label| f1| f2| f3| f4| f5| f6| f7| f8| f9| f10| +-------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+ | count| 22155| 22155| 22155| 22155| 22155| 22155| 22155| 22155| 22155| 22155| 22155| | mean| null| 0.04931437716125177| 0.05103219325119624| 0.05043777410343692| 0.04985720003755815| 0.05109828486193199|0.049969419011346175|0.049612622552246397| 0.05172608119915934| 0.05519918725594238|0.050902505322165705| | stddev| null| 0.00376757883941906|0.012025788212709033|0.008575417672641356|0.007216341887817029| 0.017036551424972|0.006744772303600655|0.006141190111960429|0.015925101732795318|0.028248525286887288|0.011750518257958392| | min| africa|7.597652484573448E-5|7.931226826328689E-5|7.472127009799751E-5|8.488001247712824E-5|5.737788781311747E-5|7.510235436716417E-5| 8.01478429209781E-5|1.102376409869290...|7.060888276954563E-5|5.730971677795215...| | max|southamerica| 0.07096255125893133| 0.08483680076143339| 0.07401129608174387| 0.09129137902172121| 0.07569705042586143| 0.09892558758161725| 0.12033929830179531| 0.08811599095688616| 0.09753703174079206| 0.08152012057643862| +-------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
import org.apache.spark.sql.functions.rand display(df_feats.sample(true,0.5).orderBy(rand()))

import org.apache.spark.ml.feature.{StringIndexer,VectorAssembler} import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.linalg.Vector val transformers = Array( new StringIndexer().setInputCol("label").setOutputCol("label_id")) // Train a RandomForest model. val rf = new RandomForestClassifier() .setLabelCol("label_id") .setFeaturesCol("features") .setNumTrees(500) .setFeatureSubsetStrategy("auto") .setImpurity("gini") .setMaxDepth(20) .setMaxBins(32) .setSeed(12345) val model = new Pipeline().setStages(transformers :+ rf).fit(mergedTrainingData)
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler} import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.linalg.Vector transformers: Array[org.apache.spark.ml.feature.StringIndexer] = Array(strIdx_5c4c8beb7d03) rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_c92cc57e8554 model: org.apache.spark.ml.PipelineModel = pipeline_fd80c4a01f49
import org.apache.spark.mllib.evaluation.MulticlassMetrics def evaluateModel(model: org.apache.spark.ml.PipelineModel, df: org.apache.spark.sql.DataFrame){ val predictionsOnData = model.transform(df) val predictionAndLabelsRdd = predictionsOnData.select("prediction", "label_id").as[(Double,Double)].rdd val metricsMulti = new MulticlassMetrics(predictionAndLabelsRdd) val accuracy = metricsMulti.accuracy val fm0 = metricsMulti.fMeasure(0) val fm1 = metricsMulti.fMeasure(1) val fm2 = metricsMulti.fMeasure(2) val fm3 = metricsMulti.fMeasure(3) val fm4 = metricsMulti.fMeasure(4) val fm5 = metricsMulti.fMeasure(5) println("Confusion matrix:") println(metricsMulti.confusionMatrix) println("Summary Statistics") println(s"Accuracy = $accuracy") println(s"fm0 = $fm0") println(s"fm1 = $fm1") println(s"fm2 = $fm2") println(s"fm3 = $fm3") println(s"fm4 = $fm4") println(s"fm5 = $fm5") }
import org.apache.spark.mllib.evaluation.MulticlassMetrics evaluateModel: (model: org.apache.spark.ml.PipelineModel, df: org.apache.spark.sql.DataFrame)Unit

Review fit of training dataset

evaluateModel(model, mergedTrainingData)
Confusion matrix: 12688.0 80.0 7.0 1.0 0.0 0.0 137.0 6891.0 0.0 0.0 0.0 0.0 53.0 18.0 1071.0 1.0 0.0 0.0 47.0 3.0 0.0 843.0 0.0 0.0 16.0 2.0 0.0 1.0 192.0 0.0 19.0 0.0 0.0 0.0 0.0 85.0 Summary Statistics Accuracy = 0.9826224328593997 fm0 = 0.9860118122474355 fm1 = 0.9828840393667095 fm2 = 0.9644304367402071 fm3 = 0.9695227142035653 fm4 = 0.9528535980148882 fm5 = 0.8994708994708994

Performance on Test dataset

evaluateModel(model, mergedTestData)
Confusion matrix: 5373.0 89.0 3.0 3.0 0.0 0.0 368.0 2522.0 0.0 1.0 0.0 0.0 478.0 15.0 20.0 3.0 0.0 0.0 193.0 8.0 0.0 192.0 0.0 0.0 89.0 5.0 0.0 1.0 1.0 0.0 14.0 0.0 0.0 0.0 0.0 17.0 Summary Statistics Accuracy = 0.8648217136774881 fm0 = 0.896770424768422 fm1 = 0.9121157323688969 fm2 = 0.07421150278293136 fm3 = 0.6475548060708264 fm4 = 0.020618556701030924 fm5 = 0.7083333333333333