Docs Menu
Docs Home
/
MongoDB Atlas
/

Atlas Stream Processing Overview

On this page

  • Streaming Data
  • Architecture
  • Structure of a Stream Processor
  • Checkpoints
  • Dead Letter Queue
  • Next Steps

Atlas Stream Processing enables you to process streams of complex data using the same query API used in Atlas databases. Atlas Stream Processing allows you to:

  • Build aggregation pipelines to continuously operate on streaming data without the delays inherent in batch processing.

  • Perform continuous schema validation to check that messages are properly formed, detect message corruption, and detect late-arriving data.

  • Continuously publish results to Atlas collections or Apache Kafka clusters, ensuring up-to-date views and analysis of data.

Atlas Stream Processing components belong directly to Atlas projects and operate independent of Atlas clusters.

A stream is a continuous flow of data originating from one or more sources, taking the form of an append-only log. Examples of data streams include temperature or pressure readings from sensors, records of financial transactions, or change data capture events.

Data streams originate from sources such as Apache Kafka Topics or change streams. You can then write processed data to sinks such as Apache Kafka Topics or Atlas collections.

Streams of data originate in systems with rapidly changing state. Atlas Stream Processing provides native stream processing capabilities to operate on continuous data without the time and computational constraints of an at-rest database.

The core abstraction of Atlas Stream Processing is the stream processor. A stream processor is a MongoDB aggregation pipeline query that operates continuously on streaming data from a specified source and writes the output to a sink. To learn more, see Structure of a Stream Processor.

Stream processing takes place on stream processing instances. Each stream processing instance is an Atlas namespace that associates the following:

  • One or more workers, which provide the RAM and CPUs necessary to run your stream processors.

  • A cloud provider and cloud region.

  • A connection registry, which stores the list of available sources and sinks of streaming data.

  • A security context in which to define user authorizations.

  • A Connection String to the stream processing instance itself.

When you define a stream processor, it becomes available only for the stream processing instance in which you define it. Each worker can host up to four running stream processors; Atlas Stream Processing automatically scales your stream processing instance up as you start stream processors by provisioning workers as needed. You can deprovision a worker by stopping all stream processors on it. Atlas Stream Processing always prefers to assign a stream processor to an existing worker over provisioning new workers.

Example

You have a stream processing instance running eight stream processors, named proc01 through proc08. proc01 through proc04 run on one worker, proc05 through proc08 run on a second worker. You start a new stream processor named proc09. Atlas Stream Processing provisions a third worker to host proc09.

Later, you stop proc03 on the first worker. When you stop proc09 and restart it, Atlas Stream Processing reassigns proc09 to the first worker and deprovisions the third worker.

If you start a new stream processor named proc10 before you stop and restart proc09, Atlas Stream Processing assigns proc10 to the first worker in the slot previously allocated to proc03.

When scaling, Atlas Stream Processing only considers the number of currently running stream processors; it doesn't count defined stream processors that aren't running. The tier of the stream processing instance determines the RAM and CPU allotment of its workers.

Connection registries store one or more connections. Each connection assigns a name to the combination of networking and security details that allow a stream processor to interact with external services. Connections exhibit the following behavior:

  • Only a connection defined in a given stream processing instance's connection registry can service stream processors hosted on that stream processing instance.

  • Each connection can service an arbitrary number of stream processors

  • Only a single connection can serve as a given stream processor's source.

  • Only a single connection can serve as a given stream processor's sink.

  • A connection is not innately defined as either a source or a sink. Any given connection can serve either function depending on how a stream processor invokes that connection.

Atlas Stream Processing runs stream processing workers in dedicated customer containers, on multi-tenant infrastructure. For more information on MongoDB security and compliance, see the MongoDB Trust Center.

Stream processors take the form of an aggregation pipeline. Each processor begins with a $source stage which connects to a source and begins receiving a continuous stream of data in the form of documents. These documents must be valid json or ejson. Each aggregation stage after the $source consumes each record from the stream in turn, and can be grouped into three types:

  • Validation : The $validate stage allows you to perform schema validation on ingested documents to ensure that only correctly formatted documents continue on to further processing and determine what happens to incorrectly formatted documents. Validation is optional.

  • Stateless Operations : Aggregation stages or operators which can act directly on the incoming data stream. These aggregations consume, transform, and pass along each document in turn, and can appear at any point between the $source and either $emit or $merge stages.

  • Stateful Operations : Aggregation stages or operators which can act only on bounded sets of documents. These aggregations consume, transform, and pass along entire sets of documents at once, and can appear only inside windows.

Windows are pipeline stages that consume streaming data and partition it into time-delimited sets so that you can apply stages and operators inapplicable to infinite data such as $group and $avg. Each stream processor can only have one window stage.

After processing the ingested data, the stream processor writes it to either a streaming data platform using the $emit stage, or to an Atlas database with the $merge stage. These stages are mutually exclusive with each other, and a stream processor can only have one such stage.

Atlas Stream Processing captures the state of a stream processor using checkpoint documents. These documents have unique IDs and are subject to the flow of your stream processor logic. When the last operator of a stream processor finishes acting on a checkpoint document, Atlas Stream Processing commits the checkpoint, generating two types of records:

  • A single record that validates the checkpoint ID and the stream processor to which it belongs

  • A set of records describing the state of each stateful operation in the relevant stream processor at the instant Atlas Stream Processing committed the checkpoint.

When you restart a stream processor after an interruption, Atlas Stream Processing queries the last committed checkpoint and resumes operation from the described state.

Atlas Stream Processing supports the use of an Atlas database collection as a dead letter queue (DLQ). When Atlas Stream Processing cannot process a document from your data stream, it writes the content of the document to the DLQ along with details of the processing failure. You can assign a collection as a DLQ in your stream processor definitions.

To learn more, see Create a Stream Processor.

For more detailed information on core Atlas Stream Processing concepts, read the following:

Back

Atlas Stream Processing

Next

Get Started