Skip to content

Performance Tuning๐Ÿ”—

Indirect Performance Enhancements๐Ÿ”—

Design Choices๐Ÿ”—

  • Although its an obvious way to optimize performance, often we donโ€™t prioritize this step. It helps in writing better Spark applications which run stable and consistent manner.

Scala vs Java vs Python vs R๐Ÿ”—

  • Its nearly impossible to choose one over other, but depends on use cases.
  • If we want to perform some single-node machine learning after performing a large ETL job, run ETL as SparkR and then use Rโ€™s massive machine learning ecosystem to run single-node machine learning algorithms.
  • Sparkโ€™s Structured API are consistent across language in terms of stability and speed.
  • When you start delving in RDDs or UDFs, then R or Python are not best choice simple because how its executed, additionally its hard to provide type guarantees

DataFrames vs SQL vs Datasets vs RDDs๐Ÿ”—

  • Across all languages, DataFrames, Datasets and SQL are equivalent, however while using UDFs R or Python will take a hit on performance due to serialisation.
  • Although everything boils down to RDDs Sparkโ€™s optimization engine will write โ€œbetterโ€ RDD code than you can manually.

Object Serialization in RDDs๐Ÿ”—

  • When working with custom data type you should use Kyro for serialisation because its more compact and more efficient than Javaโ€™s default serialisations. However you will need to register classes that you will be using in your application.
  • You can use Kryo serialization by setting spark.serializer to org.apache.spark.serializer.KryoSerializer. You will also need to explicitly register the classes that you would like to register with the Kryo serializer via the spark.kryo.classesToRegister configuration.

Cluster Configurations๐Ÿ”—

Cluster/Application Sizing and Sharing๐Ÿ”—

  • This boils down to a resource sharing and scheduling problem, however there are lot of options for how we want to share resources at cluster level or application level

Dynamic Allocation๐Ÿ”—

  • Application can give resources back to the cluster if they are no longer used, and request them again later when there is demand
  • This feature is disabled by default and available on all coarse-grained cluster managers, that is standalone mode, YARN mode, or Mesos coarse-grained mode.
  • set spark.dynamicAllocation.enabled to true for enabling this feature

Scheduling๐Ÿ”—

  • We can take advantage of running Spark jobs in parallel with scheduler pools or help Spark applications run in parallel with something like dynamic allocation or setting max-executor-cores
  • Scheduling optimizations do involve some research and experimentation, and unfortunately there are not super-quick fixes beyond setting spark.scheduler.mode to FAIR to allow better sharing of resources across multiple users, or setting --max-executor-cores, which specifies the maximum number of executor cores that your application will need. Specifying this value can ensure that your application does not take up all the resources on the cluster.

Data at Rest๐Ÿ”—

  • More often Data we write, in a large organisation is accessed by multiple parties and we should make sure that our data is efficient while reading.
  • This involves settling for a storage system, data format and taking advantage or data partitioning in some storage formats.

File-based long-term data storage๐Ÿ”—

  • There are many file formats available, one simple best practice is to use most efficient storage format possible
  • Generally prefer structured, binary types to store data, especially when you will be accessing it frequently. CSVs are slow to parse, and can cause problem when reading multiple files. You should use Apache Parquet
  • Parquet stores data in binary files with column-oriented storage and also tracks statistics about each file that make it possible to quickly skip data not needed for query.

Splittable file types and compression๐Ÿ”—

  • Make sure your file types is splittable, allowing different tasks can read different parts of file in parallel. File types like malformed JSON types require read on single machine reducing parallelism
  • Main place splittability comes in is compression formats. A ZIP or Tar canโ€™t be split. If files compressed using gzip, bzip2 or lz4 are generally splittable if the are written by Hadoop or Spark.
  • Make sure multiple files should not be very small, neither too large. Prefer several files on few hundred megabytes.

Table Partitioning๐Ÿ”—

  • Table partitioning refers to storing files in separate directories based on a key, such as the date field in the data. Storage managers like Apache Hive support this concept, as do many of Sparkโ€™s built-in data sources.
  • Partitioning allows Spark to skip many irrelevant files when it only requires data with a specific range of keys.
  • NOTE tho, donโ€™t prefer partition if it increases granularity as it can split data in multiple files, which is not ideal.

Bucketing๐Ÿ”—

  • Bucketing allows Spark to pre-partition data according to how joins or aggregation are likely to be performed by readers.
  • This improves performance and stability because data can be consistently distributed across partitions as opposed to skew distribution.
  • If join is done on a column immediately after read, we can se bucketing to ensure data is well partitioned according to those values, saving shuffle before join.

The number of files๐Ÿ”—

  • Having lots of small files is going to make the scheduler work much harder to locate the data and launch all of the read tasks increasing the network and scheduling overhead of the job.
  • Having fewer large files eases the pain off the scheduler but it will also make tasks run longer. In this case, though, you can always launch more tasks than there are input files if you want more parallelismโ€”Spark will split each file across multiple tasks assuming you are using a splittable format.
  • To control how many records go into each file, you can specify the maxRecordsPerFile option to the write operation.

Data Locality๐Ÿ”—

  • Data locality basically specifies a preference for certain nodes that hold certain data, rather than having to exchange these blocks of data over the network.
  • If you run your storage system on the same nodes as Spark, and the system supports locality hints, Spark will try to schedule tasks close to each input block of data.

Statistics Collection๐Ÿ”—

  • Spark has a cost based query optimizer that plans queries based on the properties of the input data when using the Structured APIs. This will require storing statistics about your tables.
  • There are two types of statistics: table-level and Column level
  • Statistics collection is available only on named tables, not on arbitrary DataFrames or RDDs.
# table-level statistics
ANALYZE TABLE table_name COMPUTE STATISTICS

# column-level statistics
ANALYZE TABLE table_name COMPUTE STATISTICS FOR
COLUMNS column_name1, column_name2, ...

Shuffle Configurations๐Ÿ”—

  • Sparkโ€™s External Shuffle service allows other nodes to read shuffle data from remote machines even when executor on those machines are busy (garbage collection). This comes at the cost of complexity and maintenance.
  • Optimize number of concurrent connection per executor (usually good defaults)
  • for RDD job, serialisation has large impact on shuffle, always use Kyro
  • Optimize number of partition to aim few tens of megabytes of data per output partition in your shuffle

Memory Pressure and Garbage Collections๐Ÿ”—

  • Garbage collection hits performance when there are large number of objects in memory.
  • One strategy to avoid above situation is to use Structured APIs as they reduce memory pressure because JVM objects are never realized and Spark SQL simply performs computation on internal formats

Measuring Impact of Garbage Collection๐Ÿ”—

  • Find out how frequently garbage collection happens and amount it takes.
  • Change Sparkโ€™s JVM Options using spark.executor.extraJavaOptions to -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

Garbage Collection tuning๐Ÿ”—

  • Understand how memory is organized in JVM
    • Java Heap Space is divided into : Young and Old. The Young generation is meant to hold short-lived objects whereas the Old generation is intended for objects with longer lifetimes.
    • The Young generation is further divided into three regions: Eden, Survivor1, and Survivor2.
  • Simplified description of garbage collection procedure
    • When Eden is full, a minor garbage collection is run on Eden and objects that are alive from Eden and Survivor1 are copied to Survivor2.
    • The Survivor regions are swapped.
    • If an object is old enough or if Survivor2 is full, that object is moved to Old.
    • Finally, when Old is close to full, a full garbage collection is invoked. This involves tracing through all the objects on the heap, deleting the unreferenced ones, and moving the others to fill up unused space, so it is generally the slowest garbage collection operation.
  • Gather garbage collection statistics to determine whether it is being run too often. If a full garbage collection is invoked multiple times before a task completes, it means that there isnโ€™t enough memory available for executing tasks, so you should decrease the amount of memory Spark uses for caching (spark.memory.fraction).
  • If there are too many minor collections but not many major garbage collections, allocating more memory for Eden would help. You can set the size of the Eden to be an over-estimate of how much memory each task will need. If the size of Eden is determined to be E, you can set the size of the Young generation using the option -Xmn=4/3*E. (The scaling up by 4/3 is to account for space used by survivor regions, as well.)
  • Try the G1GC garbage collector with -XX:+UseG1GC. It can improve performance in some situations in which garbage collection is a bottleneck and you donโ€™t have a way to reduce it further by sizing the generations. Note that with large executor heap sizes, it can be important to increase the G1 region size with -XX:G1HeapRegionSize.

Direct Performance Enchancements๐Ÿ”—

Parallelism๐Ÿ”—

  • First thing to speedup a stage is to increase degree of parallelism. At least 2-3 tasks per CPU core in your cluster if the stage processes a large amount of data.
  • set this via the spark.default.parallelism property as well as tuning the spark.sql.shuffle.partitions according to the number of cores in your cluster.

Improved Filtering๐Ÿ”—

  • Move filters to earliest part of Spark job. Sometimes filters are pushed into data sources themselves and this means we can avoid reading/writing data is irrelevant to end result.
  • Enabling Partitioning and Bucketing helps achieve this

Repartitioning and Coalescing๐Ÿ”—

  • Repartition calls can incur a shuffle. However it optimize overall execution by balacing data across the cluster.
  • Generally shuffle least amount of data possible, If reducing number of overall partitions in a DataFrame or RDD, first try coalesce method, which will not perform a shuffle but rather merge partitions on the same node into one partition.
  • The slower repartition method will also shuffle data across the network to achieve even load balancing. Repartitions can be particularly helpful when performing joins or prior to a cache call.

Custom Partitioning๐Ÿ”—

If your jobs are still slow or unstable, you might want to explore performing custom partitioning at the RDD level. This allows you to define a custom partition function that will organize the data across the cluster to a finer level of precision than is available at the DataFrame level.

User-Defined Functions๐Ÿ”—

  • generally avoiding UDFs is a good optimization :)
  • UDFs are expensive because they force representing data as objects in JVM and sometimes do this multiple times per record in query. Use structured APIs as much possible.

Temporary Data Storage (Caching)๐Ÿ”—

  • NOTE: Although caching sounds cool it incurs serialization, deserialization, and storage cost. So use it with caution and not too much
  • In applications that reuse the same datasets over and over, most useful optimizations is caching. Caching will place a DataFrame, table, or RDD into temporary storage (either memory or disk) across the executors in your cluster, and make subsequent reads faster.
  • Caching is a lazy operation, meaning that things will be cached only as they are accessed. The RDD API and the Structured API differ in how they actually perform caching
  • Caching an RDD involves actual data (bits/bytes) When this is accesed again Spark returns proper data. This is done using RDD reference.
  • Structured API caching is done based on physical plan. This means that we effectively store the physical plan as our key (as opposed to the object reference) and perform a lookup prior to the execution of a Structured job
  • there are different storage levels to cahce data : MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER (Java/Scala), MEMORY_AND_DISK_SER(Java/Scala), DISK_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK_2 (2 means replication), OFF_HEAP(Experimental)

Joins๐Ÿ”—

  • Joins are common operation and biggest optimization opportunity is simply understanding joins :)
  • Equi-joins are easiest for Spark to optimize, therefore should be preffered.
  • Simple things like trying to use filtering ability of inner joins by changing join ordering can yield large speedups.
  • Broadcast join hints can help Spark make intelligent planning decisions when it comes to creating query plans.
  • Avoid cartesian joins or even full outer joins as they often low-hanging fruit for optimization and stability because they can optimized into different filtering style joins when we look at entire data flow.
  • Collecting statistics before join always help Spark
  • Bucketing helps Spark avoid large shuffle when joins are performed

Aggregations๐Ÿ”—

  • there are not too many ways that you can optimize specific aggregations beyond filtering data before the aggregation having a sufficiently high number of partitions.
  • If youโ€™re using RDDs, controlling exactly how these aggregations are performed (e.g., using reduceByKey when possible over groupByKey) can be very helpful and improve the speed and stability of your code.

Broadcast Variables๐Ÿ”—

  • The basic premise is that if some large piece of data will be used across multiple UDF calls in your program, you can broadcast it to save just a single read-only copy on each node and avoid re-sending this data with each job.
  • Broadcast variables may be useful to save a lookup table or a machine learning model. You can also broadcast arbitrary objects by creating broadcast variables using your SparkContext, and then simply refer to those variables in your tasks