//Load LDA model or the topic distributions
// data format: org.apache.spark.sql.DataFrame = [genome:string, label:string, id:long]
val k_mers_df_train = spark.read.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_train_nonoverlapping")
val k_mers_df_test = spark.read.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_test_nonoverlapping")
// data format: org.apache.spark.sql.DataFrame = [_1: bigint, _2: vector] the vector part contains the topic distributions
val trainingData = spark.read.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/topic_dist_train_t20_i20k_no_cv")
val testData = spark.read.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/topic_dist_test_t20_i20k_no_cv")
k_mers_df_train: org.apache.spark.sql.DataFrame = [genome: string, label: string ... 1 more field]
k_mers_df_test: org.apache.spark.sql.DataFrame = [genome: string, label: string ... 1 more field]
trainingData: org.apache.spark.sql.DataFrame = [_1: bigint, _2: vector]
testData: org.apache.spark.sql.DataFrame = [_1: bigint, _2: vector]
//Merge data sources to get labels
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val mergedTrainingData = trainingData.join(k_mers_df_train,trainingData("_1") === k_mers_df_train("id"),"inner").withColumnRenamed("_2","features").drop("_1")
val mergedTestData = testData.join(k_mers_df_test,testData("_1") === k_mers_df_test("id"),"inner").withColumnRenamed("_2","features").drop("_1")
mergedTrainingData: org.apache.spark.sql.DataFrame = [features: vector, genome: string ... 2 more fields]
mergedTestData: org.apache.spark.sql.DataFrame = [features: vector, genome: string ... 2 more fields]
mergedTrainingData.show(5)
+--------------------+--------------------+-------+----+
| features| genome| label| id|
+--------------------+--------------------+-------+----+
|[0.05124403025197...|CTT GTA GAT CTG T...|oceania| 26|
|[0.04866065493877...|CTC TTG TAG ATC T...|oceania| 29|
|[0.04856023665158...|ACT TTC GAT CTC T...|oceania| 474|
|[0.05114514533282...|CTT GTA GAT CTG T...|oceania| 964|
|[0.04856846330546...|ACT TTC GAT CTC T...|oceania|1677|
+--------------------+--------------------+-------+----+
only showing top 5 rows
mergedTestData.show(5)
+--------------------+--------------------+-------+----+
| features| genome| label| id|
+--------------------+--------------------+-------+----+
|[0.04856691976914...|ACT TTC GAT CTC T...|oceania|2250|
|[0.04775129705622...|ACT TTC GAT CTC T...|oceania|3091|
|[0.04856692420144...|ACT TTC GAT CTC T...|oceania|7279|
|[0.04881282010425...|ACT TTC GAT CTC T...|oceania|8075|
|[0.05110516016513...|CTT GTA GAT CTG T...|oceania|9458|
+--------------------+--------------------+-------+----+
only showing top 5 rows
import org.apache.spark.sql.functions._
import org.apache.spark.ml._
//Split the feature vector into seprate columns
val vecToArray = udf( (xs: linalg.Vector) => xs.toArray )
val dfArr = mergedTrainingData.withColumn("featuresArr" , vecToArray($"features") )
val elements = Array("f1", "f2", "f3", "f4", "f5", "f6","f7", "f8", "f9","f10")
val sqlExpr = elements.zipWithIndex.map{ case (alias, idx) => col("featuresArr").getItem(idx).as(alias) }
val df_feats = dfArr.select((col("label") +: sqlExpr) : _*)
import org.apache.spark.sql.functions._
import org.apache.spark.ml._
vecToArray: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$9301/1088827103@108f1ec1,ArrayType(DoubleType,false),List(Some(class[value[0]: vector])),None,true,true)
dfArr: org.apache.spark.sql.DataFrame = [features: vector, genome: string ... 3 more fields]
elements: Array[String] = Array(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10)
sqlExpr: Array[org.apache.spark.sql.Column] = Array(featuresArr[0] AS `f1`, featuresArr[1] AS `f2`, featuresArr[2] AS `f3`, featuresArr[3] AS `f4`, featuresArr[4] AS `f5`, featuresArr[5] AS `f6`, featuresArr[6] AS `f7`, featuresArr[7] AS `f8`, featuresArr[8] AS `f9`, featuresArr[9] AS `f10`)
df_feats: org.apache.spark.sql.DataFrame = [label: string, f1: double ... 9 more fields]
df_feats.describe().show()
+-------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|summary| label| f1| f2| f3| f4| f5| f6| f7| f8| f9| f10|
+-------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| count| 22155| 22155| 22155| 22155| 22155| 22155| 22155| 22155| 22155| 22155| 22155|
| mean| null| 0.04931437716125177| 0.05103219325119624| 0.05043777410343692| 0.04985720003755815| 0.05109828486193199|0.049969419011346175|0.049612622552246397| 0.05172608119915934| 0.05519918725594238|0.050902505322165705|
| stddev| null| 0.00376757883941906|0.012025788212709033|0.008575417672641356|0.007216341887817029| 0.017036551424972|0.006744772303600655|0.006141190111960429|0.015925101732795318|0.028248525286887288|0.011750518257958392|
| min| africa|7.597652484573448E-5|7.931226826328689E-5|7.472127009799751E-5|8.488001247712824E-5|5.737788781311747E-5|7.510235436716417E-5| 8.01478429209781E-5|1.102376409869290...|7.060888276954563E-5|5.730971677795215...|
| max|southamerica| 0.07096255125893133| 0.08483680076143339| 0.07401129608174387| 0.09129137902172121| 0.07569705042586143| 0.09892558758161725| 0.12033929830179531| 0.08811599095688616| 0.09753703174079206| 0.08152012057643862|
+-------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
import org.apache.spark.sql.functions.rand
display(df_feats.sample(true,0.5).orderBy(rand()))
import org.apache.spark.ml.feature.{StringIndexer,VectorAssembler}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.linalg.Vector
val transformers = Array(
new StringIndexer().setInputCol("label").setOutputCol("label_id"))
// Train a RandomForest model.
val rf = new RandomForestClassifier()
.setLabelCol("label_id")
.setFeaturesCol("features")
.setNumTrees(500)
.setFeatureSubsetStrategy("auto")
.setImpurity("gini")
.setMaxDepth(20)
.setMaxBins(32)
.setSeed(12345)
val model = new Pipeline().setStages(transformers :+ rf).fit(mergedTrainingData)
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.linalg.Vector
transformers: Array[org.apache.spark.ml.feature.StringIndexer] = Array(strIdx_5c4c8beb7d03)
rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_c92cc57e8554
model: org.apache.spark.ml.PipelineModel = pipeline_fd80c4a01f49
import org.apache.spark.mllib.evaluation.MulticlassMetrics
def evaluateModel(model: org.apache.spark.ml.PipelineModel, df: org.apache.spark.sql.DataFrame){
val predictionsOnData = model.transform(df)
val predictionAndLabelsRdd = predictionsOnData.select("prediction", "label_id").as[(Double,Double)].rdd
val metricsMulti = new MulticlassMetrics(predictionAndLabelsRdd)
val accuracy = metricsMulti.accuracy
val fm0 = metricsMulti.fMeasure(0)
val fm1 = metricsMulti.fMeasure(1)
val fm2 = metricsMulti.fMeasure(2)
val fm3 = metricsMulti.fMeasure(3)
val fm4 = metricsMulti.fMeasure(4)
val fm5 = metricsMulti.fMeasure(5)
println("Confusion matrix:")
println(metricsMulti.confusionMatrix)
println("Summary Statistics")
println(s"Accuracy = $accuracy")
println(s"fm0 = $fm0")
println(s"fm1 = $fm1")
println(s"fm2 = $fm2")
println(s"fm3 = $fm3")
println(s"fm4 = $fm4")
println(s"fm5 = $fm5")
}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
evaluateModel: (model: org.apache.spark.ml.PipelineModel, df: org.apache.spark.sql.DataFrame)Unit
Review fit of training dataset
evaluateModel(model, mergedTrainingData)
Confusion matrix:
12688.0 80.0 7.0 1.0 0.0 0.0
137.0 6891.0 0.0 0.0 0.0 0.0
53.0 18.0 1071.0 1.0 0.0 0.0
47.0 3.0 0.0 843.0 0.0 0.0
16.0 2.0 0.0 1.0 192.0 0.0
19.0 0.0 0.0 0.0 0.0 85.0
Summary Statistics
Accuracy = 0.9826224328593997
fm0 = 0.9860118122474355
fm1 = 0.9828840393667095
fm2 = 0.9644304367402071
fm3 = 0.9695227142035653
fm4 = 0.9528535980148882
fm5 = 0.8994708994708994
Performance on Test dataset
evaluateModel(model, mergedTestData)
Confusion matrix:
5373.0 89.0 3.0 3.0 0.0 0.0
368.0 2522.0 0.0 1.0 0.0 0.0
478.0 15.0 20.0 3.0 0.0 0.0
193.0 8.0 0.0 192.0 0.0 0.0
89.0 5.0 0.0 1.0 1.0 0.0
14.0 0.0 0.0 0.0 0.0 17.0
Summary Statistics
Accuracy = 0.8648217136774881
fm0 = 0.896770424768422
fm1 = 0.9121157323688969
fm2 = 0.07421150278293136
fm3 = 0.6475548060708264
fm4 = 0.020618556701030924
fm5 = 0.7083333333333333