Claude-skill-registry kafka-setup

Set up Apache Kafka for event streaming - Strimzi for local Kubernetes, Redpanda Cloud for production. Use when configuring event-driven messaging for Phase 5. (project)

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/kafka-setup" ~/.claude/skills/majiayu000-claude-skill-registry-kafka-setup && rm -rf "$T"
manifest: skills/data/kafka-setup/SKILL.md
source content

Kafka Setup Skill

Quick Start

  1. Read Phase 5 Constitution -
    constitution-prompt-phase-5.md
  2. Choose deployment - Strimzi (local) or Redpanda Cloud (production)
  3. Install Strimzi operator - For Kubernetes deployment
  4. Create Kafka cluster - Using Strimzi CRDs
  5. Create topics - task-events, reminder-events, audit-events
  6. Configure Dapr component - Connect Dapr to Kafka

Deployment Options

OptionEnvironmentUse Case
StrimziMinikube/DOKSFull Kubernetes-native Kafka
Redpanda CloudProductionManaged Kafka-compatible streaming
Docker ComposeLocal DevQuick local development

Strimzi Installation (Kubernetes)

Install Strimzi Operator

# Create namespace
kubectl create namespace kafka

# Install Strimzi operator
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

# Wait for operator to be ready
kubectl wait deployment/strimzi-cluster-operator \
  --for=condition=available \
  --timeout=300s \
  -n kafka

Create Kafka Cluster

Create

kafka/kafka-cluster.yaml
:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: todo-kafka
  namespace: kafka
spec:
  kafka:
    version: 3.6.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.6"
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 10Gi
          deleteClaim: false
    resources:
      requests:
        memory: 1Gi
        cpu: "500m"
      limits:
        memory: 2Gi
        cpu: "1"
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 5Gi
      deleteClaim: false
    resources:
      requests:
        memory: 512Mi
        cpu: "250m"
      limits:
        memory: 1Gi
        cpu: "500m"
  entityOperator:
    topicOperator: {}
    userOperator: {}

Minikube-Optimized Cluster (Single Node)

Create

kafka/kafka-cluster-minikube.yaml
:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: todo-kafka
  namespace: kafka
spec:
  kafka:
    version: 3.6.0
    replicas: 1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      default.replication.factor: 1
      min.insync.replicas: 1
    storage:
      type: ephemeral
    resources:
      requests:
        memory: 512Mi
        cpu: "250m"
      limits:
        memory: 1Gi
        cpu: "500m"
  zookeeper:
    replicas: 1
    storage:
      type: ephemeral
    resources:
      requests:
        memory: 256Mi
        cpu: "100m"
      limits:
        memory: 512Mi
        cpu: "250m"
  entityOperator:
    topicOperator: {}

Create Kafka Topics

Create

kafka/kafka-topics.yaml
:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: task-events
  namespace: kafka
  labels:
    strimzi.io/cluster: todo-kafka
spec:
  partitions: 3
  replicas: 1
  config:
    retention.ms: 604800000  # 7 days
    cleanup.policy: delete
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: reminder-events
  namespace: kafka
  labels:
    strimzi.io/cluster: todo-kafka
spec:
  partitions: 3
  replicas: 1
  config:
    retention.ms: 604800000
    cleanup.policy: delete
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: audit-events
  namespace: kafka
  labels:
    strimzi.io/cluster: todo-kafka
spec:
  partitions: 3
  replicas: 1
  config:
    retention.ms: 2592000000  # 30 days
    cleanup.policy: delete
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: task-updates
  namespace: kafka
  labels:
    strimzi.io/cluster: todo-kafka
spec:
  partitions: 3
  replicas: 1
  config:
    retention.ms: 86400000  # 1 day
    cleanup.policy: delete

Docker Compose (Local Development)

Add to

docker-compose.yaml
:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
    networks:
      - todo-network

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    networks:
      - todo-network

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    networks:
      - todo-network

Redpanda Cloud Setup (Production)

Create Redpanda Cloud Cluster

  1. Sign up at https://cloud.redpanda.com
  2. Create a new cluster (Dedicated or Serverless)
  3. Get connection credentials

Configure Dapr for Redpanda

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: taskpubsub
  namespace: todo-app
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      secretKeyRef:
        name: redpanda-secrets
        key: brokers
    - name: authType
      value: "password"
    - name: saslUsername
      secretKeyRef:
        name: redpanda-secrets
        key: username
    - name: saslPassword
      secretKeyRef:
        name: redpanda-secrets
        key: password
    - name: saslMechanism
      value: "SCRAM-SHA-256"
    - name: tls
      value: "true"

Dapr Pub/Sub Component

Create

dapr-components/pubsub.yaml
:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: taskpubsub
  namespace: todo-app
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "todo-kafka-kafka-bootstrap.kafka.svc.cluster.local:9092"
    - name: consumerGroup
      value: "todo-consumer-group"
    - name: authType
      value: "none"
    - name: disableTls
      value: "true"
    - name: maxMessageBytes
      value: "1048576"
    - name: consumeRetryInterval
      value: "100ms"
scopes:
  - backend
  - notification-service
  - recurring-service
  - audit-service
  - websocket-service

Python Kafka Client (Direct)

Installation

uv add aiokafka

Producer

from aiokafka import AIOKafkaProducer
import json

class KafkaEventProducer:
    def __init__(self, bootstrap_servers: str):
        self.producer = AIOKafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

    async def start(self):
        await self.producer.start()

    async def stop(self):
        await self.producer.stop()

    async def publish(self, topic: str, event: dict):
        await self.producer.send_and_wait(topic, event)

Consumer

from aiokafka import AIOKafkaConsumer
import json

class KafkaEventConsumer:
    def __init__(self, bootstrap_servers: str, topics: list[str], group_id: str):
        self.consumer = AIOKafkaConsumer(
            *topics,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            value_deserializer=lambda v: json.loads(v.decode('utf-8'))
        )

    async def start(self):
        await self.consumer.start()

    async def stop(self):
        await self.consumer.stop()

    async def consume(self):
        async for msg in self.consumer:
            yield msg.topic, msg.value

Verification Checklist

  • Kafka cluster running (Strimzi or Docker)
  • All topics created (task-events, reminder-events, audit-events, task-updates)
  • Dapr component configured
  • Producer can publish messages
  • Consumer receives messages
  • Kafka UI accessible (optional)
  • Redpanda Cloud configured (for production)

Topic Schema

task-events

{
  "event_type": "task.created | task.updated | task.deleted | task.completed",
  "task_id": "uuid",
  "user_id": "uuid",
  "task": {
    "id": "uuid",
    "title": "string",
    "description": "string",
    "priority": "low | medium | high",
    "status": "pending | in_progress | completed",
    "due_date": "ISO8601",
    "tags": ["string"]
  },
  "timestamp": "ISO8601"
}

reminder-events

{
  "event_type": "reminder.triggered | reminder.created | reminder.deleted",
  "reminder_id": "uuid",
  "task_id": "uuid",
  "user_id": "uuid",
  "message": "string",
  "timestamp": "ISO8601"
}

Troubleshooting

IssueCauseSolution
Broker not reachableWrong addressUse internal K8s DNS
Topic not foundNot createdApply KafkaTopic CRD
Consumer lagSlow processingScale consumers
Auth failedWrong credentialsCheck secrets

References