Processing math: 100%

ScaDaMaLe Course site and book

//Imports import org.apache.spark.ml.feature.StopWordsRemover import org.apache.spark.ml.feature.RegexTokenizer import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.CountVectorizer import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.StopWordsRemover import org.apache.spark.ml.feature.RegexTokenizer import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.CountVectorizer import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
  1. Load the data

/scalable-data-science/000_0-sds-3-x-projects/student-project-01_group-TheTwoCultures/01_load_data
import org.apache.spark.sql.functions.{col, concat_ws, udf, flatten, explode, collect_list, collect_set, lit} import org.apache.spark.sql.types.{ArrayType, StructType, StructField, StringType, IntegerType} import com.databricks.spark.xml._ import org.apache.spark.sql.functions._ read_xml: (file_name: String)org.apache.spark.sql.DataFrame get_dataset: (file_name: String)org.apache.spark.sql.DataFrame
save_df: (df: org.apache.spark.sql.DataFrame, filePath: String)Unit load_df: (filePath: String)org.apache.spark.sql.DataFrame no_forums: (df: org.apache.spark.sql.DataFrame)Long
dbfs:/datasets/student-project-01/flashback/familjeliv-allmanna-ekonomi_df familjeliv-allmanna-ekonomi_df already exists! dbfs:/datasets/student-project-01/flashback/familjeliv-sexsamlevnad_df familjeliv-sexsamlevnad_df already exists! dbfs:/datasets/student-project-01/flashback/flashback-ekonomi_df flashback-ekonomi_df already exists! dbfs:/datasets/student-project-01/flashback/flashback-sex_df flashback-sex_df already exists! fl_root: String = dbfs:/datasets/student-project-01/familjeliv/ fb_root: String = dbfs:/datasets/student-project-01/flashback/ fl_data: Array[String] = Array(familjeliv-allmanna-ekonomi, familjeliv-sexsamlevnad) fb_data: Array[String] = Array(flashback-ekonomi, flashback-sex)
//Load dataframes val file_path_familjeliv = "dbfs:/datasets/student-project-01/familjeliv/familjeliv-sexsamlevnad_df" val file_path_flashback = "dbfs:/datasets/student-project-01/flashback/flashback-sex_df" val df_familjeliv = load_df(file_path_familjeliv) val df_flashback = load_df(file_path_flashback)
file_path_familjeliv: String = dbfs:/datasets/student-project-01/familjeliv/familjeliv-sexsamlevnad_df file_path_flashback: String = dbfs:/datasets/student-project-01/flashback/flashback-sex_df df_familjeliv: org.apache.spark.sql.DataFrame = [thread_id: string, thread_title: string ... 5 more fields] df_flashback: org.apache.spark.sql.DataFrame = [thread_id: string, thread_title: string ... 5 more fields]
//Extract the text val df_text_flashback = df_flashback.select("w") val df_text_familjeliv = df_familjeliv.select("w")
df_text_flashback: org.apache.spark.sql.DataFrame = [w: string] df_text_familjeliv: org.apache.spark.sql.DataFrame = [w: string]
  1. Add labels

//Add label columns and make sure that we have exactly the same amount of data from both forums val df_text_flashback_c = df_text_flashback.withColumn("c", lit(0)) val df_text_familjeliv_c = df_text_familjeliv.orderBy(rand()).limit(df_text_flashback_c.count().toInt).withColumn("c", lit(1)) val df_text_full = df_text_flashback_c.union(df_text_familjeliv_c) //Check the counts println(df_text_flashback_c.count()) println(df_text_familjeliv_c.count()) println(df_text_full.count())
56621 56621 113242 df_text_flashback_c: org.apache.spark.sql.DataFrame = [w: string, c: int] df_text_familjeliv_c: org.apache.spark.sql.DataFrame = [w: string, c: int] df_text_full: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [w: string, c: int]
  1. Extract single words

val tokenizer = new RegexTokenizer() .setPattern("(?U),") // break by whitespace .setMinTokenLength(5) // Filter away tokens with length < 5 .setInputCol("w") // name of the input column .setOutputCol("text") // name of the output column val tokenized_df = tokenizer.transform(df_text_full).select("c", "text") tokenized_df.show(3, false)
  1. Remove stopwords

//Stopwordsremover (similar to lda notebook) val stoppord = sc.textFile("dbfs:/datasets/student-project-01/stoppord.csv").collect() val stopwordList = Array("bara","lite","finns","vill","samt","inga","även","finns","ganska","också","igen","just","that","with","http","jpg", "kanske","tycker","gillar","bra","000","måste","tjej","tjejer","tjejen","tjejerna","kvinna","kvinnor","kille","killar","killen","män","rätt","män","com","and","html","många","aldrig","www","mpg","avi","wmv","riktigt","känner","väldigt","font","size","mms","2008","2009", "flashback", "familjeliv").union(stoppord).union(StopWordsRemover.loadDefaultStopWords("swedish")) val remover = new StopWordsRemover() .setStopWords(stopwordList) .setInputCol("text") .setOutputCol("filtered")
//Run the stopwordsremover val removed_df = remover.transform(tokenized_df).select("c", "filtered")
removed_df: org.apache.spark.sql.DataFrame = [c: int, filtered: array<string>]
  1. Count words and create vocabulary vector

//Unlimited size vocabulary just to see how much there is val vectorizerall = new CountVectorizer() .setInputCol("filtered") .setOutputCol("features") .setMinDF(5) // Only count words that occur in at least 5 threadss .fit(removed_df) // returns CountVectorizerModel //This is the one we use, limit size of vocabulary val vectorizer = new CountVectorizer() .setInputCol("filtered") .setOutputCol("features") .setVocabSize(1000) // Size of dictonary .setMinDF(5) // Only count words that occur in at least 5 threadss .fit(removed_df) // returns CountVectorizerModel
vectorizerall: org.apache.spark.ml.feature.CountVectorizerModel = CountVectorizerModel: uid=cntVec_e9a01f8ad0fe, vocabularySize=129204 vectorizer: org.apache.spark.ml.feature.CountVectorizerModel = CountVectorizerModel: uid=cntVec_9d51ff227f19, vocabularySize=1000
// Let's take a look at the vocabulary vectorizer.vocabulary
// Count the word frequencies val tf = vectorizer.transform(removed_df.select("c", "filtered")).select("c", "features").cache() //Print the feature vector to show what it looks like tf.take(1).foreach(println)
[0,(1000,[0,4,5,6,12,16,33,34,48,53,56,60,64,66,68,73,83,84,91,100,101,105,107,119,123,125,127,141,143,163,171,201,205,210,212,261,273,302,325,338,341,348,361,367,383,414,424,453,454,491,571,621,632,635,667,684,693,701,829,849,933,981],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,2.0,2.0,1.0,1.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])] tf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [c: int, features: vector]
  1. Split data into training and test data

//Train test split val random_order = tf.orderBy(rand()) val splits = random_order.randomSplit(Array(0.8, 0.2), seed = 1337) val training = splits(0) val test = splits(1) println(training.count()) println(test.count())
90818 22424 random_order: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [c: int, features: vector] splits: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = Array([c: int, features: vector], [c: int, features: vector]) training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [c: int, features: vector] test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [c: int, features: vector]
  1. Logistic Regression model

P(y=1)=11+exp(βXT)

X=[1,x1,,xm],β=[β0,β1,...,βm]

where xi is occurrence for word i, m is 1000.

//Logistic regression val lr = new LogisticRegression() .setLabelCol("c") .setMaxIter(100) //Run for 100 iterations (not necessary but let's stay on safe side) .setRegParam(0.0001) //Just a tiny bit of regularization to avoid overfitting .setElasticNetParam(0.5) // 50-50 between L1 and L2 loss // Fit the model val lrModel = lr.fit(training) // Print the coefficients and intercept for logistic regression println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
// Check the training progress lrModel.binarySummary.objectiveHistory.foreach(loss => println(loss))
//Ugly code to lookup maximum and minimum values var maxarray = Array.ofDim[Double](5,2) def findmax(idx: Int, value: Double) = { if (value > maxarray(4)(1)){ maxarray(4)(0) = idx maxarray(4)(1) = value maxarray = maxarray.sortBy(- _(1)) } } var minarray = Array.ofDim[Double](5,2) def findmin(idx: Int, value: Double) = { if (value < minarray(4)(1)){ minarray(4)(0) = idx minarray(4)(1) = value minarray = minarray.sortBy(_(1)) } }
maxarray: Array[Array[Double]] = Array(Array(0.0, 0.0), Array(0.0, 0.0), Array(0.0, 0.0), Array(0.0, 0.0), Array(0.0, 0.0)) findmax: (idx: Int, value: Double)Unit minarray: Array[Array[Double]] = Array(Array(0.0, 0.0), Array(0.0, 0.0), Array(0.0, 0.0), Array(0.0, 0.0), Array(0.0, 0.0)) findmin: (idx: Int, value: Double)Unit
//Let's check which words are considered most important for classification lrModel.coefficients.foreachActive((idx, value) => findmax(idx, value)) //First check familjeliv println(maxarray.deep.foreach(println)) println(vectorizer.vocabulary(maxarray(0)(0).toInt)) println(vectorizer.vocabulary(maxarray(1)(0).toInt)) println(vectorizer.vocabulary(maxarray(2)(0).toInt)) println(vectorizer.vocabulary(maxarray(3)(0).toInt)) println(vectorizer.vocabulary(maxarray(4)(0).toInt)) lrModel.coefficients.foreachActive((idx, value) => findmin(idx, value)) //Check for flashback println(minarray.deep.foreach(println)) println(vectorizer.vocabulary(minarray(0)(0).toInt)) println(vectorizer.vocabulary(minarray(1)(0).toInt)) println(vectorizer.vocabulary(minarray(2)(0).toInt)) println(vectorizer.vocabulary(minarray(3)(0).toInt)) println(vectorizer.vocabulary(minarray(4)(0).toInt))
Array(46.0, 1.8647655096872564) Array(885.0, 1.275281418044729) Array(950.0, 1.224376631679196) Array(380.0, 0.9577079595234373) Array(32.0, 0.8432880748567715) () anonym förlossningen maken sambon sambo Array(990.0, -2.2314223680319945) Array(664.0, -1.8291269454715258) Array(857.0, -1.4232104863197035) Array(275.0, -1.3427561053936439) Array(173.0, -1.1857533047141897) () topic bruden vafan brudar jävligt
  1. Predict on test data

val predictions = lrModel.transform(test) predictions.orderBy(rand()).select("c", "prediction", "probability").show(30, false)
+---+----------+-------------------------------------------+ |c |prediction|probability | +---+----------+-------------------------------------------+ |0 |0.0 |[0.9721401017870042,0.027859898212995764] | |0 |0.0 |[0.975623998737009,0.02437600126299096] | |1 |1.0 |[4.6730789993111417E-7,0.9999995326921002] | |0 |0.0 |[0.933249707175278,0.066750292824722] | |0 |0.0 |[0.9902085789245901,0.009791421075409966] | |0 |0.0 |[0.5279677569376853,0.47203224306231467] | |0 |0.0 |[0.9932461412304279,0.00675385876957205] | |1 |1.0 |[2.43269453308815E-5,0.9999756730546691] | |1 |1.0 |[8.266454051870882E-10,0.9999999991733546] | |0 |0.0 |[0.9997151003194746,2.8489968052548283E-4] | |1 |0.0 |[0.5514931570249911,0.44850684297500887] | |0 |0.0 |[0.5858664716477586,0.41413352835224143] | |1 |1.0 |[0.002100566198113697,0.9978994338018863] | |1 |1.0 |[0.07917634407205193,0.920823655927948] | |0 |0.0 |[0.9970675008521647,0.0029324991478353007] | |0 |0.0 |[0.9999595461915014,4.045380849869759E-5] | |0 |1.0 |[0.33337692071405434,0.6666230792859457] | |1 |1.0 |[0.36761800025826114,0.6323819997417389] | |1 |1.0 |[0.3245585295503879,0.6754414704496121] | |1 |1.0 |[0.2355899833856519,0.7644100166143482] | |0 |0.0 |[0.9999999999997755,2.2452150253864004E-13]| |1 |1.0 |[0.18608690603389255,0.8139130939661074] | |0 |0.0 |[0.740890026139782,0.25910997386021795] | |0 |0.0 |[0.963586227883629,0.036413772116371014] | |1 |1.0 |[0.0021508873399861557,0.9978491126600139] | |1 |1.0 |[0.3858439417926455,0.6141560582073544] | |1 |1.0 |[0.4517753939274335,0.5482246060725665] | |1 |1.0 |[0.02573645474447229,0.9742635452555276] | |0 |0.0 |[0.8022052550237544,0.19779474497624555] | |1 |1.0 |[0.042382658471976975,0.9576173415280229] | +---+----------+-------------------------------------------+ only showing top 30 rows predictions: org.apache.spark.sql.DataFrame = [c: int, features: vector ... 3 more fields]
//Check auroc value val evaluator = new BinaryClassificationEvaluator().setLabelCol("c") evaluator.evaluate(predictions)
evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = BinaryClassificationEvaluator: uid=binEval_0812f13ed2be, metricName=areaUnderROC, numBins=1000 res19: Double = 0.928445521562674