Claude-skill-registry kafka-setup
Set up Apache Kafka for event streaming - Strimzi for local Kubernetes, Redpanda Cloud for production. Use when configuring event-driven messaging for Phase 5. (project)
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/kafka-setup" ~/.claude/skills/majiayu000-claude-skill-registry-kafka-setup && rm -rf "$T"
manifest:
skills/data/kafka-setup/SKILL.mdsource content
Kafka Setup Skill
Quick Start
- Read Phase 5 Constitution -
constitution-prompt-phase-5.md - Choose deployment - Strimzi (local) or Redpanda Cloud (production)
- Install Strimzi operator - For Kubernetes deployment
- Create Kafka cluster - Using Strimzi CRDs
- Create topics - task-events, reminder-events, audit-events
- Configure Dapr component - Connect Dapr to Kafka
Deployment Options
| Option | Environment | Use Case |
|---|---|---|
| Strimzi | Minikube/DOKS | Full Kubernetes-native Kafka |
| Redpanda Cloud | Production | Managed Kafka-compatible streaming |
| Docker Compose | Local Dev | Quick local development |
Strimzi Installation (Kubernetes)
Install Strimzi Operator
# Create namespace kubectl create namespace kafka # Install Strimzi operator kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka # Wait for operator to be ready kubectl wait deployment/strimzi-cluster-operator \ --for=condition=available \ --timeout=300s \ -n kafka
Create Kafka Cluster
Create
kafka/kafka-cluster.yaml:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: todo-kafka namespace: kafka spec: kafka: version: 3.6.0 replicas: 3 listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true config: offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 default.replication.factor: 3 min.insync.replicas: 2 inter.broker.protocol.version: "3.6" storage: type: jbod volumes: - id: 0 type: persistent-claim size: 10Gi deleteClaim: false resources: requests: memory: 1Gi cpu: "500m" limits: memory: 2Gi cpu: "1" zookeeper: replicas: 3 storage: type: persistent-claim size: 5Gi deleteClaim: false resources: requests: memory: 512Mi cpu: "250m" limits: memory: 1Gi cpu: "500m" entityOperator: topicOperator: {} userOperator: {}
Minikube-Optimized Cluster (Single Node)
Create
kafka/kafka-cluster-minikube.yaml:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: todo-kafka namespace: kafka spec: kafka: version: 3.6.0 replicas: 1 listeners: - name: plain port: 9092 type: internal tls: false config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 storage: type: ephemeral resources: requests: memory: 512Mi cpu: "250m" limits: memory: 1Gi cpu: "500m" zookeeper: replicas: 1 storage: type: ephemeral resources: requests: memory: 256Mi cpu: "100m" limits: memory: 512Mi cpu: "250m" entityOperator: topicOperator: {}
Create Kafka Topics
Create
kafka/kafka-topics.yaml:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: task-events namespace: kafka labels: strimzi.io/cluster: todo-kafka spec: partitions: 3 replicas: 1 config: retention.ms: 604800000 # 7 days cleanup.policy: delete --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: reminder-events namespace: kafka labels: strimzi.io/cluster: todo-kafka spec: partitions: 3 replicas: 1 config: retention.ms: 604800000 cleanup.policy: delete --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: audit-events namespace: kafka labels: strimzi.io/cluster: todo-kafka spec: partitions: 3 replicas: 1 config: retention.ms: 2592000000 # 30 days cleanup.policy: delete --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: task-updates namespace: kafka labels: strimzi.io/cluster: todo-kafka spec: partitions: 3 replicas: 1 config: retention.ms: 86400000 # 1 day cleanup.policy: delete
Docker Compose (Local Development)
Add to
docker-compose.yaml:
services: zookeeper: image: confluentinc/cp-zookeeper:7.5.0 container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - "2181:2181" networks: - todo-network kafka: image: confluentinc/cp-kafka:7.5.0 container_name: kafka depends_on: - zookeeper ports: - "9092:9092" - "29092:29092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 networks: - todo-network kafka-ui: image: provectuslabs/kafka-ui:latest container_name: kafka-ui depends_on: - kafka ports: - "8080:8080" environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 networks: - todo-network
Redpanda Cloud Setup (Production)
Create Redpanda Cloud Cluster
- Sign up at https://cloud.redpanda.com
- Create a new cluster (Dedicated or Serverless)
- Get connection credentials
Configure Dapr for Redpanda
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: taskpubsub namespace: todo-app spec: type: pubsub.kafka version: v1 metadata: - name: brokers secretKeyRef: name: redpanda-secrets key: brokers - name: authType value: "password" - name: saslUsername secretKeyRef: name: redpanda-secrets key: username - name: saslPassword secretKeyRef: name: redpanda-secrets key: password - name: saslMechanism value: "SCRAM-SHA-256" - name: tls value: "true"
Dapr Pub/Sub Component
Create
dapr-components/pubsub.yaml:
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: taskpubsub namespace: todo-app spec: type: pubsub.kafka version: v1 metadata: - name: brokers value: "todo-kafka-kafka-bootstrap.kafka.svc.cluster.local:9092" - name: consumerGroup value: "todo-consumer-group" - name: authType value: "none" - name: disableTls value: "true" - name: maxMessageBytes value: "1048576" - name: consumeRetryInterval value: "100ms" scopes: - backend - notification-service - recurring-service - audit-service - websocket-service
Python Kafka Client (Direct)
Installation
uv add aiokafka
Producer
from aiokafka import AIOKafkaProducer import json class KafkaEventProducer: def __init__(self, bootstrap_servers: str): self.producer = AIOKafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode('utf-8') ) async def start(self): await self.producer.start() async def stop(self): await self.producer.stop() async def publish(self, topic: str, event: dict): await self.producer.send_and_wait(topic, event)
Consumer
from aiokafka import AIOKafkaConsumer import json class KafkaEventConsumer: def __init__(self, bootstrap_servers: str, topics: list[str], group_id: str): self.consumer = AIOKafkaConsumer( *topics, bootstrap_servers=bootstrap_servers, group_id=group_id, value_deserializer=lambda v: json.loads(v.decode('utf-8')) ) async def start(self): await self.consumer.start() async def stop(self): await self.consumer.stop() async def consume(self): async for msg in self.consumer: yield msg.topic, msg.value
Verification Checklist
- Kafka cluster running (Strimzi or Docker)
- All topics created (task-events, reminder-events, audit-events, task-updates)
- Dapr component configured
- Producer can publish messages
- Consumer receives messages
- Kafka UI accessible (optional)
- Redpanda Cloud configured (for production)
Topic Schema
task-events
{ "event_type": "task.created | task.updated | task.deleted | task.completed", "task_id": "uuid", "user_id": "uuid", "task": { "id": "uuid", "title": "string", "description": "string", "priority": "low | medium | high", "status": "pending | in_progress | completed", "due_date": "ISO8601", "tags": ["string"] }, "timestamp": "ISO8601" }
reminder-events
{ "event_type": "reminder.triggered | reminder.created | reminder.deleted", "reminder_id": "uuid", "task_id": "uuid", "user_id": "uuid", "message": "string", "timestamp": "ISO8601" }
Troubleshooting
| Issue | Cause | Solution |
|---|---|---|
| Broker not reachable | Wrong address | Use internal K8s DNS |
| Topic not found | Not created | Apply KafkaTopic CRD |
| Consumer lag | Slow processing | Scale consumers |
| Auth failed | Wrong credentials | Check secrets |