How Spark Runs On A Cluster๐
The Architecture of a Spark Application๐
Review from Chapter 2
- The Spark Driver : controller of the execution of spark Application and maintains all the state of Spark Clusters. Interfaces with cluster manager in order to actually get physical resouces and launch executors.
- The Spark Executors : processes that perform the tasks assigned by Spark driver and report their state(success/failure) and results.
- The cluster Manager : Maintains cluster of machines that will run your Spark Application. Somewhat confusingly, a cluster manager will have its own
driver
andworker
abstractions which are actually tied to physical machines rather thanprocess
in Spark.
To run a Spark application, we request resources from cluster manager to run it. Depending on config, this can include a place to run the Spark driver or might be just resources for the executor for our Spark Applications. Over the course of Spark Application execution, the cluster manager will be responsible for managing the underlying machines that our application is running on.
Spark currently supports 3 cluster managers : Apache Mesos, and Hadoop YARN.
Execution Modes๐
- determine resources are physically located when we run our application.
- Circle represents daemon processes running on and managing each of individual worker nodes. Rectangles are actual processes running that are submitted.
- Cluster Mode
Most common way to run Spark Application. User submits a pre-compiled JAR, Python Script, or R script to a cluster Manager. The cluster manager then launches the driver process on a worker node inside the cluster, in addition to the executor processes.
- Client Mode : Same as cluster mode except that Spark driver remains on client machine that submitted the application. This means that the client machine is responsible for maintaining the Spark driver process, and the cluster manager maintains the executor processses. These machines are commonly referred to as gateway machines or edge nodes.
- Local Mode : It runs entire Spark Application on a single machine. It achieves parallelism through threads. Common way to learn Spark, test application but not for production application.
The Life Cycle of a Spark Application (Outside Spark)๐
Client Request๐
- Client submits a pre-compiled JAR/library making request to cluster manager driver. Explicitly asking for resources for Spark driver process only. Cluster manager places driver onto a node in cluster. Client exits and application is now on cluster.
Launch๐
- Now driver process begins running user code. This code must include SparkSession that initialises a Spark Cluster (e.g. driver +executors).
- SparkSession communicates with cluster manager asking to launch Spark executor processes across cluster. These cluster and their numbers are configured based on command-line args during
spark-submit
- Cluster manager launches executor processes on cluster and send relevent information about their location to driver process.
Execution๐
- Now we have a โSpark Cluster,โ Spark goes about its merry way executing code. Now driver and executor processes communicate among themselves executing code, moving data. The driver schedules tasks on each worker and workers respond with results and status.
Completion๐
- After Spark Application completes, driver exits with either success or failures. The cluster manager shuts down executors in that Spark cluster for the driver. Now we can ask cluster manager about success or failure of Spark Application.
The Life Cycle of a Spark Application (Inside Spark)๐
The SparkSession๐
- first step of any Spark Application. In many interactive modes this is done automatically but in an application we must do it manually.
# use newer builder method, more robustly instantiates Spark and SQL contexts ensuring
# no context conflict.
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count")\
.config("spark.some.config.option", "some-value")\
.getOrCreate()
The SparkContext๐
- A
SparkContext
object within theSparkSession
represents the connection to spark cluster.This class is how you communicate with some of Sparkโs lower-level APIs, such as RDDs. It is commonly stored as the variablesc
in older examples and documentation. Through aSparkContext
, you can create RDDs, accumulators, and broadcast variables, and you can run code on the cluster.
Logical Instruction๐
- Spark code consists of transformations and actions. How we build (SparkSQL, RDDs, ML) it its upto us. Understanding how we take declarative instructions like DataFrames and convert them into physical execution plans is an important step to understanding how Spark runs on a cluster
df1 = spark.range(2, 10000000, 2)
df2 = spark.range(2, 10000000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")
step4.collect() # 2500000000000
step4.explain()
== Physical Plan ==
*HashAggregate(keys=[], functions=[sum(id#15L)])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_sum(id#15L)])
+- *Project [id#15L]
+- *SortMergeJoin [id#15L], [id#10L], Inner
:- *Sort [id#15L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#15L, 200)
: +- *Project [(id#7L * 5) AS id#15L]
: +- Exchange RoundRobinPartitioning(5)
: +- *Range (2, 10000000, step=2, splits=8)
+- *Sort [id#10L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#10L, 200)
+- Exchange RoundRobinPartitioning(6)
+- *Range (2, 10000000, step=4, splits=8)
A Spark Job๐
- In general there should be one spark job for one action. Actions always return results. Each job breaks into series of stages, the number of which depends on shuffle operations need to take place
- Above job is:
- Stage 1 with 8 Tasks
- Stage 2 with 8 Tasks
- Stage 3 with 6 Tasks
- Stage 4 with 5 Tasks
- Stage 5 with 200 Tasks
- Stage 6 with 1 Task
Stages๐
- group of tasks that can be executed together to compute the same operations on multiple machines. Generally Spark tries to pack as much work as possible into same stage, but the engine starts new stages after operations called shuffles.
- A Shuffle represents physical repartitioning of data. For example - sorting a DataFrame, or grouping data that was loaded from a file by key.
- First two stages in above example corresponds to range
- Stage3, 4 perform on each of those DataFrames and end of stage represents the join (a shuffle)
- Suddenly, we have 200 tasks. This is because of a Spark SQL configuration. The
spark.sql.shuffle.partitions
default value is 200, which means that when there is a shuffle performed during execution, it outputs 200 shuffle partitions by default. - A good rule of thumb is that the number of partitions should be larger than the number of executors on your cluster, potentially by multiple factors depending on the workload. If you are running code on your local machine, it would behoove you to set this value lower because your local machine is unlikely to be able to execute that number of tasks in parallel.
Tasks๐
- Stages in spark consists of tasks. Each task corresponds to a combination of blocks of data and a transformation that will run on a single executor.
- A task is just a unit of computation applied to a unit of data (the partition). Partitioning your data into a greater number of partitions means that more can be executed in parallel.
Execution Details๐
Tasks and Stages have important property. First, Spark automatically pipelines stages and tasks that can be done together, such as map
followed by another map
. Second, for all shuffle operation, Spark writes the data to stable storage, and can reuse it across multiple jobs.
Pipelining๐
An important part of what makes Spark an โin-memory computation toolโ is that unlike the tools that came before it (e.g., MapReduce), Spark performs as many steps as it can at one point in time before writing data to memory or disk. One of the key optimizations that Spark performs is pipelining, which occurs at and below the RDD level.
With pipelining, any sequence of operations that feed data directly into each other, without needing to move it across nodes, is collapsed into a single stage of tasks that do all the operations together.
Shuffle Persistence๐
When Spark needs to run an operation that has to move data across nodes, such as a reduce-by-key operation (where input data for each key needs to first be brought together from many nodes), the engine canโt perform pipelining anymore, and instead it performs a cross-network shuffle.
Spark always executes shuffles by first having the โsourceโ tasks (those sending data) write shuffle files to their local disks during their execution stage. Then, the stage that does the grouping and reduction launches and runs tasks that fetch their corresponding records from each shuffle file and performs that computation (e.g., fetches and processes the data for a specific range f keys). Saving the shuffle files to disk lets Spark run this stage later in time than the source stage (e.g., if there are not enough executors to run both at the same time), and also lets the engine re-launch reduce tasks on failure without rerunning all the input tasks.
One side effect youโll see for shuffle persistence is that running a new job over data thatโs already been shuffled does not rerun the โsourceโ side of the shuffle. Because the shuffle files were already written to disk earlier, Spark knows that it can use them to run the later stages of the job, and it need not redo the earlier ones. In the Spark UI and logs, you will see the pre-shuffle stages marked as โskippedโ. This automatic optimization can save time in a workload that runs multiple jobs over the same data, but of course, for even better performance you can perform your own caching with the DataFrame or RDD cache
method, which lets you control exactly which data is saved and where.