Vibeship-spawner-skills queue-workers

Queue & Background Workers Skill

install
source · Clone the upstream repo
git clone https://github.com/vibeforge1111/vibeship-spawner-skills
manifest: backend/queue-workers/skill.yaml
source content

Queue & Background Workers Skill

Message queues, job processing, async workflows

id: queue-workers name: Queue & Background Workers category: backend complexity: advanced requires_skills:

  • backend

description: | Patterns for building reliable background job processing systems. Covers message queues, worker pools, retries, dead letter handling, and async workflow orchestration.

============================================================================

CORE PATTERNS

============================================================================

patterns:

--- Job Queue Fundamentals ---

job_definition: name: Structured Job Definition description: Define jobs with clear contracts and metadata when: "Creating any background job" pattern: | // Job definition with full context interface Job<T = unknown> { id: string; // Unique job ID (for idempotency) type: string; // Job type for routing data: T; // Job-specific payload metadata: { createdAt: Date; attempts: number; maxAttempts: number; priority: number; // 1-10, higher = more urgent timeout: number; // Max execution time in ms retryDelay: number; // Base delay between retries correlationId?: string; // For tracing across systems }; }

  // Job handler contract
  interface JobHandler<T> {
    type: string;
    handle(job: Job<T>): Promise<void>;
    onFailed?(job: Job<T>, error: Error): Promise<void>;
    onCompleted?(job: Job<T>, result: unknown): Promise<void>;
  }

  // Example: Email sending job
  interface SendEmailJob {
    to: string;
    subject: string;
    templateId: string;
    variables: Record<string, unknown>;
  }

  const emailHandler: JobHandler<SendEmailJob> = {
    type: 'send-email',
    async handle(job) {
      await emailService.send(job.data);
    },
    async onFailed(job, error) {
      await alerting.notify('Email failed', { jobId: job.id, error: error.message });
    }
  };
why: "Strong typing catches errors at compile time, metadata enables debugging"

bullmq_setup: name: BullMQ Queue Configuration description: Production-ready BullMQ setup with Redis when: "Using BullMQ for job processing" pattern: | import { Queue, Worker, QueueScheduler } from 'bullmq'; import Redis from 'ioredis';

  // Shared Redis connection config
  const redisConfig = {
    host: process.env.REDIS_HOST,
    port: parseInt(process.env.REDIS_PORT || '6379'),
    password: process.env.REDIS_PASSWORD,
    maxRetriesPerRequest: null,  // Required for BullMQ
    enableReadyCheck: false,
  };

  // Create queue
  const emailQueue = new Queue('emails', {
    connection: redisConfig,
    defaultJobOptions: {
      attempts: 3,
      backoff: {
        type: 'exponential',
        delay: 1000,  // 1s, 2s, 4s
      },
      removeOnComplete: {
        age: 24 * 3600,  // Keep completed jobs for 24h
        count: 1000,     // Keep last 1000
      },
      removeOnFail: {
        age: 7 * 24 * 3600,  // Keep failed jobs for 7 days
      },
    },
  });

  // Queue scheduler for delayed jobs
  const scheduler = new QueueScheduler('emails', {
    connection: redisConfig,
  });

  // Worker with concurrency control
  const worker = new Worker('emails', async (job) => {
    console.log(`Processing job ${job.id}: ${job.name}`);
    await processEmail(job.data);
  }, {
    connection: redisConfig,
    concurrency: 5,  // Process 5 jobs simultaneously
    limiter: {
      max: 100,      // Max 100 jobs
      duration: 60000,  // Per minute
    },
  });

  // Event handlers
  worker.on('completed', (job) => {
    console.log(`Job ${job.id} completed`);
  });

  worker.on('failed', (job, err) => {
    console.error(`Job ${job?.id} failed:`, err.message);
  });

  // Graceful shutdown
  process.on('SIGTERM', async () => {
    await worker.close();
    await scheduler.close();
    await emailQueue.close();
  });
why: "BullMQ provides reliable job processing with Redis backing"

retry_strategies: name: Retry Strategy Patterns description: Different retry approaches for different failures when: "Handling job failures" pattern: | // Retry strategy types type RetryStrategy = | { type: 'immediate' } | { type: 'fixed'; delay: number } | { type: 'exponential'; base: number; max: number } | { type: 'polynomial'; coefficient: number; max: number };

  function calculateDelay(strategy: RetryStrategy, attempt: number): number {
    switch (strategy.type) {
      case 'immediate':
        return 0;

      case 'fixed':
        return strategy.delay;

      case 'exponential':
        // 1s, 2s, 4s, 8s... capped at max
        return Math.min(strategy.base * Math.pow(2, attempt - 1), strategy.max);

      case 'polynomial':
        // 1s, 4s, 9s, 16s... (attempt^2)
        return Math.min(strategy.coefficient * Math.pow(attempt, 2), strategy.max);
    }
  }

  // Add jitter to prevent thundering herd
  function addJitter(delay: number, jitterPercent = 0.2): number {
    const jitter = delay * jitterPercent;
    return delay + (Math.random() * jitter * 2) - jitter;
  }

  // Error-specific retry decisions
  function shouldRetry(error: Error, attempt: number, maxAttempts: number): boolean {
    // Never retry these
    if (error.name === 'ValidationError') return false;
    if (error.message.includes('not found')) return false;
    if (error.message.includes('unauthorized')) return false;

    // Always retry these (up to max)
    if (error.name === 'NetworkError' && attempt < maxAttempts) return true;
    if (error.message.includes('timeout') && attempt < maxAttempts) return true;
    if (error.message.includes('rate limit') && attempt < maxAttempts) return true;

    // Default: retry if under max
    return attempt < maxAttempts;
  }

  // BullMQ custom backoff
  const worker = new Worker('queue', processor, {
    connection: redis,
    settings: {
      backoffStrategies: {
        // Custom strategy for rate-limited APIs
        rateLimitBackoff: (attemptsMade) => {
          // Extract retry-after from error if available
          return Math.min(60000 * attemptsMade, 300000);  // Max 5 min
        }
      }
    }
  });
why: "Different errors need different retry behaviors"

dead_letter_queue: name: Dead Letter Queue Pattern description: Handle permanently failed jobs when: "Jobs exhaust all retries" pattern: | import { Queue, Worker } from 'bullmq';

  // Main queue
  const mainQueue = new Queue('orders', { connection: redis });

  // Dead letter queue for failed jobs
  const dlq = new Queue('orders-dlq', { connection: redis });

  const worker = new Worker('orders', async (job) => {
    try {
      await processOrder(job.data);
    } catch (error) {
      // Check if this is the last attempt
      if (job.attemptsMade >= (job.opts.attempts || 3) - 1) {
        // Move to DLQ before final failure
        await dlq.add('failed-order', {
          originalJob: {
            id: job.id,
            name: job.name,
            data: job.data,
            opts: job.opts,
          },
          error: {
            message: error.message,
            stack: error.stack,
          },
          failedAt: new Date().toISOString(),
          attempts: job.attemptsMade + 1,
        });

        // Alert on DLQ entry
        await alerting.send({
          channel: 'ops',
          severity: 'warning',
          message: `Order ${job.data.orderId} moved to DLQ after ${job.attemptsMade + 1} attempts`,
        });
      }
      throw error;  // Re-throw to trigger BullMQ retry
    }
  }, { connection: redis });

  // DLQ processor - manual review or auto-remediation
  const dlqWorker = new Worker('orders-dlq', async (job) => {
    const { originalJob, error } = job.data;

    // Attempt auto-remediation based on error type
    if (error.message.includes('inventory')) {
      // Retry with inventory refresh
      await inventoryService.refresh(originalJob.data.productId);
      await mainQueue.add(originalJob.name, originalJob.data, {
        ...originalJob.opts,
        attempts: 3,  // Reset attempts
      });
      return { remediation: 'inventory-refresh' };
    }

    // Otherwise, create ticket for manual review
    await ticketService.create({
      type: 'dlq-review',
      priority: 'high',
      data: job.data,
    });

    return { remediation: 'manual-review' };
  }, { connection: redis });
why: "DLQs prevent losing failed jobs and enable recovery"

--- Idempotency ---

idempotent_processing: name: Idempotent Job Processing description: Ensure jobs can be safely retried when: "Any job that modifies state" pattern: | import { Redis } from 'ioredis';

  const redis = new Redis();

  // Idempotency key generator
  function getIdempotencyKey(job: Job): string {
    // Combine job type and unique identifier
    return `idempotency:${job.type}:${job.id}`;
  }

  // Idempotent job wrapper
  async function processIdempotent<T>(
    job: Job<T>,
    processor: (job: Job<T>) => Promise<unknown>,
    ttlSeconds = 86400  // 24 hours
  ): Promise<unknown> {
    const key = getIdempotencyKey(job);

    // Check if already processed
    const existing = await redis.get(key);
    if (existing) {
      console.log(`Job ${job.id} already processed, returning cached result`);
      return JSON.parse(existing);
    }

    // Acquire lock to prevent concurrent processing
    const lockKey = `${key}:lock`;
    const lockAcquired = await redis.set(lockKey, '1', 'NX', 'EX', 300);  // 5 min lock

    if (!lockAcquired) {
      throw new Error('Job is being processed by another worker');
    }

    try {
      // Process the job
      const result = await processor(job);

      // Store result for idempotency
      await redis.setex(key, ttlSeconds, JSON.stringify(result));

      return result;
    } finally {
      // Release lock
      await redis.del(lockKey);
    }
  }

  // Usage
  const worker = new Worker('payments', async (job) => {
    return processIdempotent(job, async (j) => {
      // This is safe to retry - won't double-charge
      const payment = await paymentService.charge({
        idempotencyKey: j.id,  // Also use for Stripe
        amount: j.data.amount,
        customerId: j.data.customerId,
      });
      return { paymentId: payment.id };
    });
  }, { connection: redis });
why: "Idempotency prevents double-processing on retries"

--- Concurrency Control ---

rate_limited_queue: name: Rate-Limited Queue Processing description: Respect external API rate limits when: "Processing jobs that call rate-limited APIs" pattern: | import { Queue, Worker } from 'bullmq';

  // Queue with rate limiting
  const apiQueue = new Queue('external-api', {
    connection: redis,
  });

  // Worker with global rate limit
  const worker = new Worker('external-api', async (job) => {
    return callExternalApi(job.data);
  }, {
    connection: redis,
    concurrency: 1,  // One at a time for strict ordering
    limiter: {
      max: 30,       // 30 requests
      duration: 60000,  // Per minute
    },
  });

  // More sophisticated: Per-tenant rate limiting
  const perTenantQueue = new Queue('tenant-api', { connection: redis });

  const perTenantWorker = new Worker('tenant-api', async (job) => {
    const { tenantId } = job.data;

    // Check tenant-specific rate limit
    const key = `ratelimit:tenant:${tenantId}`;
    const current = await redis.incr(key);

    if (current === 1) {
      await redis.expire(key, 60);  // Reset every minute
    }

    if (current > 10) {  // 10 requests per minute per tenant
      // Re-queue with delay
      throw new Error('Rate limited');  // Will trigger backoff
    }

    return callApi(job.data);
  }, {
    connection: redis,
    concurrency: 10,
  });
why: "Rate limiting prevents API bans and ensures fair resource usage"

priority_queues: name: Priority Queue Processing description: Process high-priority jobs first when: "Some jobs are more urgent than others" pattern: | import { Queue, Worker } from 'bullmq';

  const queue = new Queue('notifications', { connection: redis });

  // Add jobs with different priorities
  // Lower number = higher priority (processed first)
  await queue.add('critical-alert', data, { priority: 1 });
  await queue.add('standard-notification', data, { priority: 5 });
  await queue.add('batch-digest', data, { priority: 10 });

  // Alternatively: Separate queues with weighted processing
  const criticalQueue = new Queue('notifications:critical', { connection: redis });
  const standardQueue = new Queue('notifications:standard', { connection: redis });
  const batchQueue = new Queue('notifications:batch', { connection: redis });

  // Processor that checks queues in priority order
  async function processPriorityQueues() {
    // Always check critical first
    let job = await criticalQueue.getNextJob();
    if (job) return processJob(job);

    // Then standard
    job = await standardQueue.getNextJob();
    if (job) return processJob(job);

    // Finally batch
    job = await batchQueue.getNextJob();
    if (job) return processJob(job);

    return null;
  }

  // BullMQ Worker with priority groups
  const worker = new Worker('notifications', async (job) => {
    await sendNotification(job.data);
  }, {
    connection: redis,
    // Process higher priority jobs first within concurrency limit
    concurrency: 10,
  });
why: "Priority ensures urgent work isn't blocked by batch operations"

--- Scheduled Jobs ---

scheduled_jobs: name: Scheduled and Recurring Jobs description: Cron-like job scheduling when: "Jobs need to run on a schedule" pattern: | import { Queue, QueueScheduler } from 'bullmq';

  const queue = new Queue('scheduled', { connection: redis });
  const scheduler = new QueueScheduler('scheduled', { connection: redis });

  // One-time delayed job
  await queue.add('reminder', { userId: 'user123' }, {
    delay: 24 * 60 * 60 * 1000,  // 24 hours from now
  });

  // Recurring job with repeat
  await queue.add('daily-report', {}, {
    repeat: {
      pattern: '0 9 * * *',  // Every day at 9 AM
      tz: 'America/New_York',
    },
    jobId: 'daily-report',  // Prevent duplicates
  });

  // Recurring with fixed interval
  await queue.add('health-check', {}, {
    repeat: {
      every: 60000,  // Every minute
    },
    jobId: 'health-check',
  });

  // List and manage repeatable jobs
  const repeatableJobs = await queue.getRepeatableJobs();

  // Remove a repeatable job
  await queue.removeRepeatableByKey(repeatableJobs[0].key);

  // Alternative: Use node-cron for simpler scheduling
  import cron from 'node-cron';

  cron.schedule('0 9 * * *', async () => {
    await queue.add('daily-report', {
      generatedAt: new Date().toISOString(),
    });
  }, {
    timezone: 'America/New_York',
  });
why: "Scheduled jobs automate recurring tasks reliably"

--- Workflow Orchestration ---

job_chaining: name: Job Chaining and Workflows description: Sequence of dependent jobs when: "Jobs depend on results of previous jobs" pattern: | import { Queue, FlowProducer } from 'bullmq';

  const flowProducer = new FlowProducer({ connection: redis });

  // Define a workflow as a tree of jobs
  const orderWorkflow = await flowProducer.add({
    name: 'complete-order',
    queueName: 'orders',
    data: { orderId: 'order123' },
    children: [
      {
        name: 'send-confirmation',
        queueName: 'emails',
        data: { type: 'order-confirmation' },
        children: [
          {
            name: 'charge-payment',
            queueName: 'payments',
            data: { amount: 99.99 },
          },
          {
            name: 'reserve-inventory',
            queueName: 'inventory',
            data: { productId: 'prod123', quantity: 1 },
          },
        ],
      },
    ],
  });

  // Jobs execute bottom-up:
  // 1. charge-payment and reserve-inventory (parallel)
  // 2. send-confirmation (after both complete)
  // 3. complete-order (final)

  // Access parent/children data in workers
  const worker = new Worker('emails', async (job) => {
    // Get results from child jobs
    const childrenValues = await job.getChildrenValues();
    // { 'payments:charge-payment:123': { success: true }, ... }

    await sendEmail(job.data.type, childrenValues);
  }, { connection: redis });

  // Saga pattern for compensating transactions
  async function orderSaga(orderId: string) {
    const steps = [];

    try {
      // Step 1: Reserve inventory
      const reservation = await inventoryService.reserve(orderId);
      steps.push({ service: 'inventory', action: 'reserve', id: reservation.id });

      // Step 2: Charge payment
      const payment = await paymentService.charge(orderId);
      steps.push({ service: 'payment', action: 'charge', id: payment.id });

      // Step 3: Send confirmation
      await emailService.sendConfirmation(orderId);
      steps.push({ service: 'email', action: 'send' });

      return { success: true };
    } catch (error) {
      // Compensate in reverse order
      for (const step of steps.reverse()) {
        await compensate(step);
      }
      throw error;
    }
  }

  async function compensate(step: { service: string; action: string; id?: string }) {
    switch (step.service) {
      case 'inventory':
        await inventoryService.release(step.id);
        break;
      case 'payment':
        await paymentService.refund(step.id);
        break;
      // email doesn't need compensation
    }
  }
why: "Workflows coordinate complex multi-step processes with rollback"

--- Observability ---

queue_monitoring: name: Queue Monitoring and Metrics description: Track queue health and performance when: "Operating queues in production" pattern: | import { Queue, QueueEvents } from 'bullmq'; import { Counter, Histogram, Gauge } from 'prom-client';

  // Prometheus metrics
  const jobsProcessed = new Counter({
    name: 'queue_jobs_processed_total',
    help: 'Total jobs processed',
    labelNames: ['queue', 'status'],
  });

  const jobDuration = new Histogram({
    name: 'queue_job_duration_seconds',
    help: 'Job processing duration',
    labelNames: ['queue', 'job_type'],
    buckets: [0.1, 0.5, 1, 5, 10, 30, 60],
  });

  const queueDepth = new Gauge({
    name: 'queue_depth',
    help: 'Number of jobs waiting',
    labelNames: ['queue', 'state'],
  });

  // Queue events for metrics
  const queueEvents = new QueueEvents('orders', { connection: redis });

  queueEvents.on('completed', ({ jobId, returnvalue }) => {
    jobsProcessed.inc({ queue: 'orders', status: 'completed' });
  });

  queueEvents.on('failed', ({ jobId, failedReason }) => {
    jobsProcessed.inc({ queue: 'orders', status: 'failed' });
  });

  // Periodic queue depth check
  async function updateQueueMetrics() {
    const queue = new Queue('orders', { connection: redis });

    const waiting = await queue.getWaitingCount();
    const active = await queue.getActiveCount();
    const delayed = await queue.getDelayedCount();
    const failed = await queue.getFailedCount();

    queueDepth.set({ queue: 'orders', state: 'waiting' }, waiting);
    queueDepth.set({ queue: 'orders', state: 'active' }, active);
    queueDepth.set({ queue: 'orders', state: 'delayed' }, delayed);
    queueDepth.set({ queue: 'orders', state: 'failed' }, failed);

    // Alert on high queue depth
    if (waiting > 1000) {
      await alerting.send({
        severity: 'warning',
        message: `Queue orders has ${waiting} waiting jobs`,
      });
    }
  }

  setInterval(updateQueueMetrics, 30000);  // Every 30 seconds

  // Structured logging
  worker.on('completed', (job, result) => {
    logger.info('Job completed', {
      queue: 'orders',
      jobId: job.id,
      jobName: job.name,
      duration: Date.now() - job.timestamp,
      attempts: job.attemptsMade,
    });
  });

  worker.on('failed', (job, err) => {
    logger.error('Job failed', {
      queue: 'orders',
      jobId: job?.id,
      jobName: job?.name,
      error: err.message,
      stack: err.stack,
      attempts: job?.attemptsMade,
    });
  });
why: "Visibility into queue health prevents silent failures"

============================================================================

ANTI-PATTERNS

============================================================================

anti_patterns:

no_idempotency: name: Jobs Without Idempotency description: Jobs that cause duplicate side effects on retry problem: | await queue.add('send-email', { to: 'user@example.com' });

  // Worker doesn't check if email already sent
  async function process(job) {
    await sendEmail(job.data.to);  // Sends again on retry!
  }
solution: |
  async function process(job) {
    const sentKey = `email:sent:${job.id}`;
    if (await redis.exists(sentKey)) {
      return { alreadySent: true };
    }

    await sendEmail(job.data.to);
    await redis.setex(sentKey, 86400, '1');
  }
impact: "Duplicate emails, double charges, inconsistent data"

no_timeout: name: Jobs Without Timeout description: Jobs that can run forever problem: | const worker = new Worker('api-calls', async (job) => { // No timeout - hangs forever if API is slow await callExternalApi(job.data); }); solution: | const worker = new Worker('api-calls', async (job) => { // Abort after 30 seconds const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), 30000);

    try {
      await callExternalApi(job.data, { signal: controller.signal });
    } finally {
      clearTimeout(timeout);
    }
  }, {
    connection: redis,
    lockDuration: 60000,  // Lock expires after 60s
  });
impact: "Workers blocked, queue backup, no error handling"

fire_and_forget: name: Fire and Forget Without Monitoring description: Adding jobs without tracking success/failure problem: | // In request handler await queue.add('process-upload', { fileId }); res.json({ status: 'processing' }); // No way for user to know if it failed solution: | const job = await queue.add('process-upload', { fileId });

  // Store job ID for status checking
  await db.upload.update({
    where: { id: fileId },
    data: { jobId: job.id, status: 'processing' },
  });

  // Endpoint to check status
  app.get('/upload/:id/status', async (req, res) => {
    const upload = await db.upload.findUnique({ where: { id: req.params.id } });
    const job = await queue.getJob(upload.jobId);

    res.json({
      status: upload.status,
      progress: await job?.progress,
      failedReason: await job?.failedReason,
    });
  });
impact: "Users can't track progress, silent failures"

no_graceful_shutdown: name: No Graceful Shutdown description: Workers killed mid-job problem: | const worker = new Worker('queue', process); // Process crashes, jobs are lost mid-processing solution: | const worker = new Worker('queue', process, { connection: redis });

  async function shutdown() {
    console.log('Shutting down gracefully...');

    // Stop accepting new jobs
    await worker.close();

    // Wait for active jobs to complete (with timeout)
    const timeout = setTimeout(() => {
      console.log('Timeout - forcing shutdown');
      process.exit(1);
    }, 30000);

    clearTimeout(timeout);
    process.exit(0);
  }

  process.on('SIGTERM', shutdown);
  process.on('SIGINT', shutdown);
impact: "Lost jobs, inconsistent state, data corruption"

polling_for_results: name: Polling for Job Results description: Busy-waiting for job completion problem: | const job = await queue.add('process', data);

  // Polling wastes resources
  while (true) {
    const status = await queue.getJob(job.id);
    if (status.finishedOn) break;
    await sleep(1000);
  }
solution: |
  import { QueueEvents } from 'bullmq';

  const queueEvents = new QueueEvents('process', { connection: redis });

  const job = await queue.add('process', data);

  // Wait for completion event
  const result = await job.waitUntilFinished(queueEvents, 60000);

  // Or use pub/sub for real-time updates
  queueEvents.on('completed', ({ jobId, returnvalue }) => {
    if (jobId === job.id) {
      handleResult(returnvalue);
    }
  });
impact: "Wasted CPU, increased Redis load, delayed results"

unbounded_retries: name: Unbounded Retry Loops description: Jobs that retry forever problem: | await queue.add('job', data, { attempts: Infinity, // Never gives up backoff: { type: 'fixed', delay: 1000 }, }); solution: | await queue.add('job', data, { attempts: 5, // Reasonable limit backoff: { type: 'exponential', delay: 1000, }, });

  // Move to DLQ after exhausting retries
  worker.on('failed', async (job, err) => {
    if (job.attemptsMade >= job.opts.attempts) {
      await dlq.add('failed-job', {
        originalJob: job.data,
        error: err.message,
      });
    }
  });
impact: "Zombie jobs, resource waste, noise in monitoring"

============================================================================

LIBRARY COMPARISON

============================================================================

library_comparison:

bullmq: name: BullMQ use_when: "Node.js, need reliable job processing with Redis" pros: - "Excellent reliability with Redis persistence" - "Built-in rate limiting and concurrency" - "Workflows and job dependencies" - "Great TypeScript support" cons: - "Requires Redis" - "Node.js only" install: "npm install bullmq ioredis"

rabbitmq: name: RabbitMQ (amqplib) use_when: "Need message broker patterns, multiple consumers" pros: - "Mature, battle-tested" - "Complex routing (exchanges, bindings)" - "Multi-language support" - "Management UI built-in" cons: - "More operational overhead" - "Steeper learning curve" install: "npm install amqplib"

aws_sqs: name: AWS SQS use_when: "AWS ecosystem, serverless, need managed service" pros: - "Fully managed, no ops" - "Scales automatically" - "Pay per message" - "Dead letter queue built-in" cons: - "AWS lock-in" - "No priority queues" - "Max 14 day retention" install: "npm install @aws-sdk/client-sqs"

google_pubsub: name: Google Cloud Pub/Sub use_when: "GCP ecosystem, event-driven architecture" pros: - "Global, serverless" - "Real-time messaging" - "Exactly-once delivery option" cons: - "GCP lock-in" - "Complex ordering guarantees" install: "npm install @google-cloud/pubsub"

agenda: name: Agenda use_when: "MongoDB already in stack, simple job scheduling" pros: - "Uses MongoDB (no Redis needed)" - "Simple API" - "Cron scheduling" cons: - "Less performant than Redis-based" - "MongoDB required" install: "npm install agenda"

pg_boss: name: pg-boss use_when: "PostgreSQL already in stack, simple needs" pros: - "Uses PostgreSQL (no Redis needed)" - "ACID guarantees" - "Simple setup" cons: - "Lower throughput than Redis" - "PostgreSQL required" install: "npm install pg-boss"

============================================================================

DECISION FRAMEWORK

============================================================================

decision_tree: start: "What's your infrastructure?" nodes: infrastructure: question: "What database/cache do you already have?" options: - answer: "Redis available" next: "Use BullMQ - best Node.js queue library" - answer: "Only PostgreSQL" next: "Use pg-boss - queue in your database" - answer: "Only MongoDB" next: "Use Agenda - queue in your database" - answer: "AWS native" next: "Use SQS + Lambda - serverless, managed" - answer: "Need multi-language" next: "Use RabbitMQ - mature message broker"

scale:
  question: "Expected job throughput?"
  options:
    - answer: "< 1000/hour"
      next: "Database-backed (pg-boss, Agenda) is fine"
    - answer: "1000-100000/hour"
      next: "Redis-backed (BullMQ) recommended"
    - answer: "> 100000/hour"
      next: "Managed service (SQS, Pub/Sub) or Kafka"

============================================================================

HANDOFFS

============================================================================

handoffs:

  • to: rate-limiting when: "Queue is calling external APIs" pass: "Rate limit requirements, API limits"

  • to: observability-sre when: "Need queue monitoring" pass: "Queue metrics, alerting needs"

  • to: infrastructure-as-code when: "Deploying queue infrastructure" pass: "Redis/RabbitMQ requirements, scaling needs"

  • to: database-schema-design when: "Need to persist job results" pass: "Job result schema, retention requirements"

ecosystem: core_tools: - "BullMQ - Redis-based job queue" - "ioredis - Redis client" - "node-cron - Job scheduling" - "prom-client - Prometheus metrics"

cloud_services: - "AWS SQS - Managed message queue" - "Google Pub/Sub - Event streaming" - "Azure Service Bus - Message broker"

monitoring: - "Bull Board - BullMQ dashboard" - "Arena - Bull queue UI" - "Flower - Celery monitoring"