Atlas Stream Processing Overview
On this page
Atlas Stream Processing enables you to process streams of complex data using the same query API used in Atlas databases. Atlas Stream Processing allows you to:
Build aggregation pipelines to continuously operate on streaming data without the delays inherent in batch processing.
Perform continuous schema validation to check that messages are properly formed, detect message corruption, and detect late-arriving data.
Continuously publish results to Atlas collections or Apache Kafka clusters, ensuring up-to-date views and analysis of data.
Atlas Stream Processing components belong directly to Atlas projects and operate independent of Atlas clusters.
Streaming Data
A stream is a continuous flow of data originating from one or more sources, taking the form of an append-only log. Examples of data streams include temperature or pressure readings from sensors, records of financial transactions, or change data capture events.
Data streams originate from sources such as Apache Kafka Topics or change streams. You can then write processed data to sinks such as Apache Kafka Topics or Atlas collections.
Streams of data originate in systems with rapidly changing state. Atlas Stream Processing provides native stream processing capabilities to operate on continuous data without the time and computational constraints of an at-rest database.
Architecture
The core abstraction of Atlas Stream Processing is the stream processor. A stream processor is a MongoDB aggregation pipeline query that operates continuously on streaming data from a specified source and writes the output to a sink. To learn more, see Structure of a Stream Processor.
Stream processing takes place on stream processing instances. Each stream processing instance is an Atlas namespace that associates the following:
One or more workers, which provide the RAM and CPUs necessary to run your stream processors.
A cloud provider and cloud region.
A connection registry, which stores the list of available sources and sinks of streaming data.
A security context in which to define user authorizations.
A Connection String to the stream processing instance itself.
When you define a stream processor, it becomes available only for the stream processing instance in which you define it. Each worker can host up to four running stream processors; Atlas Stream Processing automatically scales your stream processing instance up as you start stream processors by provisioning workers as needed. You can deprovision a worker by stopping all stream processors on it. Atlas Stream Processing always prefers to assign a stream processor to an existing worker over provisioning new workers.
Example
You have a stream processing instance running eight stream processors, named
proc01
through proc08
. proc01
through proc04
run on
one worker, proc05
through proc08
run on a second
worker. You start a new stream processor named
proc09
. Atlas Stream Processing provisions a third worker to host
proc09
.
Later, you stop proc03
on the first worker. When you stop
proc09
and restart it, Atlas Stream Processing reassigns proc09
to the
first worker and deprovisions the third worker.
If you start a new stream processor named proc10
before you stop
and restart proc09
, Atlas Stream Processing assigns proc10
to the first
worker in the slot previously allocated to proc03
.
When scaling, Atlas Stream Processing only considers the number of currently running stream processors; it doesn't count defined stream processors that aren't running. The tier of the stream processing instance determines the RAM and CPU allotment of its workers.
Connection registries store one or more connections. Each connection assigns a name to the combination of networking and security details that allow a stream processor to interact with external services. Connections exhibit the following behavior:
Only a connection defined in a given stream processing instance's connection registry can service stream processors hosted on that stream processing instance.
Each connection can service an arbitrary number of stream processors
Only a single connection can serve as a given stream processor's source.
Only a single connection can serve as a given stream processor's sink.
A connection is not innately defined as either a source or a sink. Any given connection can serve either function depending on how a stream processor invokes that connection.
Atlas Stream Processing runs stream processing workers in dedicated customer containers, on multi-tenant infrastructure. For more information on MongoDB security and compliance, see the MongoDB Trust Center.
Structure of a Stream Processor
Stream processors take the form of an aggregation pipeline. Each
processor begins with a $source
stage which connects to a
source and begins receiving a continuous stream of data in the form of
documents. These documents must be valid json
or ejson
. Each
aggregation stage after the $source
consumes each record from the
stream in turn, and can be grouped into three types:
Validation : The
$validate
stage allows you to perform schema validation on ingested documents to ensure that only correctly formatted documents continue on to further processing and determine what happens to incorrectly formatted documents. Validation is optional.Stateless Operations : Aggregation stages or operators which can act directly on the incoming data stream. These aggregations consume, transform, and pass along each document in turn, and can appear at any point between the
$source
and either $emit or $merge stages.Stateful Operations : Aggregation stages or operators which can act only on bounded sets of documents. These aggregations consume, transform, and pass along entire sets of documents at once, and can appear only inside windows.
Windows are pipeline stages that consume streaming data and partition
it into time-delimited sets so that you can apply stages and operators
inapplicable to infinite data such as $group
and
$avg. Each stream processor
can only have one window stage.
After processing the ingested data, the stream processor writes it to
either a streaming data platform using the $emit
stage, or to an
Atlas database with the $merge
stage. These stages are
mutually exclusive with each other, and a stream processor can only
have one such stage.
Checkpoints
Atlas Stream Processing captures the state of a stream processor using checkpoint documents. These documents have unique IDs and are subject to the flow of your stream processor logic. When the last operator of a stream processor finishes acting on a checkpoint document, Atlas Stream Processing commits the checkpoint, generating two types of records:
A single record that validates the checkpoint ID and the stream processor to which it belongs
A set of records describing the state of each stateful operation in the relevant stream processor at the instant Atlas Stream Processing committed the checkpoint.
When you restart a stream processor after an interruption, Atlas Stream Processing queries the last committed checkpoint and resumes operation from the described state.
Dead Letter Queue
Atlas Stream Processing supports the use of an Atlas database collection as a dead letter queue (DLQ). When Atlas Stream Processing cannot process a document from your data stream, it writes the content of the document to the DLQ along with details of the processing failure. You can assign a collection as a DLQ in your stream processor definitions.
To learn more, see Create a Stream Processor.
Next Steps
For more detailed information on core Atlas Stream Processing concepts, read the following: