Resilient Distributed Datasets (RDDs)๐
- Almost in all scenarios you must prefer Sparkโs Structured APIs.
- Only in some cases we might need Sparkโs low-level APIs, especially RDDs, the Spark Context, and distributed shared variables like accumulators and broadcast variables.
What Are the Low-Level APIs๐
There are two sets of low-level APIs
- manipulating distributed data (RDDs)
- distributing and manipulating shared variables (broadcast variables and accumulators)
When to Use Low-Level APIs๐
You should use low-level APIs in following 3 situations
- You need a functionality that is not provided by higher-level APIs, maybe controls over data placement across cluster
- Maintain some legacy codebase written using RDDs
- Custom shared variable manipulation
We might need to drop down to these APIs to use some legacy code, implement custom data partitioner, or update and track shared variables over pipeline. These tools give more-fine grained control at the expense of safeguarding yourself from shooting in your foot.
How to Use the Low-Level APIs๐
A SparkContext
is entry point for low-level APIs and can be accessed throught the SparkSession
About RDDs๐
- All code (DataFrames, Datasets, ..) we run eventually compiles down to RDDs
- RDD represents an immutable, partitioned collection of records that can be operated on in parallel. In DataFrames each record was a row containing fields of type schema but here it could be any arbitrary Java, Python, objects.
- They come at cost of implementing everything from scratch and optimizing yourself.
- The RDD API is similar to the
Dataset
, which we saw in the previous part of the book, except that RDDs are not stored in, or manipulated with, the structured data engine.
Types of RDDs๐
- Therre are lots of subclass of RDD. Generally you are going to use mainly two :
generic
RDD type orkey-value
RDD (provides aggregation by key). - RDD is characterized by 5 main properties
- A list of partition
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally,
Partitioner
for key-value RDDs (mostly the reason to use RDDs) - Optionally, a list of preffered location on which to compute each split.
- RDDs provide all Spark Paradigms like transformations, lazy evaluations, actions. However there is no concept of Rows in RDDs, individual records are just objects.
- NOTE: Python loses substancial amount of performance when using RDDs, as effectively similar to running UDFs row by row. In python there are now raw objects instead, serialize data to Python process, operate to it in Python and deserialize data back again for JVM.
When to Use RDDs๐
- for vast variety of use cases, DataFrames will be more effecient, more stable and expressive.
- You should not manually create RDDs (unless very specific need), they trade off partitioning power for optmisations that are in Structured APIs
Datasets and RDDs of Case Classes๐
- difference between RDDs of Case Classes and Datasets? : The difference is that Datasets can still take advantage of the wealth of functions and optimizations that the Structured APIs have to offer.
Creating RDDs๐
Interoperating Between DataFrames, Datasets, and RDDs๐
# in Scala: converts a Dataset[Long] to RDD[Long]
spark.range(500).rdd
# in Python
spark.range(10).rdd
spark.range(10).rdd.toDF() # back to DF
# extracting data from Row Type
# Scala: spark.range(10).toDF().rdd.map(rowObject => rowObject.getLong(0))
spark.range(10).toDF("id").rdd.map(lambda row: row[0])
From Local Collection๐
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)
# naming it so its visible in Spark UI
words.setName("myWords")
words.name() # myWords
From Data Sources๐
- NOTE: RDDs do not have notion of Data Sources APIs like DataFrames, they primarily define their dependency structures and lists of partitions.
spark.sparkContext.textFile("/some/path/withTextFiles")
spark.sparkContext.wholeTextFiles("/some/path/withTextFiles")
Manipulating RDDs๐
You manipulate RDDs in much the same way that you manipulate DataFrames. As mentioned, the core difference being that you manipulate raw Java or Scala objects instead of Spark types. There is also a dearth of โhelperโ methods or functions that you can draw upon to simplify calculations.
Transformation๐
Distinct๐
filter๐
# This function returns a Boolean type to be used as a filter function.
def startsWithS(individual):
return individual.startswith("S")
words.filter(lambda word: startsWithS(word)).collect()
map๐
# map the current word to the word, its starting letter, and whether the word begins with โS.โ
words2 = words.map(lambda word: (word, word[0], word.startswith("S")))
# filter
words2.filter(lambda record: record[2]).take(5)
# returns tuple ("Spark", "S" ,true") , ...
flatMap๐
# Sometimes, each current row should return multiple rows
# flatMap requires that the ouput of the map function be an iterable that can be expanded
words.flatMap(lambda word: list(word)).take(5)
sort๐
Random Splits๐
Actions๐
reduce๐
# reduce an RDD of any kind of value to one value
spark.sparkContext.parallelize(range(1, 21)).reduce(lambda x, y: x + y) # 210
# get longest word in set of words
def wordLengthReducer(leftWord, rightWord):
if len(leftWord) > len(rightWord):
return leftWord
else:
return rightWord
words.reduce(wordLengthReducer)
Because the reduce
operation on the partitions is not deterministic, you can have either โdefinitiveโ or โprocessingโ (both of length 10) as the โleftโ word. This means that sometimes you can end up with one, whereas other times you end up with the other.
count๐
countApprox๐
# This is an approximation of the count method we just looked at, but it must execute within a timeout (and can return incomplete results if it exceeds the timeout).
val confidence = 0.95
val timeoutMilliseconds = 400
words.countApprox(timeoutMilliseconds, confidence)
countApproxDistinct๐
There are two implementations of this, both based on streamlibโs implementation of โHyperLogLog in Practice: Algorithmic Engineering of a State-of-the-Art Cardinality Estimation Algorithm.โ
# In the first implementation, the argument we pass into the function is the relative accuracy
words.countApproxDistinct(0.05)
# second, you specify the relative accuracy based on two parameters: one for โregularโ data and another for a sparse representation.
words.countApproxDistinct(4, 10)
countByValue๐
- However, it counts by finally loading the result set into the memory of the driver. You should use this method only if the resulting map is expected to be small because the entire thing is loaded into the driverโs memory
countByValueApprox๐
first๐
max and min๐
take๐
take
and its derivative methods take a number of values from your RDDs. This works by first scanning one partition and then using the results from that partition to estimate the number of additional partitions needed to satisfy the limit.
There are many variations on this function, such as takeOrdered
, takeSample
, and top
.
words.take(5)
words.takeOrdered(5)
words.top(5)
val withReplacement = true
val numberToTake = 6
val randomSeed = 100L
words.takeSample(withReplacement, numberToTake, randomSeed)
Saving Files๐
Saving files means writing to plain-text files. With RDDs, you cannot actually โsaveโ to a data source in the conventional sense. You must iterate over the partitions in order to save the contents of each partition to some external database.
saveAsTextFile๐
words.saveAsTextFile("file:/tmp/bookTitle")
// we can use a compression codec as well
// in Scala
import org.apache.hadoop.io.compress.BZip2Codec
words.saveAsTextFile("file:/tmp/bookTitleCompressed", classOf[BZip2Codec])
Sequence Files๐
Spark originally grew out of the Hadoop ecosystem, so it has a fairly tight integration with a variety of Hadoop tools. A sequenceFile
is a flat file consisting of binary keyโvalue pairs. It is extensively used in MapReduce as input/output formats.
Hadoop Files๐
There are a variety of different Hadoop file formats to which you can save. These allow you to specify classes, output formats, Hadoop configurations, and compression schemes.
Caching๐
Same principles apply for caching RDDs as for DataFrames and Datasets. We can either cache or persist an RDD
We can specify a storage level as any of the storage levels in the singleton object: org.apache.spark.storage.StorageLevel
, which are combinations of memory only; disk only; and separately, off heap.
Checkpointing๐
One feature not available in the DataFrame API is the concept of checkpointing. Checkpointing is the act of saving an RDD to disk so that future references to this RDD point to those intermediate partitions on disk rather than recomputing the RDD from its original source.
similar to caching except that itโs not stored in memory, only disk. This can be helpful when performing iterative computation, similar to the use cases for caching:
Pipe RDDs to System Commands๐
With pipe
, you can return an RDD created by piping elements to forked external processes. The resulting RDD is computed by executing the given process once per partition. All elements of each input partition are written to a processโs stdin as lines of input separated by a newline. The resulting partition consists of the processโs stdout output, with each line of stdout resulting in one element of the output partition. A process is invoked even for empty partitions.
The print behavior can be customized by providing two functions.
mapPartitions๐
- spark operates on a per-partition basis when it comes to actually executing code. Notice return signature of
map
function on an RDD is actuallyMapPartitionsRDD
.map
is just a row-wise alias formapPartitions
which makes it possible for you to map an individual partition.
# example creates the value โ1โ for every partition in our data, and the sum of the following expression will count the number of partitions we have
words.mapPartitions(lambda part: [1]).sum() # 2
This allows us to perform something on an entire subdataset of yours. Ex - we could pipe this though some machine learning algorithm and train an individual model for that companyโs portion of the dataset.
def indexedFunc(partitionIndex, withinPartIterator):
return ["partition: {} => {}".format(partitionIndex,
x) for x in withinPartIterator]
words.mapPartitionsWithIndex(indexedFunc).collect()
foreachPartition๐
foreachPartition
simply iterates over all the partitions of the data without any return value. Good for writing all partitions to database.
words.foreachPartition { iter =>
import java.io._
import scala.util.Random
val randomFileName = new Random().nextInt()
val pw = new PrintWriter(new File(s"/tmp/random-file-${randomFileName}.txt"))
while (iter.hasNext) {
pw.write(iter.next())
}
pw.close()
}
Glom๐
glom
takes every partition in your dataset and converts them to arrays. Useful if we are going to collect the data to the driver and want to have an array for each partition. However, this can cause serious stability issues because if you have large partitions or a large number of partitions, itโs simple to crash the driver.