$lookup
On this page
Definition
The $lookup performs a left outer
join of the stream of messages from your $source
to an
Atlas collection in your Connection Registry.
Depending on your use case, a $lookup
pipeline stage uses one of
the following three syntaxes:
To learn more, see $lookup Syntax.
Warning
Using $lookup
to enrich a stream can reduce stream processing
speed.
The following prototype form illustrates all available fields:
{ "$lookup": { "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "pipeline": [ <pipeline to run> ], "as": "<output-array-field>" } }
Syntax
The $lookup
stage takes a document with the following fields:
Field | Type | Necessity | Description |
---|---|---|---|
from | document | Required | Document that specifies a collection in an Atlas database
to join to messages from your $source . You must
specify a collection from your Connection Registry. You must
specify a value for all fields in this document. |
from.connectionName | string | Required | Name of the connection in your Connection Registry. |
from.db | string | Required | Name of the Atlas database that contains the
collection you want to join. |
from.coll | string | Required | Name of the collection you want to join. |
localField | string | Conditional | Field from your This field is part of the following syntaxes: |
foreignField | string | Conditional | Field from documents in the This field is part of the following syntaxes: |
let | document | Conditional | |
pipeline | document | Conditional | Specifies the This field is part of the following syntaxes: |
as | string | Required | Name of the new array field to add to the input documents. This
new array field contains the matching documents from the
from collection. If the specified name already exists as a
field in the input document, that field is overwritten. |
Behavior
The Atlas Stream Processing version of $lookup
performs a left outer join of messages from your $source
and the
documents in a specified Atlas collection. This version behaves
similarly to the $lookup
stage available in a standard
MongoDB database. However, this version requires that you specify an
Atlas collection from your
Connection Registry as the value for
the from
field.
The pipeline can contain a nested
$lookup
stage. If you include a nested $lookup
stage in your pipeline, you must use the standard from
syntax to
specify a collection in the same remote Atlas connection as the
outer $lookup
stage.
Example
$lookup : { from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"}, …, pipeline: [ …, { $lookup: { from: "coll2", …, } }, …, ] }
If your pipeline has both $lookup
and $merge
on the same collection, Atlas Stream Processing results might vary if you
try to maintain an incremental view. Atlas Stream Processing processes
multiple source messages simultaneously and then merges them all
together. If multiple messages have the same ID, which both
$lookup
and $merge
use, Atlas Stream Processing might
return results that haven't yet materialized.
Example
Consider the following input stream:
{ _id: 1, count: 2 } { _id: 1, count: 3 }
Suppose your query contains the following inside the pipeline:
{ ..., pipeline: [ { $lookup on _id == foreignDoc._id from collection A } { $project: { _id: 1, count: $count + $foreignDoc.count } } { $merge: { into collection A } } ] }
If you are trying to maintain an incremental view, you might expect a result similar to the following:
{ _id: 1, count: 5 }
However, Atlas Stream Processing might return a count of 5
or 3
depending on whether Atlas Stream Processing has processed the documents.
For more information, see $lookup
.