Scalable Data Science

prepared by Raazesh Sainudiin and Sivanand Sivaram

Deep learning with H2O.ai and Spark

  • This notebook provides an introduction to the use of Deep Learning algorithms with H2O.ai and Spark
  • It shows an example deep learning application written in H2O.ai (Sparkling water) and Spark

Spam classification of SMS data

  Reworked from databricks guide and - https://github.com/h2oai/sparkling-water/blob/master/examples/src/main/scala/org/apache/spark/examples/h2o/HamOrSpamDemo.scala

  1. Explore the dataset
  2. Extract features
    • Tokenize
    • Remove stop words
    • Hash
    • TF-IDF
  3. Train a deep learning model
  4. Predict

####Explore the dataset

%fs ls /databricks-datasets/sms_spam_collection/data-001

// Getting the data if you are not on Databricks

import java.net.URL
import java.io.File
import org.apache.commons.io.FileUtils
val SMSDATA_FILE = new File("/tmp/smsData.csv")
FileUtils.copyURLToFile(new URL("https://raw.githubusercontent.com/h2oai/sparkling-water/master/examples/smalldata/smsData.txt"), SMSDATA_FILE)

Exploring the data


Convert the data to a DataFrame

val data = sqlContext.read
    .option("header", "false")
    .option("delimiter", "\t") // Use /t as delimiter
    .option("inferSchema", "true")
val df = data.toDF("hamOrSpam", "message")





import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.feature.RegexTokenizer

def tokenize(df: DataFrame): DataFrame = {
  // Set params for RegexTokenizer
  val tokenizer = new RegexTokenizer().
   setPattern("[\\W_]+"). // break by white space character(s)
   setMinTokenLength(2). // Filter away tokens with length < 2
   setInputCol("message"). // name of the input column
   setOutputCol("tokens") // name of the output column

  // Tokenize document

val tokenized_df = tokenize(df)

Remove stop words

//%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words -O /tmp/stopwords # uncomment '//' at the beginning and repeat only if needed again

val stopwords = sc.textFile("/tmp/stopwords").collect() ++ Array(",", ":", ";", "/", "<", ">", "\"", ".", "(", ")", "?", "-", "'", "!", "0", "1")

import org.apache.spark.ml.feature.StopWordsRemover

def removeStopwords(df: DataFrame): DataFrame = {

  // Set params for StopWordsRemover
  val remover = new StopWordsRemover().

// Create new DF with Stopwords removed
val filtered_df = removeStopwords(tokenized_df)

Hash - for term frequency

import org.apache.spark.ml.feature.HashingTF

def hasher(df: DataFrame): DataFrame = {
  val hashingTF = new HashingTF().
   setNumFeatures(1024). // number of features to retain

val hashed_df = hasher(filtered_df)


display(hashed_df.select("hamOrSpam", "message", "hashed").take(10))

TF-IDF (Term Frequency - Inverse Document Frequency) - is a numerical statistic that is intended to reflect how important a word is to a document in a collection or corpus.

import org.apache.spark.ml.feature.{IDF, IDFModel}

def getIDFModel(df: DataFrame): IDFModel = {

  val idf = new IDF().

val idfModel = getIDFModel(hashed_df)
val idf_df = idfModel.transform(hashed_df)

display(idf_df.select("hamOrSpam", "message", "hashed", "features").take(10))

Helper function that puts all the featurizers together

import sqlContext.implicits._

def featurizer(message: String): DataFrame = {
 val initialDF = sc.parallelize(Seq(message)).
   select(org.apache.spark.sql.functions.lit("?").as("hamOrSpam"), $"message")
  val hashedDF = hasher(removeStopwords(tokenize(initialDF)))

// Attach H2O library - maven artifact ai.h2o:sparkling-water-examples_2.10:1.6.3

import org.apache.spark.h2o._
// Create H2O Context
val h2oContext = H2OContext.getOrCreate(sc)

// Import h2oContext implicits. This helps converting between RDD, DataFrame and H2OFrame
import h2oContext.implicits._

// Implicitly convert DataFrame to H2O Frame
val table: H2OFrame = idf_df.select("hamOrSpam", "features")

// http://h2o-release.s3.amazonaws.com/h2o/rel-turchin/3/docs-website/h2o-core/javadoc/index.html

table.replace(table.find("hamOrSpam"), table.vec("hamOrSpam").toCategoricalVec).remove()

import water.Key
import hex.FrameSplitter

def split(df: H2OFrame, keys: Seq[String], ratios: Seq[Double]): Array[Frame] = {
    val ks = keys.map(Key.make[Frame](_)).toArray
    val splitter = new FrameSplitter(df, ratios.toArray, ks, null)
    // return results

// Split table
val keys = Array[String]("train.hex", "valid.hex")
val ratios = Array[Double](0.8)
val frs = split(table, keys, ratios)
val (train, valid) = (frs(0), frs(1))

What deep learning parameters can we set?

Deep learning parameters

import hex.deeplearning.DeepLearning
import hex.deeplearning.DeepLearningModel
import hex.deeplearning.DeepLearningModel.DeepLearningParameters
import DeepLearningParameters.Activation

val dlParams = new DeepLearningParameters()

dlParams._train = train
dlParams._valid = valid
dlParams._activation = Activation.RectifierWithDropout
dlParams._response_column = 'hamOrSpam
dlParams._epochs = 10
dlParams._l1 = 0.001
dlParams._hidden = Array[Int](200, 200)

// Create a job
val dl = new DeepLearning(dlParams, Key.make("dlModel.hex"))
val dlModel = dl.trainModel.get // trainModel submits a job to H2O Context. get blocks till the job is finished
                                // get returns a DeepLearningModel


(1:43 seconds):

Udacity: Deep Learning by Vincent Vanhoucke - Dropouts

– Video Credit: Udacity’s deep learning by Arpan Chakraborthy and Vincent Vanhoucke

import water.app.ModelMetricsSupport
import hex.ModelMetricsBinomial

val trainMetrics = ModelMetricsSupport.modelMetrics[ModelMetricsBinomial](dlModel, train)
println(s"Training AUC: ${trainMetrics.auc}")

val validMetrics = ModelMetricsSupport.modelMetrics[ModelMetricsBinomial](dlModel, valid)
println(s"Validation AUC: ${validMetrics.auc}")

import org.apache.spark.ml.feature.IDFModel
import org.apache.spark.sql.DataFrame

def isSpam(msg: String,
           hamThreshold: Double = 0.5): Boolean = {
  val msgTable: H2OFrame = featurizer(msg)
  msgTable.remove(0) // remove first column
  val prediction = dlModel.score(msgTable) // score takes a Frame as input and scores the input features identified
  prediction.vecs()(1).at(0) < hamThreshold

isSpam("We tried to contact you re your reply to our offer of a Video Handset? 750 anytime any networks mins? UNLIMITED TEXT?")

isSpam("See you at the next Spark meetup")

isSpam("You have won $500,000 from COCA COLA. Contact winner-coco@hotmail.com to claim your prize!")

More examples


