Claude-skill-registry kafka-integration-testing
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/kafka-integration-testing" ~/.claude/skills/majiayu000-claude-skill-registry-kafka-integration-testing && rm -rf "$T"
skills/data/kafka-integration-testing/SKILL.mdKafka Integration Testing
Purpose
Write production-grade integration tests for Kafka producers and consumers using testcontainers. Covers setting up temporary test brokers, testing producer/consumer workflows, verifying message ordering guarantees, testing error scenarios, and validating delivery semantics without mocking external services.
When to Use This Skill
Use when testing Kafka producer/consumer workflows end-to-end with "test Kafka integration", "verify message ordering", "test Kafka roundtrip", or "validate exactly-once semantics".
Do NOT use for unit testing with mocked Kafka (use
pytest-adapter-integration-testing), implementing producers/consumers (use respective kafka-*-implementation skills), or schema validation (use kafka-schema-management).
Quick Start
Create a producer/consumer round-trip test in 3 steps:
- Add dependency:
pip install testcontainers[kafka]>=4.0.0
- Write test:
import pytest from testcontainers.kafka import KafkaContainer from app.extraction.adapters.kafka.producer import OrderEventPublisher from app.storage.adapters.kafka.consumer import OrderEventConsumer @pytest.fixture def kafka_container(): with KafkaContainer() as kafka: yield kafka def test_producer_consumer_roundtrip(kafka_container): brokers = [kafka_container.get_bootstrap_server()] # Produce publisher = OrderEventPublisher(brokers, "test-orders") event = OrderEventMessage(order_id="test_123", ...) publisher.publish_order(event) publisher.flush() # Consume consumer = OrderEventConsumer(brokers, "test-orders", "test-group") message = consumer.consume(timeout=5.0) assert message is not None assert message.order_id == "test_123"
- Run:
pytest tests/integration/test_kafka_roundtrip.py -v
Implementation Steps
1. Set Up Test Environment
Configure pytest fixtures for Kafka container management:
# tests/integration/conftest.py import pytest from testcontainers.kafka import KafkaContainer @pytest.fixture(scope="function") def kafka_container() -> KafkaContainer: """Start isolated Kafka container for each test.""" container = KafkaContainer() container.start() try: import time time.sleep(2) # Give broker time to become ready yield container finally: container.stop() @pytest.fixture def kafka_brokers(kafka_container: KafkaContainer) -> list[str]: """Get broker addresses.""" return [kafka_container.get_bootstrap_server()]
2. Test Producer Functionality
def test_publisher_publishes_message_to_kafka(kafka_brokers: list[str]) -> None: """Test publisher successfully publishes message.""" publisher = OrderEventPublisher(brokers=kafka_brokers, topic="orders") event = OrderEventMessage(order_id="order_123", ...) publisher.publish_order(event) publisher.flush()
3. Test Consumer Functionality
def test_consumer_receives_published_message(kafka_brokers: list[str]) -> None: """Test consumer receives published message.""" topic = "test-orders" # Publish publisher = OrderEventPublisher(brokers=kafka_brokers, topic=topic) publisher.publish_order(event) publisher.flush() # Consume consumer = OrderEventConsumer(brokers=kafka_brokers, topic=topic, group_id="test-group") message = consumer.consume(timeout=5.0) assert message is not None assert message.order_id == "order_123" consumer.close()
4. Test Error Scenarios
def test_consumer_handles_malformed_message(kafka_brokers: list[str]) -> None: """Test consumer raises error on malformed JSON.""" from confluent_kafka import Producer producer = Producer({"bootstrap.servers": ",".join(kafka_brokers)}) producer.produce("test-orders", key=b"bad", value=b"not valid json") producer.flush() consumer = OrderEventConsumer(brokers=kafka_brokers, topic="test-orders", group_id="test-group") with pytest.raises(KafkaConsumerException): consumer.consume(timeout=5.0)
5. Test Message Ordering
def test_messages_ordered_within_partition(kafka_brokers: list[str]) -> None: """Test messages with same key maintain order.""" publisher = OrderEventPublisher(brokers=kafka_brokers, topic="ordered-orders") order_id = "order_123" # Publish 5 messages with same order_id (same partition key) for i in range(5): event = OrderEventMessage(order_id=order_id, created_at=f"2024-01-01T12:00:{i:02d}Z", ...) publisher.publish_order(event) publisher.flush() # Consume and verify order consumer = OrderEventConsumer(brokers=kafka_brokers, topic="ordered-orders", group_id="test-group") for i in range(5): message = consumer.consume(timeout=5.0) assert message is not None assert message.created_at == f"2024-01-01T12:00:{i:02d}Z"
Requirements
- Container managementtestcontainers>=4.0.0
- Kafka container supporttestcontainers[kafka]>=4.0.0
- Kafka clientconfluent-kafka>=2.3.0
- Message serializationmsgspec>=0.18.6
- Test frameworkpytest>=7.4.3
- Async test supportpytest-asyncio>=0.21.1- Docker - Required for testcontainers
- Python 3.11+ with type checking
Running Tests
# All integration tests pytest tests/integration/ -v # Specific test class pytest tests/integration/test_kafka_producer_integration.py::TestOrderEventPublisherIntegration -v # With coverage pytest tests/integration/ --cov=app.extraction.adapters.kafka --cov=app.storage.adapters.kafka
Debugging Failed Tests
Container logs:
docker logs <container_id>
Verbose output:
pytest tests/integration/test_kafka.py::test_name -vv -s
Common Issues
Container fails to start: Check Docker is running and has available resources.
Test hangs on consume(): Verify publisher called
flush() before consuming.
Malformed message exceptions: Check message schema matches expected structure.
Port already in use: Testcontainers uses random ports, conflicts are rare.
Intermittent failures: Add 2s sleep after container start for broker readiness.
Best Practices
- Use unique consumer groups per test for isolation
- Always flush producers before consuming
- Clean up resources with try/finally or fixtures
- Use timeouts on consume() to avoid hanging tests
- Test with real Kafka (don't mock) for confidence
Example Test Patterns
See
examples/examples.md for comprehensive examples:
- Basic producer/consumer tests
- Round-trip workflow testing
- Message ordering verification
- Exactly-once semantics validation
- Error handling scenarios
- Async testing patterns
- Custom fixtures for pre-populated topics
Troubleshooting Guide
See
references/reference.md for detailed troubleshooting:
- Container startup issues
- Network configuration
- Performance optimization
- Docker Compose integration
- CI/CD pipeline setup
See Also
skill - Producer implementationkafka-producer-implementation
skill - Consumer implementationkafka-consumer-implementation
skill - Schema designkafka-schema-management
- Complete test examplesexamples/examples.md
- Troubleshooting guidereferences/reference.md