git clone https://github.com/vibeforge1111/vibeship-spawner-skills
backend/queue-workers/skill.yamlQueue & Background Workers Skill
Message queues, job processing, async workflows
id: queue-workers name: Queue & Background Workers category: backend complexity: advanced requires_skills:
- backend
description: | Patterns for building reliable background job processing systems. Covers message queues, worker pools, retries, dead letter handling, and async workflow orchestration.
============================================================================
CORE PATTERNS
============================================================================
patterns:
--- Job Queue Fundamentals ---
job_definition: name: Structured Job Definition description: Define jobs with clear contracts and metadata when: "Creating any background job" pattern: | // Job definition with full context interface Job<T = unknown> { id: string; // Unique job ID (for idempotency) type: string; // Job type for routing data: T; // Job-specific payload metadata: { createdAt: Date; attempts: number; maxAttempts: number; priority: number; // 1-10, higher = more urgent timeout: number; // Max execution time in ms retryDelay: number; // Base delay between retries correlationId?: string; // For tracing across systems }; }
// Job handler contract interface JobHandler<T> { type: string; handle(job: Job<T>): Promise<void>; onFailed?(job: Job<T>, error: Error): Promise<void>; onCompleted?(job: Job<T>, result: unknown): Promise<void>; } // Example: Email sending job interface SendEmailJob { to: string; subject: string; templateId: string; variables: Record<string, unknown>; } const emailHandler: JobHandler<SendEmailJob> = { type: 'send-email', async handle(job) { await emailService.send(job.data); }, async onFailed(job, error) { await alerting.notify('Email failed', { jobId: job.id, error: error.message }); } }; why: "Strong typing catches errors at compile time, metadata enables debugging"
bullmq_setup: name: BullMQ Queue Configuration description: Production-ready BullMQ setup with Redis when: "Using BullMQ for job processing" pattern: | import { Queue, Worker, QueueScheduler } from 'bullmq'; import Redis from 'ioredis';
// Shared Redis connection config const redisConfig = { host: process.env.REDIS_HOST, port: parseInt(process.env.REDIS_PORT || '6379'), password: process.env.REDIS_PASSWORD, maxRetriesPerRequest: null, // Required for BullMQ enableReadyCheck: false, }; // Create queue const emailQueue = new Queue('emails', { connection: redisConfig, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 1000, // 1s, 2s, 4s }, removeOnComplete: { age: 24 * 3600, // Keep completed jobs for 24h count: 1000, // Keep last 1000 }, removeOnFail: { age: 7 * 24 * 3600, // Keep failed jobs for 7 days }, }, }); // Queue scheduler for delayed jobs const scheduler = new QueueScheduler('emails', { connection: redisConfig, }); // Worker with concurrency control const worker = new Worker('emails', async (job) => { console.log(`Processing job ${job.id}: ${job.name}`); await processEmail(job.data); }, { connection: redisConfig, concurrency: 5, // Process 5 jobs simultaneously limiter: { max: 100, // Max 100 jobs duration: 60000, // Per minute }, }); // Event handlers worker.on('completed', (job) => { console.log(`Job ${job.id} completed`); }); worker.on('failed', (job, err) => { console.error(`Job ${job?.id} failed:`, err.message); }); // Graceful shutdown process.on('SIGTERM', async () => { await worker.close(); await scheduler.close(); await emailQueue.close(); }); why: "BullMQ provides reliable job processing with Redis backing"
retry_strategies: name: Retry Strategy Patterns description: Different retry approaches for different failures when: "Handling job failures" pattern: | // Retry strategy types type RetryStrategy = | { type: 'immediate' } | { type: 'fixed'; delay: number } | { type: 'exponential'; base: number; max: number } | { type: 'polynomial'; coefficient: number; max: number };
function calculateDelay(strategy: RetryStrategy, attempt: number): number { switch (strategy.type) { case 'immediate': return 0; case 'fixed': return strategy.delay; case 'exponential': // 1s, 2s, 4s, 8s... capped at max return Math.min(strategy.base * Math.pow(2, attempt - 1), strategy.max); case 'polynomial': // 1s, 4s, 9s, 16s... (attempt^2) return Math.min(strategy.coefficient * Math.pow(attempt, 2), strategy.max); } } // Add jitter to prevent thundering herd function addJitter(delay: number, jitterPercent = 0.2): number { const jitter = delay * jitterPercent; return delay + (Math.random() * jitter * 2) - jitter; } // Error-specific retry decisions function shouldRetry(error: Error, attempt: number, maxAttempts: number): boolean { // Never retry these if (error.name === 'ValidationError') return false; if (error.message.includes('not found')) return false; if (error.message.includes('unauthorized')) return false; // Always retry these (up to max) if (error.name === 'NetworkError' && attempt < maxAttempts) return true; if (error.message.includes('timeout') && attempt < maxAttempts) return true; if (error.message.includes('rate limit') && attempt < maxAttempts) return true; // Default: retry if under max return attempt < maxAttempts; } // BullMQ custom backoff const worker = new Worker('queue', processor, { connection: redis, settings: { backoffStrategies: { // Custom strategy for rate-limited APIs rateLimitBackoff: (attemptsMade) => { // Extract retry-after from error if available return Math.min(60000 * attemptsMade, 300000); // Max 5 min } } } }); why: "Different errors need different retry behaviors"
dead_letter_queue: name: Dead Letter Queue Pattern description: Handle permanently failed jobs when: "Jobs exhaust all retries" pattern: | import { Queue, Worker } from 'bullmq';
// Main queue const mainQueue = new Queue('orders', { connection: redis }); // Dead letter queue for failed jobs const dlq = new Queue('orders-dlq', { connection: redis }); const worker = new Worker('orders', async (job) => { try { await processOrder(job.data); } catch (error) { // Check if this is the last attempt if (job.attemptsMade >= (job.opts.attempts || 3) - 1) { // Move to DLQ before final failure await dlq.add('failed-order', { originalJob: { id: job.id, name: job.name, data: job.data, opts: job.opts, }, error: { message: error.message, stack: error.stack, }, failedAt: new Date().toISOString(), attempts: job.attemptsMade + 1, }); // Alert on DLQ entry await alerting.send({ channel: 'ops', severity: 'warning', message: `Order ${job.data.orderId} moved to DLQ after ${job.attemptsMade + 1} attempts`, }); } throw error; // Re-throw to trigger BullMQ retry } }, { connection: redis }); // DLQ processor - manual review or auto-remediation const dlqWorker = new Worker('orders-dlq', async (job) => { const { originalJob, error } = job.data; // Attempt auto-remediation based on error type if (error.message.includes('inventory')) { // Retry with inventory refresh await inventoryService.refresh(originalJob.data.productId); await mainQueue.add(originalJob.name, originalJob.data, { ...originalJob.opts, attempts: 3, // Reset attempts }); return { remediation: 'inventory-refresh' }; } // Otherwise, create ticket for manual review await ticketService.create({ type: 'dlq-review', priority: 'high', data: job.data, }); return { remediation: 'manual-review' }; }, { connection: redis }); why: "DLQs prevent losing failed jobs and enable recovery"
--- Idempotency ---
idempotent_processing: name: Idempotent Job Processing description: Ensure jobs can be safely retried when: "Any job that modifies state" pattern: | import { Redis } from 'ioredis';
const redis = new Redis(); // Idempotency key generator function getIdempotencyKey(job: Job): string { // Combine job type and unique identifier return `idempotency:${job.type}:${job.id}`; } // Idempotent job wrapper async function processIdempotent<T>( job: Job<T>, processor: (job: Job<T>) => Promise<unknown>, ttlSeconds = 86400 // 24 hours ): Promise<unknown> { const key = getIdempotencyKey(job); // Check if already processed const existing = await redis.get(key); if (existing) { console.log(`Job ${job.id} already processed, returning cached result`); return JSON.parse(existing); } // Acquire lock to prevent concurrent processing const lockKey = `${key}:lock`; const lockAcquired = await redis.set(lockKey, '1', 'NX', 'EX', 300); // 5 min lock if (!lockAcquired) { throw new Error('Job is being processed by another worker'); } try { // Process the job const result = await processor(job); // Store result for idempotency await redis.setex(key, ttlSeconds, JSON.stringify(result)); return result; } finally { // Release lock await redis.del(lockKey); } } // Usage const worker = new Worker('payments', async (job) => { return processIdempotent(job, async (j) => { // This is safe to retry - won't double-charge const payment = await paymentService.charge({ idempotencyKey: j.id, // Also use for Stripe amount: j.data.amount, customerId: j.data.customerId, }); return { paymentId: payment.id }; }); }, { connection: redis }); why: "Idempotency prevents double-processing on retries"
--- Concurrency Control ---
rate_limited_queue: name: Rate-Limited Queue Processing description: Respect external API rate limits when: "Processing jobs that call rate-limited APIs" pattern: | import { Queue, Worker } from 'bullmq';
// Queue with rate limiting const apiQueue = new Queue('external-api', { connection: redis, }); // Worker with global rate limit const worker = new Worker('external-api', async (job) => { return callExternalApi(job.data); }, { connection: redis, concurrency: 1, // One at a time for strict ordering limiter: { max: 30, // 30 requests duration: 60000, // Per minute }, }); // More sophisticated: Per-tenant rate limiting const perTenantQueue = new Queue('tenant-api', { connection: redis }); const perTenantWorker = new Worker('tenant-api', async (job) => { const { tenantId } = job.data; // Check tenant-specific rate limit const key = `ratelimit:tenant:${tenantId}`; const current = await redis.incr(key); if (current === 1) { await redis.expire(key, 60); // Reset every minute } if (current > 10) { // 10 requests per minute per tenant // Re-queue with delay throw new Error('Rate limited'); // Will trigger backoff } return callApi(job.data); }, { connection: redis, concurrency: 10, }); why: "Rate limiting prevents API bans and ensures fair resource usage"
priority_queues: name: Priority Queue Processing description: Process high-priority jobs first when: "Some jobs are more urgent than others" pattern: | import { Queue, Worker } from 'bullmq';
const queue = new Queue('notifications', { connection: redis }); // Add jobs with different priorities // Lower number = higher priority (processed first) await queue.add('critical-alert', data, { priority: 1 }); await queue.add('standard-notification', data, { priority: 5 }); await queue.add('batch-digest', data, { priority: 10 }); // Alternatively: Separate queues with weighted processing const criticalQueue = new Queue('notifications:critical', { connection: redis }); const standardQueue = new Queue('notifications:standard', { connection: redis }); const batchQueue = new Queue('notifications:batch', { connection: redis }); // Processor that checks queues in priority order async function processPriorityQueues() { // Always check critical first let job = await criticalQueue.getNextJob(); if (job) return processJob(job); // Then standard job = await standardQueue.getNextJob(); if (job) return processJob(job); // Finally batch job = await batchQueue.getNextJob(); if (job) return processJob(job); return null; } // BullMQ Worker with priority groups const worker = new Worker('notifications', async (job) => { await sendNotification(job.data); }, { connection: redis, // Process higher priority jobs first within concurrency limit concurrency: 10, }); why: "Priority ensures urgent work isn't blocked by batch operations"
--- Scheduled Jobs ---
scheduled_jobs: name: Scheduled and Recurring Jobs description: Cron-like job scheduling when: "Jobs need to run on a schedule" pattern: | import { Queue, QueueScheduler } from 'bullmq';
const queue = new Queue('scheduled', { connection: redis }); const scheduler = new QueueScheduler('scheduled', { connection: redis }); // One-time delayed job await queue.add('reminder', { userId: 'user123' }, { delay: 24 * 60 * 60 * 1000, // 24 hours from now }); // Recurring job with repeat await queue.add('daily-report', {}, { repeat: { pattern: '0 9 * * *', // Every day at 9 AM tz: 'America/New_York', }, jobId: 'daily-report', // Prevent duplicates }); // Recurring with fixed interval await queue.add('health-check', {}, { repeat: { every: 60000, // Every minute }, jobId: 'health-check', }); // List and manage repeatable jobs const repeatableJobs = await queue.getRepeatableJobs(); // Remove a repeatable job await queue.removeRepeatableByKey(repeatableJobs[0].key); // Alternative: Use node-cron for simpler scheduling import cron from 'node-cron'; cron.schedule('0 9 * * *', async () => { await queue.add('daily-report', { generatedAt: new Date().toISOString(), }); }, { timezone: 'America/New_York', }); why: "Scheduled jobs automate recurring tasks reliably"
--- Workflow Orchestration ---
job_chaining: name: Job Chaining and Workflows description: Sequence of dependent jobs when: "Jobs depend on results of previous jobs" pattern: | import { Queue, FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer({ connection: redis }); // Define a workflow as a tree of jobs const orderWorkflow = await flowProducer.add({ name: 'complete-order', queueName: 'orders', data: { orderId: 'order123' }, children: [ { name: 'send-confirmation', queueName: 'emails', data: { type: 'order-confirmation' }, children: [ { name: 'charge-payment', queueName: 'payments', data: { amount: 99.99 }, }, { name: 'reserve-inventory', queueName: 'inventory', data: { productId: 'prod123', quantity: 1 }, }, ], }, ], }); // Jobs execute bottom-up: // 1. charge-payment and reserve-inventory (parallel) // 2. send-confirmation (after both complete) // 3. complete-order (final) // Access parent/children data in workers const worker = new Worker('emails', async (job) => { // Get results from child jobs const childrenValues = await job.getChildrenValues(); // { 'payments:charge-payment:123': { success: true }, ... } await sendEmail(job.data.type, childrenValues); }, { connection: redis }); // Saga pattern for compensating transactions async function orderSaga(orderId: string) { const steps = []; try { // Step 1: Reserve inventory const reservation = await inventoryService.reserve(orderId); steps.push({ service: 'inventory', action: 'reserve', id: reservation.id }); // Step 2: Charge payment const payment = await paymentService.charge(orderId); steps.push({ service: 'payment', action: 'charge', id: payment.id }); // Step 3: Send confirmation await emailService.sendConfirmation(orderId); steps.push({ service: 'email', action: 'send' }); return { success: true }; } catch (error) { // Compensate in reverse order for (const step of steps.reverse()) { await compensate(step); } throw error; } } async function compensate(step: { service: string; action: string; id?: string }) { switch (step.service) { case 'inventory': await inventoryService.release(step.id); break; case 'payment': await paymentService.refund(step.id); break; // email doesn't need compensation } } why: "Workflows coordinate complex multi-step processes with rollback"
--- Observability ---
queue_monitoring: name: Queue Monitoring and Metrics description: Track queue health and performance when: "Operating queues in production" pattern: | import { Queue, QueueEvents } from 'bullmq'; import { Counter, Histogram, Gauge } from 'prom-client';
// Prometheus metrics const jobsProcessed = new Counter({ name: 'queue_jobs_processed_total', help: 'Total jobs processed', labelNames: ['queue', 'status'], }); const jobDuration = new Histogram({ name: 'queue_job_duration_seconds', help: 'Job processing duration', labelNames: ['queue', 'job_type'], buckets: [0.1, 0.5, 1, 5, 10, 30, 60], }); const queueDepth = new Gauge({ name: 'queue_depth', help: 'Number of jobs waiting', labelNames: ['queue', 'state'], }); // Queue events for metrics const queueEvents = new QueueEvents('orders', { connection: redis }); queueEvents.on('completed', ({ jobId, returnvalue }) => { jobsProcessed.inc({ queue: 'orders', status: 'completed' }); }); queueEvents.on('failed', ({ jobId, failedReason }) => { jobsProcessed.inc({ queue: 'orders', status: 'failed' }); }); // Periodic queue depth check async function updateQueueMetrics() { const queue = new Queue('orders', { connection: redis }); const waiting = await queue.getWaitingCount(); const active = await queue.getActiveCount(); const delayed = await queue.getDelayedCount(); const failed = await queue.getFailedCount(); queueDepth.set({ queue: 'orders', state: 'waiting' }, waiting); queueDepth.set({ queue: 'orders', state: 'active' }, active); queueDepth.set({ queue: 'orders', state: 'delayed' }, delayed); queueDepth.set({ queue: 'orders', state: 'failed' }, failed); // Alert on high queue depth if (waiting > 1000) { await alerting.send({ severity: 'warning', message: `Queue orders has ${waiting} waiting jobs`, }); } } setInterval(updateQueueMetrics, 30000); // Every 30 seconds // Structured logging worker.on('completed', (job, result) => { logger.info('Job completed', { queue: 'orders', jobId: job.id, jobName: job.name, duration: Date.now() - job.timestamp, attempts: job.attemptsMade, }); }); worker.on('failed', (job, err) => { logger.error('Job failed', { queue: 'orders', jobId: job?.id, jobName: job?.name, error: err.message, stack: err.stack, attempts: job?.attemptsMade, }); }); why: "Visibility into queue health prevents silent failures"
============================================================================
ANTI-PATTERNS
============================================================================
anti_patterns:
no_idempotency: name: Jobs Without Idempotency description: Jobs that cause duplicate side effects on retry problem: | await queue.add('send-email', { to: 'user@example.com' });
// Worker doesn't check if email already sent async function process(job) { await sendEmail(job.data.to); // Sends again on retry! } solution: | async function process(job) { const sentKey = `email:sent:${job.id}`; if (await redis.exists(sentKey)) { return { alreadySent: true }; } await sendEmail(job.data.to); await redis.setex(sentKey, 86400, '1'); } impact: "Duplicate emails, double charges, inconsistent data"
no_timeout: name: Jobs Without Timeout description: Jobs that can run forever problem: | const worker = new Worker('api-calls', async (job) => { // No timeout - hangs forever if API is slow await callExternalApi(job.data); }); solution: | const worker = new Worker('api-calls', async (job) => { // Abort after 30 seconds const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), 30000);
try { await callExternalApi(job.data, { signal: controller.signal }); } finally { clearTimeout(timeout); } }, { connection: redis, lockDuration: 60000, // Lock expires after 60s }); impact: "Workers blocked, queue backup, no error handling"
fire_and_forget: name: Fire and Forget Without Monitoring description: Adding jobs without tracking success/failure problem: | // In request handler await queue.add('process-upload', { fileId }); res.json({ status: 'processing' }); // No way for user to know if it failed solution: | const job = await queue.add('process-upload', { fileId });
// Store job ID for status checking await db.upload.update({ where: { id: fileId }, data: { jobId: job.id, status: 'processing' }, }); // Endpoint to check status app.get('/upload/:id/status', async (req, res) => { const upload = await db.upload.findUnique({ where: { id: req.params.id } }); const job = await queue.getJob(upload.jobId); res.json({ status: upload.status, progress: await job?.progress, failedReason: await job?.failedReason, }); }); impact: "Users can't track progress, silent failures"
no_graceful_shutdown: name: No Graceful Shutdown description: Workers killed mid-job problem: | const worker = new Worker('queue', process); // Process crashes, jobs are lost mid-processing solution: | const worker = new Worker('queue', process, { connection: redis });
async function shutdown() { console.log('Shutting down gracefully...'); // Stop accepting new jobs await worker.close(); // Wait for active jobs to complete (with timeout) const timeout = setTimeout(() => { console.log('Timeout - forcing shutdown'); process.exit(1); }, 30000); clearTimeout(timeout); process.exit(0); } process.on('SIGTERM', shutdown); process.on('SIGINT', shutdown); impact: "Lost jobs, inconsistent state, data corruption"
polling_for_results: name: Polling for Job Results description: Busy-waiting for job completion problem: | const job = await queue.add('process', data);
// Polling wastes resources while (true) { const status = await queue.getJob(job.id); if (status.finishedOn) break; await sleep(1000); } solution: | import { QueueEvents } from 'bullmq'; const queueEvents = new QueueEvents('process', { connection: redis }); const job = await queue.add('process', data); // Wait for completion event const result = await job.waitUntilFinished(queueEvents, 60000); // Or use pub/sub for real-time updates queueEvents.on('completed', ({ jobId, returnvalue }) => { if (jobId === job.id) { handleResult(returnvalue); } }); impact: "Wasted CPU, increased Redis load, delayed results"
unbounded_retries: name: Unbounded Retry Loops description: Jobs that retry forever problem: | await queue.add('job', data, { attempts: Infinity, // Never gives up backoff: { type: 'fixed', delay: 1000 }, }); solution: | await queue.add('job', data, { attempts: 5, // Reasonable limit backoff: { type: 'exponential', delay: 1000, }, });
// Move to DLQ after exhausting retries worker.on('failed', async (job, err) => { if (job.attemptsMade >= job.opts.attempts) { await dlq.add('failed-job', { originalJob: job.data, error: err.message, }); } }); impact: "Zombie jobs, resource waste, noise in monitoring"
============================================================================
LIBRARY COMPARISON
============================================================================
library_comparison:
bullmq: name: BullMQ use_when: "Node.js, need reliable job processing with Redis" pros: - "Excellent reliability with Redis persistence" - "Built-in rate limiting and concurrency" - "Workflows and job dependencies" - "Great TypeScript support" cons: - "Requires Redis" - "Node.js only" install: "npm install bullmq ioredis"
rabbitmq: name: RabbitMQ (amqplib) use_when: "Need message broker patterns, multiple consumers" pros: - "Mature, battle-tested" - "Complex routing (exchanges, bindings)" - "Multi-language support" - "Management UI built-in" cons: - "More operational overhead" - "Steeper learning curve" install: "npm install amqplib"
aws_sqs: name: AWS SQS use_when: "AWS ecosystem, serverless, need managed service" pros: - "Fully managed, no ops" - "Scales automatically" - "Pay per message" - "Dead letter queue built-in" cons: - "AWS lock-in" - "No priority queues" - "Max 14 day retention" install: "npm install @aws-sdk/client-sqs"
google_pubsub: name: Google Cloud Pub/Sub use_when: "GCP ecosystem, event-driven architecture" pros: - "Global, serverless" - "Real-time messaging" - "Exactly-once delivery option" cons: - "GCP lock-in" - "Complex ordering guarantees" install: "npm install @google-cloud/pubsub"
agenda: name: Agenda use_when: "MongoDB already in stack, simple job scheduling" pros: - "Uses MongoDB (no Redis needed)" - "Simple API" - "Cron scheduling" cons: - "Less performant than Redis-based" - "MongoDB required" install: "npm install agenda"
pg_boss: name: pg-boss use_when: "PostgreSQL already in stack, simple needs" pros: - "Uses PostgreSQL (no Redis needed)" - "ACID guarantees" - "Simple setup" cons: - "Lower throughput than Redis" - "PostgreSQL required" install: "npm install pg-boss"
============================================================================
DECISION FRAMEWORK
============================================================================
decision_tree: start: "What's your infrastructure?" nodes: infrastructure: question: "What database/cache do you already have?" options: - answer: "Redis available" next: "Use BullMQ - best Node.js queue library" - answer: "Only PostgreSQL" next: "Use pg-boss - queue in your database" - answer: "Only MongoDB" next: "Use Agenda - queue in your database" - answer: "AWS native" next: "Use SQS + Lambda - serverless, managed" - answer: "Need multi-language" next: "Use RabbitMQ - mature message broker"
scale: question: "Expected job throughput?" options: - answer: "< 1000/hour" next: "Database-backed (pg-boss, Agenda) is fine" - answer: "1000-100000/hour" next: "Redis-backed (BullMQ) recommended" - answer: "> 100000/hour" next: "Managed service (SQS, Pub/Sub) or Kafka"
============================================================================
HANDOFFS
============================================================================
handoffs:
-
to: rate-limiting when: "Queue is calling external APIs" pass: "Rate limit requirements, API limits"
-
to: observability-sre when: "Need queue monitoring" pass: "Queue metrics, alerting needs"
-
to: infrastructure-as-code when: "Deploying queue infrastructure" pass: "Redis/RabbitMQ requirements, scaling needs"
-
to: database-schema-design when: "Need to persist job results" pass: "Job result schema, retention requirements"
ecosystem: core_tools: - "BullMQ - Redis-based job queue" - "ioredis - Redis client" - "node-cron - Job scheduling" - "prom-client - Prometheus metrics"
cloud_services: - "AWS SQS - Managed message queue" - "Google Pub/Sub - Event streaming" - "Azure Service Bus - Message broker"
monitoring: - "Bull Board - BullMQ dashboard" - "Arena - Bull queue UI" - "Flower - Celery monitoring"