Vibecosystem mongodb-patterns

Document modeling, aggregation pipeline, indexing strategy, change streams, and multi-document transactions.

install
source · Clone the upstream repo
git clone https://github.com/vibeeval/vibecosystem
manifest: skills/mongodb-patterns/skill.md
source content

MongoDB Patterns

Document database design and query optimization for MongoDB.

Document Modeling Strategies

// EMBED when: 1:1 or 1:few, data read together, child has no independent lifecycle
interface Order {
  _id: ObjectId
  customerId: ObjectId
  status: 'pending' | 'paid' | 'shipped'
  items: OrderItem[]        // Embedded - always read with order
  shippingAddress: Address  // Embedded - 1:1
  createdAt: Date
}

interface OrderItem {
  productId: ObjectId
  name: string              // Denormalized - avoid join at read time
  price: number             // Snapshot at purchase time
  quantity: number
}

// REFERENCE when: 1:many (unbounded), independent queries, shared across documents
interface Product {
  _id: ObjectId
  name: string
  price: number
  categoryId: ObjectId     // Reference - category queried independently
  reviews: never           // DON'T embed - unbounded array
}

// Bucket pattern: group time-series data into fixed-size documents
interface SensorBucket {
  _id: ObjectId
  sensorId: string
  startTime: Date
  endTime: Date
  count: number            // Track bucket fullness
  measurements: {          // Embed up to 200 per bucket
    timestamp: Date
    value: number
  }[]
}

Indexing Strategy

// Compound index: field order matters (ESR rule)
// Equality → Sort → Range
db.orders.createIndex({
  status: 1,       // Equality: exact match filter
  createdAt: -1,   // Sort: avoid in-memory sort
  total: 1         // Range: price > 100
})

// Partial index: only index documents matching filter (smaller index)
db.orders.createIndex(
  { customerId: 1, createdAt: -1 },
  { partialFilterExpression: { status: 'pending' } }
)

// Text index for search
db.products.createIndex({ name: 'text', description: 'text' })

// TTL index for auto-expiration
db.sessions.createIndex(
  { createdAt: 1 },
  { expireAfterSeconds: 86400 }  // Auto-delete after 24h
)

// Wildcard index for dynamic schemas
db.events.createIndex({ 'metadata.$**': 1 })

Aggregation Pipeline

// Sales analytics: top products by revenue per category
const pipeline = [
  // Stage 1: Filter date range
  { $match: {
    createdAt: { $gte: new Date('2025-01-01'), $lt: new Date('2025-02-01') },
    status: 'paid'
  }},

  // Stage 2: Unwind embedded items array
  { $unwind: '$items' },

  // Stage 3: Group by product
  { $group: {
    _id: '$items.productId',
    productName: { $first: '$items.name' },
    totalRevenue: { $sum: { $multiply: ['$items.price', '$items.quantity'] } },
    totalSold: { $sum: '$items.quantity' },
    orderCount: { $addToSet: '$_id' }
  }},

  // Stage 4: Add computed fields
  { $addFields: {
    orderCount: { $size: '$orderCount' },
    avgOrderValue: { $divide: ['$totalRevenue', { $size: '$orderCount' }] }
  }},

  // Stage 5: Sort by revenue descending
  { $sort: { totalRevenue: -1 } },

  // Stage 6: Limit to top 20
  { $limit: 20 },

  // Stage 7: Lookup category details
  { $lookup: {
    from: 'products',
    localField: '_id',
    foreignField: '_id',
    pipeline: [{ $project: { categoryId: 1 } }],
    as: 'product'
  }}
]

const results = await db.orders.aggregate(pipeline).toArray()

Change Streams (Real-time Reactivity)

async function watchOrderChanges(): Promise<void> {
  const pipeline = [
    { $match: {
      operationType: { $in: ['insert', 'update'] },
      'fullDocument.status': 'paid'
    }}
  ]

  // resumeAfter enables resuming from last processed change (crash recovery)
  const changeStream = db.orders.watch(pipeline, {
    fullDocument: 'updateLookup',  // Include full document on updates
    resumeAfter: await getLastResumeToken()
  })

  changeStream.on('change', async (event) => {
    try {
      await processOrderPayment(event.fullDocument!)
      await saveResumeToken(event._id)  // Persist for crash recovery
    } catch (err) {
      console.error('Change stream processing failed:', err)
    }
  })

  changeStream.on('error', (err) => {
    console.error('Change stream error:', err)
    // Reconnect with resume token
    setTimeout(() => watchOrderChanges(), 5000)
  })
}

Multi-Document Transactions

async function transferFunds(
  fromAccountId: string,
  toAccountId: string,
  amount: number
): Promise<void> {
  const session = client.startSession()

  try {
    await session.withTransaction(async () => {
      const from = await db.accounts.findOne(
        { _id: new ObjectId(fromAccountId) },
        { session }
      )
      if (!from || from.balance < amount) {
        throw new Error('Insufficient funds')
      }

      await db.accounts.updateOne(
        { _id: new ObjectId(fromAccountId) },
        { $inc: { balance: -amount } },
        { session }
      )

      await db.accounts.updateOne(
        { _id: new ObjectId(toAccountId) },
        { $inc: { balance: amount } },
        { session }
      )

      await db.transactions.insertOne({
        from: fromAccountId,
        to: toAccountId,
        amount,
        createdAt: new Date()
      }, { session })
    })
  } finally {
    await session.endSession()
  }
}

Checklist

  • Embed for 1:1 and 1:few; reference for 1:many and many:many
  • Follow ESR (Equality-Sort-Range) for compound index field order
  • Use partial indexes to reduce index size on filtered queries
  • Set TTL indexes for session/temp data auto-cleanup
  • Use aggregation pipeline for analytics (not client-side loops)
  • Change streams with resume tokens for crash-safe event processing
  • Keep documents under 16MB (MongoDB limit)
  • Use
    explain()
    to verify queries use indexes

Anti-Patterns

  • Unbounded arrays: reviews/comments embedded in parent (grows forever, hits 16MB)
  • Missing indexes: full collection scans on frequently queried fields
  • $lookup in hot paths: use denormalization, not joins, for read-heavy queries
  • Storing related data in separate collections when always read together
  • Using MongoDB as a relational database (normalize everything)
  • Not using write concern
    majority
    for critical writes (data loss risk)