install
source · Clone the upstream repo
git clone https://github.com/ComeOnOliver/skillshub
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/ComeOnOliver/skillshub "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/TerminalSkills/skills/kafka-js" ~/.claude/skills/comeonoliver-skillshub-kafka-js && rm -rf "$T"
manifest:
skills/TerminalSkills/skills/kafka-js/SKILL.mdsource content
KafkaJS — Apache Kafka Client for Node.js
You are an expert in KafkaJS, the pure JavaScript Apache Kafka client for Node.js. You help developers build event-driven architectures with producers, consumers, consumer groups, exactly-once semantics, SASL authentication, and admin operations — processing millions of events per second for real-time analytics, event sourcing, log aggregation, and microservices communication.
Core Capabilities
Producer
import { Kafka, Partitioners, CompressionTypes } from "kafkajs"; const kafka = new Kafka({ clientId: "my-app", brokers: ["kafka1:9092", "kafka2:9092", "kafka3:9092"], ssl: true, sasl: { mechanism: "plain", username: process.env.KAFKA_USER!, password: process.env.KAFKA_PASS! }, retry: { initialRetryTime: 300, retries: 10 }, }); const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner, idempotent: true, // Exactly-once delivery transactionalId: "order-service", }); await producer.connect(); // Send single message await producer.send({ topic: "orders", messages: [ { key: order.userId, // Same user → same partition → ordered value: JSON.stringify({ type: "order.created", data: order }), headers: { "correlation-id": requestId, "source": "order-service" }, }, ], compression: CompressionTypes.GZIP, }); // Transactional send (atomic multi-topic) const transaction = await producer.transaction(); try { await transaction.send({ topic: "orders", messages: [{ key: order.id, value: JSON.stringify(order) }] }); await transaction.send({ topic: "notifications", messages: [{ key: order.userId, value: JSON.stringify(notification) }] }); await transaction.commit(); } catch (err) { await transaction.abort(); }
Consumer
const consumer = kafka.consumer({ groupId: "order-processor", sessionTimeout: 30000, heartbeatInterval: 3000, maxBytesPerPartition: 1048576, // 1MB per partition fetch }); await consumer.connect(); await consumer.subscribe({ topics: ["orders", "payments"], fromBeginning: false }); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const event = JSON.parse(message.value!.toString()); switch (topic) { case "orders": await processOrder(event); break; case "payments": await processPayment(event); break; } }, }); // Batch processing for throughput await consumer.run({ eachBatch: async ({ batch, resolveOffset, heartbeat }) => { for (const message of batch.messages) { await processMessage(message); resolveOffset(message.offset); await heartbeat(); // Prevent session timeout on long batches } }, }); // Graceful shutdown const shutdown = async () => { await consumer.disconnect(); await producer.disconnect(); process.exit(0); }; process.on("SIGTERM", shutdown);
Installation
npm install kafkajs
Best Practices
- Idempotent producer — Enable
for exactly-once delivery; prevents duplicate messages on retriesidempotent: true - Key-based partitioning — Use message keys (userId, orderId) to ensure related events go to the same partition (ordered)
- Consumer groups — Add more consumers to a group for horizontal scaling; Kafka auto-rebalances partitions
- Manual offset commits — Commit offsets after processing, not before; prevents data loss on consumer crashes
- Heartbeat in batches — Call
during long batch processing to prevent session timeoutheartbeat() - Dead-letter topics — Send failed messages to a DLT (
) after retries; don't block the consumertopic.DLT - Schema validation — Use Avro/Protobuf with Schema Registry for strong typing across producers/consumers
- Compression — Use GZIP or LZ4 compression; reduces network bandwidth 60-80% for JSON payloads