ScaDaMaLe Course site and book

  1. Methods to load data

Preprocessing and loading the relevant data

Each forum comes as an XML-file with the structure given below:

// Data comes in XML-files with the following structure. /* <corpus id="familjeliv-adoption"> <forum id="13-242" title="Adoption &gt; Intresserad" url="http://www.familjeliv. se/forum/13/242"> <thread id="34277335" title="Tips för att välja land" postcount="25" lastpost="2 008-07-08 17:55:14" url="http://www.familjeliv.se/forum/thread/34277335-tips-for -att-valja-land"> <text datefrom="20080707" dateto="20080707" timefrom="220854" timeto="220854" lix="30.55" ovix="60.74" nk="0.51" id="34284994" username="Miss TN" date="2008-07-07 22:08:54" url="http://www.familjeliv.se/forum/thread/34277335-tips-for-att-valja-land/2#anchor-m16"> <sentence id="bacc55562-baca83a75" _geocontext="|"> <w pos="VB" msd="VB.PRS.AKT" lemma="|känna|" lex="|känna..vb.2|känna..vb.1|" sense="|känna..1:0.522|känna..2:0.313|känna..4:0.158|känna..3:0.006|" prefix="|" suffix="|" compwf="|" complemgram="|" ref="01" dephead="07" deprel="AA">Känner</w> <w pos="PN" msd="PN.UTR.SIN.DEF.SUB" lemma="|ni|" lex="|ni..pn.1|" sense="|ni..1:-1.000|" prefix="|" suffix="|" compwf="|" complemgram="|" ref="02" dephead="01" deprel="OO">ni</w> <w pos="PP" msd="PP" lemma="|för|" lex="|för..pp.1|" sense="|för..1:-1.000|för..5:-1.000|för..6:-1.000|för..7:-1.000|för..9:-1.000|" prefix="|" suffix="|" compwf="|" complemgram="|" ref="03" dephead="01" deprel="OA">för</w> <w pos="DT" msd="DT.NEU.SIN.IND" lemma="|en|" lex="|en..al.1|" sense="|den..1:-1.000|en..2:-1.000|" prefix="|" suffix="|" compwf="|" complemgram="|" ref="04" dephead="06" deprel="DT">ett</w> <w pos="JJ" msd="JJ.POS.NEU.SIN.IND.NOM" lemma="|låg|" lex="|låg..av.1|" sense="|låg..1:-1.000|" prefix="|" suffix="|" compwf="|" complemgram="|" ref="05" dephead="06" deprel="AT">lågt</w> <w pos="NN" msd="NN.NEU.SIN.IND.NOM" lemma="|medgivande|" lex="|medgivande..nn.1|" sense="|medgivande..1:0.595|medgivande..2:0.405|" prefix="|medge..vb.1|mede..nn.1|" suffix="|givande..nn.1|ande..nn.1|" compwf="|medgiv+ande|med+givande|med+giv+ande|" complemgram="|medge..vb.1+ande..nn.1:4.632e-17|mede..nn.1+givande..nn.1:6.075e-27|mede..nn.1+giv..nn.1+ande..nn.1:5.387e-27|mede..nn.1+ge..vb.1+ande..nn.1:1.257e-29|" ref="06" dephead="03" deprel="PA">medgivande</w> <w pos="VB" msd="VB.PRS.AKT" lemma="|skola|" lex="|skola..vb.2|" sense="|skola..4:-1.000|" prefix="|" suffix="|" compwf="|" complemgram="|" ref="07" deprel="ROOT">ska</w> <w pos="PN" msd="PN.UTR.PLU.DEF.SUB" lemma="|ni|" lex="|ni..pn.1|" sense="|ni..1:-1.000|" prefix="|" suffix="|" compwf="|" complemgram="|" ref="08" dephead="07" deprel="SS">ni</w> <w pos="AB" msd="AB" lemma="|verkligen|" lex="|verkligen..ab.1|" sense="|verkligen..1:-1.000|" prefix="|" suffix="|" compwf="|" complemgram="|" ref="09" dephead="07" deprel="MA">verkligen</w> <w pos="VB" msd="VB.INF.AKT" lemma="|sträva|" lex="|sträva..vb.1|" sense="|sträva..1:-1.000|" prefix="|" suffix="|" compwf="|" complemgram="|" ref="10" dephead="07" deprel="VG">sträva</w> . . . </sentence> </text> </thread> <thread id="42755312" title="Om vi planerar ett barn om sju år, när ska vi dra igång adoptionsprocessen?" postcount="3" lastpost="2009-04-01 18:39:55" url="http://www.familjeliv.se/forum/thread/42755312-om-vi-planerar-ett-barn-om-sju-ar-nar-ska-vi-dra-igang-adoptionsprocessen"> <text datefrom="20090331" dateto="20090331" timefrom="201724" timeto="201724" lix="27.48" ovix="50.60" nk="0.37" id="42755312" username="alvaereva" date="2009-03-31 20:17:24" url="http://www.familjeliv.se/forum/thread/42755312-om-vi-planerar-ett-barn-om-sju-ar-nar-ska-vi-dra-igang-adoptionsprocessen/1"> <sentence id="bac2ec05f-bace4e647" _geocontext="|"> <w pos="KN" msd="KN" lemma="|" lex="|" sense="|" prefix="|" suffix="|" compwf="|" complemgram="|" ref="01" deprel="ROOT">Som</w> . . . */
import org.apache.spark.sql.functions.{col,concat_ws, udf, flatten, explode, collect_list, collect_set, lit} import org.apache.spark.sql.types.{ ArrayType, StructType, StructField, StringType, IntegerType } import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method import org.apache.spark.sql.functions._ def read_xml(file_name: String): org.apache.spark.sql.DataFrame = { val sentence_schema = StructType(Array( StructField("w", ArrayType(StringType, true), nullable=true) )) val text_schema = StructType(Array( StructField("sentence", ArrayType(sentence_schema), nullable=false) )) val thread_schema = StructType(Array( StructField("_id", StringType, nullable = false), StructField("_title", StringType, nullable = false), StructField("_url", StringType, nullable = false), StructField("text", text_schema, nullable=false) )) val forum_schema = StructType(Array( StructField("_id", StringType, nullable = false), StructField("_title", StringType, nullable = false), StructField("_url", StringType, nullable = false), StructField("thread", ArrayType(thread_schema), nullable=false) )) val corpus_schema = StructType(Array( StructField("_id", StringType, nullable = false), StructField("forum", forum_schema, nullable=false) )) spark.read .option("rowTag", "forum") .schema(forum_schema) .xml(file_name) } def get_dataset(file_name: String) : org.apache.spark.sql.DataFrame = { val xml_df = read_xml(file_name) val ds = xml_df.withColumn("thread", explode($"thread")) val splitted_name = file_name.split("/") val forum = splitted_name(splitted_name.size-2) val corpus = splitted_name(splitted_name.size-1) val value = udf((arr: Seq[String]) => arr.mkString(",")) ds.select(col("_id") as "forum_id", col("_title") as "forum_title", col("thread._id") as "thread_id", col("thread._title") as "thread_title", flatten(col("thread.text.sentence.w")) as "w") .withColumn("w", explode($"w")) .groupBy("thread_id") .agg(first("thread_title") as("thread_title"), collect_list("w") as "w", first("forum_id") as "forum_id", first("forum_title") as "forum_title") .withColumn("w", concat_ws(",",col("w"))) .withColumn("platform", lit(forum)) .withColumn("corpus_id", lit(corpus)) }
import org.apache.spark.sql.functions.{col, concat_ws, udf, flatten, explode, collect_list, collect_set, lit} import org.apache.spark.sql.types.{ArrayType, StructType, StructField, StringType, IntegerType} import com.databricks.spark.xml._ import org.apache.spark.sql.functions._ read_xml: (file_name: String)org.apache.spark.sql.DataFrame get_dataset: (file_name: String)org.apache.spark.sql.DataFrame
def save_df(df: org.apache.spark.sql.DataFrame, filePath:String){ df.write.format("parquet").save(filePath) } def load_df(filePath: String): org.apache.spark.sql.DataFrame = { spark.read.format("parquet").load(filePath) } def no_forums(df: org.apache.spark.sql.DataFrame): Long = { val forums = df.select("forum_title").distinct() forums.show(false) forums.count() }
save_df: (df: org.apache.spark.sql.DataFrame, filePath: String)Unit load_df: (filePath: String)org.apache.spark.sql.DataFrame no_forums: (df: org.apache.spark.sql.DataFrame)Long
  1. Save preprocessed data

var fl_root = "dbfs:/datasets/student-project-01/familjeliv/" var fb_root = "dbfs:/datasets/student-project-01/flashback/" val fl_data = Array("familjeliv-allmanna-ekonomi", "familjeliv-sexsamlevnad") val fb_data = Array("flashback-ekonomi", "flashback-sex") for (name <- fl_data){ try{ println(s"${fb_root}${name}_df") dbutils.fs.ls(s"${fl_root}${name}_df") println(s"${name}_df already exists!") } catch{ case e: java.io.FileNotFoundException => { val file_name = s"${fl_root}${name}.xml" val df = get_dataset(file_name) val file_path = s"${fl_root}${name}_df" save_df(df, file_path) } } } for (name <- fb_data){ try{ println(s"${fb_root}${name}_df") dbutils.fs.ls(s"${fb_root}${name}_df") println(s"${name}_df already exists!") } catch{ case e: java.io.FileNotFoundException => { val file_name = s"${fb_root}${name}.xml" val df = get_dataset(file_name) val file_path = s"${fb_root}${name}_df" save_df(df, file_path) } } }
dbfs:/datasets/student-project-01/flashback/familjeliv-allmanna-ekonomi_df familjeliv-allmanna-ekonomi_df already exists! dbfs:/datasets/student-project-01/flashback/familjeliv-sexsamlevnad_df familjeliv-sexsamlevnad_df already exists! dbfs:/datasets/student-project-01/flashback/flashback-ekonomi_df flashback-ekonomi_df already exists! dbfs:/datasets/student-project-01/flashback/flashback-sex_df flashback-sex_df already exists! fl_root: String = dbfs:/datasets/student-project-01/familjeliv/ fb_root: String = dbfs:/datasets/student-project-01/flashback/ fl_data: Array[String] = Array(familjeliv-allmanna-ekonomi, familjeliv-sexsamlevnad) fb_data: Array[String] = Array(flashback-ekonomi, flashback-sex)