Skip to content

Structured Streaming in Production๐Ÿ”—

  • Structured Streaming was marked as production-ready in Apache Spark 2.2.0, meaning that this release has all the features required for production use and stabilizes the API.

Fault Tolerance and Checkpointing๐Ÿ”—

  • failure recovery is a important operational concern, faults are inevitable like losing connection to a cluster, schema changes or intentional restart.
  • Structured Streaming allows recovering an application by just restarting it. To do this enable checkpointing and write-ahead logs, both of which are handled automatically by the engine.
  • We must configure a query to write to a checkpoint location on a reliable file system which is used for periodic saving of all relevant progress information as well as current intermediate state values to the checkpoint location.
static = spark.read.json("/data/activity-data")
streaming = spark\
  .readStream\
  .schema(static.schema)\
  .option("maxFilesPerTrigger", 10)\
  .json("/data/activity-data")\
  .groupBy("gt")\
  .count()
query = streaming\
  .writeStream\
  .outputMode("complete")\
  .option("checkpointLocation", "/some/python/location/")\
  .queryName("test_python_stream")\
  .format("memory")\
  .start()

Updating Your Application๐Ÿ”—

  • checkpointing helps us with information about stream processed thus far and what intermediate state it may be storing but it has small catch - we might reason our old checkpoint when we update our streaming application. Make sure that update is not a breaking change to avoid such situations.

Updating your streaming application code๐Ÿ”—

  • Structured Streaming allows certain types of chagnes to application code between restarts.
    • you can change UDFs as long as they have same type signature.
    • adding a new columns is also not a breaking change for checkpoint directory.
  • If you update your streaming application to add new aggregation key or fundamental changes to query itself, Spark cannot construct the required state for query using checkpoint directory.

Updating Your Spark Version๐Ÿ”—

  • Structured Streaming applications should be able to restart from an old checkpoint directory across patch version upgrades to spark. (2.2.0 -> 2.2.1 -> 2.2.2).
  • Checkpoint format is designed to forward-compatible, so the only way it breaks due to critical bug fixes. Check Spark notes before upgrading.

Sizing and Rescaling your Applications๐Ÿ”—

  • Ideally, cluster should be big enough to comfortable handle bursts above your data rate. The metrics you should be monitoring for are :
    • Input rate is much higher than processing rate (elaborated or momentarily)
    • You can dynamically add executors to your application. When it comes time you can scale-down your application same way.
  • These change usually require a restart of application or stream with a new configuration. For example spark.sql.shuffle.partitions can not be updated while a stream is currently running.

Metrics and Monitoring๐Ÿ”—

  • Mostly Metric and Monitoring for streaming application is same as general spark applications, but there are more specifics to help you beter understand state of your application.
  • There are two key APIs you can leverage to query the status of a streaming query

Query Status๐Ÿ”—

  • It answers : โ€œWhat processing is my stream performing right nowโ€. This is reported in the status field of the query object returned by startStream
query.status

{
  "message" : "Getting offsets from ...",
  "isDataAvailable" : true,
  "isTriggerActive" : true
}

Recent Progress๐Ÿ”—

query.recentProgress

# Scala Version
Array({
  "id" : "d9b5eac5-2b27-4655-8dd3-4be626b1b59b",
  "runId" : "f8da8bc7-5d0a-4554-880d-d21fe43b983d",
  "name" : "test_stream",
  "timestamp" : "2017-08-06T21:11:21.141Z",
  "numInputRows" : 780119,
  "processedRowsPerSecond" : 19779.89350912779,
  "durationMs" : {
    "addBatch" : 38179,
    "getBatch" : 235,
    "getOffset" : 518,
    "queryPlanning" : 138,
    "triggerExecution" : 39440,
    "walCommit" : 312
  },
  "stateOperators" : [ {
    "numRowsTotal" : 7,
    "numRowsUpdated" : 7
  } ],
  "sources" : [ {
    "description" : "FileStreamSource[/some/stream/source/]",
    "startOffset" : null,
    "endOffset" : {
      "logOffset" : 0
    },
    "numInputRows" : 780119,
    "processedRowsPerSecond" : 19779.89350912779
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
})
  • Input rate and processing rate
  • Batch Duration : all streaming systems utilize batching to operate at any reasonable throughput. Structured Streaming achieves both.

Spark UI๐Ÿ”—

The Spark web UI, covered in earlier topics, also shows tasks, jobs, and data processing metrics for Structured Streaming applications. On the Spark UI, each streaming application will appear as a sequence of short jobs, one for each trigger.

Alerting๐Ÿ”—

Advanced Monitoring with the Streaming Listener๐Ÿ”—

  • we can use status and query Progress APIs to output monitoring events into your organizationโ€™s monitoring platform (Prometheus, etc.). There is also a lower-level but more powerful way to observe an applicationโ€™s execution using StreamingQueryListener class
  • This class will allow you to receive asynchronous updates from the streaming query in roder to automatically output this information to other systems and implement robust monitoring and alerting mechanisms.
val spark: SparkSession = ...

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    }
    override def onQueryTerminated(
      queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
    }
})
  • the following code for a StreamingQueryListener that will forward all query progress information to Kafka. Youโ€™ll have to parse this JSON string once you read data from Kafka in order to access the actual metrics
class KafkaMetrics(servers: String) extends StreamingQueryListener {
  val kafkaProperties = new Properties()
  kafkaProperties.put(
    "bootstrap.servers",
    servers)
  kafkaProperties.put(
    "key.serializer",
    "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
  kafkaProperties.put(
    "value.serializer",
    "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")

  val producer = new KafkaProducer[String, String](kafkaProperties)

  import org.apache.spark.sql.streaming.StreamingQueryListener
  import org.apache.kafka.clients.producer.KafkaProducer

  override def onQueryProgress(event:
    StreamingQueryListener.QueryProgressEvent): Unit = {
    producer.send(new ProducerRecord("streaming-metrics",
      event.progress.json))
  }
  override def onQueryStarted(event:
    StreamingQueryListener.QueryStartedEvent): Unit = {}
  override def onQueryTerminated(event:
    StreamingQueryListener.QueryTerminatedEvent): Unit = {}
}