Vibecosystem kafka-patterns
Topic design, partition strategies, consumer group patterns, exactly-once processing, and dead letter queue handling.
install
source · Clone the upstream repo
git clone https://github.com/vibeeval/vibecosystem
manifest:
skills/kafka-patterns/skill.mdsource content
Kafka Patterns
Event streaming patterns for Apache Kafka in distributed systems.
Topic Design
# Topic naming convention: <domain>.<entity>.<event-type> # Examples: # orders.order.created # payments.payment.completed # inventory.stock.updated # Topic configuration topics: orders.order.created: partitions: 12 # Match expected consumer parallelism replication-factor: 3 # Survive 2 broker failures retention.ms: 604800000 # 7 days cleanup.policy: delete orders.order.changelog: partitions: 12 replication-factor: 3 retention.ms: -1 # Infinite retention (compacted) cleanup.policy: compact # Keep latest value per key min.compaction.lag.ms: 3600000 # 1h before compacting
Producer Patterns
import { Kafka, Partitioners, CompressionTypes } from 'kafkajs' const kafka = new Kafka({ clientId: 'order-service', brokers: process.env.KAFKA_BROKERS!.split(','), }) const producer = kafka.producer({ idempotent: true, // Exactly-once producer maxInFlightRequests: 5, // Max parallel requests createPartitioner: Partitioners.DefaultPartitioner, }) await producer.connect() // Key-based partitioning: same key always goes to same partition (ordering) async function publishOrderEvent(order: Order, eventType: string): Promise<void> { await producer.send({ topic: `orders.order.${eventType}`, compression: CompressionTypes.LZ4, messages: [{ key: order.id, // Orders for same ID → same partition → ordered value: JSON.stringify({ eventId: crypto.randomUUID(), // Idempotency key eventType, timestamp: new Date().toISOString(), data: order, }), headers: { 'content-type': 'application/json', 'source': 'order-service', 'correlation-id': order.correlationId, }, }], }) } // Batch publishing for throughput async function publishBatch(events: OrderEvent[]): Promise<void> { await producer.sendBatch({ topicMessages: [{ topic: 'orders.order.created', messages: events.map(e => ({ key: e.orderId, value: JSON.stringify(e), })), }], }) }
Consumer Group Patterns
const consumer = kafka.consumer({ groupId: 'payment-processor', // Consumer group: shared topic consumption sessionTimeout: 30000, heartbeatInterval: 3000, maxBytesPerPartition: 1048576, // 1MB per partition per fetch retry: { retries: 5 }, }) await consumer.connect() await consumer.subscribe({ topics: ['orders.order.created'], fromBeginning: false, // Start from latest offset }) await consumer.run({ autoCommit: false, // Manual commit for exactly-once eachBatchAutoResolve: false, eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary, heartbeat }) => { for (const message of batch.messages) { try { const event = JSON.parse(message.value!.toString()) // Idempotency check: skip already processed events if (await isAlreadyProcessed(event.eventId)) { resolveOffset(message.offset) continue } await processOrderPayment(event.data) await markAsProcessed(event.eventId) resolveOffset(message.offset) await commitOffsetsIfNecessary() await heartbeat() } catch (err) { console.error(`Failed to process message at offset ${message.offset}:`, err) // Send to DLQ instead of blocking the partition await sendToDeadLetterQueue(message, err as Error) resolveOffset(message.offset) } } }, })
Dead Letter Queue (DLQ)
const DLQ_TOPIC = 'orders.order.created.dlq' async function sendToDeadLetterQueue( originalMessage: KafkaMessage, error: Error ): Promise<void> { await producer.send({ topic: DLQ_TOPIC, messages: [{ key: originalMessage.key, value: originalMessage.value, headers: { ...originalMessage.headers, 'dlq-reason': error.message, 'dlq-timestamp': new Date().toISOString(), 'dlq-original-topic': 'orders.order.created', 'dlq-retry-count': '0', }, }], }) } // DLQ consumer: retry or alert async function processDLQ(): Promise<void> { const dlqConsumer = kafka.consumer({ groupId: 'dlq-processor' }) await dlqConsumer.subscribe({ topics: [DLQ_TOPIC] }) await dlqConsumer.run({ eachMessage: async ({ message }) => { const retryCount = parseInt( message.headers?.['dlq-retry-count']?.toString() ?? '0' ) if (retryCount >= 3) { // Max retries exceeded: alert ops team await alertOps({ topic: DLQ_TOPIC, key: message.key?.toString(), reason: message.headers?.['dlq-reason']?.toString(), retries: retryCount, }) return } // Retry with incremented count try { const event = JSON.parse(message.value!.toString()) await processOrderPayment(event.data) } catch (err) { // Re-enqueue with incremented retry count await producer.send({ topic: DLQ_TOPIC, messages: [{ key: message.key, value: message.value, headers: { ...message.headers, 'dlq-retry-count': String(retryCount + 1), }, }], }) } }, }) }
Partition Strategy
// Custom partitioner: route by region for data locality const regionalPartitioner = () => ({ partition: ({ topic, partitionMetadata, message }) => { const region = message.headers?.['region']?.toString() ?? 'default' const regionMap: Record<string, number> = { 'us-east': 0, 'us-west': 1, 'eu-west': 2, 'eu-east': 3, 'ap-southeast': 4, } const partition = regionMap[region] if (partition !== undefined && partition < partitionMetadata.length) { return partition } // Fallback: hash the key const numPartitions = partitionMetadata.length const hash = murmurHash(message.key?.toString() ?? '') return Math.abs(hash) % numPartitions } })
Checklist
- Idempotent producer enabled (exactly-once semantics)
- Key-based partitioning for ordered processing per entity
- Manual offset commit (not auto-commit) for at-least-once guarantee
- Dead Letter Queue for failed messages (max 3 retries then alert)
- Consumer idempotency check before processing (eventId dedup)
- Heartbeat during long-running batch processing
- Replication factor >= 3 for production topics
- Monitoring: consumer lag, throughput, error rate per consumer group
Anti-Patterns
- Auto-commit offsets: message loss if consumer crashes before processing
- Single partition topics: no parallelism, bottleneck
- Unbounded retry: infinite retry loop blocks partition processing
- Large messages (>1MB): use claim-check pattern (store in S3, send reference)
- Skipping idempotency: duplicate processing on consumer restart
- Global ordering requirement: use single partition only when truly needed