Claude-codex-settings atlas-stream-processing
Manages MongoDB Atlas Stream Processing (ASP) workflows. Handles workspace provisioning, data source/sink connections, processor lifecycle operations, debugging diagnostics, and tier sizing. Supports Kafka, Atlas clusters, S3, HTTPS, and Lambda integrations for streaming data workloads and event processing. NOT for general MongoDB queries or Atlas cluster management. Requires MongoDB MCP Server with Atlas API credentials.
git clone https://github.com/fcakyon/claude-codex-settings
T=$(mktemp -d) && git clone --depth=1 https://github.com/fcakyon/claude-codex-settings "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/mongodb-skills/skills/atlas-stream-processing" ~/.claude/skills/fcakyon-claude-codex-settings-atlas-stream-processing && rm -rf "$T"
plugins/mongodb-skills/skills/atlas-stream-processing/SKILL.mdMongoDB Atlas Streams
Build, operate, and debug Atlas Stream Processing (ASP) pipelines using four MCP tools from the MongoDB MCP Server.
Prerequisites
This skill requires the MongoDB MCP Server connected with:
- Atlas API credentials (
andapiClientId
)apiClientSecret
The 4 tools:
atlas-streams-discover, atlas-streams-build, atlas-streams-manage, atlas-streams-teardown.
All operations require an Atlas project ID. If unknown, call
atlas-list-projects first to find your project ID.
If MCP tools are unavailable
If the MongoDB MCP Server is not connected or the streams tools are missing, see references/mcp-troubleshooting.md for diagnostic steps and fallback options.
Tool Selection Matrix
atlas-streams-discover — ALL read operations
| Action | Use when |
|---|---|
| See all workspaces in a project |
| Review workspace config, state, region |
| See all connections in a workspace |
| Check connection state, config, health |
| See all processors in a workspace |
| Check processor state, pipeline, config |
| Full health report: state, stats, errors |
| PrivateLink and VPC peering details. Optional: + to get Atlas account details for PrivateLink setup |
Pagination (all list actions):
limit (1-100, default 20), pageNum (default 1).
Response format: responseFormat — "concise" (default for list actions) or "detailed" (default for inspect/diagnose).
atlas-streams-build — ALL create operations
| Resource | Key parameters |
|---|---|
| , , (default SP10), |
| , (Kafka/Cluster/S3/Https/Kinesis/Lambda/SchemaRegistry/Sample), |
| , (must start with , end with /), , |
| (project-level, not tied to a specific workspace) |
Field mapping — only fill fields for the selected resource type:
- resource = "workspace": Fill:
,projectId
,workspaceName
,cloudProvider
,region
,tier
. Leave empty: all connection and processor fields.includeSampleData - resource = "connection": Fill:
,projectId
,workspaceName
,connectionName
,connectionType
. Leave empty: all workspace and processor fields. (See references/connection-configs.md for type-specific schemas.)connectionConfig - resource = "processor": Fill:
,projectId
,workspaceName
,processorName
,pipeline
(recommended),dlq
(optional). Leave empty: all workspace and connection fields. (See references/pipeline-patterns.md for pipeline examples.)autoStart - resource = "privatelink": Fill:
,projectId
. Note: PrivateLink is project-level, not workspace-level.privateLinkConfig
is not required — omit it. Leave empty: all connection and processor fields.workspaceName
atlas-streams-manage — ALL update/state operations
| Action | Notes |
|---|---|
| Begins billing. Optional override, |
| Stops billing. Retains state 45 days |
| Processor must be stopped first. Change pipeline, DLQ, or name |
| Change tier or region |
| Update config (networking is immutable — must delete and recreate) |
/ | VPC peering management |
Field mapping — always fill
projectId, workspaceName, then by action:
→"start-processor"
. Optional:resourceName
,tier
,resumeFromCheckpoint
(ISO 8601 timestamp to resume from a specific point)startAtOperationTime
→"stop-processor"resourceName
→"modify-processor"
. At least one of:resourceName
,pipeline
,dlqnewName
→"update-workspace"
ornewRegionnewTier
→"update-connection"
,resourceName
. Exception: networking config (e.g., PrivateLink) cannot be modified after creation — delete and recreate.connectionConfig
→"accept-peering"
,peeringId
,requesterAccountIdrequesterVpcId
→"reject-peering"peeringId
State pre-checks:
→ errors if processor is already STARTEDstart-processor
→ no-ops if already STOPPED or CREATED (not an error)stop-processor
→ errors if processor is STARTED (must stop first)modify-processor
Processor states:
CREATED → STARTED (via start) → STOPPED (via stop). Can also enter FAILED on runtime errors. Modify requires STOPPED or CREATED state.
Teardown safety checks:
- Processor deletion → auto-stops before deleting (no need to stop manually first)
- Connection deletion → blocks if any running processor references it. Stop/delete referencing processors first.
- Workspace deletion → See detailed workflow below (lines 108-111).
atlas-streams-teardown — ALL delete operations
| Resource | Safety behavior |
|---|---|
| Auto-stops before deleting |
| Blocks if referenced by running processor |
| Cascading delete of all connections and processors |
/ | Remove networking resources |
Field mapping — always fill
projectId, resource, then:
→resource: "workspace"workspaceName
orresource: "connection"
→"processor"
,workspaceNameresourceName
orresource: "privatelink"
→"peering"
(the ID). These are project-level resources, not tied to a specific workspace.resourceName
Before deleting a workspace, inspect it first:
→atlas-streams-discover
— get connection/processor countsinspect-workspace- Present to user: "Workspace X contains N connections and M processors. Deleting permanently removes all. Proceed?"
- Wait for confirmation before calling
atlas-streams-teardown
CRITICAL: Validate Before Creating Processors
You MUST call
before composing any processor pipeline. This is not optional.search-knowledge
- Field validation: Query with the sink/source type, e.g. "Atlas Stream Processing $emit S3 fields" or "Atlas Stream Processing Kafka $source configuration". This catches errors like
vsprefix
for S3path
.$emit - Pattern examples: Query with
for working pipelines, e.g. "Atlas Stream Processing tumbling window example".dataSources: [{"name": "devcenter"}]
Also fetch examples from the official ASP examples repo when building non-trivial processors: https://github.com/mongodb/ASP_example (quickstarts, example processors, Terraform examples). Start with
example_processors/README.md for the full pattern catalog.
Key quickstarts:
| Quickstart | Pattern |
|---|---|
| Inline with (zero infra, ephemeral) |
| Change stream → tumbling window → to Atlas |
| Kafka source → tumbling window rollup → to Atlas |
| Chained processors: rollup → archive to separate collection |
| Real-time Kafka topic monitoring (sinkless, like ) |
Pipeline Rules & Warnings
Invalid constructs — these are NOT valid in streaming pipelines:
,$$NOW
,$$ROOT
— NOT available in stream processing. NEVER use these. Use the document's own timestamp field or$$CURRENT
metadata for event time instead of_stream_meta
.$$NOW- HTTPS connections as
— HTTPS is for$source
enrichment or sink only, NOT as a data source$https - Kafka
without$source
— topic field is requiredtopic - Pipelines without a sink — terminal stage (
,$merge
,$emit
, or$https
async) required for deployed processors (sinkless only works via$externalFunction
)sp.process() - Lambda as
target — Lambda uses$emit
(mid-pipeline enrichment), not$externalFunction$emit
with$validate
— crashes processor; usevalidationAction: "error"
instead"dlq"
Required fields by stage:
(change stream): include$source
to get the full document contentfullDocument: "updateLookup"
(Kinesis): use$source
(NOTstream
orstreamName
)topic
(Kinesis): MUST include$emitpartitionKey
(S3): use$emit
(NOTpath
)prefix
: must include$https
,connectionName
,path
,method
,asonError: "dlq"
: must include$externalFunction
,connectionName
,functionName
,execution
,asonError: "dlq"
: must include$validate
withvalidator
and$jsonSchemavalidationAction: "dlq"
: include$lookup
setting (e.g.,parallelism
) for concurrent I/Oparallelism: 2- AWS connections (S3, Kinesis, Lambda): IAM role ARN must be registered via Atlas Cloud Provider Access first. Always confirm this with user. See references/connection-configs.md for details.
See references/pipeline-patterns.md for stage field examples with JSON syntax.
SchemaRegistry connection:
connectionType must be "SchemaRegistry" (not "Kafka"). Schema type values are case-sensitive (use lowercase avro, not AVRO). See references/connection-configs.md for required fields and auth types.
MCP Tool Behaviors
Elicitation: When creating connections, the build tool auto-collects missing sensitive fields (passwords, bootstrap servers) via MCP elicitation. Do NOT ask the user for these — let the tool collect them.
Auto-normalization:
array → auto-converted to comma-separated stringbootstrapServers
string → auto-wrapped in arrayschemaRegistryUrls
→ defaults todbRoleToExecute
for Cluster connections{role: "readWriteAnyDatabase", type: "BUILT_IN"}
Workspace creation:
includeSampleData defaults to true, which auto-creates the sample_stream_solar connection.
Region naming: The
region field uses Atlas-specific names that differ by cloud provider. Using the wrong format returns a cryptic dataProcessRegion error.
| Provider | Cloud Region | Streams Value |
|---|---|---|
| AWS | us-east-1 | |
| AWS | us-east-2 | |
| AWS | eu-west-1 | |
| GCP | us-central1 | |
| GCP | europe-west1 | |
| Azure | eastus | |
| Azure | westeurope | |
See references/connection-configs.md for the full region mapping table. If unsure, inspect an existing workspace with
atlas-streams-discover → inspect-workspace and check dataProcessRegion.region.
Connection Capabilities — Source/Sink Reference
Know what each connection type can do before creating pipelines:
| Connection Type | As Source ($source) | As Sink ($merge / $emit) | Mid-Pipeline | Notes |
|---|---|---|---|---|
| Cluster | ✅ Change streams | ✅ $merge to collections | ✅ $lookup | Change streams monitor insert/update/delete/replace operations |
| Kafka | ✅ Topic consumer | ✅ $emit to topics | ❌ | Source MUST include field |
| Sample Stream | ✅ Sample data | ❌ Not valid | ❌ | Testing/demo only |
| S3 | ❌ Not valid | ✅ $emit to buckets | ❌ | Sink only - use , , . Supports AWS PrivateLink. |
| Https | ❌ Not valid | ✅ $https as sink | ✅ $https enrichment | Can be used mid-pipeline for enrichment OR as final sink stage |
| AWSLambda | ❌ Not valid | ✅ $externalFunction (async only) | ✅ $externalFunction (sync or async) | Sink: required. Mid-pipeline: or |
| AWS Kinesis | ✅ Stream consumer | ✅ $emit to streams | ❌ | Similar to Kafka pattern |
| SchemaRegistry | ❌ Not valid | ❌ Not valid | ✅ Schema resolution | Metadata only - used by Kafka connections for Avro schemas |
Common connection usage mistakes to avoid:
- ❌ Using
as sink with$externalFunction
→ Must useexecution: "sync"
for sink stageexecution: "async" - ❌ Forgetting change streams exist → Atlas Cluster is a powerful source, not just a sink
- ❌ Using
with Kafka → Use$merge
for Kafka sinks$emit
See references/connection-configs.md for detailed connection configuration schemas by type.
Core Workflows
Setup from scratch
→atlas-streams-discover
(check existing)list-workspaces
→atlas-streams-build
(region near data, SP10 for dev)resource: "workspace"
→atlas-streams-build
(for each source/sink/enrichment)resource: "connection"- Validate connections:
→atlas-streams-discover
+list-connections
for each — verify names match targets, present summary to userinspect-connection - Call
to validate field names. Fetch relevant examples from https://github.com/mongodb/ASP_examplesearch-knowledge
→atlas-streams-build
(with DLQ configured)resource: "processor"
→atlas-streams-manage
(warn about billing)start-processor
Workflow Patterns
Incremental pipeline development (recommended): See references/development-workflow.md for the full 5-phase lifecycle.
- Start with basic
→$source
pipeline (validate connectivity)$merge - Add
stages (validate filtering)$match - Add
/$addFields
transforms (validate reshaping)$project - Add windowing or enrichment (validate aggregation logic)
- Add error handling / DLQ configuration
Modify a processor pipeline:
→atlas-streams-manage
— processor MUST be stopped firstaction: "stop-processor"
→atlas-streams-manage
— provide new pipelineaction: "modify-processor"
→atlas-streams-manage
— restartaction: "start-processor"
Debug a failing processor:
→atlas-streams-discover
— one-shot health report. Always call this first.diagnose-processor- Commit to a specific root cause. Match symptoms to diagnostic patterns:
- Error 419 + "no partitions found" → Kafka topic doesn't exist or is misspelled
- State: FAILED + multiple restarts → connection-level error (bypasses DLQ), check connection config
- State: STARTED + zero output + windowed pipeline → likely idle Kafka partitions blocking window closure; add
to KafkapartitionIdleTimeout
(e.g.,$source
){"size": 30, "unit": "second"} - State: STARTED + zero output + non-windowed → check if source has data; inspect Kafka offset lag
- High memoryUsageBytes approaching tier limit → OOM risk; recommend higher tier
- DLQ count increasing → per-document errors; use MongoDB
on DLQ collection See references/output-diagnostics.md for the full pattern table.find
- Classify processor type before interpreting output volume (alert vs transformation vs filter).
- Provide concrete, ordered fix steps specific to the diagnosed root cause. Do NOT present a list of hypothetical scenarios.
- If detailed logs are needed, direct the user to the Atlas UI: Atlas → Stream Processing → Workspace → Processor → Logs tab.
Chained processors (multi-sink pattern)
CRITICAL: A single pipeline can only have ONE terminal sink (
$merge or $emit). When users request multiple output destinations (e.g., "write to Atlas AND emit to Kafka"), you MUST acknowledge the single-sink constraint and propose chained processors using an intermediate destination. See references/pipeline-patterns.md for the full pattern with examples.
Pre-Deploy & Post-Deploy Checklists
See references/development-workflow.md for the complete pre-deploy quality checklist (connection validation, pipeline validation) and post-deploy verification workflow.
Tier Sizing & Performance
See references/sizing-and-parallelism.md for tier specifications, parallelism formulas, complexity scoring, and performance optimization strategies.
Troubleshooting
See references/development-workflow.md for the complete troubleshooting table covering processor failures, API errors, configuration issues, and performance problems.
Billing & Cost
Atlas Stream Processing has no free tier. All deployed processors incur continuous charges while running.
- Charges are per-hour, calculated per-second, only while the processor is running
stops billing; stopped processors retain state for 45 days at no chargestop-processor- For prototyping without billing: Use
in mongosh — runs pipelines ephemerally without deploying a processorsp.process() - See
for tier pricing and cost optimization strategiesreferences/sizing-and-parallelism.md
Safety Rules
andatlas-streams-teardown
require user confirmation — do not bypassatlas-streams-manage- BEFORE calling
for a workspace, you MUST first inspect the workspace withatlas-streams-teardown
to count connections and processors, then present this information to the user before requesting confirmationatlas-streams-discover - BEFORE creating any processor, you MUST validate all connections per the "Pre-Deployment Validation" section in references/development-workflow.md
- Deleting a workspace removes ALL connections and processors permanently
- After stopping a processor, state is preserved 45 days — then checkpoints are discarded
drops all window state — warn user firstresumeFromCheckpoint: false- Moving processors between workspaces is not supported (must recreate)
- Dry-run / simulation is not supported — explain what you would do and ask for confirmation
- Always warn users about billing before starting processors
- Store API authentication credentials in connection settings, never hardcode in processor pipelines
Reference Files
| File | Read when... |
|---|---|
| Building or modifying processor pipelines |
| Creating connections (type-specific schemas) |
| Following lifecycle management or debugging decision trees |
| Processor output is unexpected (zero, low, or wrong) |
| Choosing tiers, tuning parallelism, or optimizing cost |