Skip to content

Monitoring and Debuggingđź”—

NOTE: This chapter is better explored using a Spark UI

Monitoring Landscapeđź”—

Components we can monitor :

  • Spark Application and Jobs: Debug using Spark UI and Spark Logs.
  • JVM : Executors are run in individual JVM. Use tools like jstack, jmap, jstat and jvisualvm etc to profile Spark Jobs.
  • OS/Machine : We can monitor machines for CPU, Network and I/O using tools like dstat, iostat, iotop
  • Cluster: Monitor using Cluster Manager like YARN, Mesos or standalone cluster. Popular tools include Ganglia and Prometheus.

What to Monitor ?đź”—

We usually monitor two things : processes running your application (CPU usage, memory usage, etc.), and query execution inside it (e.g. jobs and tasks).

Driver and Executor Processesđź”—

  • We should keep an eye on driver because state of application resides here. If you could only monitor one machine : always monitor driver but understanding state of executors is also important for individual Spark jobs.
  • Spark has configurable metrics system based on Dropwizard Metrics Library. Its configured via a configuration file that Spark expects at $SPARK_HOME/conf/metrics.properties or defined via spark.metrics.conf

Queries, Jobs, Stages and Tasksđź”—

  • Spark provides ability to dive into queries, job, stages, and tasks. This allows to know exactly what’s eating up all the resources in case users run catersian joins :)

Spark Logsđź”—

  • Usually with Scala and Java, Spark automatically provides logs out of box, but with python it requires py4j to integrate to Spark’s Java-based logging library. Using logging module or simple print statements will still print results to standard errors, however its hard to find.
# set spark log level
spark.sparkContext.setLogLevel("INFO")

Spark UIđź”—

  • visual way to monitor application while they are running as well as metrics about Spark Workloads, at the Spark and JVM level.
  • Every SparkContext running launches a web UI, by default on port 4040, that displays useful information about the application. When you run Spark in local mode, for example, just navigate to http://localhost:4040 to see the UI when running a Spark Application on your local machine.

Open a new spark-shell and run this code and trace it in UI

spark.read\
  .option("header", "true")\
  .csv("/data/retail-data/all/online-retail-dataset.csv")\
  .repartition(2)\
  .selectExpr("instr(Description, 'GLASS') >= 1 as is_glass")\
  .groupBy("is_glass")\
  .count()\
  .collect()

Notice the aggregate statistics about this query:

Submitted Time: 2017/04/08 16:24:41
Duration: 2 s
Succeeded Jobs: 2

Open the Jobs Menu

The first stage has eight tasks. CSV files are splittable, and Spark broke up the work to be distributed relatively evenly between the different cores on the machine. This happens at the cluster level and points to an important optimization: how you store your files. The following stage has two tasks because we explicitly called a repartition to move the data into two partitions. The last stage has 200 tasks because the default shuffle partitions value is 200.

Spark REST APIđź”—

you can also access Spark’s status and metrics via a REST API. This is is available at http://localhost:4040/api/v1 and is a way of building visualizations and monitoring tools on top of Spark itself.

Spark UI History Serverđź”—

Normally, the Spark UI is only available while a SparkContext is running, so how can you get to it after your application crashes or ends? To do this, Spark includes a tool called the Spark History Server that allows you to reconstruct the Spark UI and REST API, provided that the application was configured to save an event log.

To use the history server, you first need to configure your application to store event logs to a certain location. You can do this by by enabling spark.eventLog.enabled and the event log location with the configuration spark.eventLog.dir. Then, once you have stored the events, you can run the history server as a standalone application, and it will automatically reconstruct the web UI based on these logs. Some cluster managers and cloud services also configure logging automatically and run a history server by default.

Debugging and Spark First Aidđź”—

Common Problem and their solutions

Spark Jobs Not Startingđź”—

This issue can arise frequently, especially when you are just getting started with a fresh deployment or environment.

Signs and Symptomsđź”—

  • Spark jobs don’t start
  • Spark UI doesn’t show any nodes on the cluster except the driver
  • Spark UI seems to be reporting incorrect information.

Potential treatmentsđź”—

  • Occurs due to application’s resource demands are not configured properly and Spark makes assumptions about networks, file system and other resources. Most likely configured something incorrectly, and now the node that runs the driver can’t talk to executor, maybe your forgot to specify what IP and port is open or didn’t open correct one.
  • Ensure machines can communicate properly with one another on ports that you expect
  • Ensure Spark resource configurations are correct and that cluster manager is setup correctly for Spark. Try running a simple application first to see if that works. One common issue maybe that you requested more memory per executor than the cluster manager has free to allocate.

Errors Before Executionđź”—

  • While developing a new application and have previously run code on this cluster, but now some new code won’t work.

Signs and Symptomsđź”—

  • Commands don’t run at all and output error messages
  • Check Spark UI and no jobs, stages or tasks seem to run.

Potential Treatmentsđź”—

  • Take a look at error message to make sure nothing is wrong with your code such as Incorrect file path or column name
  • Double check to verify that cluster has the network connectivity between driver, workers, and the storage systems.
  • There might be issues with libraries or class paths that are causing the wrong version of a library to be loaded for accesing storage.

Errors During Executionđź”—

Signs and Symptomsđź”—

  • One spark job runs successfully on cluster but next one fails
  • A step in multistep query fails
  • Difficult to parse error message

Treatmentsđź”—

  • Check if data is in correct format as expected. Sometimes upstream data may change causing unintended consequences in application
  • If an error pop quickly even before tasks are launched, then its most likely an analysis exception while planning the query. Either misspelled columns are referenced or column, view, or table doesn’t exits
  • Read stack trace for more clues on the problem.
  • Its also possible your own code is failing and Spark will just ouput the excepted error from the program and task will be marked as failed on Spark UI.

Slow Tasks or Stragglersđź”—

Signs and Symtomsđź”—

  • Spark stages seem to execute until there are only a handful of tasks left. Those tasks then take a long time.
  • These slow tasks show up in the Spark UI and occur consistently on the same dataset(s).
  • These occur in stages, one after the other.
  • Scaling up the number of machines given to the Spark Application doesn’t really help—some tasks still take much longer than others.
  • In the Spark metrics, certain executors are reading and writing much more data than others.

potential treatmentsđź”—

Most often the source of this issue is that your data is partitioned unevenly into DataFrame or RDD partitions. When this happens, some executors might need to work on much larger amounts of work than others.

  • Try increasing the number of partitions to have less data per partition
  • Try repartitioning by another combination of columns.
  • Try increasing the memory allocated to your executors if possible.
  • Monitor the executor that is having trouble and see if it is the same machine across jobs; you might also have an unhealthy executor or machine in your cluster—for example, one whose disk is nearly full.
  • Check whether your user-defined functions (UDFs) are wasteful in their object allocation or business logic. Try to convert them to DataFrame code if possible.
  • Ensure that your UDFs or User-Defined Aggregate Functions (UDAFs) are running on a small enough batch of data. Oftentimes an aggregation can pull a lot of data into memory for a common key, leading to that executor having to do a lot more work than other
  • Turning on speculation (Discussed below)
  • Another common issue can arise when you’re working with Datasets. Because Datasets perform a lot of object instantiation to convert records to Java objects for UDFs, they can cause a lot of garbage collection.

Slow Aggregationsđź”—

Signs and symptomsđź”—

  • Slow tasks during a groupBy call.
  • Jobs after the aggregation are slow, as well.

Potential Treatmentsđź”—

Unfortunately, this issue can’t always be solved. Sometimes, the data in your job just has some skewed keys, and the operation you want to run on them needs to be slow.

  • Increase number of partitions before aggregation to reduce number of different keys processed in each task.
  • Increase executor memory as this can help handle the case if a single key has lots of data allowing less spillage to disk.
  • If tasks after aggregation are slow means your dataset is still unbalanced, try a repartition call to partition it randomly.
  • Try to prune filters and SELECT statements to collect the data that you actually need. Spark’s query optimizer does this for structured APIs automatically.
  • Ensure null values are represented correctly and not using “ ” or “EMPTY”. Spark can optimize nulls early in job if possible.
  • Some aggregations are inherently slower than others. For isntance, collect_list and collect_set are very slow aggregation functions because they must return all the matching objects to the driver, and should be avoided in performance-critical code.

Slow Joinsđź”—

Joins and aggregations are both shuffles, so they share some of the same general symptoms as well as treatments.

Signs and symptomsđź”—

  • A join stage seems to be taking a long time. This can be one task or many tasks.
  • Stages before and after the join seem to be operating normally.

Potential treatmentsđź”—

  • Many joins can be optimized (manually or automatically) to other types of joins.
  • Experimenting with different join orderings can really help speed up jobs, especially if some of those joins filter out a large amount of data; do those first.
  • Partitioning a dataset prior to joining can be very helpful for reducing data movement across the cluster, especially if the same dataset will be used in multiple join operations. It’s worth experimenting with different prejoin partitioning. Keep in mind, again, that this isn’t “free” and does come at the cost of a shuffle.
  • Slow joins can also be caused by data skew. There’s not always a lot you can do here, but sizing up the Spark application and/or increasing the size of executors can help, as described in earlier sections.
  • Gather data using filter and select that you actually need.
  • Represent null values correctly.
  • Sometimes Spark can’t properly plan for a broadcast join if it doesn’t know any statistics about the input DataFrame or table. If you know that one of the tables that you are joining is small, you can try to force a broadcast, or use Spark’s statistics collection commands to let it analyze the table.

Slow Reads/Writesđź”—

Slow I/O can be difficult to diagnose, especially with networked file systems.

Signs and Symptomsđź”—

  • Slow reading of data from a distributed file system or external system.
  • Slow writes from network file systems or blob storage.

Potential Treatmentsđź”—

  • Turning on speculation (set spark.speculation to true) can help with slow reads and writes. This will launch additional tasks with the same operation in an attempt to see whether it’s just some transient issue in the first task. Speculation is a powerful tool and works well with consistent file systems. However, it can cause duplicate data writes with some eventually consistent cloud services, such as Amazon S3, so check whether it is supported by the storage system connector you are using.
  • Ensuring sufficient network connectivity can be important—your Spark cluster may simply not have enough total network bandwidth to get to your storage system.
  • For distributed file systems such as HDFS running on the same nodes as Spark, make sure Spark sees the same hostnames for nodes as the file system. This will enable Spark to do locality-aware scheduling.

Driver OutOfMemoryError or Driver Unresponsiveđź”—

This is usually serious issue as it crashes Spark Application. It happens due to collecting too much data back to driver, making it run out of memory

Signs and Symptomsđź”—

  • Spark Application is unresponsive or crashed.
  • OutOfMemoryErrors or garbage collection messages in the driver logs.
  • Commands take a very long time to run or don’t run at all.
  • Interactivity is very low or non-existent.
  • Memory usage is high for the driver JVM.

Potential Treatmentsđź”—

  • Your code should not call collect on a very large dataset
  • Avoid doing broadcast join where the data to be broadcast is too big.
  • A long-running application generated a large number of objects on the driver and is unable to release them. Java’s jmap tool can be useful to see what objects are filling most of the memory of your driver JVM by printing a histogram of the heap.
  • Increase the driver’s memory allocation if possible to make it work with more data
  • Issues with JVMs running out of memory can happen if you are using another language binding, such as Python, due to data conversion between the two requiring too much memory in the JVM.
  • If you are sharing a SparkContext with other users, ensure that people aren’t trying to do something that might be causing large amounts of memory allocation in the driver

Executor OutOfMemoryError or Executor Unresponsiveđź”—

Many Spark Applications can recover from this automatically depending on the issue.

Signs and Symptomsđź”—

  • OutOfMemoryErrors or garbage collection messages in the executor logs. You can find these in the Spark UI.
  • Executors that crash or become unresponsive.
  • Slow tasks on certain nodes that never seem to recover.

Potential treatmentsđź”—

  • Try increasing the memory and number of executors
  • Try increasing PySpark worker size via relevant Python configurations
  • Look for garbage collection error messages in the executor logs. Some of the tasks that are running, especially if you’re using UDFs, can be creating lots of objects that need to be garbage collected. Repartition your data to increase parallelism, reduce the amount of records per task, and ensure that all executors are getting the same amount of work.
  • Ensure nulls are handled correctly
  • Try using fewer UDFs and more of Spark’s structured operations
  • Use Java montioring tools like jmap to get a histogram of heap memory usage on your executors
  • If executors are being placed on nodes that also have other workloads running on them, such as a key-value store, try to isolate your Spark jobs from other jobs.

Unexpected Nulls in Resultsđź”—

Signs and symptomsđź”—

  • Unexpected null values after transformations.
  • Scheduled production jobs that used to work no longer work, or no longer produce the right results.

Potential treatmentsđź”—

  • It’s possible that your data format has changed without adjusting your business logic. This means that code that worked before is no longer valid.
  • Use an accumulator to try to count records or certain types, as well as parsing or processing errors where you skip a record. This can be helpful because you might think that you’re parsing data of a certain format, but some of the data doesn’t. Most often, users will place the accumulator in a UDF when they are parsing their raw data into a more controlled format and perform the counts there. This allows you to count valid and invalid records and then operate accordingly after the fact.
  • Ensure that your transformations actually result in valid query plans. Spark SQL sometimes does implicit type coercions that can cause confusing results.

For instance, the SQL expression SELECT 5*"23" results in 115 because the string “25” converts to an the value 25 as an integer, but the expression SELECT 5 * " " results in null because casting the empty string to an integer gives null. Make sure that your intermediate datasets have the schema you expect them to (try using printSchema on them), and look for any CAST operations in the final query plan.

No Space Left on Disk Errorsđź”—

Signs and symptomsđź”—

  • You see “no space left on disk” errors and your jobs fail.

Potential treatmentsđź”—

  • Add more disk space by sizing up the nodes or attaching external storage from cloud.
  • If there is a limit on storage, Repartition can help not to have skewed nodes with large storage.
  • Try experimenting with different storage configurations like log retain or rolling, etc.

Serialization Errorsđź”—

Signs and symptomsđź”—

  • You see serialization errors and your jobs fail.

Potential treatmentsđź”—

  • This is very uncommon when working with the Structured APIs, but you might be trying to perform some custom logic on executors with UDFs or RDDs and either the task that you’re trying to serialize to these executors or the data you are trying to share cannot be serialized. This often happens when you’re working with either some code or data that cannot be serialized into a UDF or function, or if you’re working with strange data types that cannot be serialized. If you are using (or intend to be using Kryo serialization), verify that you’re actually registering your classes so that they are indeed serialized.
  • Try not to refer to any fields of the enclosing object in your UDFs when creating UDFs inside a Java or Scala class. This can cause Spark to try to serialize the whole enclosing object, which may not be possible. Instead, copy the relevant fields to local variables in the same scope as closure and use those.