Skip to content

Event-Time And Stateful Processing๐Ÿ”—

Event Time๐Ÿ”—

  • NOTE: Sparkโ€™s DStream API doesnโ€™t support processing information wrt event-time
  • There are two relevant times:
    • Event Time : time that is embedded in data itself. The challenge here is that event data can be late or out of order. This means that the stream processing system must be able to handle out-of-order or late data.
    • Processing Time : time at which stream processing system actually recieves data. Usually less important. This canโ€™t ever be out of order because its a property of streaming system at a certain time.
  • Order of the series of events in the processing system does not guarantee an ordering in event time. Computer networks are unreliable. That means that events can be dropped, slowed down, repeated, or be sent without issue.

Stateful Processing๐Ÿ”—

  • Stateful processing is only necessary when you need to use or update intermediate information (state) over longer periods of time (in either a microbatch or a record-at-a-time approach). This can happen when you are using event time or when you are performing an aggregation on a key, whether that involves event time or not.
  • spark handles all the complexity entailing stateful processing for you. We just need to specify the logic. Spark storest state in a state store.

Arbitrary Stateful Processing๐Ÿ”—

  • sometimes we need fine grained control on what state should be stored, how its updated and when it should be removed, either explicitly or via a time-out.
  • Some scenario where it might be used
    • We would like to record information about user sessions on an ecommerce site. Track what user visits over the course of this session in order to provide recommendation in real time during next time.
    • Report on errors in the web application but only if five events occurs during a userโ€™s session. Count-based windows that emit result if five events of some type occur.
    • Deduplicate records over time requires tracking every record that you see before deduplication it.

Event-Time Basics๐Ÿ”—

# contd. from previous chapter
spark.conf.set("spark.sql.shuffle.partitions", 5)
static = spark.read.json("/data/activity-data")
streaming = spark\
  .readStream\
  .schema(static.schema)\
  .option("maxFilesPerTrigger", 10)\
  .json("/data/activity-data")

streaming.printSchema()
  • Notice how this dataset has Creation_Time defining event time, whereas the Arrival_Time defines when an event hit our servers somewere upstream.

Windows on Event Time๐Ÿ”—

First step is to convert timstamp column into proper Spark SQL timestamp type.

withEventTime = streaming\.selectExpr(
  "*",
  "cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time")

Tumbling Windows๐Ÿ”—

  • count the number of occurences of an event in a given window.

image

  • we perform aggregation of keys over a window of time. but we operate data recieved since the last trigger.
  • for dataset we will use 10 min windows.
from pyspark.sql.functions import window, col
withEventTime.groupBy(window(col("event_time"), "10 minutes")).count()\
  .writeStream\
  .queryName("pyevents_per_window")\
  .format("memory")\
  .outputMode("complete")\
  .start()
  • weโ€™re writing out to the in-memory sink for debugging, so we can query it with SQL after we have the stream running
spark.sql("SELECT * FROM events_per_window").printSchema()
# or
# SELECT * FROM events_per_window

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)
  • Note: how previous window is actually a struct (a complex type). Using this we can query this struct for start and end time of a particular window
  • Of importance is the fact that we can also perform an aggregation on multiple columns, including the event time column. Just like we saw in the previous chapter, we can even perform these aggregations using methods like cube
from pyspark.sql.functions import window, col
withEventTime.groupBy(window(col("event_time"), "10 minutes"), "User").count()\
  .writeStream\
  .queryName("pyevents_per_window")\
  .format("memory")\
  .outputMode("complete")\
  .start()

Sliding Windows๐Ÿ”—

  • decouple window from the starting time of the window

image

  • we run a sliding window through which we look at an hour increment, but weโ€™d like to know state every 10 minutes
from pyspark.sql.functions import window, col
withEventTime.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))\
  .count()\
  .writeStream\
  .queryName("pyevents_per_window")\
  .format("memory")\
  .outputMode("complete")\
  .start()

# SELECT * FROM events_per_window

Handling Late Data with Watermarks๐Ÿ”—

  • all previous examples we didnโ€™t define how late we expect to see data, meaning spark will wait forever for data because we didnโ€™t specify watermark, or a time at which we donโ€™t expect to see any more data.
  • we must define watermark in order to age-out data in the stream, so that we donโ€™t overwhelm the system over a long period of time. DStream did not have this way to handle data and lost events may appear on other processing bacthes.
from pyspark.sql.functions import window, col
withEventTime\
  .withWatermark("event_time", "30 minutes")\
  .groupBy(window(col("event_time"), "10 minutes", "5 minutes"))\
  .count()\
  .writeStream\
  .queryName("pyevents_per_window")\
  .format("memory")\
  .outputMode("complete")\
  .start()

# SELECT * FROM events_per_window

Dropping Duplicates in a Stream๐Ÿ”—

  • multiple publish of data multiple times is common in IoT application and deduplication require high coordination, so its very important task.
  • Structured Streaminng makes it easy to take message system that provide at least once semantics, and convert them into exactly-once by dropping duplicate message as they come in, based on arbitrary keys. To de-duplicate data, Spark will maintain a number of user specified keys and ensure that duplicates are ignored.
from pyspark.sql.functions import expr

withEventTime\
  .withWatermark("event_time", "5 seconds")\
  .dropDuplicates(["User", "event_time"])\
  .groupBy("User")\
  .count()\
  .writeStream\
  .queryName("pydeduplicated")\
  .format("memory")\
  .outputMode("complete")\
  .start()

Arbitrary Stateful Processing๐Ÿ”—

  • While performing stateful processing, we might want to do following
    • create a window based on counts of a given key
    • emit an alert if there is a number of events within a certain time frame
    • Maintain user sessions of an undetermined amount of time and save those sessions to perform analysis on later
  • Effectively we will want to two things
    • Map over groups in your data, operate on each group of data, and generate at most a single row for each group. mapGroupsWithState API
    • Map over groups in your data, operate on each group of data, and generate one or more rows for each group. flatMapGroupsWithState

Time-Outs๐Ÿ”—

  • Time we should wait before timing-out some intermediate state. global parameter across all groups that is configured on a per-group basis.
  • Time-outs can be either based on processing time or event time.
  • While using timeouts, check for time-out first before processing values using state.hasTimedOut or checking if values iterator is empty.
  • We can set timeout duration using GroupState.setTimeoutDuration. The time-out will occur when the clock advances by set duration.
    • Time-out will never occur before the clock time has advanced by D ms
    • Time will occur eventually when there is a trigger in the query(after Dms). So there is no strict upper bound on when time-out would occur.
  • Since time-out is based on clock-time, it is affected by system clock and timezone variations
  • When using a time-out based on event time, the user also must specify the event-time watermark in query using watermarks. When set, data older than the watermark is filtered out. As the developer, you can set the timestamp that the watermark should reference by setting a time-out timestamp using the GroupState.setTimeoutTimestamp(...) API
  • there is a no strict upper bound on the delay when the time-out actually occurs. The watermark can advance only when there is data in the stream, and the event time of the data has actually advanced

Output Modes๐Ÿ”—

NOTE:mapGroupsWithState supports update mode only while flatMapGroupsWithState supports append and update mode.

Both are still experimental features in 3.5.1 Spark. Check documentation for more information.

mapGroupWithState๐Ÿ”—

  • We can control arbitrary state by creating it, updating it over time, and removing it using following things
    • Three class definitions: an input definition, a state definition, and optionally an output definition.
    • A function to update state based on a key, an iterator of events and a previous state.
    • A time-out parameter.
  • Ex - Simply update keys based on certain amount of state in sensor data.So keys for grouping here are grouping(mapping on) is a user and activity combination (first and last timestamp)
// input, state and output definitions
case class InputRow(user:String, timestamp:java.sql.Timestamp, activity:String)
case class UserState(user:String,
  var activity:String,
  var start:java.sql.Timestamp,
  var end:java.sql.Timestamp)

// set up the function that defines how you will update your state based on a given row
def updateUserStateWithEvent(state:UserState, input:InputRow):UserState = {
  if (Option(input.timestamp).isEmpty) {
    return state
  }
  if (state.activity == input.activity) {

    if (input.timestamp.after(state.end)) {
      state.end = input.timestamp
    }
    if (input.timestamp.before(state.start)) {
      state.start = input.timestamp
    }
  } else {
    if (input.timestamp.after(state.end)) {
      state.start = input.timestamp
      state.end = input.timestamp
      state.activity = input.activity
    }
  }

  state
}

// define a function defining the way state is updated based on an epoch of rows
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, GroupState}
def updateAcrossEvents(user:String,
  inputs: Iterator[InputRow],
  oldState: GroupState[UserState]):UserState = {
  var state:UserState = if (oldState.exists) oldState.get else UserState(user,
        "",
        new java.sql.Timestamp(6284160000000L),
        new java.sql.Timestamp(6284160L)
    )
  // we simply specify an old date that we can compare against and
  // immediately update based on the values in our data

  for (input <- inputs) {
    state = updateUserStateWithEvent(state, input)
    oldState.update(state)
  }
  state
}
// usually you should set a time-out for a given group's state (its omitted here)
// Querying above Stream
import org.apache.spark.sql.streaming.GroupStateTimeout
withEventTime
  .selectExpr("User as user",
    "cast(Creation_Time/1000000000 as timestamp) as timestamp", "gt as activity")
  .as[InputRow]
  .groupByKey(_.user)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("update")
  .start()
SELECT * FROM events_per_window order by user, start

+----+--------+--------------------+--------------------+
|user|activity|               start|                 end|
+----+--------+--------------------+--------------------+
|   a|    bike|2015-02-23 13:30:...|2015-02-23 14:06:...|
|   a|    bike|2015-02-23 13:30:...|2015-02-23 14:06:...|
...
|   d|    bike|2015-02-24 13:07:...|2015-02-24 13:42:...|
+----+--------+--------------------+--------------------+

Example : Count-Based Windows๐Ÿ”—

  • usually window operations revolve around a start and end time and counting or summing up in that window. Sometimes we may wish to create windows based on counting only regardless of state and event times, and perform some aggregation on that window.
  • Example : Output average reading of each device periodically creating a window based on count of events and outputting it each time it has 500 events for that device
// input, state and output definitions
case class InputRow(device: String, timestamp: java.sql.Timestamp, x: Double)
case class DeviceState(device: String, var values: Array[Double],
  var count: Int)
case class OutputRow(device: String, previousAverage: Double)

// update function
def updateWithEvent(state:DeviceState, input:InputRow):DeviceState = {
  state.count += 1
  // maintain an array of the x-axis values
  state.values = state.values ++ Array(input.x)
  state
}

// function that updates states across a series of input rows
mport org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode,
  GroupState}

def updateAcrossEvents(device:String, inputs: Iterator[InputRow],
  oldState: GroupState[DeviceState]):Iterator[OutputRow] = {
  inputs.toSeq.sortBy(_.timestamp.getTime).toIterator.flatMap { input =>
    val state = if (oldState.exists) oldState.get
      else DeviceState(device, Array(), 0)

    val newState = updateWithEvent(state, input)
    if (newState.count >= 500) {
      // One of our windows is complete; replace our state with an empty
      // DeviceState and output the average for the past 500 items from
      // the old state
      oldState.update(DeviceState(device, Array(), 0))
      Iterator(OutputRow(device,
        newState.values.sum / newState.values.length.toDouble))
    }
    else {
      // Update the current DeviceState object in place and output no
      // records
      oldState.update(newState)
      Iterator()
    }
  }
}
// run stream
import org.apache.spark.sql.streaming.GroupStateTimeout

withEventTime
  .selectExpr("Device as device",
    "cast(Creation_Time/1000000000 as timestamp) as timestamp", "x")
  .as[InputRow]
  .groupByKey(_.device)
  .flatMapGroupsWithState(OutputMode.Append,
    GroupStateTimeout.NoTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("count_based_device")
  .format("memory")
  .outputMode("append")
  .start()
SELECT * FROM count_based_device

+--------+--------------------+
|  device|     previousAverage|
+--------+--------------------+
|nexus4_1|      4.660034012E-4|
|nexus4_1|0.001436279298199...|
...
|nexus4_1|1.049804683999999...|
|nexus4_1|-0.01837188737960...|
+--------+--------------------+

flatMapGroupsWithState๐Ÿ”—

  • rather than having a single key with at most one output, a single key may have many outputs providing more flexibility as compared to mapGroupWithState
  • Things we need to define remain same as we defined with mapGroupWithState

Example : Sessionization๐Ÿ”—

  • Sessions are simply unspecified time windows with a series of events that occur.
  • We want to record these different events in an array in order to compare these sessions to other sessions in the future.
  • In a session, you will likely have arbitrary logic to maintain and update your state over time as well as certain actions to define when state ends (like a count) or a simple time-out.
  • Often times there will be a session ID that we can use making it easier, lets create sessions on the fly here since there is no session ID
// input, state, and output
case class InputRow(uid:String, timestamp:java.sql.Timestamp, x:Double,
  activity:String)
case class UserSession(val uid:String, var timestamp:java.sql.Timestamp,
  var activities: Array[String], var values: Array[Double])
case class UserSessionOutput(val uid:String, var activities: Array[String],
  var xAvg:Double)

// a function to change state
def updateWithEvent(state:UserSession, input:InputRow):UserSession = {
  // handle malformed dates
  if (Option(input.timestamp).isEmpty) {
    return state
  }

  state.timestamp = input.timestamp
  state.values = state.values ++ Array(input.x)
  if (!state.activities.contains(input.activity)) {
    state.activities = state.activities ++ Array(input.activity)
  }
  state
}


// method to work on epoch of rows
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode,
  GroupState}

def updateAcrossEvents(uid:String,
  inputs: Iterator[InputRow],
  oldState: GroupState[UserSession]):Iterator[UserSessionOutput] = {

  inputs.toSeq.sortBy(_.timestamp.getTime).toIterator.flatMap { input =>
    val state = if (oldState.exists) oldState.get else UserSession(
    uid,
    new java.sql.Timestamp(6284160000000L),
    Array(),
    Array())
    val newState = updateWithEvent(state, input)

    if (oldState.hasTimedOut) {
      val state = oldState.get
      oldState.remove()
      Iterator(UserSessionOutput(uid,
      state.activities,
      newState.values.sum / newState.values.length.toDouble))
    } else if (state.values.length > 1000) {
      val state = oldState.get
      oldState.remove()
      Iterator(UserSessionOutput(uid,
      state.activities,
      newState.values.sum / newState.values.length.toDouble))
    } else {
      oldState.update(newState)
      oldState.setTimeoutTimestamp(newState.timestamp.getTime(), "5 seconds")
      Iterator()
    }

  }
}
// create a query
import org.apache.spark.sql.streaming.GroupStateTimeout

withEventTime.where("x is not null")
  .selectExpr("user as uid",
    "cast(Creation_Time/1000000000 as timestamp) as timestamp",
    "x", "gt as activity")
  .as[InputRow]
  .withWatermark("timestamp", "5 seconds")
  .groupByKey(_.uid)
  .flatMapGroupsWithState(OutputMode.Append,
    GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("count_based_device")
  .format("memory")
  .start()
SELECT * FROM count_based_device

+---+--------------------+--------------------+
|uid|          activities|                xAvg|
+---+--------------------+--------------------+
|  a|  [stand, null, sit]|-9.10908533566433...|
|  a|   [sit, null, walk]|-0.00654280428601...|
...
|  c|[null, stairsdown...|-0.03286657789999995|
+---+--------------------+--------------------+
  • Notice how session that have number of activities in them have higher x-axis gyroscope value than ones that ahve fewer activities