Claude-skill-registry kafka-consumer-implementation
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/kafka-consumer-implementation" ~/.claude/skills/majiayu000-claude-skill-registry-kafka-consumer-implementation && rm -rf "$T"
skills/data/kafka-consumer-implementation/SKILL.mdKafka Consumer Implementation
Purpose
Implement production-grade Kafka consumers that reliably consume and process domain events with high performance, type safety, and comprehensive error recovery. Covers msgspec deserialization, confluent-kafka configuration, offset management, OpenTelemetry tracing, and anti-corruption layer patterns for translating message schemas to domain models.
When to Use This Skill
Use when building event-driven systems that consume domain events from Kafka topics with "implement Kafka consumer", "consume events from Kafka", "process order messages", or "set up event consumer".
Do NOT use when mocking Kafka consumers in unit tests (use
pytest-adapter-integration-testing), implementing producers (use kafka-producer-implementation), or testing with testcontainers (use kafka-integration-testing).
Quick Start
Create a high-performance Kafka consumer in 3 steps:
- Define message schema:
import msgspec class OrderEventMessage(msgspec.Struct, frozen=True): order_id: str created_at: str customer_name: str total_price: float
- Implement consumer:
from confluent_kafka import Consumer import msgspec class OrderEventConsumer: def __init__(self, brokers: list[str], topic: str, group_id: str) -> None: config = { "bootstrap.servers": ",".join(brokers), "group.id": group_id, "auto.offset.reset": "earliest", "enable.auto.commit": False, } self.consumer = Consumer(config) self.consumer.subscribe([topic]) self.decoder = msgspec.json.Decoder(OrderEventMessage) def consume(self, timeout: float = 1.0) -> OrderEventMessage | None: msg = self.consumer.poll(timeout) if msg is None or msg.error(): return None return self.decoder.decode(msg.value()) def commit(self) -> None: self.consumer.commit(asynchronous=False)
- Use in application:
consumer = OrderEventConsumer(["localhost:9092"], "orders", "loader") message = consumer.consume() if message: process(message) consumer.commit()
Implementation Steps
1. Consumer Configuration
Key configuration for exactly-once processing:
config = { "bootstrap.servers": ",".join(brokers), "group.id": group_id, "auto.offset.reset": "earliest", # Start from beginning "enable.auto.commit": False, # Manual offset management "session.timeout.ms": 300000, # 5 minute timeout "max.poll.interval.ms": 300000, # 5 minutes for processing }
2. Consumer Adapter
Implement consumer with error handling:
- msgspec deserialization (10-20x faster than Pydantic)
- OpenTelemetry distributed tracing
- Manual offset management for exactly-once semantics
- Comprehensive error logging
See
references/detailed-implementation.md for complete consumer adapter code.
3. Anti-Corruption Layer
Translate Kafka messages to domain entities:
class OrderEventTranslator: @staticmethod def to_domain_order(message: OrderEventMessage) -> Order: # Validate if not message.order_id: raise ValueError("order_id is required") # Convert types (str -> OrderId, float -> Money) created_at = datetime.fromisoformat(message.created_at) order_id = OrderId(message.order_id) total_price = Money(Decimal(str(message.total_price))) return Order(order_id, created_at, message.customer_name, total_price, [])
4. Processing Loop
Main consumer loop pattern:
- Poll for messages (5s timeout)
- Translate to domain objects
- Process (load into storage)
- Commit offset (only after success)
- Handle errors without stopping loop
See
references/detailed-implementation.md for complete processing loop code.
5. Lifecycle Management
Use context managers for clean shutdown:
@asynccontextmanager async def managed_consumer(brokers, topic, group_id): consumer = OrderEventConsumer(brokers, topic, group_id) try: yield consumer finally: consumer.close()
Requirements
- Production-grade Kafka clientconfluent-kafka>=2.3.0
- Ultra-fast deserializationmsgspec>=0.18.6
- Structured loggingstructlog>=23.2.0
- Distributed tracingopentelemetry-api>=1.22.0- Kafka/Redpanda broker running (3.x or later)
- Python 3.11+ with type checking enabled
Consumer Groups and Offset Management
Consumer Groups: Consumers in the same group share responsibility for topic partitions. Kafka automatically rebalances when members join/leave.
Manual Offset Management (exactly-once-per-restart):
- Disable auto-commit:
"enable.auto.commit": False - Commit only after successful processing:
consumer.commit() - Offset reset behavior:
starts from beginning"auto.offset.reset": "earliest"
See
references/detailed-implementation.md for complete offset management patterns, consumer lag monitoring, and rebalancing behavior.
Error Handling
Key error handling strategies:
- Deserialization failures: Log error, commit offset to skip poison pill
- Processing failures: Don't commit, message will be retried on restart
- Commit failures: Log error, continue (will retry on next message)
See
references/error-handling.md for comprehensive error handling strategies and dead letter queue patterns.
Testing
Use testcontainers for integration tests:
from testcontainers.kafka import KafkaContainer @pytest.fixture def kafka_container(): with KafkaContainer() as kafka: yield kafka def test_consumer_roundtrip(kafka_container): brokers = [kafka_container.get_bootstrap_server()] # Test consumer/producer workflow
See
examples/integration-examples.md for complete integration test patterns.
See Also
- Complete consumer adapter and processing loop codereferences/detailed-implementation.md
- Comprehensive error handling strategiesreferences/error-handling.md
- Real-world integration patternsexamples/integration-examples.md
skill - For producing eventskafka-producer-implementation
skill - For schema designkafka-schema-management