$source
On this page
Definition
The $source
stage specifies a connection in the
Connection Registry to stream data
from. The following connection types are supported:
Apache Kafka broker
MongoDB collection change stream
MongoDB database change stream
Document array
Note
You can't use Atlas serverless instances as a
$source
.
Syntax
Apache Kafka Broker
To operate on streaming data from an Apache Kafka broker, the
$source
stage has the following prototype form:
{ "$source": { "connectionName": "<registered-connection>", "topic" : "<source-topic>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "partitionIdleTimeout": { "size": <duration-number>, "unit": "<duration-unit>" }, "config": { "auto_offset_reset": "<start-event>", "group_id": "<group-id>", "keyFormat": "<deserialization-type>", "keyFormatError": "<error-handling>" }, } }
The $source
stage takes a document with the following fields:
Field | Type | Necessity | Description | |
---|---|---|---|---|
connectionName | string | Required | Label that identifies the connection in the
Connection Registry, to
ingest data from. | |
topic | string | Required | Name of the Apache Kafka topic to stream messages from. | |
timeField | document | Optional | Document that defines an authoritative timestamp for incoming messages. If you use
If you do not declare a | |
tsFieldName | string | Optional | Name that overrides the name of the timestamp field projected by the $source. The $source stage in an Atlas Stream Processing pipeline projects a field
called | |
partitionIdleTimeout | document | Optional | Document specifying the amount of time that a partition is
allowed to be idle before it is ignored in watermark
calculations. | |
partitionIdleTimeout.size | integer | Optional | Number specifying the duration of the partition idle timeout. | |
partitionIdleTimeout.unit | string | Optional | Unit of time for the duration of the partition idle timeout. The value of
| |
config | document | Optional | Document containing fields that override various default
values. | |
config.auto_offset_reset | string | Optional | Specifies which event in the Apache Kafka source topic to begin
ingestion with.
Defaults to | |
config.group_id | string | Optional | ID of the kafka consumer group to associate with the stream processor. If omitted, Atlas Stream Processing associates the stream processing instance with an auto-generated ID in the following format:
Atlas Stream Processing commits partition offsets to the Apache Kafka broker for the specified consumer group ID after a checkpoint is committed. It commits an offset when messages up through that offset are durably recorded in a checkpoint. This allows you to track the offset lag and progress of the stream processor directly from the Kafka broker consumer group metadata. | |
config.keyFormat | string | Optional | Data type used to deserialize Apache Kafka key data. Must be one of the following values:
Defaults to | |
config.keyFormatError | string | Optional | How to handle errors encountered when deserializing Apache Kafka key data. Must be one of the following values:
|
Note
Atlas Stream Processing requires that documents in the source data stream be
valid json
or ejson
. Atlas Stream Processing sets the documents that
don't meet this requirement to your dead letter queue if you have configured one.
MongoDB Collection Change Stream
To operate on streaming data from an Atlas collection change
stream, the $source
stage has the following prototype form:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "db" : "<source-db>", "coll" : ["<source-coll>",...], "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
The $source
stage takes a document with the following fields:
Field | Type | Necessity | Description |
---|---|---|---|
connectionName | string | Conditional | Label that identifies the connection in the
Connection Registry, to
ingest data from. |
timeField | document | Optional | Document that defines an authoritative timestamp for incoming messages. If you use
If you do not declare a |
tsFieldName | string | Optional | Name that overrides the name of default timestamp fields declared by the source. Atlas Stream Processing pipelines internally add a field to incoming
messages called |
db | string | Required | Name of a MongoDB database hosted on the Atlas instance
specified by connectionName . The change stream of this
database acts as the streaming data source. |
coll | string or array of strings | Required | Name of one or more MongoDB collections hosted on the Atlas
instance specified by connectionName . The change stream of
these collections act as the streaming data source. If you omit
this field, your stream processor will source from a
MongoDB Database Change Stream. |
config | document | Optional | Document containing fields that override various default
values. |
config.startAfter | token | Conditional | The change event after which the source begins reporting. This takes the form of a resume token. You can use only one of either |
config.startAtOperationTime | timestamp | Conditional | The operation time after which the source should begin reporting. You can use only one of either |
config.fullDocument | string | Conditional | Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following:
If you do not specify a value for fullDocument, it defaults to
To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection. |
config.fullDocumentOnly | boolean | Conditional | Setting that controls whether a change stream source returns
the entire change event document including all metadata, or
only the contents of To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection. |
config.fullDocumentBeforeChange | string | Optional | Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following:
If you do not specify a value for To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection. |
config.pipeline | document | Optional | Specifies an aggregation pipeline to filter change stream output
at the point of origin. This pipeline must conform to the parameters
described in change-stream-modify-output. |
MongoDB Database Change Stream
To operate on streaming data from an Atlas database change
stream, the $source
stage has the following prototype form:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "db" : "<source-db>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
The $source
stage takes a document with the following fields:
Field | Type | Necessity | Description |
---|---|---|---|
connectionName | string | Conditional | Label that identifies the connection in the
Connection Registry, to
ingest data from. |
timeField | document | Optional | Document that defines an authoritative timestamp for incoming messages. If you use
If you do not declare a |
tsFieldName | string | Optional | Name that overrides the name of default timestamp fields declared by the source. Atlas Stream Processing pipelines internally add a field to incoming
messages called |
db | string | Required | Name of a MongoDB database hosted on the Atlas instance
specified by connectionName . The change stream of this
database acts as the streaming data source. |
config | document | Optional | Document containing fields that override various default
values. |
config.startAfter | token | Conditional | The change event after which the source begins reporting. This takes the form of a resume token. You can use only one of either |
config.startAtOperationTime | timestamp | Conditional | The operation time after which the source should begin reporting. You can use only one of either |
config.fullDocument | string | Conditional | Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following:
If you do not specify a value for fullDocument, it defaults to
To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database. |
config.fullDocumentOnly | boolean | Conditional | Setting that controls whether a change stream source returns
the entire change event document including all metadata, or
only the contents of To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database. |
config.fullDocumentBeforeChange | string | Optional | Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following:
If you do not specify a value for To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database. |
config.pipeline | document | Optional | Specifies an aggregation pipeline to filter change stream output
at the point of origin. This pipeline must conform to the parameters
described in change-stream-modify-output. |
Document Array
To operate on an array of documents, the $source
stage has the
following prototype form:
{ "$source": { "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "documents" : [{source-doc},...] | <expression> } }
The $source
stage takes a document with the following fields:
Field | Type | Necessity | Description |
---|---|---|---|
timeField | document | Optional | Document that defines an authoritative timestamp for incoming messages. If you use
If you do not declare a |
tsFieldName | string | Optional | Name that overrides the name of default timestamp fields declared by the source. Atlas Stream Processing pipelines internally add a field to incoming
messages called |
documents | array | Conditional | Array of documents to use as a streaming data source. The
value of this field can either be an array of objects or an
expression that evaluates to an array of objects. Do not use this
field when using the connectionName field. |
Behavior
$source
must be the first stage of any pipeline it appears
in. You can use only one $source
stage per pipeline.