Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$validate

On this page

  • Definition
  • Syntax
  • Behavior
  • Validator Example

The $validate stage checks streaming documents for conformity to a schema of expected ranges, values, or datatypes.

$validate

A $validate pipeline stage has the following prototype form:

{
"$validate": {
"validator": { <filter> },
"validationAction" : "discard" | "dlq"
}
}

The $validate stage takes a document with the following fields:

Field
Type
Necessity
Description
validator
document
Required

Document of expressions used to validate incoming messages against a user-defined schema. You can use all but the following query operators to define validation expressions:

  • $near

  • $nearSphere

  • $text

  • $where

validationAction
string
Optional

Specifies the action to take when a message violates the user-defined schema. You can specify one of the following values:

  • discard: Discards the message. If you do not specify a value for validationAction, this is the default behavior.

  • dlq: Logs the violation to the collection defined in your Stream Processor configuration and performs a best-effort discard without transactional guarantees.

You can use $validate at any point in a pipeline after the $source stage, and before the $emit or $merge stage.

If you specify either the discard or dlq options for the validationAction field, Atlas Stream Processing logs messages which fail validation in the following format:

{
"t": <datetime>,
"s": "<severity-level>",
"c": "streams-<job-name>",
"ctx": "<processed-pipeline>",
"msg": "<message-body>",
"attrs": {
<result-of-logAttributes-evaluation>
},
"tags": <array-of-strings>,
"truncated": {
<truncation-description>
},
"size": <size-of-entry>
}

The following table describes the log entry fields:

Field
Type
Description
attrs
document
Document containing the results of evaluating the logAttributes field in the $validate definition. The result is a list of fields.
c
string
Name of the specific stream processing job in which the failure occurred.
ctx
string
Name of the streaming data pipeline being processed.
msg
string
Body of the message that failed validation.

Atlas Stream Processing supports only JSON Schema Draft 4 or earlier.

The following document shows an example validator expression using $and to perform a logical AND operation:

{
$validate: {
validator: {
$and: [{
$expr: {
$ne: [
"$Racer_Name",
"Pace Car"
]
}
},
{
$jsonSchema: {
required: [ "Racer_Num", "Racer_Name", "lap", "Corner_Num", "timestamp" ],
properties: {
Racer_Num: {
bsonType: "int",
description: "'Racer_Num' is the integer number of the race car and is required"
},
Racer_Name: {
bsonType: "string",
description: "'Racer_Name' must be a string and is required"
},
lap: {
bsonType: "int",
minimum: 1,
description: "'lap' must be a int and is required"
},
Corner_Num: {
bsonType: "int",
minimum: 1,
maximum: 4,
description: "'Corner_Num' must be a int between 1 and 4 and is required"
},
timestamp: {
bsonType: "string",
pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}$",
description: "'timestamp' must be a string matching iso date pattern and is required"
}
}
}
}]
}, validationAction : "dlq"
}
}

Back

$source

Next

$lookup