Claude-skill-registry Kafka Streaming

Comprehensive guide to Apache Kafka for real-time data streaming including topics, producers, consumers, stream processing, and production best practices

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/kafka-streaming" ~/.claude/skills/majiayu000-claude-skill-registry-kafka-streaming && rm -rf "$T"
manifest: skills/data/kafka-streaming/SKILL.md
source content

Kafka Streaming

What is Kafka?

Apache Kafka: Distributed event streaming platform for high-throughput, fault-tolerant, real-time data pipelines.

Core Concepts

Producer → Topic (Partitions) → Consumer

Producer: Writes events
Topic: Category of events
Partition: Ordered log within topic
Consumer: Reads events

Use Cases

  • Event streaming: User clicks, transactions
  • Log aggregation: Application logs
  • Metrics collection: System metrics
  • Data integration: CDC, ETL pipelines
  • Messaging: Microservices communication

Architecture

Components

Producers → Kafka Cluster (Brokers) → Consumers
                ↓
            ZooKeeper (metadata)

Broker: Kafka server (node in cluster) ZooKeeper: Coordination service (being replaced by KRaft) Topic: Logical channel for events Partition: Physical log file Consumer Group: Set of consumers working together


Topics and Partitions

Topic

Topic: "user-events"
Partitions: 3

Partition 0: [event1, event4, event7, ...]
Partition 1: [event2, event5, event8, ...]
Partition 2: [event3, event6, event9, ...]

Partitioning Strategy

# Key-based partitioning (same key → same partition)
producer.send('user-events', key='user123', value=event)

# Round-robin (no key)
producer.send('user-events', value=event)

# Custom partitioner
class CustomPartitioner:
    def partition(self, key, all_partitions):
        return hash(key) % len(all_partitions)

Why Partitions?

  • Parallelism: Multiple consumers read simultaneously
  • Scalability: Add partitions to increase throughput
  • Ordering: Events with same key stay in order

Producers

Basic Producer (Python)

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None
)

# Send event
event = {
    'user_id': '123',
    'action': 'click',
    'timestamp': '2024-01-15T10:00:00Z'
}

future = producer.send(
    topic='user-events',
    key='user123',
    value=event
)

# Wait for confirmation
record_metadata = future.get(timeout=10)
print(f"Sent to partition {record_metadata.partition} at offset {record_metadata.offset}")

producer.close()

Producer Configuration

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    
    # Serialization
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    
    # Acknowledgments (reliability)
    acks='all',  # Wait for all replicas (most reliable)
    # acks=1     # Wait for leader only
    # acks=0     # Don't wait (fastest, least reliable)
    
    # Retries
    retries=3,
    
    # Batching (performance)
    batch_size=16384,  # 16KB
    linger_ms=10,      # Wait 10ms to batch
    
    # Compression
    compression_type='gzip',  # or 'snappy', 'lz4', 'zstd'
    
    # Idempotence (exactly-once)
    enable_idempotence=True
)

Consumers

Basic Consumer (Python)

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'user-events',
    bootstrap_servers=['localhost:9092'],
    group_id='analytics-group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',  # Start from beginning
    enable_auto_commit=True
)

for message in consumer:
    event = message.value
    print(f"Received: {event}")
    
    # Process event
    process_event(event)

consumer.close()

Consumer Groups

Topic: user-events (3 partitions)
Consumer Group: analytics-group (3 consumers)

Consumer 1 → Partition 0
Consumer 2 → Partition 1
Consumer 3 → Partition 2

Each partition consumed by exactly one consumer in group

Offset Management

# Auto-commit (default)
consumer = KafkaConsumer(
    'user-events',
    enable_auto_commit=True,
    auto_commit_interval_ms=5000  # Commit every 5 seconds
)

# Manual commit
consumer = KafkaConsumer(
    'user-events',
    enable_auto_commit=False
)

for message in consumer:
    process_event(message.value)
    consumer.commit()  # Commit after processing

Stream Processing

Kafka Streams (Java)

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source = builder.stream("user-events");

// Filter
KStream<String, String> filtered = source.filter(
    (key, value) -> value.contains("click")
);

// Map
KStream<String, String> mapped = source.mapValues(
    value -> value.toUpperCase()
);

// Aggregate
KTable<String, Long> counts = source
    .groupByKey()
    .count();

// Join
KStream<String, String> joined = stream1.join(
    stream2,
    (value1, value2) -> value1 + value2,
    JoinWindows.of(Duration.ofMinutes(5))
);

// Output
filtered.to("filtered-events");

ksqlDB (SQL for Kafka)

-- Create stream
CREATE STREAM user_events (
    user_id VARCHAR,
    action VARCHAR,
    timestamp BIGINT
) WITH (
    KAFKA_TOPIC='user-events',
    VALUE_FORMAT='JSON'
);

-- Filter
CREATE STREAM click_events AS
SELECT * FROM user_events
WHERE action = 'click';

-- Aggregate
CREATE TABLE user_click_counts AS
SELECT user_id, COUNT(*) as click_count
FROM click_events
GROUP BY user_id;

-- Join
CREATE STREAM enriched_events AS
SELECT
    e.user_id,
    e.action,
    u.name,
    u.email
FROM user_events e
LEFT JOIN users u ON e.user_id = u.user_id;

Schemas and Serialization

Avro with Schema Registry

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

# Define schema
value_schema = avro.loads('''
{
    "type": "record",
    "name": "UserEvent",
    "fields": [
        {"name": "user_id", "type": "string"},
        {"name": "action", "type": "string"},
        {"name": "timestamp", "type": "long"}
    ]
}
''')

# Producer with schema
producer = AvroProducer({
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'http://localhost:8081'
}, default_value_schema=value_schema)

# Send event
event = {
    'user_id': '123',
    'action': 'click',
    'timestamp': 1705315200000
}

producer.produce(topic='user-events', value=event)
producer.flush()

Exactly-Once Semantics

Producer Idempotence

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    enable_idempotence=True,  # Prevents duplicates
    acks='all',
    retries=3
)

Transactions

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    transactional_id='my-transactional-id',
    enable_idempotence=True
)

producer.init_transactions()

try:
    producer.begin_transaction()
    
    # Send multiple messages atomically
    producer.send('topic1', value=event1)
    producer.send('topic2', value=event2)
    
    producer.commit_transaction()
except Exception as e:
    producer.abort_transaction()

Monitoring and Operations

Key Metrics

Throughput:
- Messages/sec
- Bytes/sec

Latency:
- Producer latency
- End-to-end latency

Consumer Lag:
- How far behind consumers are
- Critical metric!

Broker Metrics:
- CPU, memory, disk usage
- Network throughput

Consumer Lag

# Check consumer lag
kafka-consumer-groups --bootstrap-server localhost:9092 \
  --group analytics-group --describe

# Output:
# TOPIC         PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# user-events   0          1000            1500            500
# user-events   1          2000            2100            100

Best Practices

1. Choose Partition Count Carefully

Too few: Limited parallelism
Too many: Overhead

Rule of thumb: 
- Start with 3-6 partitions
- Increase based on throughput needs
- Consider: partitions = max(consumers, throughput/target_per_partition)

2. Use Appropriate Replication Factor

Replication factor = 3 (recommended)

Provides:
- Fault tolerance (2 brokers can fail)
- High availability
- Data durability

3. Monitor Consumer Lag

Alert if lag > threshold
Indicates:
- Slow consumers
- Insufficient consumers
- Processing issues

4. Use Schema Registry

Benefits:
- Schema evolution
- Backward compatibility
- Type safety

5. Tune Batch Size and Linger

# For throughput
producer = KafkaProducer(
    batch_size=32768,  # 32KB
    linger_ms=100      # Wait 100ms
)

# For latency
producer = KafkaProducer(
    batch_size=1,
    linger_ms=0
)

Common Patterns

Event Sourcing

Store all state changes as events
Rebuild state by replaying events

Example:
- user.created
- user.updated
- user.deleted

Replay → Current user state

CDC (Change Data Capture)

Capture database changes → Kafka

Tools:
- Debezium (MySQL, PostgreSQL, MongoDB)
- Maxwell (MySQL)
- Kafka Connect

Flow:
Database → CDC → Kafka → Consumers

CQRS (Command Query Responsibility Segregation)

Write model: Commands → Kafka → Write DB
Read model: Events → Kafka → Read DB (denormalized)

Benefits:
- Optimized for reads and writes separately
- Scalability

Kafka Connect

Source Connector (Database → Kafka)

{
  "name": "mysql-source",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "mysql",
    "table.include.list": "inventory.customers",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
  }
}

Sink Connector (Kafka → Database)

{
  "name": "postgres-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://localhost:5432/analytics",
    "connection.user": "postgres",
    "connection.password": "password",
    "topics": "user-events",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.mode": "record_key"
  }
}

Summary

Kafka: Distributed event streaming platform

Core Concepts:

  • Topics: Logical channels
  • Partitions: Physical logs (parallelism)
  • Producers: Write events
  • Consumers: Read events
  • Consumer Groups: Parallel processing

Guarantees:

  • At-least-once (default)
  • Exactly-once (with idempotence + transactions)

Stream Processing:

  • Kafka Streams (Java)
  • ksqlDB (SQL)

Best Practices:

  • Choose partition count carefully
  • Replication factor = 3
  • Monitor consumer lag
  • Use schema registry
  • Tune batch size and linger

Common Patterns:

  • Event sourcing
  • CDC
  • CQRS

Tools:

  • Kafka Connect (integrations)
  • Schema Registry (Avro schemas)
  • ksqlDB (SQL queries)