ScaDaMaLe Course site and book

val k_mers_df_train = spark.read.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_train").cache() val k_mers_df_test = spark.read.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_test").cache()
k_mers_df_train: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [genome: string, label: string ... 1 more field] k_mers_df_test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [genome: string, label: string ... 1 more field]
val k_mers_df_train = spark.read.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_train_nonoverlapping").cache() val k_mers_df_test = spark.read.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_test_nonoverlapping").cache()
k_mers_df_train: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [genome: string, label: string ... 1 more field] k_mers_df_test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [genome: string, label: string ... 1 more field]

Format data

Generate word count vectors

import org.apache.spark.ml.feature.RegexTokenizer // Set params for RegexTokenizer val tokenizer = new RegexTokenizer() .setPattern("[\\W_]+") // break by white space character(s) - try to remove emails and other patterns .setInputCol("genome") // name of the input column .setOutputCol("tokens") // name of the output column // Tokenize document val tokenized_df_train = tokenizer.transform(k_mers_df_train) val tokenized_df_test = tokenizer.transform(k_mers_df_test)
import org.apache.spark.ml.feature.RegexTokenizer tokenizer: org.apache.spark.ml.feature.RegexTokenizer = RegexTokenizer: uid=regexTok_5df744efa843, minTokenLength=1, gaps=true, pattern=[\W_]+, toLowercase=true tokenized_df_train: org.apache.spark.sql.DataFrame = [genome: string, label: string ... 2 more fields] tokenized_df_test: org.apache.spark.sql.DataFrame = [genome: string, label: string ... 2 more fields]
display(tokenized_df_train.select("tokens"))
import org.apache.spark.ml.feature.CountVectorizer val vectorizer = new CountVectorizer() .setInputCol("tokens") .setOutputCol("features") .setMinDF(10) .fit(tokenized_df_train)
import org.apache.spark.ml.feature.CountVectorizer vectorizer: org.apache.spark.ml.feature.CountVectorizerModel = CountVectorizerModel: uid=cntVec_9226a16a835f, vocabularySize=241
val vocabList = vectorizer.vocabulary
vocabList: Array[String] = Array(ttt, tgt, aaa, tta, aca, ttg, taa, att, aat, ctt, caa, tga, gtt, atg, act, aga, tat, tac, aac, tgg, tgc, aag, tca, cta, ttc, tct, gtg, agt, gaa, cat, gct, ctg, cac, gta, ata, tag, gat, ggt, cag, acc, gca, cca, atc, agg, gac, cct, agc, gag, gga, ctc, gtc, ggc, tcc, gcc, acg, cgt, ggg, ccc, tcg, cgc, cga, gcg, cgg, ccg, nnn, nna, naa, ntt, tnn, cnn, gnn, ann, nnt, nng, nnc, agn, ttn, aan, acn, tan, nat, tcn, ngt, nct, can, gtn, ctn, nta, atn, tgn, ana, nca, ggn, nga, tna, nac, ntg, gcn, gan, tgk, ngc, ccn, ncc, ngg, tnt, ntc, nag, agk, yta, cnt, ktt, aya, gkt, kta, gnt, nan, ytt, ktg, gkc, tty, ayt, tay, yaa, acy, gsc, aay, tgy, ggk, ant, tyt, yac, tya, yat, anc, ang, cay, tkt, cak, cna, cng, rcc, akt, ggw, aty, cgn, gyt, tng, acw, aak, cyt, ytg, raa, yca, ntn, gay, gna, kat, kct, gnc, ngn, cty, tkg, tnc, yag, ayc, kca, aka, tyg, gka, ygt, cnc, cya, ttk, ayg, tka, gng, maa, tth, yga, ncn, ama, aar, kag, ytc, gtk, cch, ncg, kaa, ctk, gty, yct, ara, rtg, ckt, tar, gya, kac, tgr, crc, ccy, tkc, tak, akg, tyc, grt, gcy, trt, ggr, ygg, gak, gcr, kgt, ygc, cgk, wga, wtc, tma, tck, atr, waa, vcc, cwt, tcy, tgs, rgc, rac, agy, kgc, gam, haa, agr, rgt, tha, cyc, atk, gtr, tra, nam, gkg, gwg, cra)
vocabList.size
res17: Int = 241
// Create vector of token counts val countVectors_train = vectorizer.transform(tokenized_df_train).select("id", "features") val countVectors_test = vectorizer.transform(tokenized_df_test).select("id", "features")
countVectors_train: org.apache.spark.sql.DataFrame = [id: bigint, features: vector] countVectors_test: org.apache.spark.sql.DataFrame = [id: bigint, features: vector]
import org.apache.spark.ml.linalg.{Vector => MLVector} import org.apache.spark.mllib.{linalg => mllib} import org.apache.spark.ml.{linalg => ml} val lda_countVector_train = countVectors_train.map { case Row(id: Long, countVector: MLVector) => (id, mllib.Vectors.fromML(countVector)) }.cache() val lda_countVector_test = countVectors_test.map { case Row(id: Long, countVector: MLVector) => (id, mllib.Vectors.fromML(countVector)) }.cache()
import org.apache.spark.ml.linalg.{Vector=>MLVector} import org.apache.spark.mllib.{linalg=>mllib} import org.apache.spark.ml.{linalg=>ml} lda_countVector_train: org.apache.spark.sql.Dataset[(Long, org.apache.spark.mllib.linalg.Vector)] = [_1: bigint, _2: vector] lda_countVector_test: org.apache.spark.sql.Dataset[(Long, org.apache.spark.mllib.linalg.Vector)] = [_1: bigint, _2: vector]
import org.apache.spark.mllib.linalg.{Vectors => OldVectors} import org.apache.spark.ml.linalg.{Vectors => NewVectors} val lda_countVector_train_1 = lda_countVector_train.map({case (a,b) =>(a,b.asML)}) val lda_countVector_test_1 = lda_countVector_test.map({case (a,b) =>(a,b.asML)})
import org.apache.spark.mllib.linalg.{Vectors=>OldVectors} import org.apache.spark.ml.linalg.{Vectors=>NewVectors} lda_countVector_train_1: org.apache.spark.sql.Dataset[(Long, org.apache.spark.ml.linalg.Vector)] = [_1: bigint, _2: vector] lda_countVector_test_1: org.apache.spark.sql.Dataset[(Long, org.apache.spark.ml.linalg.Vector)] = [_1: bigint, _2: vector]
val trainDF = lda_countVector_train_1.toDF() val testDF = lda_countVector_test_1.toDF()
trainDF: org.apache.spark.sql.DataFrame = [_1: bigint, _2: vector] testDF: org.apache.spark.sql.DataFrame = [_1: bigint, _2: vector]
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val mergedTrainingData = trainDF.join(k_mers_df_train,trainDF("_1") === k_mers_df_train("id"),"inner").withColumnRenamed("_2","features").drop("_1") val mergedTestData = testDF.join(k_mers_df_test,testDF("_1") === k_mers_df_test("id"),"inner").withColumnRenamed("_2","features").drop("_1")
mergedTrainingData: org.apache.spark.sql.DataFrame = [features: vector, genome: string ... 2 more fields] mergedTestData: org.apache.spark.sql.DataFrame = [features: vector, genome: string ... 2 more fields]

Classification

The count vectors are used as features for classification

import org.apache.spark.ml.feature.{StringIndexer,VectorAssembler} import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.linalg.Vector val transformers = Array( new StringIndexer().setInputCol("label").setOutputCol("label_id")) // Train a RandomForest model. val rf = new RandomForestClassifier() .setLabelCol("label_id") .setFeaturesCol("features") .setNumTrees(500) .setFeatureSubsetStrategy("auto") .setImpurity("gini") .setMaxDepth(20) .setMaxBins(32) .setSeed(12345) val model = new Pipeline().setStages(transformers :+ rf).fit(mergedTrainingData)
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler} import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.linalg.Vector transformers: Array[org.apache.spark.ml.feature.StringIndexer] = Array(strIdx_e8cf65204547) rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_0a45a46b7366 model: org.apache.spark.ml.PipelineModel = pipeline_14ad91398432
import org.apache.spark.mllib.evaluation.MulticlassMetrics def evaluateModel(model: org.apache.spark.ml.PipelineModel, df: org.apache.spark.sql.DataFrame){ val predictionsOnData = model.transform(df) val predictionAndLabelsRdd = predictionsOnData.select("prediction", "label_id").as[(Double,Double)].rdd val metricsMulti = new MulticlassMetrics(predictionAndLabelsRdd) val accuracy = metricsMulti.accuracy val fm0 = metricsMulti.fMeasure(0) val fm1 = metricsMulti.fMeasure(1) val fm2 = metricsMulti.fMeasure(2) val fm3 = metricsMulti.fMeasure(3) val fm4 = metricsMulti.fMeasure(4) val fm5 = metricsMulti.fMeasure(5) println("Confusion matrix:") println(metricsMulti.confusionMatrix) println("Summary Statistics") println(s"Accuracy = $accuracy") println(s"fm0 = $fm0") println(s"fm1 = $fm1") println(s"fm2 = $fm2") println(s"fm3 = $fm3") println(s"fm4 = $fm4") println(s"fm5 = $fm5") }
evaluateModel(model, mergedTrainingData)
Confusion matrix: 12776.0 0.0 0.0 0.0 0.0 0.0 70.0 6955.0 3.0 0.0 0.0 0.0 139.0 1.0 1002.0 1.0 0.0 0.0 133.0 0.0 4.0 756.0 0.0 0.0 36.0 0.0 0.0 0.0 175.0 0.0 33.0 0.0 0.0 0.0 0.0 71.0 Summary Statistics Accuracy = 0.981042654028436 fm0 = 0.9841697800716405 fm1 = 0.99470823798627 fm2 = 0.9312267657992566 fm3 = 0.9163636363636364 fm4 = 0.9067357512953368 fm5 = 0.8114285714285714
evaluateModel(model, mergedTestData)
Confusion matrix: 5450.0 8.0 6.0 4.0 0.0 0.0 140.0 2743.0 8.0 0.0 0.0 0.0 189.0 7.0 314.0 6.0 0.0 0.0 116.0 1.0 7.0 269.0 0.0 0.0 44.0 2.0 1.0 3.0 46.0 0.0 9.0 0.0 0.0 0.0 0.0 22.0 Summary Statistics Accuracy = 0.9413517828632251 fm0 = 0.9548002803083393 fm1 = 0.9706298655343242 fm2 = 0.7370892018779341 fm3 = 0.7970370370370371 fm4 = 0.647887323943662 fm5 = 0.8301886792452831