Stream Processor Windows
On this page
Atlas Stream Processing windows are aggregation pipeline stages that capture time-bounded subsets of a data stream, allowing you to perform operations that require finite inputs on streaming data.
Consider the example stream processor described here. The
$match
stage can operate directly on the stream of data
pulled in by $source
, checking each document against the
match criteria as the stream processor ingests it.
By contrast, the $group
stage and the various statistical
computations contained within it cannot operate on unbounded data, as
it is impossible to determine minimum, maximum, average, or median
values without first bounding the set of values to consider. Many
non-mathematical operators such as $push and $top also require bounded data.
A stream processor provides these bounds with a window. A window opens, and all documents that the stream processor ingests accumulate in that window's state until a predefined interval of time elapses and the window closes. The window batches all documents it captures during that interval, and passes this set through its internal pipeline. From within this pipeline, the batched documents are indistinguishable from data-at-rest.
Atlas Stream Processing provides support for Tumbling Windows and Hopping Windows.
Tumbling Windows
Tumbling windows are windows defined entirely by the time intervals they capture. These time intervals don't overlap.
Example
You define a tumbling window with an interval of 3 seconds. When you start your stream processor:
A window opens for 3 seconds.
The first window captures all documents that the stream generates within those 3 seconds.
After 3 seconds elapse, the window closes and applies your aggregation logic to all the documents in that window.
If you configure
allowedLateness
, Atlas Stream Processing writes late-arriving messages to the Dead Letter Queue after the close of the window.A new window opens as soon as the first one closes and captures documents from the stream for the next 3 seconds.
Tumbling windows ensure comprehensive capture of data streams without repeated processing of individual documents.
Hopping Windows
Hopping windows are windows defined by the time interval they capture and the interval between opening each window, called the hop. Since duration is decoupled from frequency, you can configure hopping windows to overlap or to space themselves apart.
To define a hopping window with overlap, set a hop smaller than the interval.
Example
You define a hopping window with an interval of 20 seconds and a hop of 5 seconds. When you start your stream processor:
A window opens for 20 seconds.
The first window captures all documents that the stream generates within those 20 seconds.
5 seconds later, another window opens and captures all documents within the next 20 seconds. Because the first window is still open, all documents that the stream generates for the next 15 seconds are captured by both windows.
20 seconds after the first window opened, it closes and applies your aggregation logic to all the documents in that window.
5 seconds later, the second window closes and applies your aggregation logic to all the documents in that window, including those that were already subject to aggregation logic in the first window.
If you configure allowedLateness
, Atlas Stream Processing writes
late-arriving messages to the Dead Letter Queue after the close
of the window.
To define a hopping window with spacing, set a hop larger than the interval.
Example
You define a hopping window with an interval of 3 seconds and a hop of 5 seconds. When you start a stream processor:
A window opens for 3 seconds.
The first window captures all documents for the next 3 seconds.
After 3 seconds elapse, the window closes and applies your aggregation logic to all the documents in that window.
The next window opens after a further 2 seconds elapse.
Atlas Stream Processing does not process any documents that the stream generates during those 2 seconds.
Stream Processing Timing
In streaming data processing, documents are subject to two timing systems:
Event Time - The time at which either the source stream generates a document, or the messaging system (e.g. Apache Kafka) receives the document. This is ascertained by the timestamp of the document.
Processing Time - The time at which the stream processor consumes a document. This is ascertained by the clock of the system hosting the stream processor.
Network latency, upstream processing, and other factors can not only cause discrepancies between these times for a given document, but can also cause documents to arrive in a stream processor out of event-time order. In either case, windows can miss documents that you intend for them to capture. Atlas Stream Processing considers such documents late-arriving, and sends them to your Dead Letter Queue if you configure one.
Atlas Stream Processing offers various mechanisms which change window behavior to mitigate these issues.
Watermarks
A watermark supersedes processing time and updates only when the processor consumes a document with a later event time than any previously-consumed document. All stream processors apply watermarks in Atlas Stream Processing.
Example
You configure a stream processor with 5-minute windows. You start the
processor at 12:00
, so that the first two windows will have
durations of 12:00-12:05
and 12:05-12:10
. The following table
illustrates which windows will capture which events given varying
delays, with and without watermarks.
Event Time | Processing Time | Window Time (No Watermarks) | Window Time (Watermarks) |
---|---|---|---|
12:00 | 12:00 | 12:00-12:05 | 12:00-12:05 |
12:00 | 12:01 | 12:00-12:05 | 12:00-12:05 |
12:01 | 12:03 | 12:00-12:05 | 12:00-12:05 |
12:03 | 12:04 | 12:00-12:05 | 12:00-12:05 |
12:02 | 12:05 | 12:05-12:10 | 12:00-12:05 |
12:01 | 12:06 | 12:05-12:10 | 12:00-12:05 |
12:04 | 12:06 | 12:05-12:10 | 12:00-12:05 |
12:05 | 12:07 | 12:05-12:10 | 12:05-12:10 |
12:06 | 12:07 | 12:05-12:10 | 12:04-12:10 |
12:06 | 12:08 | 12:05-12:10 | 12:05-12:10 |
In the scenario where watermarks don't apply, the 12:00-12:05
window closes at 12:05
according to the system clock of the
stream processing instance, and the 12:05-12:10
window opens immediately. As a
result, though the source generated seven of the documents during the
12:00-12:05
interval, the relevant window captures only four
documents.
In the scenario where watermarks do apply, the 12:00-12:05
window
doesn't close at 12:05
because among the documents it ingests up
to that point, the latest event time—and thus the watermark value—is
12:03
. The 12:00-12:05
window doesn't close until 12:07
on
the system clock, when the stream processor ingests a document with an
event time of 12:05
, advances the watermark to that time, and opens
the 12:05-12:10
window. Each window captures all of the appropriate
documents.
Allowed Lateness
If the differences between event time and processing time vary sufficiently, documents may arrive in a stream processor after the watermark has advanced enough to close the expected window. To mitigate this, Atlas Stream Processing supports Allowed Lateness, a setting which delays a window closing by a set interval relative to the watermark.
While watermarks are properties of stream processors, Allowed Lateness is a property of a window, and only affects when that window closes. If the stream processor's watermark advances to a point that would trigger a new window to open, Allowed Lateness keeps earlier windows open without preventing this.
Example
You configure a stream processor with 5-minute tumbling windows. You
start the processor at 12:00
, so that the first two windows will
have durations of 12:00-12:05
and 12:05-12:10
. You set an
Allowed Lateness of 2 minutes.
The table below reflects the order in which the stream processor ingests the described documents.
Event Time | Watermark | Allowed Lateness Time | Window Time |
---|---|---|---|
12:00 | 12:00 | 11:58 | 12:00-12:05 |
12:01 | 12:01 | 11:59 | 12:00-12:05 |
12:03 | 12:03 | 12:01 | 12:00-12:05 |
12:02 | 12:03 | 12:01 | 12:00-12:05 |
12:04 | 12:04 | 12:02 | 12:00-12:05 |
12:01 | 12:04 | 12:02 | 12:00-12:05 |
12:05 | 12:05 | 12:03 | 12:00-12:15, 12:05-12:10 |
12:06 | 12:06 | 12:04 | 12:00-12:05, 12:05-12:10 |
12:04 | 12:06 | 12:04 | 12:00-12:05, 12:05-12:10 |
12:07 | 12:07 | 12:05 | 12:05-12:10 |
When the watermark advances to 12:05
, the 12:05-12:10
window
opens. However, because the Allowed Lateness interval is 2 minutes,
from within the 12:00-12:05
window, it is effectively only
12:03
, so it stays open. Only when the watermark advances to
12:07
does the adjusted time reach 12:05
. At this point, the
12:00-12:05
window closes.
Idleness Timeout
Decoupling windowing behavior from processing time by default improves stream processing correctness in most cases. However, a streaming data source may have periods of extended idleness. In this scenario, a window may capture events prior to the period of idleness and be unable to return processed results while waiting for the watermark to advance enough to close.
Atlas Stream Processing allows users to configure an idleness timeout for windows to mitigate these scenarios using processing time. An idleness timeout is an interval of time that begins when processing time passes the end of an open window's interval, and the stream processor's source is idle. If the source remains idle for an interval equal to the idleness timeout, the window closes and the watermark advances independent of any document ingestion.
Example
You configure a tumbling window with a 3-minute interval and a 1-minute idleness timeout. The following table illustrates the effects of the idleness timeout during and after a window's interval.
Processing Time | Event Time or Status | Watermark | Window Time |
---|---|---|---|
12:00 | 12:00 | 12:00 | 12:00-12:03 |
12:01 | Source idle | 12:00 | 12:00-12:03 |
12:02 | Source idle | 12:00 | 12:00-12:03 |
12:03 | Source idle | 12:00 | 12:00-12:03 |
12:04 | 12:02 | 12:02 | 12:00-12:03 |
12:05 | 12:05 | 12:05 | 12:03-12:06 |
12:06 | Source idle | 12:05 | 12:03-12:06 |
12:07 | Source idle | 12:00 | 12:06-12:09 |
12:08 | Source idle | 12:00 | 12:06-12:09 |
12:09 | 12:09 | 12:09 | 12:09-12:12 |
During the 12:00-12:03
interval, the source idles for three
minutes, but the stream processor doesn't close the window because
processing time is not past the end of the window's interval, and the
source does not remain idle after the window's interval ends. When the
watermark advances to 12:05
, the window closes normally, and the
12:03-12:06
window opens.
When the source goes idle at 12:06
, it remains idle through
12:07
, triggering the idleness timeout and advancing the watermark
to 12:06
.