Claude-skill-registry Kafka Streaming
Comprehensive guide to Apache Kafka for real-time data streaming including topics, producers, consumers, stream processing, and production best practices
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/kafka-streaming" ~/.claude/skills/majiayu000-claude-skill-registry-kafka-streaming && rm -rf "$T"
manifest:
skills/data/kafka-streaming/SKILL.mdsource content
Kafka Streaming
What is Kafka?
Apache Kafka: Distributed event streaming platform for high-throughput, fault-tolerant, real-time data pipelines.
Core Concepts
Producer → Topic (Partitions) → Consumer Producer: Writes events Topic: Category of events Partition: Ordered log within topic Consumer: Reads events
Use Cases
- Event streaming: User clicks, transactions
- Log aggregation: Application logs
- Metrics collection: System metrics
- Data integration: CDC, ETL pipelines
- Messaging: Microservices communication
Architecture
Components
Producers → Kafka Cluster (Brokers) → Consumers ↓ ZooKeeper (metadata)
Broker: Kafka server (node in cluster) ZooKeeper: Coordination service (being replaced by KRaft) Topic: Logical channel for events Partition: Physical log file Consumer Group: Set of consumers working together
Topics and Partitions
Topic
Topic: "user-events" Partitions: 3 Partition 0: [event1, event4, event7, ...] Partition 1: [event2, event5, event8, ...] Partition 2: [event3, event6, event9, ...]
Partitioning Strategy
# Key-based partitioning (same key → same partition) producer.send('user-events', key='user123', value=event) # Round-robin (no key) producer.send('user-events', value=event) # Custom partitioner class CustomPartitioner: def partition(self, key, all_partitions): return hash(key) % len(all_partitions)
Why Partitions?
- Parallelism: Multiple consumers read simultaneously
- Scalability: Add partitions to increase throughput
- Ordering: Events with same key stay in order
Producers
Basic Producer (Python)
from kafka import KafkaProducer import json producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'), key_serializer=lambda k: k.encode('utf-8') if k else None ) # Send event event = { 'user_id': '123', 'action': 'click', 'timestamp': '2024-01-15T10:00:00Z' } future = producer.send( topic='user-events', key='user123', value=event ) # Wait for confirmation record_metadata = future.get(timeout=10) print(f"Sent to partition {record_metadata.partition} at offset {record_metadata.offset}") producer.close()
Producer Configuration
producer = KafkaProducer( bootstrap_servers=['localhost:9092'], # Serialization value_serializer=lambda v: json.dumps(v).encode('utf-8'), # Acknowledgments (reliability) acks='all', # Wait for all replicas (most reliable) # acks=1 # Wait for leader only # acks=0 # Don't wait (fastest, least reliable) # Retries retries=3, # Batching (performance) batch_size=16384, # 16KB linger_ms=10, # Wait 10ms to batch # Compression compression_type='gzip', # or 'snappy', 'lz4', 'zstd' # Idempotence (exactly-once) enable_idempotence=True )
Consumers
Basic Consumer (Python)
from kafka import KafkaConsumer import json consumer = KafkaConsumer( 'user-events', bootstrap_servers=['localhost:9092'], group_id='analytics-group', value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='earliest', # Start from beginning enable_auto_commit=True ) for message in consumer: event = message.value print(f"Received: {event}") # Process event process_event(event) consumer.close()
Consumer Groups
Topic: user-events (3 partitions) Consumer Group: analytics-group (3 consumers) Consumer 1 → Partition 0 Consumer 2 → Partition 1 Consumer 3 → Partition 2 Each partition consumed by exactly one consumer in group
Offset Management
# Auto-commit (default) consumer = KafkaConsumer( 'user-events', enable_auto_commit=True, auto_commit_interval_ms=5000 # Commit every 5 seconds ) # Manual commit consumer = KafkaConsumer( 'user-events', enable_auto_commit=False ) for message in consumer: process_event(message.value) consumer.commit() # Commit after processing
Stream Processing
Kafka Streams (Java)
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("user-events"); // Filter KStream<String, String> filtered = source.filter( (key, value) -> value.contains("click") ); // Map KStream<String, String> mapped = source.mapValues( value -> value.toUpperCase() ); // Aggregate KTable<String, Long> counts = source .groupByKey() .count(); // Join KStream<String, String> joined = stream1.join( stream2, (value1, value2) -> value1 + value2, JoinWindows.of(Duration.ofMinutes(5)) ); // Output filtered.to("filtered-events");
ksqlDB (SQL for Kafka)
-- Create stream CREATE STREAM user_events ( user_id VARCHAR, action VARCHAR, timestamp BIGINT ) WITH ( KAFKA_TOPIC='user-events', VALUE_FORMAT='JSON' ); -- Filter CREATE STREAM click_events AS SELECT * FROM user_events WHERE action = 'click'; -- Aggregate CREATE TABLE user_click_counts AS SELECT user_id, COUNT(*) as click_count FROM click_events GROUP BY user_id; -- Join CREATE STREAM enriched_events AS SELECT e.user_id, e.action, u.name, u.email FROM user_events e LEFT JOIN users u ON e.user_id = u.user_id;
Schemas and Serialization
Avro with Schema Registry
from confluent_kafka import avro from confluent_kafka.avro import AvroProducer # Define schema value_schema = avro.loads(''' { "type": "record", "name": "UserEvent", "fields": [ {"name": "user_id", "type": "string"}, {"name": "action", "type": "string"}, {"name": "timestamp", "type": "long"} ] } ''') # Producer with schema producer = AvroProducer({ 'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8081' }, default_value_schema=value_schema) # Send event event = { 'user_id': '123', 'action': 'click', 'timestamp': 1705315200000 } producer.produce(topic='user-events', value=event) producer.flush()
Exactly-Once Semantics
Producer Idempotence
producer = KafkaProducer( bootstrap_servers=['localhost:9092'], enable_idempotence=True, # Prevents duplicates acks='all', retries=3 )
Transactions
producer = KafkaProducer( bootstrap_servers=['localhost:9092'], transactional_id='my-transactional-id', enable_idempotence=True ) producer.init_transactions() try: producer.begin_transaction() # Send multiple messages atomically producer.send('topic1', value=event1) producer.send('topic2', value=event2) producer.commit_transaction() except Exception as e: producer.abort_transaction()
Monitoring and Operations
Key Metrics
Throughput: - Messages/sec - Bytes/sec Latency: - Producer latency - End-to-end latency Consumer Lag: - How far behind consumers are - Critical metric! Broker Metrics: - CPU, memory, disk usage - Network throughput
Consumer Lag
# Check consumer lag kafka-consumer-groups --bootstrap-server localhost:9092 \ --group analytics-group --describe # Output: # TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG # user-events 0 1000 1500 500 # user-events 1 2000 2100 100
Best Practices
1. Choose Partition Count Carefully
Too few: Limited parallelism Too many: Overhead Rule of thumb: - Start with 3-6 partitions - Increase based on throughput needs - Consider: partitions = max(consumers, throughput/target_per_partition)
2. Use Appropriate Replication Factor
Replication factor = 3 (recommended) Provides: - Fault tolerance (2 brokers can fail) - High availability - Data durability
3. Monitor Consumer Lag
Alert if lag > threshold Indicates: - Slow consumers - Insufficient consumers - Processing issues
4. Use Schema Registry
Benefits: - Schema evolution - Backward compatibility - Type safety
5. Tune Batch Size and Linger
# For throughput producer = KafkaProducer( batch_size=32768, # 32KB linger_ms=100 # Wait 100ms ) # For latency producer = KafkaProducer( batch_size=1, linger_ms=0 )
Common Patterns
Event Sourcing
Store all state changes as events Rebuild state by replaying events Example: - user.created - user.updated - user.deleted Replay → Current user state
CDC (Change Data Capture)
Capture database changes → Kafka Tools: - Debezium (MySQL, PostgreSQL, MongoDB) - Maxwell (MySQL) - Kafka Connect Flow: Database → CDC → Kafka → Consumers
CQRS (Command Query Responsibility Segregation)
Write model: Commands → Kafka → Write DB Read model: Events → Kafka → Read DB (denormalized) Benefits: - Optimized for reads and writes separately - Scalability
Kafka Connect
Source Connector (Database → Kafka)
{ "name": "mysql-source", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "localhost", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "mysql", "table.include.list": "inventory.customers", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory" } }
Sink Connector (Kafka → Database)
{ "name": "postgres-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:postgresql://localhost:5432/analytics", "connection.user": "postgres", "connection.password": "password", "topics": "user-events", "auto.create": "true", "insert.mode": "upsert", "pk.mode": "record_key" } }
Summary
Kafka: Distributed event streaming platform
Core Concepts:
- Topics: Logical channels
- Partitions: Physical logs (parallelism)
- Producers: Write events
- Consumers: Read events
- Consumer Groups: Parallel processing
Guarantees:
- At-least-once (default)
- Exactly-once (with idempotence + transactions)
Stream Processing:
- Kafka Streams (Java)
- ksqlDB (SQL)
Best Practices:
- Choose partition count carefully
- Replication factor = 3
- Monitor consumer lag
- Use schema registry
- Tune batch size and linger
Common Patterns:
- Event sourcing
- CDC
- CQRS
Tools:
- Kafka Connect (integrations)
- Schema Registry (Avro schemas)
- ksqlDB (SQL queries)