ScaDaMaLe Course site and book

// Paths to datasets of different regions.
val paths: List[String] = List("dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_oceania.fasta",
                               "dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_northamerica.fasta",
                               "dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_southamerica.fasta",
                               "dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_europe.fasta",
                               "dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_africa.fasta",
                               "dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_asia.fasta")
paths: List[String] = List(dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_oceania.fasta, dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_northamerica.fasta, dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_southamerica.fasta, dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_europe.fasta, dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_africa.fasta, dbfs:/FileStore/shared_uploads/hugower@kth.se/sequences_asia.fasta)
import scala.util.matching.Regex

// regex pattern to take region name, label, from complete path name (Must be changed accordingly if path follows a different structure)
val pattern: Regex = "/[a-zA-Z]+_([a-zA-Z]+)\\.".r 

def read_datasets(paths:List[String]): List[RDD[(String,String)]] = {
  if (paths.size < 1) { // return an empty RDD
    return List.fill(0) (sc.emptyRDD)
  }
  else {
    pattern.findFirstMatchIn(paths.head) match { // extract the label based on the pattern defined above
      case Some(x) => {
        val label:String = x.group(1)  // create the label based on the path name
        return (sc.textFile(paths.head).filter(_ != "").map(_.trim()).map(s => (s,label)))::read_datasets(paths.tail) // read the file in path and attach the data with its label to RDD list
      }
      case None => throw new RuntimeException("no label found")
    }
  }
}
import scala.util.matching.Regex
pattern: scala.util.matching.Regex = /[a-zA-Z]+_([a-zA-Z]+)\.
read_datasets: (paths: List[String])List[org.apache.spark.rdd.RDD[(String, String)]]
// read data and set the delimiter as ">" which seperates each sample in fasta format
sc.hadoopConfiguration.set("textinputformat.record.delimiter",">")
val datasets = read_datasets(paths)
  
datasets: List[org.apache.spark.rdd.RDD[(String, String)]] = List(MapPartitionsRDD[202189] at map at command-3103574048361361:13, MapPartitionsRDD[202196] at map at command-3103574048361361:13, MapPartitionsRDD[202205] at map at command-3103574048361361:13, MapPartitionsRDD[202210] at map at command-3103574048361361:13, MapPartitionsRDD[202215] at map at command-3103574048361361:13, MapPartitionsRDD[202220] at map at command-3103574048361361:13)
datasets.length
res0: Int = 6
datasets(0).take(1)
// combine the datasets into one and cache for optimization
val data = datasets.reduce( (a,b) => a++b).cache()
data: org.apache.spark.rdd.RDD[(String, String)] = UnionRDD[202273] at $plus$plus at command-3103574048361373:1
data.take(1)
// get the headers for each sample (the first line of each sample is a header)
val headers = data.map( {case (genome,label) => (genome.split("\n").head.split('|'),label)})
headers.count
headers: org.apache.spark.rdd.RDD[(Array[String], String)] = MapPartitionsRDD[35] at map at command-3103574048360661:1
res2: Long = 31550
headers.take(5)
res3: Array[(Array[String], String)] = Array((Array("MW320729.1 ", Severe acute respiratory syndrome coronavirus 2 isolate SARS-CoV-2/human/AUS/VIC16982/2020, complete genome),oceania), (Array("MW320730.1 ", Severe acute respiratory syndrome coronavirus 2 isolate SARS-CoV-2/human/AUS/VIC17307/2020, complete genome),oceania), (Array("MW320731.1 ", Severe acute respiratory syndrome coronavirus 2 isolate SARS-CoV-2/human/AUS/VIC17193/2020, complete genome),oceania), (Array("MW320733.1 ", Severe acute respiratory syndrome coronavirus 2 isolate SARS-CoV-2/human/AUS/VIC16732/2020, complete genome),oceania), (Array("MW320735.1 ", Severe acute respiratory syndrome coronavirus 2 isolate SARS-CoV-2/human/AUS/VIC16821/2020, complete genome),oceania))
// remove the headers and only get genome sequences of samples.
val samples = data.map( {case (genome,label) => (genome.split("\n").tail.mkString(""), label)}).cache()
samples.count
samples: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[202309] at map at command-3103574048360662:2
res0: Long = 31550
// get the genome lengths per sample (this is just to check if there are extreme cases so we would remove those)
val genome_length_per_s = samples.map({case (genome,label) => genome.length()})
genome_length_per_s: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[14298] at map at command-3103574048360664:1
// check the statistics if there is any significant variation
genome_length_per_s.stats
res6: org.apache.spark.util.StatCounter = (count: 31550, mean: 29812.288621, stdev: 81.069114, max: 30018.000000, min: 28645.000000)
// A tail recursive overlapping subsequence function 
// ex1: input: ("abcd", 2, true) -> output: "ab bc cd": 
// ex2: input: ("abcd", 2, false) -> output: "ab cd"
def subsequence_str( sequence:String, k:Int, overlapping:Boolean ): String = {
  def helper(seq:String, acc:String): String = {
    if (seq.length < k ) {
      return acc
    }
    else {
      val sub = seq.substring(0,k)
      if(overlapping) helper(seq.tail, acc + sub + " ")
      else helper(seq.substring(k), acc + sub + " ")
    }
  }
  return helper(sequence, "")
}
subsequence_str: (sequence: String, k: Int, overlapping: Boolean)String
// Extract the subsequences, kmers, for each sample
val k_mers = samples.map( {case (genome,label) => (subsequence_str(genome, 3, false),label)} ).cache()
k_mers: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[202801] at map at command-3103574048360668:1
k_mers.take(1)
// index kmers
val kmers_df = k_mers.zipWithIndex.map({case ((a,b),c) => (a,b,c)}).toDF("genome", "label", "id").cache()
kmers_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [genome: string, label: string ... 1 more field]
kmers_df.take(1)
// split train and test data
val split = kmers_df.randomSplit(Array(0.7, 0.3), seed=42)
split: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = Array([genome: string, label: string ... 1 more field], [genome: string, label: string ... 1 more field])
val train = split(0).cache()
train.take(1)
train.count
res6: Long = 22155
val test = split(1).cache()
test.take(1)
test.count
res9: Long = 9395
// save the results for the next notebook
dbutils.fs.rm("/FileStore/shared_uploads/caylak@kth.se/data_test_nonoverlapping", recurse=true) // remove existing folder
test.write.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_test_nonoverlapping")

dbutils.fs.rm("/FileStore/shared_uploads/caylak@kth.se/data_train_nonoverlapping", recurse=true) // remove existing folder
train.write.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_train_nonoverlapping")