Skip to content

How Spark Runs On A Cluster๐Ÿ”—

The Architecture of a Spark Application๐Ÿ”—

Review from Chapter 2

  • The Spark Driver : controller of the execution of spark Application and maintains all the state of Spark Clusters. Interfaces with cluster manager in order to actually get physical resouces and launch executors.
  • The Spark Executors : processes that perform the tasks assigned by Spark driver and report their state(success/failure) and results.
  • The cluster Manager : Maintains cluster of machines that will run your Spark Application. Somewhat confusingly, a cluster manager will have its own driver and worker abstractions which are actually tied to physical machines rather than process in Spark.

To run a Spark application, we request resources from cluster manager to run it. Depending on config, this can include a place to run the Spark driver or might be just resources for the executor for our Spark Applications. Over the course of Spark Application execution, the cluster manager will be responsible for managing the underlying machines that our application is running on.

Spark currently supports 3 cluster managers : Apache Mesos, and Hadoop YARN.

Execution Modes๐Ÿ”—

  • determine resources are physically located when we run our application.
  • Circle represents daemon processes running on and managing each of individual worker nodes. Rectangles are actual processes running that are submitted.
  1. Cluster Mode

Most common way to run Spark Application. User submits a pre-compiled JAR, Python Script, or R script to a cluster Manager. The cluster manager then launches the driver process on a worker node inside the cluster, in addition to the executor processes.

image

  1. Client Mode : Same as cluster mode except that Spark driver remains on client machine that submitted the application. This means that the client machine is responsible for maintaining the Spark driver process, and the cluster manager maintains the executor processses. These machines are commonly referred to as gateway machines or edge nodes.

image

  1. Local Mode : It runs entire Spark Application on a single machine. It achieves parallelism through threads. Common way to learn Spark, test application but not for production application.

The Life Cycle of a Spark Application (Outside Spark)๐Ÿ”—

Client Request๐Ÿ”—

  • Client submits a pre-compiled JAR/library making request to cluster manager driver. Explicitly asking for resources for Spark driver process only. Cluster manager places driver onto a node in cluster. Client exits and application is now on cluster.

Launch๐Ÿ”—

  • Now driver process begins running user code. This code must include SparkSession that initialises a Spark Cluster (e.g. driver +executors).
  • SparkSession communicates with cluster manager asking to launch Spark executor processes across cluster. These cluster and their numbers are configured based on command-line args during spark-submit
  • Cluster manager launches executor processes on cluster and send relevent information about their location to driver process.

Execution๐Ÿ”—

  • Now we have a โ€œSpark Cluster,โ€ Spark goes about its merry way executing code. Now driver and executor processes communicate among themselves executing code, moving data. The driver schedules tasks on each worker and workers respond with results and status.

Completion๐Ÿ”—

  • After Spark Application completes, driver exits with either success or failures. The cluster manager shuts down executors in that Spark cluster for the driver. Now we can ask cluster manager about success or failure of Spark Application.

The Life Cycle of a Spark Application (Inside Spark)๐Ÿ”—

The SparkSession๐Ÿ”—

  • first step of any Spark Application. In many interactive modes this is done automatically but in an application we must do it manually.
# use newer builder method, more robustly instantiates Spark and SQL contexts ensuring
# no context conflict.

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()

The SparkContext๐Ÿ”—

  • A SparkContext object within the SparkSession represents the connection to spark cluster.This class is how you communicate with some of Sparkโ€™s lower-level APIs, such as RDDs. It is commonly stored as the variable sc in older examples and documentation. Through a SparkContext, you can create RDDs, accumulators, and broadcast variables, and you can run code on the cluster.
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

Logical Instruction๐Ÿ”—

  • Spark code consists of transformations and actions. How we build (SparkSQL, RDDs, ML) it its upto us. Understanding how we take declarative instructions like DataFrames and convert them into physical execution plans is an important step to understanding how Spark runs on a cluster
df1 = spark.range(2, 10000000, 2)
df2 = spark.range(2, 10000000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")

step4.collect() # 2500000000000

step4.explain()

== Physical Plan ==
*HashAggregate(keys=[], functions=[sum(id#15L)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_sum(id#15L)])
      +- *Project [id#15L]
         +- *SortMergeJoin [id#15L], [id#10L], Inner
            :- *Sort [id#15L ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(id#15L, 200)
            :     +- *Project [(id#7L * 5) AS id#15L]
            :        +- Exchange RoundRobinPartitioning(5)
            :           +- *Range (2, 10000000, step=2, splits=8)
            +- *Sort [id#10L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(id#10L, 200)
                  +- Exchange RoundRobinPartitioning(6)
                     +- *Range (2, 10000000, step=4, splits=8)

A Spark Job๐Ÿ”—

  • In general there should be one spark job for one action. Actions always return results. Each job breaks into series of stages, the number of which depends on shuffle operations need to take place
  • Above job is:
    • Stage 1 with 8 Tasks
    • Stage 2 with 8 Tasks
    • Stage 3 with 6 Tasks
    • Stage 4 with 5 Tasks
    • Stage 5 with 200 Tasks
    • Stage 6 with 1 Task

Stages๐Ÿ”—

  • group of tasks that can be executed together to compute the same operations on multiple machines. Generally Spark tries to pack as much work as possible into same stage, but the engine starts new stages after operations called shuffles.
  • A Shuffle represents physical repartitioning of data. For example - sorting a DataFrame, or grouping data that was loaded from a file by key.
  • First two stages in above example corresponds to range
  • Stage3, 4 perform on each of those DataFrames and end of stage represents the join (a shuffle)
  • Suddenly, we have 200 tasks. This is because of a Spark SQL configuration. The spark.sql.shuffle.partitions default value is 200, which means that when there is a shuffle performed during execution, it outputs 200 shuffle partitions by default.
  • A good rule of thumb is that the number of partitions should be larger than the number of executors on your cluster, potentially by multiple factors depending on the workload. If you are running code on your local machine, it would behoove you to set this value lower because your local machine is unlikely to be able to execute that number of tasks in parallel.

Tasks๐Ÿ”—

  • Stages in spark consists of tasks. Each task corresponds to a combination of blocks of data and a transformation that will run on a single executor.
  • A task is just a unit of computation applied to a unit of data (the partition). Partitioning your data into a greater number of partitions means that more can be executed in parallel.

Execution Details๐Ÿ”—

Tasks and Stages have important property. First, Spark automatically pipelines stages and tasks that can be done together, such as map followed by another map. Second, for all shuffle operation, Spark writes the data to stable storage, and can reuse it across multiple jobs.

Pipelining๐Ÿ”—

An important part of what makes Spark an โ€œin-memory computation toolโ€ is that unlike the tools that came before it (e.g., MapReduce), Spark performs as many steps as it can at one point in time before writing data to memory or disk. One of the key optimizations that Spark performs is pipelining, which occurs at and below the RDD level.

With pipelining, any sequence of operations that feed data directly into each other, without needing to move it across nodes, is collapsed into a single stage of tasks that do all the operations together.

Shuffle Persistence๐Ÿ”—

When Spark needs to run an operation that has to move data across nodes, such as a reduce-by-key operation (where input data for each key needs to first be brought together from many nodes), the engine canโ€™t perform pipelining anymore, and instead it performs a cross-network shuffle.

Spark always executes shuffles by first having the โ€œsourceโ€ tasks (those sending data) write shuffle files to their local disks during their execution stage. Then, the stage that does the grouping and reduction launches and runs tasks that fetch their corresponding records from each shuffle file and performs that computation (e.g., fetches and processes the data for a specific range f keys). Saving the shuffle files to disk lets Spark run this stage later in time than the source stage (e.g., if there are not enough executors to run both at the same time), and also lets the engine re-launch reduce tasks on failure without rerunning all the input tasks.

One side effect youโ€™ll see for shuffle persistence is that running a new job over data thatโ€™s already been shuffled does not rerun the โ€œsourceโ€ side of the shuffle. Because the shuffle files were already written to disk earlier, Spark knows that it can use them to run the later stages of the job, and it need not redo the earlier ones. In the Spark UI and logs, you will see the pre-shuffle stages marked as โ€œskippedโ€. This automatic optimization can save time in a workload that runs multiple jobs over the same data, but of course, for even better performance you can perform your own caching with the DataFrame or RDD cache method, which lets you control exactly which data is saved and where.