006a_PipedRDD(Scala)

Loading...

ScaDaMaLe Course site and book

Piped RDDs and Bayesian AB Testing

Here we will first take excerpts with minor modifications from the end of Chapter 12. Resilient Distributed Datasets (RDDs) of Spark: The Definitive Guide:

Next, we will do Bayesian AB Testing using PipedRDDs.

First, we create the toy RDDs as in The Definitive Guide:

From a Local Collection

To create an RDD from a collection, you will need to use the parallelize method on a SparkContext (within a SparkSession). This turns a single node collection into a parallel collection. When creating this parallel collection, you can also explicitly state the number of partitions into which you would like to distribute this array. In this case, we are creating two partitions:

// in Scala
val myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"  .split(" ")
val words = spark.sparkContext.parallelize(myCollection, 2)
%python
# in Python
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\
  .split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)
words

glom from The Definitive Guide

glom is an interesting function that takes every partition in your dataset and converts them to arrays. This can be useful if you’re going to collect the data to the driver and want to have an array for each partition. However, this can cause serious stability issues because if you have large partitions or a large number of partitions, it’s simple to crash the driver.

Let's use glom to see how our words are distributed among the two partitions we used explicitly.

words.glom.collect 
%python
words.glom().collect()

Checkpointing from The Definitive Guide

One feature not available in the DataFrame API is the concept of checkpointing. Checkpointing is the act of saving an RDD to disk so that future references to this RDD point to those intermediate partitions on disk rather than recomputing the RDD from its original source. This is similar to caching except that it’s not stored in memory, only disk. This can be helpful when performing iterative computation, similar to the use cases for caching:

Let's create a directory in dbfs:/// for checkpointing of RDDs in the sequel. The following %fs mkdirs /path_to_dir is a shortcut to create a directory in dbfs:///

%fs
mkdirs /datasets/ScaDaMaLe/checkpointing/
spark.sparkContext.setCheckpointDir("dbfs:///datasets/ScaDaMaLe/checkpointing")
words.checkpoint()

Now, when we reference this RDD, it will derive from the checkpoint instead of the source data. This can be a helpful optimization.

YouTry

Just some more words in haha_words with \n, the End-Of-Line (EOL) characters, in-place.