ScaDaMaLe Course site and book

val k_mers_df_train = spark.read.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_train_nonoverlapping").cache() val k_mers_df_test = spark.read.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_test_nonoverlapping").cache()
k_mers_df_train: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [genome: string, label: string ... 1 more field] k_mers_df_test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [genome: string, label: string ... 1 more field]

This part is adapted from the LDA course tutorial '034LDA20NewsGroupsSmall'.

import org.apache.spark.ml.feature.RegexTokenizer // Set params for RegexTokenizer val tokenizer = new RegexTokenizer() .setPattern("[\\W_]+") // break by white space character(s) .setInputCol("genome") // name of the input column .setOutputCol("tokens") // name of the output column // Tokenize train and test documents val tokenized_df_train = tokenizer.transform(k_mers_df_train) val tokenized_df_test = tokenizer.transform(k_mers_df_test)
import org.apache.spark.ml.feature.RegexTokenizer tokenizer: org.apache.spark.ml.feature.RegexTokenizer = RegexTokenizer: uid=regexTok_b3a99fc4c607, minTokenLength=1, gaps=true, pattern=[\W_]+, toLowercase=true tokenized_df_train: org.apache.spark.sql.DataFrame = [genome: string, label: string ... 2 more fields] tokenized_df_test: org.apache.spark.sql.DataFrame = [genome: string, label: string ... 2 more fields]
display(tokenized_df_train.select("tokens"))
display(tokenized_df_test.select("tokens"))

Since there are 64 = 4*4*4 possible words (3-mers) for a genome (which consists of A-T-G-C, 4 letters), we initially planned to use a fixed vocabulary.

import org.apache.spark.ml.feature.CountVectorizerModel // create a dictionary array from all possible k-mers val k = 3 val fixed_vocab = List.fill((k))(List("a","t","g","c")).flatten.combinations(k).flatMap(_.permutations).toArray.map(_.mkString("")) // https://stackoverflow.com/questions/38406959/creating-all-permutations-of-a-list-with-a-limited-range-in-scala val fixed_vectorizer = new CountVectorizerModel(fixed_vocab) .setInputCol("tokens") .setOutputCol("features")
import org.apache.spark.ml.feature.CountVectorizerModel k: Int = 3 fixed_vocab: Array[String] = Array(aaa, aat, ata, taa, aag, aga, gaa, aac, aca, caa, att, tat, tta, atg, agt, tag, tga, gat, gta, atc, act, tac, tca, cat, cta, agg, gag, gga, agc, acg, gac, gca, cag, cga, acc, cac, cca, ttt, ttg, tgt, gtt, ttc, tct, ctt, tgg, gtg, ggt, tgc, tcg, gtc, gct, ctg, cgt, tcc, ctc, cct, ggg, ggc, gcg, cgg, gcc, cgc, ccg, ccc) fixed_vectorizer: org.apache.spark.ml.feature.CountVectorizerModel = CountVectorizerModel: uid=cntVecModel_4bd2f56ddf2e, vocabularySize=64

However, we observed that there are some unexpected rare k-mers such as aay, ktg in the genome sequences. If they are really rare (less than 10), we have decided to eliminate them. But if they are more common, with the intuition that this sequencing error (could not find a document indicating what they refer to so we assumed they are errors) might indicate some pattern for that sample, we keep them. This approach provided us better topic diversity and results.

import org.apache.spark.ml.feature.CountVectorizer // Create a dictionary of kmers val vectorizer = new CountVectorizer() .setInputCol("tokens") .setOutputCol("features") .setMinDF(10) // a term must appear at least in 10 documents to be included in the vocabulary. .fit(tokenized_df_train) // create the vocabulary based on the train data
import org.apache.spark.ml.feature.CountVectorizer vectorizer: org.apache.spark.ml.feature.CountVectorizerModel = CountVectorizerModel: uid=cntVec_cf83465b3abd, vocabularySize=241
// Vocabulary of k-mers which contains some weird nucleotides val vocabList = vectorizer.vocabulary
vocabList: Array[String] = Array(ttt, tgt, aaa, tta, aca, ttg, taa, att, aat, ctt, caa, tga, gtt, atg, act, aga, tat, tac, aac, tgg, tgc, aag, tca, cta, ttc, tct, gtg, agt, gaa, cat, gct, ctg, cac, gta, ata, tag, gat, ggt, cag, acc, gca, cca, atc, agg, gac, cct, agc, gag, gga, ctc, gtc, ggc, tcc, gcc, acg, cgt, ggg, ccc, tcg, cgc, cga, gcg, cgg, ccg, nnn, nna, naa, ntt, tnn, cnn, gnn, ann, nnt, nng, nnc, agn, ttn, aan, acn, tan, nat, tcn, ngt, nct, can, gtn, ctn, nta, atn, ana, tgn, nca, ggn, nga, tna, nac, ntg, gcn, gan, tgk, ngc, ccn, ncc, ngg, tnt, ntc, nag, agk, yta, cnt, ktt, aya, gkt, kta, gnt, nan, ytt, ktg, gkc, tty, ayt, tay, yaa, acy, gsc, aay, tgy, ggk, ant, tyt, yac, yat, tya, ang, anc, cay, tkt, cng, cak, rcc, cna, cgn, aty, akt, ggw, gyt, tng, raa, cyt, acw, ytg, aak, yca, ntn, gna, gay, cty, kat, kct, tkg, gnc, ngn, yag, tnc, kca, ayc, tyg, gka, ygt, aka, cnc, cya, ayg, ttk, gng, tth, maa, ncn, yga, tka, ama, aar, ytc, gtk, kag, cch, ncg, ctk, kaa, gty, yct, ara, rtg, ckt, tar, gya, tkc, tak, tgr, ccy, akg, kac, crc, grt, ggr, trt, gcy, tyc, ygg, gak, wga, ygc, cgk, gcr, kgt, wtc, tck, cwt, waa, tcy, vcc, tma, atr, agy, rgc, rac, tgs, kgc, gam, atk, cyc, haa, agr, tha, rgt, gwg, tra, cra, gtr, gkg, nam)
vocabList.size
res4: Int = 241
// Create vector of token counts val countVectors_train = vectorizer.transform(tokenized_df_train).select("id", "features") val countVectors_test = vectorizer.transform(tokenized_df_test).select("id", "features")
countVectors_train: org.apache.spark.sql.DataFrame = [id: bigint, features: vector] countVectors_test: org.apache.spark.sql.DataFrame = [id: bigint, features: vector]
tokenized_df_train.take(1)
countVectors_train.take(5)
countVectors_test.take(5)

Fix the incompatibility between mllib Vector and ml Vector, which causes conflict when LDA topic distribution is given as RandomForestClassifier input

import org.apache.spark.ml.linalg.{Vector => MLVector} import org.apache.spark.mllib.{linalg => mllib} import org.apache.spark.ml.{linalg => ml} // convert each sample from ml to mllib vectors (because this causes problems in classifier step) val lda_countVector_train = countVectors_train.map { case Row(id: Long, countVector: MLVector) => (id, mllib.Vectors.fromML(countVector)) }.cache() val lda_countVector_test = countVectors_test.map { case Row(id: Long, countVector: MLVector) => (id, mllib.Vectors.fromML(countVector)) }.cache()
import org.apache.spark.ml.linalg.{Vector=>MLVector} import org.apache.spark.mllib.{linalg=>mllib} import org.apache.spark.ml.{linalg=>ml} lda_countVector_train: org.apache.spark.sql.Dataset[(Long, org.apache.spark.mllib.linalg.Vector)] = [_1: bigint, _2: vector] lda_countVector_test: org.apache.spark.sql.Dataset[(Long, org.apache.spark.mllib.linalg.Vector)] = [_1: bigint, _2: vector]
// format: Array(id, (VocabSize, Array(indexedTokens), Array(Token Frequency))) lda_countVector_test.take(1)
res10: Array[(Long, org.apache.spark.mllib.linalg.Vector)] = Array((13340,(241,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63],[334.0,211.0,327.0,282.0,288.0,277.0,95.0,278.0,315.0,259.0,245.0,70.0,310.0,311.0,291.0,129.0,213.0,181.0,189.0,99.0,87.0,221.0,189.0,183.0,136.0,217.0,181.0,152.0,251.0,136.0,265.0,150.0,108.0,174.0,183.0,89.0,213.0,245.0,163.0,99.0,158.0,138.0,107.0,78.0,142.0,146.0,53.0,116.0,98.0,93.0,101.0,90.0,56.0,73.0,42.0,67.0,34.0,31.0,39.0,29.0,28.0,37.0,20.0,19.0])))
// The number of topics val num_topics = 20
num_topics: Int = 20
import org.apache.spark.mllib.clustering.{LDA, EMLDAOptimizer,DistributedLDAModel} val lda = new LDA() .setOptimizer(new EMLDAOptimizer()) .setK(num_topics) .setMaxIterations(20000) .setDocConcentration(-1) // use default values .setTopicConcentration(-1) // use default values
import org.apache.spark.mllib.clustering.{LDA, EMLDAOptimizer, DistributedLDAModel} lda: org.apache.spark.mllib.clustering.LDA = org.apache.spark.mllib.clustering.LDA@53a22046
// Run the LDA based on the model described val lda_countVector_train_mllib = lda_countVector_train.rdd val lda_countVector_test_mllib = lda_countVector_test.rdd val ldaModel = lda.run(lda_countVector_train_mllib)
lda_countVector_train_mllib: org.apache.spark.rdd.RDD[(Long, org.apache.spark.mllib.linalg.Vector)] = MapPartitionsRDD[399] at rdd at command-685894176420037:2 lda_countVector_test_mllib: org.apache.spark.rdd.RDD[(Long, org.apache.spark.mllib.linalg.Vector)] = MapPartitionsRDD[404] at rdd at command-685894176420037:3 ldaModel: org.apache.spark.mllib.clustering.LDAModel = org.apache.spark.mllib.clustering.DistributedLDAModel@4e5ac90e
// Cast to distributed LDA model (which is possible through EMLDAOptimizer in the model) so we can get topic distributions val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
distLDAModel: org.apache.spark.mllib.clustering.DistributedLDAModel = org.apache.spark.mllib.clustering.DistributedLDAModel@4e5ac90e
val topicIndices = distLDAModel.describeTopics(maxTermsPerTopic = 10)
// https://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.mllib.clustering.DistributedLDAModel // Get the topic distributions for each train document which we will use as features in the classification step val topicDistributions_train = distLDAModel.topicDistributions.cache()
topicDistributions_train: org.apache.spark.rdd.RDD[(Long, org.apache.spark.mllib.linalg.Vector)] = MapPartitionsRDD[828598] at map at LDAModel.scala:768
lda_countVector_test_mllib.take(1)
res11: Array[(Long, org.apache.spark.mllib.linalg.Vector)] = Array((13340,(241,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63],[334.0,211.0,327.0,282.0,288.0,277.0,95.0,278.0,315.0,259.0,245.0,70.0,310.0,311.0,291.0,129.0,213.0,181.0,189.0,99.0,87.0,221.0,189.0,183.0,136.0,217.0,181.0,152.0,251.0,136.0,265.0,150.0,108.0,174.0,183.0,89.0,213.0,245.0,163.0,99.0,158.0,138.0,107.0,78.0,142.0,146.0,53.0,116.0,98.0,93.0,101.0,90.0,56.0,73.0,42.0,67.0,34.0,31.0,39.0,29.0,28.0,37.0,20.0,19.0])))
// Get the topic distributions for each test document which we will use as features in the classification step val topicDistributions_test = distLDAModel.toLocal.topicDistributions(lda_countVector_test_mllib).cache()
topicDistributions_test: org.apache.spark.rdd.RDD[(Long, org.apache.spark.mllib.linalg.Vector)] = MapPartitionsRDD[828874] at map at LDAModel.scala:373
assert (topicDistributions_train.take(1)(0)._2.size == num_topics)
topicDistributions_train.take(1)
res13: Array[(Long, org.apache.spark.mllib.linalg.Vector)] = Array((31198,[0.050970660127433086,0.03854914539182694,0.04233208760127245,0.05520192993035042,0.04529229810049971,0.044746575004622195,0.04738680077580458,0.04917176116296172,0.028216072817023714,0.06235014298552372,0.07282634237929085,0.05367477019497316,0.02834755979614659,0.007634148695007966,0.0747900052332216,0.07432157320344387,0.05619372014685311,0.05917612984447815,0.059889692006826215,0.048928584602440005]))
topicDistributions_test.take(1)
res14: Array[(Long, org.apache.spark.mllib.linalg.Vector)] = Array((13340,[0.05125119795748651,0.05590356021965471,0.057337790514388406,0.053942804029345516,0.03184697426952284,0.054473799789446706,0.055602712254395316,0.07875351104584705,0.05294681251741617,0.03326797507323466,0.04996448701691076,0.06303737755619679,0.07611049117795103,0.009242706964001621,0.017554379171811085,0.026326662949499827,0.040104268572631795,0.09709248658013904,0.03851877943786023,0.05672122290225994]))
import org.apache.spark.mllib.linalg.{Vectors => OldVectors} import org.apache.spark.ml.linalg.{Vectors => NewVectors} val n_topicDistributions_train = topicDistributions_train.map({case (a,b) =>(a,b.asML)}) val n_topicDistributions_test = topicDistributions_test.map({case (a,b) =>(a,b.asML)})
import org.apache.spark.mllib.linalg.{Vectors=>OldVectors} import org.apache.spark.ml.linalg.{Vectors=>NewVectors} n_topicDistributions_train: org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] = MapPartitionsRDD[828876] at map at command-685894176420046:4 n_topicDistributions_test: org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] = MapPartitionsRDD[828877] at map at command-685894176420046:5
// save the topic distributions for train and test with partitioning for the next notebook dbutils.fs.rm("/FileStore/shared_uploads/caylak@kth.se/topic_dist_train_t20_i20k_no_cv", recurse=true) // remove existing folder n_topicDistributions_train.toDF.write.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/topic_dist_train_t20_i20k_no_cv") dbutils.fs.rm("/FileStore/shared_uploads/caylak@kth.se/topic_dist_test_t20_i20k_no_cv", recurse=true) // remove existing folder n_topicDistributions_test.toDF.write.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/topic_dist_test_t20_i20k_no_cv")
// Get the top word distributions for each topic val topics = topicIndices.map { case (terms, termWeights) => terms.map(vocabList(_)).zip(termWeights) } println(s"$num_topics topics:") topics.zipWithIndex.foreach { case (topic, i) => println(s"TOPIC $i") topic.foreach { case (term, weight) => println(s"$term\t$weight") } println(s"==========") }
//Zip topic terms with topic IDs val termArray = topics.zipWithIndex
// Transform data into the form (term, probability, topicId) val termRDD = sc.parallelize(termArray) val termRDD2 =termRDD.flatMap( (x: (Array[(String, Double)], Int)) => { val arrayOfTuple = x._1 val topicId = x._2 arrayOfTuple.map(el => (el._1, el._2, topicId)) })
termRDD: org.apache.spark.rdd.RDD[(Array[(String, Double)], Int)] = ParallelCollectionRDD[829330] at parallelize at command-685894176420051:2 termRDD2: org.apache.spark.rdd.RDD[(String, Double, Int)] = MapPartitionsRDD[829331] at flatMap at command-685894176420051:3
// Create DF with proper column names val termDF = termRDD2.toDF.withColumnRenamed("_1", "term").withColumnRenamed("_2", "probability").withColumnRenamed("_3", "topicId")
termDF: org.apache.spark.sql.DataFrame = [term: string, probability: double ... 1 more field]
Create JSON data val rawJson = termDF.toJSON.collect().mkString(",\n")
displayHTML(s""" <!DOCTYPE html> <meta charset="utf-8"> <style> circle { fill: rgb(31, 119, 180); fill-opacity: 0.5; stroke: rgb(31, 119, 180); stroke-width: 1px; } .leaf circle { fill: #ff7f0e; fill-opacity: 1; } text { font: 14px sans-serif; } </style> <body> <script src="https://cdnjs.cloudflare.com/ajax/libs/d3/3.5.5/d3.min.js"></script> <script> var json = { "name": "data", "children": [ { "name": "topics", "children": [ ${rawJson} ] } ] }; var r = 1000, format = d3.format(",d"), fill = d3.scale.category20c(); var bubble = d3.layout.pack() .sort(null) .size([r, r]) .padding(1.5); var vis = d3.select("body").append("svg") .attr("width", r) .attr("height", r) .attr("class", "bubble"); var node = vis.selectAll("g.node") .data(bubble.nodes(classes(json)) .filter(function(d) { return !d.children; })) .enter().append("g") .attr("class", "node") .attr("transform", function(d) { return "translate(" + d.x + "," + d.y + ")"; }) color = d3.scale.category20(); node.append("title") .text(function(d) { return d.className + ": " + format(d.value); }); node.append("circle") .attr("r", function(d) { return d.r; }) .style("fill", function(d) {return color(d.topicName);}); var text = node.append("text") .attr("text-anchor", "middle") .attr("dy", ".3em") .text(function(d) { return d.className.substring(0, d.r / 3)}); text.append("tspan") .attr("dy", "1.2em") .attr("x", 0) .text(function(d) {return Math.ceil(d.value * 10000) /10000; }); // Returns a flattened hierarchy containing all leaf nodes under the root. function classes(root) { var classes = []; function recurse(term, node) { if (node.children) node.children.forEach(function(child) { recurse(node.term, child); }); else classes.push({topicName: node.topicId, className: node.term, value: node.probability}); } recurse(null, root); return {children: classes}; } </script> """)