Vibecosystem event-driven-patterns
Message queue patterns with BullMQ, Kafka, RabbitMQ - saga, outbox, dead letter queue, exactly-once semantics.
install
source · Clone the upstream repo
git clone https://github.com/vibeeval/vibecosystem
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/vibeeval/vibecosystem "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/event-driven-patterns" ~/.claude/skills/vibeeval-vibecosystem-event-driven-patterns && rm -rf "$T"
manifest:
skills/event-driven-patterns/SKILL.mdsource content
Event-Driven Patterns
Message queue and event bus patterns for decoupled, reliable async processing.
BullMQ Setup (Producer + Consumer)
import { Queue, Worker, QueueEvents } from 'bullmq' import Redis from 'ioredis' const connection = new Redis(process.env.REDIS_URL!, { maxRetriesPerRequest: null }) // Producer: define queue const emailQueue = new Queue('email', { connection }) const marketQueue = new Queue('market-resolution', { connection }) // Add job with options await emailQueue.add( 'send-welcome', { userId: 'abc', email: 'user@example.com' }, { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, removeOnComplete: { count: 1000 }, removeOnFail: { count: 5000 } } ) // Delayed job (send after 1 hour) await emailQueue.add('send-reminder', { userId: 'abc' }, { delay: 3_600_000 }) // Consumer: named processor const emailWorker = new Worker( 'email', async (job) => { if (job.name === 'send-welcome') { await sendWelcomeEmail(job.data.email) } else if (job.name === 'send-reminder') { await sendReminderEmail(job.data.userId) } // Return value stored in job.returnvalue return { sent: true, at: new Date().toISOString() } }, { connection, concurrency: 10 } ) emailWorker.on('completed', (job, result) => { console.log(`Job ${job.id} completed:`, result) }) emailWorker.on('failed', (job, err) => { console.error(`Job ${job?.id} failed after ${job?.attemptsMade} attempts:`, err.message) })
Retry Policies and Dead Letter Queue
import { Queue, Worker, QueueEvents } from 'bullmq' const dlqQueue = new Queue('dead-letter', { connection }) const processingWorker = new Worker( 'orders', async (job) => { // Attempt processing await processOrder(job.data) }, { connection, concurrency: 5 } ) // Move failed jobs to DLQ after all retries exhausted processingWorker.on('failed', async (job, err) => { if (!job) return const isExhausted = job.attemptsMade >= (job.opts.attempts || 1) if (isExhausted) { await dlqQueue.add('order-failed', { originalJob: job.name, data: job.data, error: err.message, failedAt: new Date().toISOString(), attempts: job.attemptsMade }) console.error(`Job moved to DLQ: ${job.id}`) } }) // DLQ consumer: alert + manual review const dlqWorker = new Worker('dead-letter', async (job) => { await alertOpsTeam({ message: `Job failed permanently: ${job.data.originalJob}`, data: job.data }) }, { connection })
Transactional Outbox Pattern
// Problem: write to DB and publish event atomically (no lost messages) // Solution: write event to outbox table in same transaction, relay worker reads and publishes // DB schema // CREATE TABLE outbox ( // id UUID PRIMARY KEY DEFAULT gen_random_uuid(), // aggregate_type TEXT NOT NULL, // aggregate_id TEXT NOT NULL, // event_type TEXT NOT NULL, // payload JSONB NOT NULL, // published_at TIMESTAMPTZ, // created_at TIMESTAMPTZ DEFAULT now() // ); async function createMarketWithOutbox(data: CreateMarketDto): Promise<Market> { return db.$transaction(async (tx) => { // 1. Write domain entity const market = await tx.market.create({ data }) // 2. Write outbox event in SAME transaction await tx.outbox.create({ data: { aggregateType: 'Market', aggregateId: market.id, eventType: 'MarketCreated', payload: { marketId: market.id, name: market.name, createdAt: market.createdAt } } }) return market }) } // Relay worker: poll outbox and publish (runs separately) async function outboxRelay(): Promise<void> { const unpublished = await db.outbox.findMany({ where: { publishedAt: null }, orderBy: { createdAt: 'asc' }, take: 100 }) for (const event of unpublished) { try { await publishToQueue(event.eventType, event.payload) await db.outbox.update({ where: { id: event.id }, data: { publishedAt: new Date() } }) } catch (err) { console.error(`Outbox relay failed for ${event.id}:`, err) } } } // Poll every second setInterval(outboxRelay, 1000)
Saga Pattern (Orchestration)
// Orchestrator drives the saga steps and handles compensation interface SagaStep<T> { name: string execute: (ctx: T) => Promise<Partial<T>> compensate: (ctx: T) => Promise<void> } class SagaOrchestrator<T extends Record<string, unknown>> { constructor(private steps: SagaStep<T>[]) {} async run(initialContext: T): Promise<T> { const ctx = { ...initialContext } const completed: SagaStep<T>[] = [] for (const step of this.steps) { try { const result = await step.execute(ctx) Object.assign(ctx, result) completed.push(step) console.log(`Saga step '${step.name}' succeeded`) } catch (err) { console.error(`Saga step '${step.name}' failed, compensating...`) // Compensate in reverse order for (const done of completed.reverse()) { try { await done.compensate(ctx) console.log(`Compensated '${done.name}'`) } catch (compensateErr) { console.error(`Compensation '${done.name}' failed:`, compensateErr) // Log to manual intervention queue } } throw err } } return ctx } } // Order fulfillment saga interface OrderContext { orderId: string userId: string amount: number paymentId?: string reservationId?: string } const orderSaga = new SagaOrchestrator<OrderContext>([ { name: 'reserve-inventory', execute: async (ctx) => { const reservationId = await inventory.reserve(ctx.orderId) return { reservationId } }, compensate: async (ctx) => { if (ctx.reservationId) await inventory.release(ctx.reservationId) } }, { name: 'charge-payment', execute: async (ctx) => { const paymentId = await payments.charge(ctx.userId, ctx.amount) return { paymentId } }, compensate: async (ctx) => { if (ctx.paymentId) await payments.refund(ctx.paymentId) } }, { name: 'confirm-order', execute: async (ctx) => { await orders.confirm(ctx.orderId) return {} }, compensate: async (ctx) => { await orders.cancel(ctx.orderId) } } ])
Idempotent Consumers (Exactly-Once Semantics)
// Even if a message is delivered twice, process it only once async function processEventIdempotent( eventId: string, handler: () => Promise<void> ): Promise<void> { const key = `processed:${eventId}` // SET NX: only set if not exists (atomic) const isNew = await redis.set(key, '1', 'EX', 86_400, 'NX') if (!isNew) { console.log(`Event ${eventId} already processed, skipping`) return } try { await handler() } catch (err) { // Release lock so it can be retried await redis.del(key) throw err } } // In BullMQ worker const worker = new Worker('payments', async (job) => { await processEventIdempotent(job.id!, async () => { await processPayment(job.data) }) }, { connection })
Fan-Out Pattern
// One event → multiple consumers in parallel const eventBus = new Queue('events', { connection }) async function publishMarketResolved(marketId: string, outcome: string): Promise<void> { const event = { marketId, outcome, resolvedAt: new Date().toISOString() } // Fan-out to multiple downstream queues await Promise.all([ notificationQueue.add('market-resolved', event), payoutQueue.add('process-payouts', event), analyticsQueue.add('track-resolution', event), feedQueue.add('update-feed', event) ]) } // Each queue has its own worker with appropriate concurrency and retry config
Priority Queue
// Higher priority number = processed first in BullMQ await criticalQueue.add('urgent-payout', data, { priority: 1 }) // highest await normalQueue.add('regular-email', data, { priority: 10 }) await batchQueue.add('report-generation', data, { priority: 100 }) // lowest // Worker respects priority automatically when picking next job
Queue Monitoring
import { QueueEvents } from 'bullmq' const queueEvents = new QueueEvents('email', { connection }) // Track job lifecycle queueEvents.on('waiting', ({ jobId }) => metrics.increment('jobs.waiting')) queueEvents.on('active', ({ jobId }) => metrics.increment('jobs.active')) queueEvents.on('completed', ({ jobId }) => metrics.increment('jobs.completed')) queueEvents.on('failed', ({ jobId, failedReason }) => { metrics.increment('jobs.failed') console.error(`Job ${jobId} failed: ${failedReason}`) }) queueEvents.on('stalled', ({ jobId }) => { metrics.increment('jobs.stalled') console.warn(`Job ${jobId} stalled — worker may have crashed`) }) // Health check: alert if queue depth grows too large async function checkQueueHealth(queue: Queue): Promise<void> { const counts = await queue.getJobCounts('waiting', 'active', 'failed', 'delayed') if (counts.waiting > 1000) { await alertOpsTeam({ queue: queue.name, backlog: counts.waiting }) } if (counts.failed > 100) { await alertOpsTeam({ queue: queue.name, failures: counts.failed }) } } setInterval(() => checkQueueHealth(emailQueue), 30_000)
Remember: Use the outbox pattern whenever a DB write and an event publish must be atomic. Never publish directly inside a transaction — the broker call can fail after the DB commits, causing lost events.