ScaDaMaLe Course site and book

// Paths to datasets of different regions. val paths: List[String] = List("dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_oceania.fasta", "dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_northamerica.fasta", "dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_southamerica.fasta", "dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_europe.fasta", "dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_africa.fasta", "dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_asia.fasta")
paths: List[String] = List(dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_oceania.fasta, dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_northamerica.fasta, dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_southamerica.fasta, dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_europe.fasta, dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_africa.fasta, dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_asia.fasta)
import scala.util.matching.Regex // regex pattern to take region name, label, from complete path name (Must be changed accordingly if path follows a different structure) val pattern: Regex = "/[a-zA-Z]+_([a-zA-Z]+)\\.".r def read_datasets(paths:List[String]): List[RDD[(String,String)]] = { if (paths.size < 1) { // return an empty RDD return List.fill(0) (sc.emptyRDD) } else { pattern.findFirstMatchIn(paths.head) match { // extract the label based on the pattern defined above case Some(x) => { val label:String = x.group(1) // create the label based on the path name return (sc.textFile(paths.head).filter(_ != "").map(_.trim()).map(s => (s,label)))::read_datasets(paths.tail) // read the file in path and attach the data with its label to RDD list } case None => throw new RuntimeException("no label found") } } }
import scala.util.matching.Regex pattern: scala.util.matching.Regex = /[a-zA-Z]+_([a-zA-Z]+)\. read_datasets: (paths: List[String])List[org.apache.spark.rdd.RDD[(String, String)]]
// read data and set the delimiter as ">" which seperates each sample in fasta format sc.hadoopConfiguration.set("textinputformat.record.delimiter",">") val datasets = read_datasets(paths)
datasets: List[org.apache.spark.rdd.RDD[(String, String)]] = List(MapPartitionsRDD[202189] at map at command-3103574048361361:13, MapPartitionsRDD[202196] at map at command-3103574048361361:13, MapPartitionsRDD[202205] at map at command-3103574048361361:13, MapPartitionsRDD[202210] at map at command-3103574048361361:13, MapPartitionsRDD[202215] at map at command-3103574048361361:13, MapPartitionsRDD[202220] at map at command-3103574048361361:13)
datasets.length
res0: Int = 6
datasets(0).take(1)
// combine the datasets into one and cache for optimization val data = datasets.reduce( (a,b) => a++b).cache()
data: org.apache.spark.rdd.RDD[(String, String)] = UnionRDD[202273] at $plus$plus at command-3103574048361373:1
data.take(1)
// get the headers for each sample (the first line of each sample is a header) val headers = data.map( {case (genome,label) => (genome.split("\n").head.split('|'),label)}) headers.count
headers: org.apache.spark.rdd.RDD[(Array[String], String)] = MapPartitionsRDD[35] at map at command-3103574048360661:1 res2: Long = 31550
headers.take(5)
res3: Array[(Array[String], String)] = Array((Array("MW320729.1 ", Severe acute respiratory syndrome coronavirus 2 isolate SARS-CoV-2/human/AUS/VIC16982/2020, complete genome),oceania), (Array("MW320730.1 ", Severe acute respiratory syndrome coronavirus 2 isolate SARS-CoV-2/human/AUS/VIC17307/2020, complete genome),oceania), (Array("MW320731.1 ", Severe acute respiratory syndrome coronavirus 2 isolate SARS-CoV-2/human/AUS/VIC17193/2020, complete genome),oceania), (Array("MW320733.1 ", Severe acute respiratory syndrome coronavirus 2 isolate SARS-CoV-2/human/AUS/VIC16732/2020, complete genome),oceania), (Array("MW320735.1 ", Severe acute respiratory syndrome coronavirus 2 isolate SARS-CoV-2/human/AUS/VIC16821/2020, complete genome),oceania))
// remove the headers and only get genome sequences of samples. val samples = data.map( {case (genome,label) => (genome.split("\n").tail.mkString(""), label)}).cache() samples.count
samples: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[202309] at map at command-3103574048360662:2 res0: Long = 31550
// get the genome lengths per sample (this is just to check if there are extreme cases so we would remove those) val genome_length_per_s = samples.map({case (genome,label) => genome.length()})
genome_length_per_s: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[14298] at map at command-3103574048360664:1
// check the statistics if there is any significant variation genome_length_per_s.stats
res6: org.apache.spark.util.StatCounter = (count: 31550, mean: 29812.288621, stdev: 81.069114, max: 30018.000000, min: 28645.000000)
// A tail recursive overlapping subsequence function // ex1: input: ("abcd", 2, true) -> output: "ab bc cd": // ex2: input: ("abcd", 2, false) -> output: "ab cd" def subsequence_str( sequence:String, k:Int, overlapping:Boolean ): String = { def helper(seq:String, acc:String): String = { if (seq.length < k ) { return acc } else { val sub = seq.substring(0,k) if(overlapping) helper(seq.tail, acc + sub + " ") else helper(seq.substring(k), acc + sub + " ") } } return helper(sequence, "") }
subsequence_str: (sequence: String, k: Int, overlapping: Boolean)String
// Extract the subsequences, kmers, for each sample val k_mers = samples.map( {case (genome,label) => (subsequence_str(genome, 3, false),label)} ).cache()
k_mers: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[202801] at map at command-3103574048360668:1
k_mers.take(1)
// index kmers val kmers_df = k_mers.zipWithIndex.map({case ((a,b),c) => (a,b,c)}).toDF("genome", "label", "id").cache()
kmers_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [genome: string, label: string ... 1 more field]
kmers_df.take(1)
// split train and test data val split = kmers_df.randomSplit(Array(0.7, 0.3), seed=42)
split: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = Array([genome: string, label: string ... 1 more field], [genome: string, label: string ... 1 more field])
val train = split(0).cache() train.take(1)
train.count
res6: Long = 22155
val test = split(1).cache() test.take(1)
test.count
res9: Long = 9395
// save the results for the next notebook dbutils.fs.rm("/FileStore/shared_uploads/caylak@kth.se/data_test_nonoverlapping", recurse=true) // remove existing folder test.write.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_test_nonoverlapping") dbutils.fs.rm("/FileStore/shared_uploads/caylak@kth.se/data_train_nonoverlapping", recurse=true) // remove existing folder train.write.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_train_nonoverlapping")