Skip to content

Advanced RDDsπŸ”—

  • Here we focus on advanced RDD operation and focuses on key-value RDDs, a powerful data manipulation abstraction.
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\
  .split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)

Key-Value Basics (Key-Value RDDs)πŸ”—

  • Many <someoperation>ByKey methods rely on data to be in key-value format.
# easiest to create key-value pairs
words.map(lambda word: (word.lower(), 1)))

keyByπŸ”—

# creating key-value pairs using keyBy
keyword = words.keyBy(lambda word: word.lower()[0])

Mapping over ValuesπŸ”—

  • After creating key-value pairs, we can manipulate them, If we have tuple, Spark will assume that the first element is key, the second is the value. When in this format, you can explicitly choose to map-over the values and ignore keys.
keyword.mapValues(lambda word: word.upper()).collect()

We can flatMap over row’s values as well.

keyword.flatMapValues(lambda word: word.upper()).collect()

Extracting Keys and ValuesπŸ”—

keyword.keys().collect()
keyword.values().collect()

lookupπŸ”—

keyword.lookup("s") # ['Spark', 'Simple']

sampleByKeyπŸ”—

there are two ways to sample an RDD by a set of keys. We can do it via an approximation or exactly. Both operations can do so with or without replacement as well as sampling by a fraction by a given key. This is done via simple random sampling with one pass over the RDD, which produces a sample of size that’s approximately equal to the sum of math.ceil(numItems * samplingRate) over all key values

import random
distinctChars = words.flatMap(lambda word: list(word.lower())).distinct()\
  .collect()
sampleMap = dict(map(lambda c: (c, random.random()), distinctChars))
words.map(lambda word: (word.lower()[0], word))\
  .sampleByKey(True, sampleMap, 6).collect()

AggregationπŸ”—

  • we can do aggregation on plain RDDs or on PairRDDs, depending on method we are using
chars = words.flatMap(lambda word: word.lower())    # exploded words
KVcharacters = chars.map(lambda letter: (letter, 1))
def maxFunc(left, right):
  return max(left, right)
def addFunc(left, right):
  return left + right
nums = sc.parallelize(range(1,31), 5)

After above we can use countByKey which counts the items per each key

KVcharacters.countByKey()

Understanding Aggregation ImplementationsπŸ”—

There are several ways to create your key–value PairRDDs; however, the implementation is actually quite important for job stability.

groupByKeyπŸ”—

KVcharacters.groupByKey().map(lambda row: (row[0], reduce(addFunc, row[1])))\
  .collect()
# note this is Python 2, reduce must be imported from functools in Python 3

You might think groupByKey with a map over each grouping is best way to sum up the counts for each key. However mostly this is wrong approach, fundamental issues here is that executor must hold all values for a given key in memory before applying the function to them. We can have OutOfMemoryErrors

There is preferred approch mentioned below.

reduceByKeyπŸ”—

performing a simple count, a much more stable approach is to perform the same flatMap and then just perform a map to map each letter instance to the number one, and then perform a reduceByKey with a summation function in order to collect back the array.

This implementation is much more stable because the reduce happens within each partition and doesn’t need to put everything in memory. Additionally, there is no incurred shuffle during this operation; everything happens at each worker individually before performing a final reduce.

KVcharacters.reduceByKey(addFunc).collect


# Array((d,4), (p,3), (t,3), (b,1), (h,1), (n,2),
...
# (a,4), (i,7), (k,1), (u,1), (o,1), (g,3), (m,2), (c,1))

Other Aggregation MethodsπŸ”—

aggregateπŸ”—

  • This function requires a null and start value and then requires you to specify two different functions. First for aggregates within partitions, and second aggregates across partitions. Start value is used at both levels
nums.aggregate(0, maxFunc, addFunc)

NOTE: It might have performance implications as it performs final aggregations on the driver.

# use tree based implemetations for avoiding OutOfMemoryError
depth = 3
nums.treeAggregate(0, maxFunc, addFunc, depth)

aggregateByKeyπŸ”—

# instead of partition we aggregate by key
KVcharacters.aggregateByKey(0, addFunc, maxFunc).collect()

combineByKeyπŸ”—

# Combiner operates on a given key and merges the values according to some function. It 
# then goes to merge the different outputs of the combiners to give us our result.
def valToCombiner(value):
  return [value]
def mergeValuesFunc(vals, valToAppend):
  vals.append(valToAppend)
  return vals
def mergeCombinerFunc(vals1, vals2):
  return vals1 + vals2
outputPartitions = 6
KVcharacters\
  .combineByKey(
    valToCombiner,
    mergeValuesFunc,
    mergeCombinerFunc,
    outputPartitions)\
  .collect()

foldByKeyπŸ”—

foldByKey merges the values for each key using an associative function and a neutral β€œzero value,” which can be added to the result an arbitrary number of times, and must not change the result (e.g., 0 for addition, or 1 for multiplication)

KVcharacters.foldByKey(0, addFunc).collect()

CoGroupsπŸ”—

CoGroups give ability to gorup together up to three key-value RDDs together in Scala and two in Python. This joins the given values by key.

This is effectively just a group-based join on an RDD. When doing this, you can also specify a number of output partitions or a custom partitioning function to control exactly how this data is distributed across the cluster

import random
distinctChars = words.flatMap(lambda word: word.lower()).distinct()
charRDD = distinctChars.map(lambda c: (c, random.random()))
charRDD2 = distinctChars.map(lambda c: (c, random.random()))
charRDD.cogroup(charRDD2).take(5)

JoinsπŸ”—

Inner JoinπŸ”—

# in Python
keyedChars = distinctChars.map(lambda c: (c, random.random()))
outputPartitions = 10
KVcharacters.join(keyedChars).count()
KVcharacters.join(keyedChars, outputPartitions).count()
  • Other type of Joins
    • fullOuterJoin
    • leftOuterJoin
    • rightOuterJoin
    • cartesian

zipsπŸ”—

The final type of join isn’t really a join at all, but it does combine two RDDs, so it’s worth labeling it as a join. zip allows you to β€œzip” together two RDDs, assuming that they have the same length. This creates a PairRDD. The two RDDs must have the same number of partitions as well as the same number of elements

numRange = sc.parallelize(range(10), 2)
words.zip(numRange).collect()

# Output
[('Spark', 0),
 ('The', 1),
 ('Definitive', 2),
 ('Guide', 3),
 (':', 4),
 ('Big', 5),
 ('Data', 6),
 ('Processing', 7),
 ('Made', 8),
 ('Simple', 9)]

Controlling PartitionsπŸ”—

With RDDs, you have control over how data is exactly physically distributed across the cluster. Some of these methods are basically the same from what we have in the Structured APIs but the key addition (that does not exist in the Structured APIs) is the ability to specify a partitioning function (formally a custom Partitioner, which we discuss later when we look at basic methods).

coalesceπŸ”—

coalesce effectively collapses partitions on the same worker in order to avoid a shuffle of the data when repartitioning.

words.coalesce(1).getNumPartitions() # 1

repartitionπŸ”—

The repartition operation allows you to repartition your data up or down but performs a shuffle across nodes in the process. Increasing the number of partitions can increase the level of parallelism when operating in map- and filter-type operations

words.repartition(10) // gives us 10 partitions

repartitionAndSortWithinPartitionsπŸ”—

This operation gives you the ability to repartition as well as specify the ordering of each one of those output partitions.

Custom PartitioningπŸ”—

Custom partitioners are not available in the Structured APIs because they don’t really have a logical counterpart. They’re a low-level, implementation detail that can have a significant effect on whether your jobs run successfully.

The canonical example to motivate custom partition for this operation is PageRank whereby we seek to control the layout of the data on the cluster and avoid shuffles. In our shopping dataset, this might mean partitioning by each customer ID.

In short, the sole goal of custom partitioning is to even out the distribution of your data across the cluster so that you can work around problems like data skew.

df = spark.read.option("header", "true").option("inferSchema", "true")\
  .csv("/data/retail-data/all/")
rdd = df.coalesce(10).rdd

df.printSchema()

Spark has two built-in Partitioners that you can leverage off in the RDD API, a HashPartitioner for discrete values and a RangePartitioner. These two work for discrete values and continuous values, respectively. Spark’s Structured APIs will already use these, although we can use the same thing in RDDs:

def partitionFunc(key):
  import random
  if key == 17850 or key == 12583:
    return 0
  else:
    return random.randint(1,2)

keyedRDD = rdd.keyBy(lambda row: row[6])
keyedRDD\
  .partitionBy(3, partitionFunc)\
  .map(lambda x: x[0])\
  .glom()\
  .map(lambda x: len(set(x)))\
  .take(5)

See the count of results in each partition. The second two numbers will vary, because we’re distributing them randomly (as you will see when we do the same in Python) but the same principles apply

Custom SerializationπŸ”—

  • Issue of Kryo serialisation. Any object that you hope to parallelize must be serializable.
  • The default serialization can be quite slow. Spark can use the Kryo library (version 2) to serialize objects more quickly. Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all serializable types and requires you to register the classes you’ll use in the program in advance for best performance.
  • You can use Kryo by initializing your job with a SparkConf and setting the value of "spark.serializer" to "org.apache.spark.serializer.KryoSerializer" (we discuss this in the next part of the book). This setting configures the serializer used for shuffling data between worker nodes and serializing RDDs to disk. The only reason Kryo is not the default is because of the custom registration requirement, but we recommend trying it in any network-intensive application.
// To register your own custom classes with Kryo, use the registerKryoClasses method in scala
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)