Skip to content

Resilient Distributed Datasets (RDDs)๐Ÿ”—

  • Almost in all scenarios you must prefer Sparkโ€™s Structured APIs.
  • Only in some cases we might need Sparkโ€™s low-level APIs, especially RDDs, the Spark Context, and distributed shared variables like accumulators and broadcast variables.

What Are the Low-Level APIs๐Ÿ”—

There are two sets of low-level APIs

  • manipulating distributed data (RDDs)
  • distributing and manipulating shared variables (broadcast variables and accumulators)

When to Use Low-Level APIs๐Ÿ”—

You should use low-level APIs in following 3 situations

  • You need a functionality that is not provided by higher-level APIs, maybe controls over data placement across cluster
  • Maintain some legacy codebase written using RDDs
  • Custom shared variable manipulation

We might need to drop down to these APIs to use some legacy code, implement custom data partitioner, or update and track shared variables over pipeline. These tools give more-fine grained control at the expense of safeguarding yourself from shooting in your foot.

How to Use the Low-Level APIs๐Ÿ”—

A SparkContext is entry point for low-level APIs and can be accessed throught the SparkSession

About RDDs๐Ÿ”—

  • All code (DataFrames, Datasets, ..) we run eventually compiles down to RDDs
  • RDD represents an immutable, partitioned collection of records that can be operated on in parallel. In DataFrames each record was a row containing fields of type schema but here it could be any arbitrary Java, Python, objects.
  • They come at cost of implementing everything from scratch and optimizing yourself.
  • The RDD API is similar to the Dataset, which we saw in the previous part of the book, except that RDDs are not stored in, or manipulated with, the structured data engine.

Types of RDDs๐Ÿ”—

  • Therre are lots of subclass of RDD. Generally you are going to use mainly two : generic RDD type or key-value RDD (provides aggregation by key).
  • RDD is characterized by 5 main properties
    • A list of partition
    • A function for computing each split
    • A list of dependencies on other RDDs
    • Optionally, Partitioner for key-value RDDs (mostly the reason to use RDDs)
    • Optionally, a list of preffered location on which to compute each split.
  • RDDs provide all Spark Paradigms like transformations, lazy evaluations, actions. However there is no concept of Rows in RDDs, individual records are just objects.
  • NOTE: Python loses substancial amount of performance when using RDDs, as effectively similar to running UDFs row by row. In python there are now raw objects instead, serialize data to Python process, operate to it in Python and deserialize data back again for JVM.

When to Use RDDs๐Ÿ”—

  • for vast variety of use cases, DataFrames will be more effecient, more stable and expressive.
  • You should not manually create RDDs (unless very specific need), they trade off partitioning power for optmisations that are in Structured APIs

Datasets and RDDs of Case Classes๐Ÿ”—

  • difference between RDDs of Case Classes and Datasets? : The difference is that Datasets can still take advantage of the wealth of functions and optimizations that the Structured APIs have to offer.

Creating RDDs๐Ÿ”—

Interoperating Between DataFrames, Datasets, and RDDs๐Ÿ”—

# in Scala: converts a Dataset[Long] to RDD[Long]
spark.range(500).rdd

# in Python
spark.range(10).rdd
spark.range(10).rdd.toDF()  # back to DF


# extracting data from Row Type
# Scala: spark.range(10).toDF().rdd.map(rowObject => rowObject.getLong(0))
spark.range(10).toDF("id").rdd.map(lambda row: row[0])

From Local Collection๐Ÿ”—

myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)

# naming it so its visible in Spark UI
words.setName("myWords")
words.name() # myWords

From Data Sources๐Ÿ”—

  • NOTE: RDDs do not have notion of Data Sources APIs like DataFrames, they primarily define their dependency structures and lists of partitions.
spark.sparkContext.textFile("/some/path/withTextFiles")
spark.sparkContext.wholeTextFiles("/some/path/withTextFiles")

Manipulating RDDs๐Ÿ”—

You manipulate RDDs in much the same way that you manipulate DataFrames. As mentioned, the core difference being that you manipulate raw Java or Scala objects instead of Spark types. There is also a dearth of โ€œhelperโ€ methods or functions that you can draw upon to simplify calculations.

Transformation๐Ÿ”—

Distinct๐Ÿ”—

# Remove duplicates from RDDs
words.distinct().count()

filter๐Ÿ”—

# This function returns a Boolean type to be used as a filter function.
def startsWithS(individual):
  return individual.startswith("S")
words.filter(lambda word: startsWithS(word)).collect()

map๐Ÿ”—

# map the current word to the word, its starting letter, and whether the word begins with โ€œS.โ€
words2 = words.map(lambda word: (word, word[0], word.startswith("S")))

# filter
words2.filter(lambda record: record[2]).take(5)
# returns tuple ("Spark", "S" ,true") , ...

flatMap๐Ÿ”—

# Sometimes, each current row should return multiple rows
# flatMap requires that the ouput of the map function be an iterable that can be expanded

words.flatMap(lambda word: list(word)).take(5)

sort๐Ÿ”—

words.sortBy(lambda word: len(word) * -1).take(2)

Random Splits๐Ÿ”—

fiftyFiftySplit = words.randomSplit([0.5, 0.5]) # returns an array of RDDs

Actions๐Ÿ”—

reduce๐Ÿ”—

# reduce an RDD of any kind of value to one value
spark.sparkContext.parallelize(range(1, 21)).reduce(lambda x, y: x + y) # 210

# get longest word in set of words
def wordLengthReducer(leftWord, rightWord):
  if len(leftWord) > len(rightWord):
    return leftWord
  else:
    return rightWord

words.reduce(wordLengthReducer)

Because the reduce operation on the partitions is not deterministic, you can have either โ€œdefinitiveโ€ or โ€œprocessingโ€ (both of length 10) as the โ€œleftโ€ word. This means that sometimes you can end up with one, whereas other times you end up with the other.

count๐Ÿ”—

words.count()

countApprox๐Ÿ”—

# This is an approximation of the count method we just looked at, but it must execute within a timeout (and can return incomplete results if it exceeds the timeout).

val confidence = 0.95
val timeoutMilliseconds = 400
words.countApprox(timeoutMilliseconds, confidence)

countApproxDistinct๐Ÿ”—

There are two implementations of this, both based on streamlibโ€™s implementation of โ€œHyperLogLog in Practice: Algorithmic Engineering of a State-of-the-Art Cardinality Estimation Algorithm.โ€

# In the first implementation, the argument we pass into the function is the relative accuracy
words.countApproxDistinct(0.05)
# second, you specify the relative accuracy based on two parameters: one for โ€œregularโ€ data and another for a sparse representation.
words.countApproxDistinct(4, 10)

countByValue๐Ÿ”—

# counts the number of values in a given RDD
words.countByValue()
  • However, it counts by finally loading the result set into the memory of the driver. You should use this method only if the resulting map is expected to be small because the entire thing is loaded into the driverโ€™s memory

countByValueApprox๐Ÿ”—

# similar to above with confidence interval [0,1]
words.countByValueApprox(1000, 0.95)

first๐Ÿ”—

words.first()

max and min๐Ÿ”—

spark.sparkContext.parallelize(1 to 20).max()
spark.sparkContext.parallelize(1 to 20).min()

take๐Ÿ”—

take and its derivative methods take a number of values from your RDDs. This works by first scanning one partition and then using the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

There are many variations on this function, such as takeOrdered, takeSample, and top.

words.take(5)
words.takeOrdered(5)
words.top(5)
val withReplacement = true
val numberToTake = 6
val randomSeed = 100L
words.takeSample(withReplacement, numberToTake, randomSeed)

Saving Files๐Ÿ”—

Saving files means writing to plain-text files. With RDDs, you cannot actually โ€œsaveโ€ to a data source in the conventional sense. You must iterate over the partitions in order to save the contents of each partition to some external database.

saveAsTextFile๐Ÿ”—

words.saveAsTextFile("file:/tmp/bookTitle")
// we can use a compression codec as well

// in Scala
import org.apache.hadoop.io.compress.BZip2Codec
words.saveAsTextFile("file:/tmp/bookTitleCompressed", classOf[BZip2Codec])

Sequence Files๐Ÿ”—

Spark originally grew out of the Hadoop ecosystem, so it has a fairly tight integration with a variety of Hadoop tools. A sequenceFile is a flat file consisting of binary keyโ€“value pairs. It is extensively used in MapReduce as input/output formats.

words.saveAsObjectFile("/tmp/my/sequenceFilePath")

Hadoop Files๐Ÿ”—

There are a variety of different Hadoop file formats to which you can save. These allow you to specify classes, output formats, Hadoop configurations, and compression schemes.

Caching๐Ÿ”—

Same principles apply for caching RDDs as for DataFrames and Datasets. We can either cache or persist an RDD

words.cache()

We can specify a storage level as any of the storage levels in the singleton object: org.apache.spark.storage.StorageLevel, which are combinations of memory only; disk only; and separately, off heap.

words.getStorageLevel()

Checkpointing๐Ÿ”—

One feature not available in the DataFrame API is the concept of checkpointing. Checkpointing is the act of saving an RDD to disk so that future references to this RDD point to those intermediate partitions on disk rather than recomputing the RDD from its original source.

similar to caching except that itโ€™s not stored in memory, only disk. This can be helpful when performing iterative computation, similar to the use cases for caching:

spark.sparkContext.setCheckpointDir("/some/path/for/checkpointing")
words.checkpoint()

Pipe RDDs to System Commands๐Ÿ”—

With pipe, you can return an RDD created by piping elements to forked external processes. The resulting RDD is computed by executing the given process once per partition. All elements of each input partition are written to a processโ€™s stdin as lines of input separated by a newline. The resulting partition consists of the processโ€™s stdout output, with each line of stdout resulting in one element of the output partition. A process is invoked even for empty partitions.

The print behavior can be customized by providing two functions.

words.pipe("wc -l").collect()

mapPartitions๐Ÿ”—

  • spark operates on a per-partition basis when it comes to actually executing code. Notice return signature of map function on an RDD is actually MapPartitionsRDD. map is just a row-wise alias for mapPartitions which makes it possible for you to map an individual partition.
# example creates the value โ€œ1โ€ for every partition in our data, and the sum of the following expression will count the number of partitions we have
words.mapPartitions(lambda part: [1]).sum() # 2

This allows us to perform something on an entire subdataset of yours. Ex - we could pipe this though some machine learning algorithm and train an individual model for that companyโ€™s portion of the dataset.

def indexedFunc(partitionIndex, withinPartIterator):
  return ["partition: {} => {}".format(partitionIndex,
    x) for x in withinPartIterator]
words.mapPartitionsWithIndex(indexedFunc).collect()

foreachPartition๐Ÿ”—

foreachPartition simply iterates over all the partitions of the data without any return value. Good for writing all partitions to database.

words.foreachPartition { iter =>
  import java.io._
  import scala.util.Random
  val randomFileName = new Random().nextInt()
  val pw = new PrintWriter(new File(s"/tmp/random-file-${randomFileName}.txt"))
  while (iter.hasNext) {
      pw.write(iter.next())
  }
  pw.close()
}

Glom๐Ÿ”—

glom takes every partition in your dataset and converts them to arrays. Useful if we are going to collect the data to the driver and want to have an array for each partition. However, this can cause serious stability issues because if you have large partitions or a large number of partitions, itโ€™s simple to crash the driver.

# in Python
spark.sparkContext.parallelize(["Hello", "World"], 2).glom().collect()
# [['Hello'], ['World']]