Claude-skill-registry-data message_queues
Async communication patterns using message brokers and task queues. Use when building event-driven systems, background job processing, or service decoupling. Covers Kafka (event streaming), RabbitMQ (complex routing), NATS (cloud-native), Redis Streams, Celery (Python), BullMQ (TypeScript), Temporal (workflows), and event sourcing patterns.
git clone https://github.com/majiayu000/claude-skill-registry-data
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry-data "$T" && mkdir -p ~/.claude/skills && cp -r "$T/data/message-queues" ~/.claude/skills/majiayu000-claude-skill-registry-data-message-queues && rm -rf "$T"
data/message-queues/SKILL.mdMessage Queues
Implement asynchronous communication patterns for event-driven architectures, background job processing, and service decoupling.
When to Use This Skill
Use message queues when:
- Long-running operations block HTTP requests (report generation, video processing)
- Service decoupling required (microservices, event-driven architecture)
- Guaranteed delivery needed (payment processing, order fulfillment)
- Event streaming for analytics (log aggregation, metrics pipelines)
- Workflow orchestration for complex processes (multi-step sagas, human-in-the-loop)
- Background job processing (email sending, image resizing)
Broker Selection Decision Tree
Choose message broker based on primary need:
Event Streaming / Log Aggregation
→ Apache Kafka
- Throughput: 500K-1M msg/s
- Replay events (event sourcing)
- Exactly-once semantics
- Long-term retention
- Use: Analytics pipelines, CQRS, event sourcing
Simple Background Jobs
→ Task Queues
- Python → Celery + Redis
- TypeScript → BullMQ + Redis
- Go → Asynq + Redis
- Use: Email sending, report generation, webhooks
Complex Workflows / Sagas
→ Temporal
- Durable execution (survives restarts)
- Saga pattern support
- Human-in-the-loop workflows
- Use: Order processing, AI agent orchestration
Request-Reply / RPC Patterns
→ NATS
- Built-in request-reply
- Sub-millisecond latency
- Cloud-native, simple operations
- Use: Microservices RPC, IoT command/control
Complex Message Routing
→ RabbitMQ
- Exchanges (direct, topic, fanout, headers)
- Dead letter exchanges
- Message TTL, priorities
- Use: Multi-consumer patterns, pub/sub
Already Using Redis
→ Redis Streams
- No new infrastructure
- Simple consumer groups
- Moderate throughput (100K+ msg/s)
- Use: Notification queues, simple job queues
Performance Comparison
| Broker | Throughput | Latency (p99) | Best For |
|---|---|---|---|
| Kafka | 500K-1M msg/s | 10-50ms | Event streaming |
| NATS JetStream | 200K-400K msg/s | Sub-ms to 5ms | Cloud-native microservices |
| RabbitMQ | 50K-100K msg/s | 5-20ms | Task queues, complex routing |
| Redis Streams | 100K+ msg/s | Sub-ms | Simple queues, caching |
Quick Start Examples
Kafka Producer/Consumer (Python)
See
examples/kafka-python/ for working code.
from confluent_kafka import Producer, Consumer # Producer producer = Producer({'bootstrap.servers': 'localhost:9092'}) producer.produce('orders', key='order_123', value='{"status": "created"}') producer.flush() # Consumer consumer = Consumer({ 'bootstrap.servers': 'localhost:9092', 'group.id': 'order-processors', 'auto.offset.reset': 'earliest' }) consumer.subscribe(['orders']) while True: msg = consumer.poll(1.0) if msg is not None: process_order(msg.value())
Celery Background Jobs (Python)
See
examples/celery-image-processing/ for full implementation.
from celery import Celery app = Celery('tasks', broker='redis://localhost:6379') @app.task(bind=True, max_retries=3) def process_image(self, image_url: str): try: result = expensive_image_processing(image_url) return result except RecoverableError as e: raise self.retry(exc=e, countdown=60)
BullMQ Job Processing (TypeScript)
See
examples/bullmq-webhook-processor/ for full implementation.
import { Queue, Worker } from 'bullmq' const queue = new Queue('webhooks', { connection: { host: 'localhost', port: 6379 } }) // Enqueue job await queue.add('send-webhook', { url: 'https://example.com/webhook', payload: { event: 'order.created' } }) // Process jobs const worker = new Worker('webhooks', async job => { await fetch(job.data.url, { method: 'POST', body: JSON.stringify(job.data.payload) }) }, { connection: { host: 'localhost', port: 6379 } })
Temporal Workflow Orchestration
See
examples/temporal-order-saga/ for saga pattern implementation.
from temporalio import workflow, activity from datetime import timedelta @workflow.defn class OrderSagaWorkflow: @workflow.run async def run(self, order_id: str) -> str: # Step 1: Reserve inventory inventory_id = await workflow.execute_activity( reserve_inventory, order_id, start_to_close_timeout=timedelta(seconds=10), ) # Step 2: Charge payment payment_id = await workflow.execute_activity( charge_payment, order_id, start_to_close_timeout=timedelta(seconds=30), ) return f"Order {order_id} completed"
Core Patterns
Event Naming Convention
Use:
Domain.Entity.Action.Version
Examples:
order.created.v1user.profile.updated.v2payment.failed.v1
Event Schema Structure
{ "event_type": "order.created.v2", "event_id": "uuid-here", "timestamp": "2025-12-02T10:00:00Z", "version": "2.0", "data": { "order_id": "ord_123", "customer_id": "cus_456" }, "metadata": { "producer": "order-service", "trace_id": "abc123", "correlation_id": "xyz789" } }
Dead Letter Queue Pattern
Route failed messages to dead letter queue (DLQ) after max retries:
@app.task(bind=True, max_retries=3) def process_order(self, order_id: str): try: result = perform_processing(order_id) return result except UnrecoverableError as e: send_to_dlq(order_id, str(e)) raise Reject(e, requeue=False)
Idempotency for Exactly-Once Processing
@app.post("/process") async def process_payment( payment_data: dict, idempotency_key: str = Header(None) ): # Check if already processed cached_result = redis_client.get(f"idempotency:{idempotency_key}") if cached_result: return {"status": "already_processed"} result = process_payment_logic(payment_data) redis_client.setex(f"idempotency:{idempotency_key}", 86400, result) return {"status": "processed", "result": result}
Frontend Integration
Job Status Updates via SSE
# FastAPI endpoint for real-time job status @app.get("/status/{task_id}") async def task_status_stream(task_id: str): async def event_generator(): while True: task = celery_app.AsyncResult(task_id) if task.state == 'PROGRESS': yield {"event": "progress", "data": task.info.get('progress', 0)} elif task.state == 'SUCCESS': yield {"event": "complete", "data": task.result} break await asyncio.sleep(0.5) return EventSourceResponse(event_generator())
React Component
export function JobStatus({ jobId }: { jobId: string }) { const [progress, setProgress] = useState(0) useEffect(() => { const eventSource = new EventSource(`/api/status/${jobId}`) eventSource.addEventListener('progress', (e) => { setProgress(JSON.parse(e.data)) }) eventSource.addEventListener('complete', (e) => { toast({ title: 'Job complete', description: JSON.parse(e.data) }) eventSource.close() }) return () => eventSource.close() }, [jobId]) return <ProgressBar value={progress} /> }
Detailed Guides
For comprehensive documentation, see reference files:
Broker-Specific Guides
- Kafka: See
for partitioning, consumer groups, exactly-once semanticsreferences/kafka.md - RabbitMQ: See
for exchanges, bindings, routing patternsreferences/rabbitmq.md - NATS: See
for JetStream, request-reply patternsreferences/nats.md - Redis Streams: See
for consumer groups, acknowledgmentsreferences/redis-streams.md
Task Queue Guides
- Celery: See
for periodic tasks, canvas (workflows), monitoringreferences/celery.md - BullMQ: See
for job prioritization, flows, Bull Board monitoringreferences/bullmq.md - Temporal: See
for saga patterns, signals, queriesreferences/temporal-workflows.md
Pattern Guides
- Event Patterns: See
for event sourcing, CQRS, outbox patternreferences/event-patterns.md
Common Anti-Patterns to Avoid
1. Synchronous API for Long Operations
# ❌ BAD: Blocks request thread @app.post("/generate-report") def generate_report(user_id: str): report = expensive_computation(user_id) # 5 minutes! return report # ✅ GOOD: Enqueue background job @app.post("/generate-report") async def generate_report(user_id: str): task = generate_report_task.delay(user_id) return {"task_id": task.id}
2. Non-Idempotent Consumers
# ❌ BAD: Processes duplicates @app.task def send_email(email: str): send_email_service(email) # Sends twice if retried! # ✅ GOOD: Idempotent with deduplication @app.task def send_email(email: str, idempotency_key: str): if redis.exists(f"sent:{idempotency_key}"): return "already_sent" send_email_service(email) redis.setex(f"sent:{idempotency_key}", 86400, "1")
3. Ignoring Dead Letter Queues
# ❌ BAD: Failed messages lost forever @app.task(max_retries=3) def risky_task(data): process(data) # If all retries fail, data disappears # ✅ GOOD: DLQ for manual inspection @app.task(max_retries=3) def risky_task(data): try: process(data) except Exception as e: if self.request.retries >= 3: send_to_dlq(data, str(e)) raise
4. Using Kafka for Request-Reply
# ❌ BAD: Kafka is not designed for RPC def get_user_profile(user_id: str): kafka_producer.send("user_requests", {"user_id": user_id}) # How to correlate response? Kafka is asynchronous! # ✅ GOOD: Use NATS request-reply or HTTP/gRPC response = await nats.request("user.profile", user_id.encode())
Library Recommendations
Context7 Research
Confluent Kafka (Python)
- Context7 ID:
/confluentinc/confluent-kafka-python - Trust Score: 68.8/100
- Code Snippets: 192+
- Production-ready Python Kafka client
Temporal
- Context7 ID:
/websites/temporal_io - Trust Score: 80.9/100
- Code Snippets: 3,769+
- Workflow orchestration for durable execution
Installation
Python:
pip install confluent-kafka celery[redis] temporalio aio-pika redis
TypeScript/Node.js:
npm install kafkajs bullmq @temporalio/client amqplib ioredis
Rust:
cargo add rdkafka lapin async-nats redis
Go:
go get github.com/confluentinc/confluent-kafka-go go get github.com/hibiken/asynq go get go.temporal.io/sdk
Utilities
Use scripts for setup automation:
- Kafka setup: Run
for test utilitiespython scripts/kafka_producer_consumer.py - Schema validation: Run
to validate event schemaspython scripts/validate_message_schema.py
Related Skills
- api-patterns: API design for async job submission
- realtime-sync: WebSocket/SSE for job status updates
- feedback: Toast notifications for job completion
- databases-*: Persistent storage for event logs Message Queues v1.1 - Enhanced
🔄 Workflow
Kaynak: Enterprise Integration Patterns & Confluent Kafka Guide
Aşama 1: Design Phase
- Pattern Selection: Point-to-Point (Queue) mi Pub-Sub (Topic) mi karar ver.
- Schema Registry: Mesaj formatını (Avro/Protobuf) ve versiyonlamayı baştan yap.
- Partitioning: Veri dağılımını (Ordering garantisi için Key seçimi) planla.
Aşama 2: Implementation Checklist
- Idempotency: Consumer tarafında "Exactly-Once" veya "At-Least-Once" stratejisini kur.
- DLQ: İşlenemeyen mesajlar için Dead Letter Queue ve Alarm kur.
- Backpressure: Consumer yavaşlarsa Producer'ı yavaşlatacak mekanizmayı düşün.
Aşama 3: Operations
- Lag Monitoring: Consumer Lag (üretim hızı vs tüketim hızı) metriğini izle.
- Retention: Disk doluluğunu önlemek için retention policy (süre veya boyut) ayarla.
Kontrol Noktaları
| Aşama | Doğrulama |
|---|---|
| 1 | Mesaj sırasında (ordering) bozulma iş mantığını bozuyor mu? |
| 2 | Sistem 24 saatlik log kaybına dayanıklı mı (Durability)? |
| 3 | Poison message (formatı bozuk mesaj) sistemi kilitliyor mu? |