Docs Menu
Docs Home
/
MongoDB Atlas
/

Get Started with Atlas Stream Processing

On this page

  • Prerequisites
  • Procedure
  • In Atlas, go to the Stream Processing page for your project.
  • Create a Stream Processing Instance.
  • Get the stream processing instance connection string.
  • Add the sample stream connection to the connection registry.
  • Add a MongoDB Atlas connection to the connection registry.
  • Verify that your streaming data source emits messages.
  • Create a persistent stream processor.
  • Start the stream processor.
  • Verify the output of the stream processor.
  • Drop the stream processor.
  • Next Steps

This tutorial takes you through the steps of setting up Atlas Stream Processing and running your first stream processor.

To complete this tutorial you need:

  • An Atlas project

  • mongosh version 2.0 or higher

  • An Atlas user with the Project Owner or the Project Stream Processing Owner role to manage a Stream Processing Instance and Connection Registry

    Note

    The Project Owner role allows you to create database deployments, manage project access and project settings, manage IP Access List entries, and more.

    The Project Stream Processing Owner role enables Atlas Stream Processing actions such as viewing, creating, deleting, and editing stream processing instances, and viewing, adding, modifying, and deleting connections in the connection registry.

    See Project Roles to learn more about the differences between the two roles.

  • A database user with the atlasAdmin role to create and run stream processors

  • An Atlas cluster

1
  1. If it's not already displayed, select the organization that contains your project from the Organizations menu in the navigation bar.

  2. If it's not already displayed, select your project from the Projects menu in the navigation bar.

  3. In the sidebar, click Stream Processing under the Services heading.

2
  1. Click Get Started in the lower-right corner. Atlas provides a brief explanation of core Atlas Stream Processing components.

  2. Click the Create instance button.

  3. On the Create a stream processing instance page, configure your instance as follows:

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Instance Name: tutorialInstance

  4. Click Create.

3
  1. Locate the overview panel of your stream processing instance and click Connect.

  2. Select I have the MongoDB shell installed.

  3. From the Select your mongo shell version dropdown menu, select the latest version of mongosh.

  4. Copy the connection string provided under Run your connection string in your command line. You will need this in a later step.

  5. Click Close.

4

This connection serves as your streaming data source.

  1. In the pane for your stream processing instance, click Configure.

  2. Click Add sample connection. This creates a sample_stream_solar connection of type Sample Stream. Click on the Connection Registry tab to verify that the connection exists.

5

This connection serves as our streaming data sink.

  1. In the pane for your stream processing instance, click Configure.

  2. In the Connection Registry tab, click + Add Connection in the upper right.

  3. Click Atlas Database. In the Connection Name field, enter mongodb1. From the Atlas Cluster drop down, select an Atlas cluster without any data stored on it.

  4. Click Add connection.

6

To do this, create a minimal stream processor.

  1. Open a terminal application of your choice.

  2. Connect to your stream processing instance with mongosh.

    Paste the mongosh connection string that you copied in a previous step into your terminal, where <atlas-stream-processing-url> is the URL of your stream processing instance and <username> is a user with the atlasAdmin role.

    mongosh "mongodb://<atlas-stream-processing-url>/"
    --tls --authenticationDatabase admin --username <username>

    Enter your password when prompted.

  3. Create the stream processor.

    Copy the following code into your mongosh prompt:

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    Verify that data from the sample_stream_solar connection displays to the console, and terminate the process.

    Stream processors you create with sp.process() don't persist after you terminate them.

7

Copy the following code into your mongosh prompt:

let s = {
$source: {
connectionName: "sample_stream_solar",
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
}
let u = {
$unwind: {
path: "$obs"
}
}
let g = {
$group: {
_id: "$group_id",
max: {
$max: "$obs.watts"
},
avg: {
$avg: "$obs.watts"
}
}
}
let t = {
$tumblingWindow: {
interval: {
size: NumberInt(1),
unit: "second"
},
pipeline: [u, g]
}
}
let m = {
$merge: {
into: {
connectionName: "mongodb1",
db: "MySolar",
coll: "solar"
}
}
}
sp.createStreamProcessor("avgWatts", [s, t, m])

This creates a stream processor named avgWatts that applies the previously defined query and writes the processed data to the solar collection of the MySolar database on the cluster you connected to. It returns the average and peak wattage of all observed solar panels over 1 hour intervals.

To learn more about how Atlas Stream Processing writes to at-rest databases, see $merge.

8

Run the following command in mongosh:

sp.avgWatts.start()
9

To verify that the processor is active, run the following command in mongosh:

sp.avgWatts.stats()

This command reports operational statistics of the avgWatts stream processor.

To verify that the stream processor is writing data to your Atlas cluster:

  1. In Atlas, go to the Clusters page for your project.

    1. If it is not already displayed, select the organization that contains your desired project from the Organizations menu in the navigation bar.

    2. If it is not already displayed, select your desired project from the Projects menu in the navigation bar.

    3. If the Clusters page is not already displayed, click Database in the sidebar.

  2. Click the Browse Collections button for your cluster.

  3. View the MySolar collection.

10

Run the following command in mongosh:

sp.avgWatts.drop()

To confirm that you have dropped avgWatts, list all your available stream processors:

sp.listStreamProcessors()

Learn how to:

Back

Overview

Next

Stream Processor Windows