Skip to content

Distributed Shared Variables🔗

  • second kind of low-level API in Spark exposes two distributed shared variables: brodcast varaibles and accumulators. We can use these in UDFs that have special properties when running on a cluster.

Broadcast Variables🔗

  • Broadcast Variables are way you can share an immutable value efficiently across the cluster without encapsulating that variable in function closure(e.g. in map).
  • When we use a variable in a closure, it must be deserialized on the worked node many times. Moreover, if you use the same variable in multiple Spark actions and jobs, it will be re-sent to the workers with every job instead of once.
  • Broadcast variables are shared, immutable variables that are cached on every machine in the cluster instead of serialized with every single task.
  • Standard use case is to pass around a large lookup table tha fits in memory on the executors and use that in a function
my_collection = "Spark The Definitive Guide : Big Data Processing Made Simple"\
  .split(" ")
words = spark.sparkContext.parallelize(my_collection, 2)

# to supplement list of words with extra information
supplementalData = {"Spark":1000, "Definitive":200,
                    "Big":-300, "Simple":100}

# broadcasting this structure across Spark and referencing it
suppBroadcast = spark.sparkContext.broadcast(supplementalData)
suppBroadcast.value # reference

# now we can transform our RDD using this value.
words.map(lambda word: (word, suppBroadcast.value.get(word, 0)))\
  .sortBy(lambda wordPair: wordPair[1])\
  .collect()

Accumulators🔗

  • way of updating a value inside of a variety of transformations and propagating value to driver node in an efficient and fault-tolerant way.
  • Accumulators provide a mutable variable that a Spark cluster can safely update on a per-row basis. You can use these for debugging purposes (say to track the values of a certain variable per partition in order to intelligently use it over time) or to create low-level aggregation.
  • For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will be applied only once, meaning that restarted tasks will not update the value. In transformations**, you should be aware that each task’s update can be applied more than once if tasks or job stages are reexecuted.
  • accumulator updates are not guaranteed to be executed when made within a lazy transformation like map().
# read the parquet file
flights = spark.read\
  .parquet("/data/flight-data/parquet/2010-summary.parquet")

# in accumulator to count number of flights from or to china
# this is an unnamed accumulator    # not visible in Spark UI
accChina = spark.sparkContext.accumulator(0)

# declaring a named accumulator for showing in Spark UI
# Scala
val accChina = new LongAccumulator
val accChina2 = spark.sparkContext.longAccumulator("China")
spark.sparkContext.register(accChina, "China")
# end

def accChinaFunc(flight_row):
  destination = flight_row["DEST_COUNTRY_NAME"]
  origin = flight_row["ORIGIN_COUNTRY_NAME"]
  if destination == "China":
    accChina.add(flight_row["count"])
  if origin == "China":
    accChina.add(flight_row["count"])
# iterate over every row in flights database    
flights.foreach(lambda flight_row: accChinaFunc(flight_row))

accChina.value # 953

Custom Accummulators🔗

Although Spark does provide some default accumulator types, sometimes you might want to build your own custom accumulator. In order to do this you need to subclass the AccumulatorV2 class.

// in Scala
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.util.AccumulatorV2

val arr = ArrayBuffer[BigInt]()

class EvenAccumulator extends AccumulatorV2[BigInt, BigInt] {
  private var num:BigInt = 0
  def reset(): Unit = {
    this.num = 0
  }
  def add(intValue: BigInt): Unit = {
    if (intValue % 2 == 0) {
        this.num += intValue
    }
  }
  def merge(other: AccumulatorV2[BigInt,BigInt]): Unit = {
    this.num += other.value
  }
  def value():BigInt = {
    this.num
  }
  def copy(): AccumulatorV2[BigInt,BigInt] = {
    new EvenAccumulator
  }
  def isZero():Boolean = {
    this.num == 0
  }
}
val acc = new EvenAccumulator
val newAcc = sc.register(acc, "evenAcc")
// in Scala
acc.value // 0
flights.foreach(flight_row => acc.add(flight_row.count))
acc.value // 31390