Skip to content

Developing Spark Applicationđź”—

Writing Spark Applicationsđź”—

Spark Applications are combination of two things : a Spark Cluster and your code.

Writing Python Applicationsđź”—

Spark doesn’t have a build concept, just write Python scripts to simple execute against the cluster.

To facilitate code reuse, its common to package multiple Python files into egg or ZIP files of Spark code. To include those files use --py-files argument of spark-submit to add .py, .zip or .egg files to be distributed with applications.

from __future__ import print_function
# entry point
if __name__ == '__main__':
    from pyspark.sql import SparkSession
    # always share this inside your application rather than instantiating it with every python class.
    spark = SparkSession.builder \
        .master("local") \
        .appName("Word Count") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

    print(spark.range(5000).where("id > 500").selectExpr("sum(id)").collect())

Running the application : $SPARK_HOME/bin/spark-submit --master local pyspark_template/main.py

Testing Spark Applicationsđź”—

Strategic Principlesđź”—

Input data resilienceđź”—

  • Being resilient to different kinds of input data is something that is quite fundamental to how you write your data pipelines. Data changes with business requirements, so design your application to handle some degree of change in input data or otherwise handle failures gracefully.

Business logic resilience and evolutionđź”—

  • do robust logical testing with realistic data to ensure that you are actually getting what you want.
  • Don’t try writing bunch of “Spark Unit Tests” that just test Spark’s functionality, instead focus on testing business logic.

Resilience in output and atomicityđź”—

  • Output Structure of your pipeline is what you expect. This means you will need to gracefully handle ouput schema resolution. Most of the times your Spark pipeline will be feeding other Spark pipelines.

Tactical Takeawaysđź”—

The highest value approach is to verify that your business logic is correct by employing proper unit testing and to ensure that you’re resilient to changing input data or have structured it so that schema evolution will not become unwielding in the future.

Managing SparkSessionsđź”—

  • Testing your Spark code using a unit test framework like JUnit or ScalaTest is relatively easy because of Spark’s local mode
  • perform dependency injection as much as possible when managing SparkSessions in your code.
  • initialize the SparkSession only once and pass it around to relevant functions and classes at runtime in a way that makes it easy to substitute during testing.

Which Spark API to Use ?đź”—

Right API depends on your team and its needs: some teams and projects will need the less strict SQL and DataFrame APIs for speed of development, while others will want to use type-safe Datasets or RDDs.

Connecting to Unit Testing Frameworksđź”—

To unit test your code, we recommend using the standard frameworks in your langage (e.g., JUnit or ScalaTest), and setting up your test harnesses to create and clean up a SparkSession for each test.

Connecting to Data Sourcesđź”—

You should make sure your testing code does not connect to production data sources, so that developers can easily run it in isolation if these data sources change.

One easy way to make this happen is to have all your business logic functions take DataFrames or Datasets as input instead of directly connecting to various sources; after all, subsequent code will work the same way no matter what the data source was.

The Development Processđź”—

First, maintain a scratch space, such as an interactive notebook or some equivalent thereof and then as you build key components and algorithms, move them to a more permanent location like a library or package.

When running on your local machine, the spark-shell and its various language-specific implementations are probably the best way to develop applications. For the most part, the shell is for interactive applications, whereas spark-submit is for production applications on your Spark cluster.

Launching Applicationsđź”—

  • Most Common way to submit application is through spark-submit
./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar-or-script> \
  [application-arguments]
  • Always favour cluster mode to reduce latency between the executors and the driver.
  • To enumerate all options yourself, run spark-submit with --help.

Application Launch Examplesđź”—

./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --executor-memory 20G \
  --total-executor-cores 100 \
  replace/with/path/to/examples.jar \
  1000

# or

./bin/spark-submit \
  --master spark://207.184.161.138:7077 \
  examples/src/main/python/pi.py \
  1000

Configuring Applicationsđź”—

Majority of configuration fall into following categories

  • Application properties
  • Runtime environment
  • Shuffle behavior
  • Spark UI
  • Compression and serialization
  • Memory management
  • Execution behavior
  • Networking
  • Scheduling
  • Dynamic allocation
  • Security
  • Encryption
  • Spark SQL
  • Spark streaming
  • SparkR

Spark provides three location to configure system :

  • Spark properties control most application parameters and can be set by using a SparkConf object
  • Java system properties
  • Hardcoded configuration files

You can find several templates under /conf dir of Spark Home. You can configure logging through log4j.properties

SparkConfđź”—

The SparkConf manages all of our application configurations. You create one via the import statement, as shown in the example that follows.

from pyspark import SparkConf
conf = SparkConf().setMaster("local[2]").setAppName("DefinitiveGuide")\
  .set("some.conf", "to.some.value")
./bin/spark-submit --name "DefinitiveGuide" --master local[4] ...

Application Propertiesđź”—

  • properties set either from spark-submit or while creating application.
Property name Default Meaning
spark.app.name (none) The name of your application. This will appear in the UI and in log data.
spark.driver.cores 1 Number of cores to use for the driver process, only in cluster mode.
spark.driver.maxResultSize 1g Limit of total size of serialized results of all partitions for each Spark action (e.g., collect). Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size exceeds this limit. Having a high limit can cause OutOfMemoryErrors in the driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from OutOfMemoryErrors.
spark.driver.memory 1g Amount of memory to use for the driver process, where SparkContext is initialized. (e.g. 1g, 2g). Note: in client mode, this must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, set this through the --driver-memory command-line option or in your default properties file.
spark.executor.memory 1g Amount of memory to use per executor process (e.g., 2g, 8g).
spark.extraListeners (none) A comma-separated list of classes that implement SparkListener; when initializing SparkContext, instances of these classes will be created and registered with Spark’s listener bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor will be called; otherwise, a zero-argument constructor will be called. If no valid constructor can be found, the SparkContext creation will fail with an exception.
spark.logConf FALSE Logs the effective SparkConf as INFO when a SparkContext is started.
spark.master (none) The cluster manager to connect to. See the list of allowed master URLs.
spark.submit.deployMode (none) The deploy mode of the Spark driver program, either “client” or “cluster,” which means to launch driver program locally (“client”) or remotely (“cluster”) on one of the nodes inside the cluster.
spark.log.callerContext (none) Application information that will be written into Yarn RM log/HDFS audit log when running on Yarn/HDFS. Its length depends on the Hadoop configuration hadoop.caller.context.max.size. It should be concise, and typically can have up to 50 characters.
spark.driver.supervise FALSE If true, restarts the driver automatically if it fails with a non-zero exit status. Only has effect in Spark standalone mode or Mesos cluster deploy mode.

More description available at : https://spark.apache.org/docs/latest/configuration.html

Runtime Properties : configure the runtime environment of your application

Execution Properties : finer-grained control on actual execution.

Configuring Memory Management : There are times when you might need to manually manage the memory options to try and optimize your applications.

Configuring Shuffle Behaviour : We’ve emphasized how shuffles can be a bottleneck in Spark jobs because of their high communication overhead. Therefore there are a number of low-level configurations for controlling shuffle behavior.

Environment Variablesđź”—

We can configure Spark settings through environment variables, which are read from the conf/spark-env.sh

Following variables can be set :

  • JAVA_HOME : Location where Java is installed (if it’s not on your default PATH).
  • PYSPARK_PYTHON : Python binary executable to use for PySpark in both driver and workers (default is python2.7 if available; otherwise, python). Property spark.pyspark.python takes precedence if it is set.
  • PYSPARK_DRIVER_PYTHON : Python binary executable to use for PySpark in driver only (default is PYSPARK_PYTHON). Property spark.pyspark.driver.python takes precedence if it is set.
  • SPARKR_DRIVER_R : R binary executable to use for SparkR shell (default is R). Property spark.r.shell.command takes precedence if it is set.
  • SPARK_LOCAL_IP : IP address of the machine to which to bind.
  • SPARK_PUBLIC_DNS : Hostname your Spark program will advertise to other machines.

Job Scheduling within an Applicationđź”—

  • Witing a given Spark Application, multiple parllel job can be run simulatenously as if they were submitted from separate thread. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that server multiple requests.
  • By default scheduler runs in FIFO fashion. If the jobs at the head of the queue don’t need to use the entire cluster, later jobs can begin to run right away, but if the jobs at the head of the queue are large, later jobs might be delayed significantly.
  • It is also possible to configure fair sharing between jobs. Under fair sharing, Spark assigns tasks between jobs in a round-robin fashion so that all jobs get a roughly equal share of cluster resources. This means that short jobs submitted while a long job is running can begin receiving resources right away and still achieve good response times without waiting for the long job to finish. This mode is best for multiuser settings.
  • To enable the fair scheduler, set the spark.scheduler.mode property to FAIR when configuring a SparkContext.
  • The fair scheduler also supports grouping jobs into pools, and setting different scheduling options, or weights, for each pool. This can be useful to create a high-priority pool for more important jobs or to group the jobs of each user together and give users equal shares regardless of how many concurrent jobs they have instead of giving jobs equal shares. This approach is modeled after the Hadoop Fair Scheduler.
sc.setLocalProperty("spark.scheduler.pool", "pool1")