MongoDB Atlas Streams
Build, operate, and debug Atlas Stream Processing (ASP) pipelines using four MCP tools from the MongoDB MCP Server.
Prerequisites
This skill requires the MongoDB MCP Server connected with:
- Atlas API credentials (
apiClientIdandapiClientSecret)
The 4 tools: atlas-streams-discover, atlas-streams-build, atlas-streams-manage, atlas-streams-teardown.
All operations require an Atlas project ID. If unknown, call atlas-list-projects first to find your project ID.
If MCP tools are unavailable
If the MongoDB MCP Server is not connected or the streams tools are missing, see references/mcp-troubleshooting.md for diagnostic steps and fallback options.
Tool Selection Matrix
atlas-streams-discover — ALL read operations
| Action | Use when |
|---|---|
list-workspaces | See all workspaces in a project |
inspect-workspace | Review workspace config, state, region |
list-connections | See all connections in a workspace |
inspect-connection | Check connection state, config, health |
list-processors | See all processors in a workspace |
inspect-processor | Check processor state, pipeline, config |
diagnose-processor | Full health report: state, stats, errors |
get-networking | PrivateLink and VPC peering details. Optional: cloudProvider + region to get Atlas account details for PrivateLink setup |
Pagination (all list actions): limit (1-100, default 20), pageNum (default 1).
Response format: responseFormat — "concise" (default for list actions) or "detailed" (default for inspect/diagnose).
atlas-streams-build — ALL create operations
| Resource | Key parameters |
|---|---|
workspace | cloudProvider, region, tier (default SP10), includeSampleData |
connection | connectionName, connectionType (Kafka/Cluster/S3/Https/Kinesis/Lambda/SchemaRegistry/Sample), connectionConfig |
processor | processorName, pipeline (must start with $source, end with $merge/$emit), dlq, autoStart |
privatelink | privateLinkConfig (project-level, not tied to a specific workspace) |
Field mapping — only fill fields for the selected resource type:
- resource = "workspace": Fill:
projectId,workspaceName,cloudProvider,region,tier,includeSampleData. Leave empty: all connection and processor fields. - resource = "connection": Fill:
projectId,workspaceName,connectionName,connectionType,connectionConfig. Leave empty: all workspace and processor fields. (See references/connection-configs.md for type-specific schemas.) - resource = "processor": Fill:
projectId,workspaceName,processorName,pipeline,dlq(recommended),autoStart(optional). Leave empty: all workspace and connection fields. (See references/pipeline-patterns.md for pipeline examples.) - resource = "privatelink": Fill:
projectId,privateLinkConfig. Note: PrivateLink is project-level, not workspace-level.workspaceNameis not required — omit it. Leave empty: all connection and processor fields.
atlas-streams-manage — ALL update/state operations
| Action | Notes |
|---|---|
start-processor | Begins billing. Optional tier override, resumeFromCheckpoint |
stop-processor | Stops billing. Retains state 45 days |
modify-processor | Processor must be stopped first. Change pipeline, DLQ, or name |
update-workspace | Change tier or region |
update-connection | Update config (networking is immutable — must delete and recreate) |
accept-peering / reject-peering | VPC peering management |
Field mapping — always fill projectId, workspaceName, then by action:
"start-processor"→resourceName. Optional:tier,resumeFromCheckpoint,startAtOperationTime(ISO 8601 timestamp to resume from a specific point)"stop-processor"→resourceName"modify-processor"→resourceName. At least one of:pipeline,dlq,newName"update-workspace"→newRegionornewTier"update-connection"→resourceName,connectionConfig. Exception: networking config (e.g., PrivateLink) cannot be modified after creation — delete and recreate."accept-peering"→peeringId,requesterAccountId,requesterVpcId"reject-peering"→peeringId
State pre-checks:
start-processor→ errors if processor is already STARTEDstop-processor→ no-ops if already STOPPED or CREATED (not an error)modify-processor→ errors if processor is STARTED (must stop first)
Processor states: CREATED → STARTED (via start) → STOPPED (via stop). Can also enter FAILED on runtime errors. Modify requires STOPPED or CREATED state.
Teardown safety checks:
- Processor deletion → auto-stops before deleting (no need to stop manually first)
- Connection deletion → blocks if any running processor references it. Stop/delete referencing processors first.
- Workspace deletion → See detailed workflow below (lines 108-111).
atlas-streams-teardown — ALL delete operations
| Resource | Safety behavior |
|---|---|
processor | Auto-stops before deleting |
connection | Blocks if referenced by running processor |
workspace | Cascading delete of all connections and processors |
privatelink / peering | Remove networking resources |
Field mapping — always fill projectId, resource, then:
resource: "workspace"→workspaceNameresource: "connection"or"processor"→workspaceName,resourceNameresource: "privatelink"or"peering"→resourceName(the ID). These are project-level resources, not tied to a specific workspace.
Before deleting a workspace, inspect it first:
atlas-streams-discover→inspect-workspace— get connection/processor counts- Present to user: "Workspace X contains N connections and M processors. Deleting permanently removes all. Proceed?"
- Wait for confirmation before calling
atlas-streams-teardown
CRITICAL: Validate Before Creating Processors
You MUST call search-knowledge before composing any processor pipeline. This is not optional.
- Field validation: Query with the sink/source type, e.g. "Atlas Stream Processing $emit S3 fields" or "Atlas Stream Processing Kafka $source configuration". This catches errors like
prefixvspathfor S3$emit. - Pattern examples: Query with
dataSources: [{"name": "devcenter"}]for working pipelines, e.g. "Atlas Stream Processing tumbling window example".
Also fetch examples from the official ASP examples repo when building non-trivial processors: https://github.com/mongodb/ASP_example (quickstarts, example processors, Terraform examples). Start with example_processors/README.md for the full pattern catalog.
Key quickstarts:
| Quickstart | Pattern |
|---|---|
00_hello_world.json | Inline $source.documents with $match (zero infra, ephemeral) |
01_changestream_basic.json | Change stream → tumbling window → $merge to Atlas |
03_kafka_to_mongo.json | Kafka source → tumbling window rollup → $merge to Atlas |
04_mongo_to_mongo.json | Chained processors: rollup → archive to separate collection |
05_kafka_tail.json | Real-time Kafka topic monitoring (sinkless, like tail -f) |
Pipeline Rules & Warnings
Invalid constructs — these are NOT valid in streaming pipelines:
$$NOW,$$ROOT,$$CURRENT— NOT available in stream processing. NEVER use these. Use the document's own timestamp field or_stream_metametadata for event time instead of$$NOW.- HTTPS connections as
$source— HTTPS is for$httpsenrichment or sink only, NOT as a data source - Kafka
$sourcewithouttopic— topic field is required - Pipelines without a sink — terminal stage (
$merge,