Skip to content

Structured Streaming BasicsπŸ”—

  • Structured Streaming is a stream processing framework built on Spark SQL engine which uses existing DataFrames API simplifying writing streaming code.
  • Structured Streaming ensures end-to-end, exactly once processing as well as fault tolerance through checkpointing and write ahead logs.

Core ConceptsπŸ”—

  • spark aims to simplify stream processing and avoids too much complex terminologies.

Transformation and ActionsπŸ”—

  • same concept of transformation and actions from DataFrames APIs.
  • some of transformation will have a few restrictions on some type of queries that cannot be incrementalize yet.

Input SourcesπŸ”—

  • several input sources are supported
    • Apache Kafka 0.10
    • Files on distributed file system like HDFS or S3
    • A socket source for testing.

SinksπŸ”—

  • several output sources are supported
    • Apache Kafka 0.10
    • Almost any file format
    • A foreach sink for running arbitary computation on the records
    • A console sink for testing
    • A memory sink for debugging

Output ModesπŸ”—

  • defines how data is written on the sink i.e. append or update
  • The supported output modes are
    • Append : only add new records to the ouput sink
    • Update : Update changed records in place
    • Complete : Rewrite the full output

TriggersπŸ”—

  • when data is output - by default Structured Streaming will look for new input records as soon as it finished processing the last group of input data, giving lowest latency for new results.
  • However this behaviour can lead to many small ouput writes when sink is set of files. We can trigger based on processing time.

Event-Time ProcessingπŸ”—

  • Processing data based on timestamps included in the record that may arrive out of order

Event Time DataπŸ”—

  • Event-Time : time fields are embedded in data. Processing is based on time of generation of data.
  • Expressing event-time processing is simple in Structured Streaming. Because the system views the input data as a table, the event time is just another field in that table, and your application can do grouping, aggregation, and windowing using standard SQL operators.
  • Structured Streaming automatically optimizes when it knows a column is an event-time field based on watermark controls

WatermarksπŸ”—

  • feature of Structured Streaming that allows to specify how late they expect to see data in event time.
  • We can also set how long to remember the old data, when to ouput a result for a particular time window.

Structured Streaming in ActionπŸ”—

  • Data that we are using is sensor readings at high frequency : https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data
static = spark.read.json("/data/activity-data/")
dataSchema = static.schema
static.printScehma()
root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)

Let’s create streaming version of this data

streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
  .json("/data/activity-data")

# just like DataFrame APIs, streaming DF creation and execution is also lazy.
# defining transformations

activityCounts = streaming.groupBy("gt").count()

# NOTE: since this code running in local set shuffle partitions to 5 
spark.conf.set("spark.sql.shuffle.partitions", 5)

# specify action to start the query on an output sink

activityQuery = activityCounts.writeStream.queryName("activity_counts")\
  .format("memory").outputMode("complete")\
  .start()

# terminate the query to prevent the driver from exiting while query is stil running
# activityQuery.awaitTermination()


# Now in antoher Spark Session we can see all active streams
spark.streams.active    # Return UUIDs, and we can select the stream, but we already that in variable
  • Now that this stream is running, we can experiment with the results by querying the in-memory table it is maintaining of the current output of our streaming aggregation. This table will be called activity_counts, the same as the stream.
from time import sleep
for x in range(5):
    spark.sql("SELECT * FROM activity_counts").show()
    sleep(1)

Transformations on StreamsπŸ”—

  • limitation arise from transformation in stream are actually logical, sort doesn’t make sense in streams that are not aggregated, you cannot perform multi-level aggregation without Stateful Processing

Selection and FilteringπŸ”—

  • All select and filter transformation & All individual column manipulations are supported.
from pyspark.sql.functions import expr
simpleTransform = streaming.withColumn("stairs", expr("gt like '%stairs%'"))\
  .where("stairs")\
  .where("gt is not null")\
  .select("gt", "model", "arrival_time", "creation_time")\
  .writeStream\
  .queryName("simple_transform")\
  .format("memory")\
  .outputMode("append")\
  .start()

AggregationsπŸ”—

  • we could specify exotice aggregations like cube on the phone model and activity and average x,y,z of sensors.
deviceModelStats = streaming.cube("gt", "model").avg()\
  .drop("avg(Arrival_time)")\
  .drop("avg(Creation_Time)")\
  .drop("avg(Index)")\
  .writeStream.queryName("device_counts").format("memory")\
  .outputMode("complete")\
  .start()

JoinsπŸ”—

historicalAgg = static.groupBy("gt", "model").avg()
deviceModelStats = streaming.drop("Arrival_Time", "Creation_Time", "Index")\
  .cube("gt", "model").avg()\
  .join(historicalAgg, ["gt", "model"])\
  .writeStream.queryName("device_counts").format("memory")\
  .outputMode("complete")\
  .start()

Input and OutputπŸ”—

Documentations : http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

Source and SinksπŸ”—

File Source and SinkπŸ”—

  • simplest source is file source. We have seen Parquet, text, JSON, and CSV
  • Only difference between using file source/sink and Spark’s static file source is that with streaming, we can control how many file we read during each trigger vis maxFilesPerTrigger option.

Kafka source and sinkπŸ”—

  • Kafka is a distributed publish and subscribe system for data.
  • Each record on Kakfa consists of a Key, Value and Timestamp. Topics consist of immutable sequences of records for which teh position of a record in a sequence is called an offset. Reading data is called subscribing to a topic, writing data is called as publishing to a topic.
  • Spark allows you to read from Kafka with both batch and streaming DataFrames.

Reading from a Kafka SourceπŸ”—

To read, you first need to choose one of the following options: assign, subscribe, or subscribePattern

  • Assign : Fine grained way of specify not just topic but also the topic partitions from which we would like {"topicA":[0,1],"topicB":[2,4]}
  • subscribe and subscribePattern are ways of subscribing to one or more topics either specifying a list of topics or via a pattern

Secondly, we specify kafka.bootstrap.servers that Kafka provides to connect to the service.

Other options we can choose :

  • startingOffsets and endingOffsets : The start point when a query is started, either earliest, which is from the earliest offsets; latest, which is just from the latest offsets; or a JSON string specifying a starting offset for each TopicPartition. This applies only when a new Streaming query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest. The ending offsets for a given query.
  • failOnDataLoss : Whether to fail the query when it’s possible that data is lost (e.g., topics are deleted, or offsets are out of range). This might be a false alarm. You can disable it when it doesn’t work as you expected. The default is true.
  • maxOffsetsPerTrigger : total number of offsets to read in given triggers
df1 = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1")\
  .load()
# Subscribe to multiple topics
df2 = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1,topic2")\
  .load()
# Subscribe to a pattern
df3 = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribePattern", "topic.*")\
  .load()

Each row in the source will have the following schema:

  • key: binary
  • value: binary
  • topic: string
  • partition: int
  • offset: long
  • timestamp: long

Writing to a Kafka SinkπŸ”—

df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("checkpointLocation", "/to/HDFS-compatible/dir")\
  .start()
df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("checkpointLocation", "/to/HDFS-compatible/dir")\
  .option("topic", "topic1")\
  .start()

Foreach SinkπŸ”—

  • similar to foreachPartitions in Dataset API, allowing arbitrary operation to be computed on a per-partition basis, in parallel.
  • To use foreach sink we need to implement ForeachWriter interface, which contains open, process, and close
  • NOTE:
    • writer must be Serializable, as it were a UDF or Dataset Map Function
    • three methods(open, process, and close) will be called on each executor
    • writer must do all its initialization, like opening connections or starting transactions using only in the open method.
  • Because the Foreach sink runs arbitrary user code, one key issue you must consider when using it is fault tolerance. If Structured Streaming asked your sink to write some data, but then crashed, it cannot know whether your original write succeeded. Therefore, the API provides some additional parameters to help you achieve exactly-once processing.
  • First, the open call on your ForeachWriter receives two parameters that uniquely identify the set of rows that need to be acted on. The version parameter is a monotonically increasing ID that increases on a per-trigger basis, and partitionId is the ID of the partition of the output in your task
//in Scala
datasetOfString.write.foreach(new ForeachWriter[String] {
  def open(partitionId: Long, version: Long): Boolean = {
    // open a database connection
  }
  def process(record: String) = {
    // write string to connection
  }
  def close(errorOrNull: Throwable): Unit = {
    // close the connection
  }
})

Sources and sinks for testingπŸ”—

Spark also includes several test sources and sinks that you can use for prototyping or debugging your streaming queries (these should be used only during development and not in production scenarios

Socket Source

socketDF = spark.readStream.format("socket")\
  .option("host", "localhost").option("port", 9999).load()

Now netcat the port

nc -lk 9999

console sink

activityCounts.format("console").write()

memory sink

activityCounts.writeStream.format("memory").queryName("my_device_table")

How Data is Output (Modes)πŸ”—

  • Append Mode : when new rows are added to result table, they will be output to sink based on trigger we specify. Ensures that each row is ouput once assuming a fault-tolerant sink
  • Complete Mode: outputs entire state of the result table to output sink, usefule when working with statful data where all rows change over time or the sink we are writing doesn’t support row-level updates
  • Update Mode: only the rows that are different from previous write are written out. Sink must support row-level updates for this to work. If query doesn’t have aggregation its similar to append mode.

There are three modes of output but when to use which one ?

If your query just does a map operation, Structured Streaming will not allows complete mode, because this would require it to remember all inputs since start of the job and rewrite whole ouput table.

When Data is Output (Triggers)πŸ”—

To control when data is output to our sink, we set a trigger. By default, Structured Streaming will start data as soon as the previous trigger completes processing. You can use triggers to ensure that you do not overwhelm your output sink with too many updates or to try and control file sizes in the output.

Processing time triggerπŸ”—

activityCounts.writeStream.trigger(processingTime='5 seconds')\
  .format("console").outputMode("complete").start()

Trigger is fired at every five seconds

Once TriggerπŸ”—

  • useful in production/development : in dev, we can test application on just one trigger worth of data at a Time, in prod we can use it to run job manually at low rate.
activityCounts.writeStream.trigger(once=True)\
  .format("console").outputMode("complete").start()

Streaming Dataset APIπŸ”—

  • Note Structured Streaming is not limite to DataFrame API for streaming, we can use Datasets to perform same computation in a type safe manner.
// in Scala
case class Flight(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME: String,
  count: BigInt)
val dataSchema = spark.read
  .parquet("/data/flight-data/parquet/2010-summary.parquet/")
  .schema
val flightsDF = spark.readStream.schema(dataSchema)
  .parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
def originIsDestination(flight_row: Flight): Boolean = {
  return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
}
flights.filter(flight_row => originIsDestination(flight_row))
  .groupByKey(x => x.DEST_COUNTRY_NAME).count()
  .writeStream.queryName("device_counts").format("memory").outputMode("complete")
  .start()