Docs Menu
Docs Home
/
MongoDB Atlas
/

Manage Stream Processors

On this page

An Atlas Stream Processing stream processor applies the logic of a uniquely named stream aggregation pipeline to your streaming data. Atlas Stream Processing saves each stream processor definition to persistent storage so that it can be reused. You can only use a given stream processor in the stream processing instance its definition is stored in. Atlas Stream Processing supports up to 4 stream processors per worker. For additional processors that exceed this limit, Atlas Stream Processing allocates a new resource.

To create and manage a stream processor, you must have:

Many stream processor commands require you to specify the name of the relevant stream processor in the method invocation. The syntax described in the following sections assumes strictly alphanumeric names. If your stream processor's name includes non-alphanumeric characters such as hyphens (-) or full stops (.), you must enclose the name in square brackets ([]) and double quotes ("") in the method invocation, as in sp.["special-name-stream"].stats().

You can create a stream processor interactively with the sp.process() method. Stream processors that you create interactively exhibit the following behavior:

  • Write output and dead letter queue documents to the shell

  • Begin running immediately upon creation

  • Run for either 10 minutes or until the user stops them

  • Don't persist after stopping

Stream processors that you create interactively are intended for prototyping. To create a persistent stream processor, see Create a Stream Processor.

sp.process() has the following syntax:

sp.process(<pipeline>)
Field
Type
Necessity
Description
pipeline
array
Required
Stream aggregation pipeline you want to apply to your streaming data.
1

Use the connection string associated with your stream processing instance to connect using mongosh.

Example

The following command connects to a stream processing instance as a user named streamOwner using SCRAM-SHA-256 authentication:

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

Provide your user password when prompted.

2

In the mongosh prompt, assign an array containing the aggregation stages you want to apply to a variable named pipeline.

The following example uses the stuff topic in the myKafka connection in the connection registry as the $source, matches records where the temperature field has a value of 46 and emits the processed messages to the output topic of the mySink connection in the connection registry:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

The following command creates a stream processor that applies the logic defined in pipeline.

sp.process(pipeline)

To create a new stream processor with mongosh, use the sp.createStreamProcessor() method. It has the following syntax:

sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument
Type
Necessity
Description
name
string
Required
Logical name for the stream processor. This must be unique within the stream processing instance. This name should contain only alphanumeric characters.
pipeline
array
Required
Stream aggregation pipeline you want to apply to your streaming data.
options
object
Optional
Object defining various optional settings for your stream processor.
options.dlq
object
Conditional
Object assigning a dead letter queue for your stream processing instance. This field is necessary if you define the options field.
options.dlq.connectionName
string
Conditional
Human-readable label that identifies a connection in your connection registry. This connection must reference an Atlas cluster. This field is necessary if you define the options.dlq field.
options.dlq.db
string
Conditional
Name of an Atlas database on the cluster specified in options.dlq.connectionName. This field is necessary if you define the options.dlq field.
options.dlq.coll
string
Conditional
Name of a collection in the database specified in options.dlq.db. This field is necessary if you define the options.dlq field.
1

Use the connection string associated with your stream processing instance to connect using mongosh.

Example

The following command connects to a stream processing instance as a user named streamOwner using SCRAM-SHA-256 authentication:

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

Provide your user password when prompted.

2

In the mongosh prompt, assign an array containing the aggregation stages you want to apply to a variable named pipeline.

The following example uses the stuff topic in the myKafka connection in the connection registry as the $source, matches records where the temperature field has a value of 46 and emits the processed messages to the output topic of the mySink connection in the connection registry:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

In the mongosh prompt, assign an object containing the following properties of your DLQ:

  • Connection name

  • Database name

  • Collection name

The following example defines a DLQ over the cluster01 connection, in the metadata.dlq database collection.

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

The following command creates a stream processor named proc01 that applies the logic defined in pipeline. Documents that throw errors in processing are written to the DLQ defined in deadLetter.

sp.createStreamProcessor("proc01", pipeline, deadLetter)

To start an existing stream processor with mongosh, use the sp.<streamprocessor>.start() method. <streamprocessor> must be the name of a stream processor defined for the current stream processing instance.

For example, to start a stream processor named proc01, run the following command:

sp.proc01.start()

This method returns:

  • true if the stream processor exists and isn't currently running.

  • false if you try to start a stream processor that doesn't exist, or exists and is currently running.

To stop an existing stream processor with mongosh, use the sp.<streamprocessor>.stop() method. <streamprocessor> must be the name of a currently running stream processor defined for the current stream processing instance.

For example, to stop a stream processor named proc01, run the following command:

sp.proc01.stop()

This method returns:

  • true if the stream processor exists and is currently running.

  • false if the stream processor doesn't exist, or if the stream processor isn't currently running.

To delete an existing stream processor with mongosh, use the sp.<streamprocessor>.drop() method. <streamprocessor> must be the name of a stream processor defined for the current stream processing instance.

For example, to drop a stream processor named proc01, run the following command:

sp.proc01.drop()

This method returns:

  • true if the stream processor exists.

  • false if the stream processor doesn't exist.

When you drop a stream processor, all resources that Atlas Stream Processing provisioned for it are destroyed, along with all saved state.

To list all available stream processors on the current stream processing instance with mongosh, use the sp.listStreamProcessors() method. It returns a list of documents containing the name, start time, current state, and pipeline associated with each stream processor. It has the following syntax:

sp.listStreamProcessors(<filter>)

<filter> is a document specifying which field(s) to filter the list by.

Example

The following example shows a return value for an unfiltered request:

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

If you run the command again on the same stream processing instance, filtering for a "state" of "running", you see the following output:

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

To return an array of sampled results from an existing stream processor to STDOUT with mongosh, use the sp.<streamprocessor>.sample() method. <streamprocessor> must be the name of a currently running stream processor defined for the current stream processing instance. For example, the following command samples from a stream processor named proc01.

sp.proc01.sample()

This command runs continuously until you cancel it by using CTRL-C, or until the returned samples cumulatively reach 40 MB in size. The stream processor reports invalid documents in the sample in a _dlqMessage document of the following form:

{
_dlqMessage: {
_stream_meta: {
source: {
type: "<type>"
}
},
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>'
}
}

You can use these messages to diagnose data hygiene issues without defining a dead letter queue collection.

To return a document summarizing the current status of an existing stream processor with mongosh, use the sp.<streamprocessor>.stats() method. streamprocessor must be the name of a currently running stream processor defined for the current stream processing instance. It has the following syntax:

sp.<streamprocessor>.stats({options: {<options>}})

Where options is an optional document with the following fields:

Field
Type
Description
scale
integer
Unit to use for the size of items in the output. By default, Atlas Stream Processing displays item size in bytes. To display in KB, specify a scale of 1024.
verbose
boolean
Flag that specifies the verbosity level of the output document. If set to true, the output document contains a subdocument that reports the statistics of each individual operator in your pipeline. Defaults to false.

The output document has the following fields:

Field
Type
Description
ns
string
The namespace the stream processor is defined in.
stats
object
A document describing the operational state of the stream processor.
stats.name
string
The name of the stream processor.
stats.status
string

The status of the stream processor. This field can have the following values:

  • starting

  • running

  • error

  • stopping

stats.scaleFactor
integer
The scale in which the size field displays. If set to 1, sizes display in bytes. If set to 1024, sizes display in kilobytes.
stats.inputMessageCount
integer
The number of documents published to the stream. A document is considered 'published' to the stream once it passes through the $source stage, not when it passes through the entire pipeline.
stats.inputMessageSize
integer
The number of bytes or kilobytes published to the stream. Bytes are considered 'published' to the stream once they pass through the $source stage, not when it passes through the entire pipeline.
stats.outputMessageCount
integer
The number of documents processed by the stream. A document is considered 'processed' by the stream once it passes through the entire pipeline.
stats.outputMessageSize
integer
The number of bytes or kilobytes processed by the stream. Bytes are considered 'processed' by the stream once they pass through the entire pipeline.
stats.dlqMessageCount
integer
The number of documents sent to the Dead Letter Queue.
stats.dlqMessageSize
integer
The number of bytes or kilobytes sent to the Dead Letter Queue.
stats.stateSize
integer
The number of bytes used by windows to store processor state.
stats.watermark
integer
The timestamp of the current watermark.
stats.operatorStats
array

The statistics for each operator in the processor pipeline. Atlas Stream Processing returns this field only if you pass in the verbose option.

stats.operatorStats provides per-operator versions of many core stats fields:

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.stateSize

Additionally, stats.operatorStats includes the following unique fields:

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTime

stats.operatorStats.maxMemoryUsage
integer
The maximum memory usage of the operator in bytes or kilobytes.
stats.operatorStats.executionTime
integer
The total execution time of the operator in seconds.
stats.kafkaPartitions
array
Offset information for an Apache Kafka broker's partitions. kafkaPartitions applies only to connections using an Apache Kafka source.
stats.kafkaPartitions.partition
integer
The Apache Kafka topic partition number.
stats.kafkaPartitions.currentOffset
integer
The offset that the stream processor is on for the specified partition. This value equals the previous offset that the stream processor processed plus 1.
stats.kafkaPartitions.checkpointOffset
integer
The offset that the stream processor last committed to the Apache Kafka broker and the checkpoint for the specified partition. All messages through this offset are recorded in the last checkpoint.

For example, the following shows the status of a stream processor named proc01 on a stream processing instance named inst01 with item sizes displayed in KB:

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}

Back

Manage Connections

Next

Aggregation Pipelines