install
source · Clone the upstream repo
git clone https://github.com/Intense-Visions/harness-engineering
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/Intense-Visions/harness-engineering "$T" && mkdir -p ~/.claude/skills && cp -r "$T/agents/skills/codex/events-message-queue" ~/.claude/skills/intense-visions-harness-engineering-events-message-queue-46287c && rm -rf "$T"
manifest:
agents/skills/codex/events-message-queue/SKILL.mdsource content
Events: Message Queue
Use message queues for reliable async delivery with competing consumers and dead letter queues.
When to Use
- You need reliable, at-least-once delivery — messages must not be lost even if the consumer crashes
- You have work that should be processed by exactly one consumer (competing consumers pattern)
- You need decoupling between producer and consumer throughput
- You want dead letter queues (DLQ) for failed message handling
- Background job processing, email sending, report generation, order fulfillment
Instructions
BullMQ (Redis-backed queue — recommended for Node.js):
import { Queue, Worker, Job } from 'bullmq'; import { Redis } from 'ioredis'; const connection = new Redis({ host: 'localhost', port: 6379 }); // Define job data types interface SendEmailJobData { to: string; subject: string; body: string; templateId?: string; } // Producer — enqueue jobs const emailQueue = new Queue<SendEmailJobData>('email', { connection }); async function scheduleWelcomeEmail(userId: string, email: string): Promise<void> { await emailQueue.add( 'welcome', { to: email, subject: 'Welcome!', body: 'Thanks for joining.', templateId: 'welcome-v2' }, { delay: 5_000, // wait 5s before processing attempts: 3, // retry up to 3 times backoff: { type: 'exponential', delay: 2_000 }, // 2s, 4s, 8s removeOnComplete: { count: 100 }, // keep last 100 completed jobs removeOnFail: { count: 50 }, // keep last 50 failed jobs } ); } // Consumer — process jobs const emailWorker = new Worker<SendEmailJobData>( 'email', async (job: Job<SendEmailJobData>) => { console.log(`Processing job ${job.id}: send email to ${job.data.to}`); await sendEmail(job.data.to, job.data.subject, job.data.body); // Returning from the processor marks the job complete }, { connection, concurrency: 5, // process 5 jobs in parallel } ); emailWorker.on('completed', (job) => { console.log(`Job ${job.id} completed`); }); emailWorker.on('failed', (job, err) => { console.error(`Job ${job?.id} failed after all retries:`, err.message); // At this point, job moves to the failed set (acts as DLQ) });
RabbitMQ with amqplib:
import amqp from 'amqplib'; const QUEUE = 'order.processing'; const DLQ = 'order.processing.dlq'; async function setupQueue(): Promise<void> { const conn = await amqp.connect('amqp://localhost'); const channel = await conn.createChannel(); // Dead letter queue await channel.assertQueue(DLQ, { durable: true }); // Main queue with DLQ routing await channel.assertQueue(QUEUE, { durable: true, arguments: { 'x-dead-letter-exchange': '', 'x-dead-letter-routing-key': DLQ, 'x-message-ttl': 3_600_000, // messages expire after 1h }, }); channel.prefetch(1); // process one message at a time per consumer return channel; } // Producer async function publishOrder(orderId: string, amount: number): Promise<void> { const channel = await setupQueue(); const message = JSON.stringify({ orderId, amount, timestamp: new Date() }); channel.sendToQueue(QUEUE, Buffer.from(message), { persistent: true }); } // Consumer async function startConsumer(): Promise<void> { const channel = await setupQueue(); await channel.consume(QUEUE, async (msg) => { if (!msg) return; try { const data = JSON.parse(msg.content.toString()); await processOrder(data.orderId, data.amount); channel.ack(msg); // acknowledge on success } catch (err) { console.error('Processing failed:', err); // nack with requeue=false — sends to DLQ after retries exhausted channel.nack(msg, false, false); } }); }
Competing consumers pattern:
// Run multiple workers — each message processed by exactly ONE worker // Scale by adding more worker instances // Worker 1 (process 1) const worker1 = new Worker('orders', processOrder, { connection, concurrency: 3 }); // Worker 2 (process 2 or separate machine) const worker2 = new Worker('orders', processOrder, { connection, concurrency: 3 }); // BullMQ guarantees only one worker processes each job
Details
Delivery guarantees:
- At-most-once: Fire and forget. Publisher sends, never tracks. Simple but messages can be lost.
- At-least-once: Publisher gets confirmation; consumer acks after processing. May process twice — handlers must be idempotent.
- Exactly-once: At-least-once + deduplication. Hard to achieve; usually simulated with idempotency keys.
Dead letter queue strategy:
- Job fails after all retries
- Moved to DLQ automatically
- Monitoring alerts on DLQ depth
- Operators inspect, fix the bug, and replay from DLQ
Anti-patterns:
- Consumers that ack before processing (risk losing work on crash)
- Long-running consumers without heartbeats (broker requeues "stalled" jobs)
- Unbounded queue depth (apply backpressure via rate limiting producers)
- Processing heavy jobs in HTTP request handlers (enqueue them instead)
Monitoring queues:
// BullMQ — check queue health const counts = await emailQueue.getJobCounts('waiting', 'active', 'completed', 'failed', 'delayed'); console.log(counts); // Expose as /metrics endpoint for Prometheus
Source
microservices.io/patterns/communication-style/messaging.html
Process
- Read the instructions and examples in this document.
- Apply the patterns to your implementation, adapting to your specific context.
- Verify your implementation against the details and edge cases listed above.
Harness Integration
- Type: knowledge — this skill is a reference document, not a procedural workflow.
- No tools or state — consumed as context by other skills and agents.
Success Criteria
- The patterns described in this document are applied correctly in the implementation.
- Edge cases and anti-patterns listed in this document are avoided.