ScaDaMaLe Course site and book

Introduction to Spark

Spark Essentials: RDDs, Transformations and Actions

  • This introductory notebook describes how to get started running Spark (Scala) code in Notebooks.
  • Working with Spark's Resilient Distributed Datasets (RDDs)
    • creating RDDs
    • performing basic transformations on RDDs
    • performing basic actions on RDDs

RECOLLECT from 001_WhySpark notebook and AJ's videos that Spark does fault-tolerant, distributed, in-memory computing

THEORY CAVEAT This module is focused on getting you to quickly write Spark programs with a high-level appreciation of the underlying concepts.

In the last module, we will spend more time on analyzing the core algorithms in parallel and distributed setting of a typical Spark cluster today -- where several multi-core parallel computers (Spark workers) are networked together to provide a fault-tolerant distributed computing platform.

Spark Cluster Overview:

Driver Program, Cluster Manager and Worker Nodes

The driver does the following:

  1. connects to a cluster manager to allocate resources across applications
  • acquire executors on cluster nodes
    • executor processs run compute tasks and cache data in memory or disk on a worker node
  • sends application (user program built on Spark) to the executors
  • sends tasks for the executors to run
    • task is a unit of work that will be sent to one executor

See http://spark.apache.org/docs/latest/cluster-overview.html for an overview of the spark cluster.

The Abstraction of Resilient Distributed Dataset (RDD)

RDD is a fault-tolerant collection of elements that can be operated on in parallel.

Two types of Operations are possible on an RDD:

  • Transformations
  • Actions

(watch now 2:26):

RDD in Spark by Anthony Joseph in BerkeleyX/CS100.1x


Transformations

(watch now 1:18):

Spark Transformations by Anthony Joseph in BerkeleyX/CS100.1x


Actions

(watch now 0:48):

Spark Actions by Anthony Joseph in BerkeleyX/CS100.1x


Key Points

  • Resilient distributed datasets (RDDs) are the primary abstraction in Spark.
  • RDDs are immutable once created:
    • can transform it.
    • can perform actions on it.
    • but cannot change an RDD once you construct it.
  • Spark tracks each RDD's lineage information or recipe to enable its efficient recomputation if a machine fails.
  • RDDs enable operations on collections of elements in parallel.
  • We can construct RDDs by:
    • parallelizing Scala collections such as lists or arrays
    • by transforming an existing RDD,
    • from files in distributed file systems such as (HDFS, S3, etc.).
  • We can specify the number of partitions for an RDD
  • The more partitions in an RDD, the more opportunities for parallelism
  • There are two types of operations you can perform on an RDD:
    • transformations (are lazily evaluated)
      • map
      • flatMap
      • filter
      • distinct
      • ...
    • actions (actual evaluation happens)
      • count
      • reduce
      • take
      • collect
      • takeOrdered
      • ...
  • Spark transformations enable us to create new RDDs from an existing RDD.
  • RDD transformations are lazy evaluations (results are not computed right away)
  • Spark remembers the set of transformations that are applied to a base data set (this is the lineage graph of RDD)
  • The allows Spark to automatically recover RDDs from failures and slow workers.
  • The lineage graph is a recipe for creating a result and it can be optimized before execution.
  • A transformed RDD is executed only when an action runs on it.
  • You can also persist, or cache, RDDs in memory or on disk (this speeds up iterative ML algorithms that transforms the initial RDD iteratively).
  • Here is a great reference URL for programming guides for Spark that one should try to cover first

Let's get our hands dirty in Spark!

DO NOW!

In your databricks community edition:

  1. In your WorkSpace create a Folder named scalable-data-science
  2. Import the databricks archive file at the following URL:
  3. This should open a structure of directories in with path: /Workspace/scalable-data-science/xtraResources/

Let us look at the legend and overview of the visual RDD Api by doing the following first:

Running Spark

The variable sc allows you to access a Spark Context to run your Spark programs. Recall SparkContext is in the Driver Program.

**NOTE: Do not create the sc variable - it is already initialized for you in spark-shell REPL, that includes notebook environments like databricks, Jupyter, zeppelin, etc. **

We will do the following next:

  1. Create an RDD using sc.parallelize
  • Perform the collect action on the RDD and find the number of partitions it is made of using getNumPartitions action
  • Perform the take action on the RDD
  • Transform the RDD by map to make another RDD
  • Transform the RDD by filter to make another RDD
  • Perform the reduce action on the RDD
  • Transform the RDD by flatMap to make another RDD
  • Create a Pair RDD
  • Perform some transformations on a Pair RDD
  • Where in the cluster is your computation running?
  • Shipping Closures, Broadcast Variables and Accumulator Variables
  • Spark Essentials: Summary
  • HOMEWORK
  • Importing Standard Scala and Java libraries

Entry Point

Now we are ready to start programming in Spark!

Our entry point for Spark 2.x applications is the class SparkSession. An instance of this object is already instantiated for us which can be easily demonstrated by running the next cell

We will need these docs!

println(spark)
org.apache.spark.sql.SparkSession@69141846

NOTE that since Spark 2.0 SparkSession is a replacement for the other entry points: * SparkContext, available in our notebook as sc. * SQLContext, or more specifically its subclass HiveContext, available in our notebook as sqlContext.

println(sc)
println(sqlContext)
org.apache.spark.SparkContext@517c9049
org.apache.spark.sql.hive.HiveContext@6c5b5052

We will be using the pre-made SparkContext sc when learning about RDDs.

1. Create an RDD using sc.parallelize

First, let us create an RDD of three elements (of integer type Int) from a Scala Seq (or List or Array) with two partitions by using the parallelize method of the available Spark Context sc as follows:

val x = sc.parallelize(Array(1, 2, 3), 2)    // <Ctrl+Enter> to evaluate this cell (using 2 partitions)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at command-685894176422457:1
//x.  // place the cursor after 'x.' and hit Tab to see the methods available for the RDD x we created

2. Perform the collect action on the RDD and find the number of partitions in it using getNumPartitions action

No action has been taken by sc.parallelize above. To see what is "cooked" by the recipe for RDD x we need to take an action.

The simplest is the collect action which returns all of the elements of the RDD as an Array to the driver program and displays it.

So you have to make sure that all of that data will fit in the driver program if you call collect action!

Let us look at the collect action in detail and return here to try out the example codes.

Let us perform a collect action on RDD x as follows:

x.collect()    // <Ctrl+Enter> to collect (action) elements of rdd; should be (1, 2, 3)
res44: Array[Int] = Array(1, 2, 3)

CAUTION: collect can crash the driver when called upon an RDD with massively many elements. So, it is better to use other diplaying actions like take or takeOrdered as follows:

Let us look at the getNumPartitions action in detail and return here to try out the example codes.

// <Ctrl+Enter> to evaluate this cell and find the number of partitions in RDD x
x.getNumPartitions 
res45: Int = 2

We can see which elements of the RDD are in which parition by calling glom() before collect().

glom() flattens elements of the same partition into an Array.

x.glom().collect() // glom() flattens elements on the same partition
res46: Array[Array[Int]] = Array(Array(1), Array(2, 3))
val a = x.glom().collect()
a: Array[Array[Int]] = Array(Array(1), Array(2, 3))

Thus from the output above, Array[Array[Int]] = Array(Array(1), Array(2, 3)), we know that 1 is in one partition while 2 and 3 are in another partition.

You Try!

Crate an RDD x with three elements, 1,2,3, and this time do not specifiy the number of partitions. Then the default number of partitions will be used. Find out what this is for the cluster you are attached to.

The default number of partitions for an RDD depends on the cluster this notebook is attached to among others - see programming-guide.

val x = sc.parallelize(Seq(1, 2, 3))    // <Shift+Enter> to evaluate this cell (using default number of partitions)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at command-685894176422471:1
x.getNumPartitions // <Shift+Enter> to evaluate this cell
res47: Int = 8
x.glom().collect() // <Ctrl+Enter> to evaluate this cell
res48: Array[Array[Int]] = Array(Array(), Array(), Array(1), Array(), Array(), Array(2), Array(), Array(3))

3. Perform the take action on the RDD

The .take(n) action returns an array with the first n elements of the RDD.

x.take(2) // Ctrl+Enter to take two elements from the RDD x
res49: Array[Int] = Array(1, 2)
You Try!

Fill in the parenthes ( ) below in order to take just one element from RDD x.

//x.take(1) // uncomment by removing '//' before x in the cell and fill in the parenthesis to take just one element from RDD x and Cntrl+Enter

4. Transform the RDD by map to make another RDD

The map transformation returns a new RDD that's formed by passing each element of the source RDD through a function (closure). The closure is automatically passed on to the workers for evaluation (when an action is called later).

Let us look at the map transformation in detail and return here to try out the example codes.

// Shift+Enter to make RDD x and RDD y that is mapped from x
val x = sc.parallelize(Array("b", "a", "c")) // make RDD x: [b, a, c]
val y = x.map(z => (z,1))                    // map x into RDD y: [(b, 1), (a, 1), (c, 1)]
x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[38] at parallelize at command-685894176422480:2
y: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[39] at map at command-685894176422480:3
// Cntrl+Enter to collect and print the two RDDs
println(x.collect().mkString(", "))
println(y.collect().mkString(", "))
b, a, c
(b,1), (a,1), (c,1)

5. Transform the RDD by filter to make another RDD

The filter transformation returns a new RDD that's formed by selecting those elements of the source RDD on which the function returns true.

Let us look at the filter transformation in detail and return here to try out the example codes.

//Shift+Enter to make RDD x and filter it by (n => n%2 == 1) to make RDD y
val x = sc.parallelize(Array(1,2,3))
// the closure (n => n%2 == 1) in the filter will 
// return True if element n in RDD x has remainder 1 when divided by 2 (i.e., if n is odd)
val y = x.filter(n => n%2 == 1) 
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at parallelize at command-685894176422484:2
y: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[41] at filter at command-685894176422484:5
// Cntrl+Enter to collect and print the two RDDs
println(x.collect().mkString(", "))
println(y.collect().mkString(", "))
//y.collect()
1, 2, 3
1, 3

6. Perform the reduce action on the RDD

Reduce aggregates a data set element using a function (closure). This function takes two arguments and returns one and can often be seen as a binary operator. This operator has to be commutative and associative so that it can be computed correctly in parallel (where we have little control over the order of the operations!).

Let us look at the reduce action in detail and return here to try out the example codes.

//Shift+Enter to make RDD x of inteegrs 1,2,3,4 and reduce it to sum
val x = sc.parallelize(Array(1,2,3,4))
val y = x.reduce((a,b) => a+b)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[42] at parallelize at command-685894176422488:2
y: Int = 10
//Cntrl+Enter to collect and print RDD x and the Int y, sum of x
println(x.collect.mkString(", "))
println(y)
1, 2, 3, 4
10

7. Transform an RDD by flatMap to make another RDD

flatMap is similar to map but each element from input RDD can be mapped to zero or more output elements. Therefore your function should return a sequential collection such as an Array rather than a single element as shown below.

Let us look at the flatMap transformation in detail and return here to try out the example codes.

//Shift+Enter to make RDD x and flatMap it into RDD by closure (n => Array(n, n*100, 42))
val x = sc.parallelize(Array(1,2,3))
val y = x.flatMap(n => Array(n, n*100, 42))
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[43] at parallelize at command-685894176422492:2
y: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[44] at flatMap at command-685894176422492:3
//Cntrl+Enter to collect and print RDDs x and y
println(x.collect().mkString(", "))
println(y.collect().mkString(", "))
sc.parallelize(Array(1,2,3)).map(n => Array(n,n*100,42)).collect()
1, 2, 3
1, 100, 42, 2, 200, 42, 3, 300, 42
res54: Array[Array[Int]] = Array(Array(1, 100, 42), Array(2, 200, 42), Array(3, 300, 42))

8. Create a Pair RDD

Let's next work with RDD of (key,value) pairs called a Pair RDD or Key-Value RDD.

// Cntrl+Enter to make RDD words and display it by collect
val words = sc.parallelize(Array("a", "b", "a", "a", "b", "b", "a", "a", "a", "b", "b"))
words.collect()
words: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[47] at parallelize at command-685894176422495:2
res55: Array[String] = Array(a, b, a, a, b, b, a, a, a, b, b)

Let's make a Pair RDD called wordCountPairRDD that is made of (key,value) pairs with key=word and value=1 in order to encode each occurrence of each word in the RDD words, as follows:

// Cntrl+Enter to make and collect Pair RDD wordCountPairRDD
val wordCountPairRDD = words.map(s => (s, 1))
wordCountPairRDD.collect()
wordCountPairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[48] at map at command-685894176422497:2
res56: Array[(String, Int)] = Array((a,1), (b,1), (a,1), (a,1), (b,1), (b,1), (a,1), (a,1), (a,1), (b,1), (b,1))

Wide Transformations and Shuffles

So far we have seen transformations that are narrow -- with no data transfer between partitions. Think of map.

ReduceByKey and GroupByKey are wide transformations as data has to be shuffled across the partitions in different executors -- this is generally very expensive operation.

READ the Background about Shuffles in the programming guide below.

In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key - this is called the shuffle

READ the Performance Impact about Shuffles in the programming guide below.

The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations.

Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.

https://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations

9. Perform some transformations on a Pair RDD

Let's next work with RDD of (key,value) pairs called a Pair RDD or Key-Value RDD.

Now some of the Key-Value transformations that we could perform include the following.

  • reduceByKey transformation
    • which takes an RDD and returns a new RDD of key-value pairs, such that:
      • the values for each key are aggregated using the given reduced function
      • and the reduce function has to be of the type that takes two values and returns one value.
  • sortByKey transformation
    • this returns a new RDD of key-value pairs that's sorted by keys in ascending order
  • groupByKey transformation
    • this returns a new RDD consisting of key and iterable-valued pairs.

Let's see some concrete examples next.

// Cntrl+Enter to reduceByKey and collect wordcounts RDD
//val wordcounts = wordCountPairRDD.reduceByKey( _ + _ )
val wordcounts = wordCountPairRDD.reduceByKey( (value1, value2) => value1 + value2 )
wordcounts.collect()
wordcounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[49] at reduceByKey at command-685894176422504:3
res58: Array[(String, Int)] = Array((a,6), (b,5))

Now, let us do just the crucial steps and avoid collecting intermediate RDDs (something we should avoid for large datasets anyways, as they may not fit in the driver program).

//Cntrl+Enter to make words RDD and do the word count in two lines
val words = sc.parallelize(Array("a", "b", "a", "a", "b", "b", "a", "a", "a", "b", "b"))
val wordcounts = words
                    .map(s => (s, 1))
                    .reduceByKey(_ + _)
                    .collect() 
words: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at command-685894176422506:2
wordcounts: Array[(String, Int)] = Array((a,6), (b,5))
You Try!

You try evaluating sortByKey() which will make a new RDD that consists of the elements of the original pair RDD that are sorted by Keys.

// Shift+Enter and comprehend code
val words = sc.parallelize(Array("a", "b", "a", "a", "b", "b", "a", "a", "a", "b", "b"))
val wordCountPairRDD = words.map(s => (s, 1))
val wordCountPairRDDSortedByKey = wordCountPairRDD.sortByKey()
words: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[53] at parallelize at command-685894176422508:2
wordCountPairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[54] at map at command-685894176422508:3
wordCountPairRDDSortedByKey: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[57] at sortByKey at command-685894176422508:4
wordCountPairRDD.collect() // Shift+Enter and comprehend code
res59: Array[(String, Int)] = Array((a,1), (b,1), (a,1), (a,1), (b,1), (b,1), (a,1), (a,1), (a,1), (b,1), (b,1))
wordCountPairRDDSortedByKey.collect() // Cntrl+Enter and comprehend code
res60: Array[(String, Int)] = Array((a,1), (a,1), (a,1), (a,1), (a,1), (a,1), (b,1), (b,1), (b,1), (b,1), (b,1))

The next key value transformation we will see is groupByKey

When we apply the groupByKey transformation to wordCountPairRDD we end up with a new RDD that contains two elements. The first element is the tuple b and an iterable CompactBuffer(1,1,1,1,1) obtained by grouping the value 1 for each of the five key value pairs (b,1). Similarly the second element is the key a and an iterable CompactBuffer(1,1,1,1,1,1) obtained by grouping the value 1 for each of the six key value pairs (a,1).

CAUTION: groupByKey can cause a large amount of data movement across the network. It also can create very large iterables at a worker. Imagine you have an RDD where you have 1 billion pairs that have the key a. All of the values will have to fit in a single worker if you use group by key. So instead of a group by key, consider using reduced by key.

val wordCountPairRDDGroupByKey = wordCountPairRDD.groupByKey() // <Shift+Enter> CAUTION: this transformation can be very wide!
wordCountPairRDDGroupByKey: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[58] at groupByKey at command-685894176422513:1
wordCountPairRDDGroupByKey.collect()  // Cntrl+Enter
res61: Array[(String, Iterable[Int])] = Array((a,CompactBuffer(1, 1, 1, 1, 1, 1)), (b,CompactBuffer(1, 1, 1, 1, 1)))

10. Understanding Closures - Where in the cluster is your computation running?

One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster. RDD operations that modify variables outside of their scope can be a frequent source of confusion. In the example below we’ll look at code that uses foreach() to increment a counter, but similar issues can occur for other operations as well.

https://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-

val data = Array(1, 2, 3, 4, 5)
var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
Counter value: 0
data: Array[Int] = Array(1, 2, 3, 4, 5)
counter: Int = 0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[59] at parallelize at command-685894176422517:3

From RDD programming guide:

The behavior of the above code is undefined, and may not work as intended. To execute jobs, Spark breaks up the processing of RDD operations into tasks, each of which is executed by an executor. Prior to execution, Spark computes the task’s closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor.

The variables within the closure sent to each executor are now copies and thus, when counter is referenced within the foreach function, it’s no longer the counter on the driver node. There is still a counter in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.

11. Shipping Closures, Broadcast Variables and Accumulator Variables

Closures, Broadcast and Accumulator Variables

(watch now 2:06):

Closures, Broadcast and Accumulators by Anthony Joseph in BerkeleyX/CS100.1x

We will use these variables in the sequel.

SUMMARY

Spark automatically creates closures

  • for functions that run on RDDs at workers,
  • and for any global variables that are used by those workers
  • one closure per worker is sent with every task
  • and there's no communication between workers
  • closures are one way from the driver to the worker
  • any changes that you make to the global variables at the workers
    • are not sent to the driver or
    • are not sent to other workers.

The problem we have is that these closures

  • are automatically created are sent or re-sent with every job
  • with a large global variable it gets inefficient to send/resend lots of data to each worker
  • we cannot communicate that back to the driver

To do this, Spark provides shared variables in two different types.

  • broadcast variables
    • lets us to efficiently send large read-only values to all of the workers
    • these are saved at the workers for use in one or more Spark operations.
  • accumulator variables
    • These allow us to aggregate values from workers back to the driver.
    • only the driver can access the value of the accumulator
    • for the tasks, the accumulators are basically write-only

Accumulators

Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types.

Read: https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators.

A numeric accumulator can be created by calling SparkContext.longAccumulator() or SparkContext.doubleAccumulator() to accumulate values of type Long or Double, respectively. Tasks running on a cluster can then add to it using the add method. However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.

The code below shows an accumulator being used to add up the elements of an array:

val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 1891, name: Some(My Accumulator), value: 0)
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
accum.value
res66: Long = 10
Broadcast Variables

From https://spark.apache.org/docs/latest/rdd-programming-guide.html#broadcast-variables:

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The code below shows this in action.

val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(67)
broadcastVar.value
res68: Array[Int] = Array(1, 2, 3)
broadcastVar.value(0)
res69: Int = 1
val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[61] at parallelize at command-685894176422531:1
rdd.collect
res70: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
rdd.map(x => x%3).collect
res71: Array[Int] = Array(1, 2, 0, 1, 2, 0, 1, 2, 0, 1)
rdd.map(x => x+broadcastVar.value(x%3)).collect
res72: Array[Int] = Array(3, 5, 4, 6, 8, 7, 9, 11, 10, 12)

After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).

To release the resources that the broadcast variable copied onto executors, call .unpersist(). If the broadcast is used again afterwards, it will be re-broadcast. To permanently release all resources used by the broadcast variable, call .destroy(). The broadcast variable can’t be used after that. Note that these methods do not block by default. To block until resources are freed, specify blocking=true when calling them.

broadcastVar.unpersist()
A more interesting example of broadcast variable

Let us broadcast maps and use them to lookup the values at each executor. This example is taken from: - https://sparkbyexamples.com/spark/spark-broadcast-variables/

val states = Map(("NY","New York"),("CA","California"),("FL","Florida"))
val countries = Map(("USA","United States of America"),("IN","India"))

val broadcastStates = spark.sparkContext.broadcast(states)
val broadcastCountries = spark.sparkContext.broadcast(countries)

val data = Seq(("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL"))

val rdd = spark.sparkContext.parallelize(data) // spark.sparkContext is the same as sc.parallelize in spark-shell/notebook

  val rdd2 = rdd.map(f=>{
    val country = f._3
    val state = f._4
    val fullCountry = broadcastCountries.value.get(country).get
    val fullState = broadcastStates.value.get(state).get
    (f._1,f._2,fullCountry,fullState)
  })
states: scala.collection.immutable.Map[String,String] = Map(NY -> New York, CA -> California, FL -> Florida)
countries: scala.collection.immutable.Map[String,String] = Map(USA -> United States of America, IN -> India)
broadcastStates: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[String,String]] = Broadcast(71)
broadcastCountries: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[String,String]] = Broadcast(72)
data: Seq[(String, String, String, String)] = List((James,Smith,USA,CA), (Michael,Rose,USA,NY), (Robert,Williams,USA,CA), (Maria,Jones,USA,FL))
rdd: org.apache.spark.rdd.RDD[(String, String, String, String)] = ParallelCollectionRDD[64] at parallelize at command-685894176422538:12
rdd2: org.apache.spark.rdd.RDD[(String, String, String, String)] = MapPartitionsRDD[65] at map at command-685894176422538:14
println(rdd2.collect().mkString("\n"))
(James,Smith,United States of America,California)
(Michael,Rose,United States of America,New York)
(Robert,Williams,United States of America,California)
(Maria,Jones,United States of America,Florida)

12. Spark Essentials: Summary

(watch now: 0:29)

Spark Essentials Summary by Anthony Joseph in BerkeleyX/CS100.1x

NOTE: In databricks cluster, we (the course coordinator/administrators) set the number of workers for you.

13. HOMEWORK

See the notebook in this folder named 005_RDDsTransformationsActionsHOMEWORK. This notebook will give you more examples of the operations above as well as others we will be using later, including:

  • Perform the takeOrdered action on the RDD
  • Transform the RDD by distinct to make another RDD and
  • Doing a bunch of transformations to our RDD and performing an action in a single cell.


14. Importing Standard Scala and Java libraries

  • For other libraries that are not available by default, you can upload other libraries to the Workspace.
  • Refer to the Libraries guide for more details.
import scala.math._
val x = min(1, 10)
import scala.math._
x: Int = 1
import java.util.HashMap
val map = new HashMap[String, Int]()
map.put("a", 1)
map.put("b", 2)
map.put("c", 3)
map.put("d", 4)
map.put("e", 5)
import java.util.HashMap
map: java.util.HashMap[String,Int] = {a=1, b=2, c=3, d=4, e=5}
res75: Int = 0