import scala.util.matching.Regex// regex pattern to take region name, label, from complete path name (Must be changed accordingly if path follows a different structure)val pattern: Regex = "/[a-zA-Z]+_([a-zA-Z]+)\\.".r
defread_datasets(paths:List[String]): List[RDD[(String,String)]] = {
if (paths.size < 1) { // return an empty RDDreturnList.fill(0) (sc.emptyRDD)
}
else {
pattern.findFirstMatchIn(paths.head) match { // extract the label based on the pattern defined abovecaseSome(x) => {
val label:String = x.group(1) // create the label based on the path namereturn (sc.textFile(paths.head).filter(_ != "").map(_.trim()).map(s => (s,label)))::read_datasets(paths.tail) // read the file in path and attach the data with its label to RDD list
}
caseNone => thrownewRuntimeException("no label found")
}
}
}
// read data and set the delimiter as ">" which seperates each sample in fasta format
sc.hadoopConfiguration.set("textinputformat.record.delimiter",">")
val datasets = read_datasets(paths)
datasets: List[org.apache.spark.rdd.RDD[(String, String)]] = List(MapPartitionsRDD[202189] at map at command-3103574048361361:13, MapPartitionsRDD[202196] at map at command-3103574048361361:13, MapPartitionsRDD[202205] at map at command-3103574048361361:13, MapPartitionsRDD[202210] at map at command-3103574048361361:13, MapPartitionsRDD[202215] at map at command-3103574048361361:13, MapPartitionsRDD[202220] at map at command-3103574048361361:13)
datasets.length
res0: Int = 6
datasets(0).take(1)
// combine the datasets into one and cache for optimizationval data = datasets.reduce( (a,b) => a++b).cache()
data: org.apache.spark.rdd.RDD[(String, String)] = UnionRDD[202273] at $plus$plus at command-3103574048361373:1
data.take(1)
// get the headers for each sample (the first line of each sample is a header)val headers = data.map( {case (genome,label) => (genome.split("\n").head.split('|'),label)})
headers.count
headers: org.apache.spark.rdd.RDD[(Array[String], String)] = MapPartitionsRDD[35] at map at command-3103574048360661:1
res2: Long = 31550
// remove the headers and only get genome sequences of samples.val samples = data.map( {case (genome,label) => (genome.split("\n").tail.mkString(""), label)}).cache()
samples.count
samples: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[202309] at map at command-3103574048360662:2
res0: Long = 31550
// get the genome lengths per sample (this is just to check if there are extreme cases so we would remove those)val genome_length_per_s = samples.map({case (genome,label) => genome.length()})
genome_length_per_s: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[14298] at map at command-3103574048360664:1
// check the statistics if there is any significant variation
genome_length_per_s.stats
// save the results for the next notebook
dbutils.fs.rm("/FileStore/shared_uploads/caylak@kth.se/data_test_nonoverlapping", recurse=true) // remove existing folder
test.write.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_test_nonoverlapping")
dbutils.fs.rm("/FileStore/shared_uploads/caylak@kth.se/data_train_nonoverlapping", recurse=true) // remove existing folder
train.write.parquet("dbfs:/FileStore/shared_uploads/caylak@kth.se/data_train_nonoverlapping")