Claude-skill-registry customerio-load-scale

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/customerio-load-scale" ~/.claude/skills/majiayu000-claude-skill-registry-customerio-load-scale && rm -rf "$T"
manifest: skills/data/customerio-load-scale/SKILL.md
source content

Customer.io Load & Scale

Overview

Load testing and scaling strategies for high-volume Customer.io integrations.

Prerequisites

  • Customer.io integration working
  • Load testing tools (k6, Artillery)
  • Staging environment with test workspace

Capacity Planning

Customer.io Rate Limits

EndpointLimitNotes
Track API (identify/track)100 req/secPer workspace
App API (transactional)100 req/secPer workspace
Webhooks (outbound)VariesBased on plan

Scaling Targets

VolumeArchitectureNotes
< 1M events/daySingle serviceDirect API calls
1-10M events/dayQueue-basedMessage queue buffer
> 10M events/dayDistributedMultiple workers

Instructions

Step 1: Load Test Script (k6)

// load-tests/customerio.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate, Trend } from 'k6/metrics';

const errorRate = new Rate('errors');
const identifyDuration = new Trend('identify_duration');
const trackDuration = new Trend('track_duration');

const BASE_URL = 'https://track.customer.io/api/v1';
const AUTH = __ENV.CUSTOMERIO_AUTH; // base64(site_id:api_key)

export const options = {
  scenarios: {
    identify_load: {
      executor: 'ramping-rate',
      startRate: 10,
      timeUnit: '1s',
      preAllocatedVUs: 50,
      stages: [
        { target: 50, duration: '1m' },
        { target: 100, duration: '2m' },
        { target: 100, duration: '5m' },
        { target: 0, duration: '1m' },
      ],
      exec: 'identifyScenario',
    },
    track_load: {
      executor: 'ramping-rate',
      startRate: 10,
      timeUnit: '1s',
      preAllocatedVUs: 50,
      stages: [
        { target: 50, duration: '1m' },
        { target: 100, duration: '2m' },
        { target: 100, duration: '5m' },
        { target: 0, duration: '1m' },
      ],
      exec: 'trackScenario',
    },
  },
  thresholds: {
    'errors': ['rate<0.01'],
    'identify_duration': ['p95<500'],
    'track_duration': ['p95<500'],
  },
};

export function identifyScenario() {
  const userId = `load-test-${__VU}-${__ITER}`;
  const payload = JSON.stringify({
    email: `${userId}@loadtest.com`,
    _load_test: true,
    created_at: Math.floor(Date.now() / 1000),
  });

  const start = new Date();
  const res = http.post(
    `${BASE_URL}/customers/${userId}`,
    payload,
    {
      headers: {
        'Authorization': `Basic ${AUTH}`,
        'Content-Type': 'application/json',
      },
    }
  );
  identifyDuration.add(new Date() - start);

  const success = check(res, {
    'identify status is 200': (r) => r.status === 200,
  });
  errorRate.add(!success);

  sleep(0.1);
}

export function trackScenario() {
  const userId = `load-test-${__VU}-${__ITER}`;
  const payload = JSON.stringify({
    name: 'load_test_event',
    data: {
      source: 'k6',
      timestamp: new Date().toISOString(),
    },
  });

  const start = new Date();
  const res = http.post(
    `${BASE_URL}/customers/${userId}/events`,
    payload,
    {
      headers: {
        'Authorization': `Basic ${AUTH}`,
        'Content-Type': 'application/json',
      },
    }
  );
  trackDuration.add(new Date() - start);

  const success = check(res, {
    'track status is 200': (r) => r.status === 200,
  });
  errorRate.add(!success);

  sleep(0.1);
}

Step 2: Horizontal Scaling

# k8s/scaled-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: customerio-worker
spec:
  replicas: 3
  selector:
    matchLabels:
      app: customerio-worker
  template:
    metadata:
      labels:
        app: customerio-worker
    spec:
      containers:
        - name: worker
          image: customerio-worker:latest
          resources:
            requests:
              cpu: "500m"
              memory: "256Mi"
            limits:
              cpu: "1000m"
              memory: "512Mi"
          env:
            - name: CONCURRENCY
              value: "10"
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: customerio-worker-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: customerio-worker
  minReplicas: 3
  maxReplicas: 20
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: External
      external:
        metric:
          name: pubsub.googleapis.com|subscription|num_undelivered_messages
          selector:
            matchLabels:
              resource.labels.subscription_id: customerio-events
        target:
          type: AverageValue
          averageValue: 1000

Step 3: Message Queue Architecture

// lib/scaled-processor.ts
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { TrackClient, RegionUS } from '@customerio/track';

const kafka = new Kafka({
  clientId: 'customerio-worker',
  brokers: process.env.KAFKA_BROKERS!.split(',')
});

const consumer = kafka.consumer({
  groupId: 'customerio-workers',
  sessionTimeout: 30000,
  heartbeatInterval: 3000
});

const client = new TrackClient(
  process.env.CUSTOMERIO_SITE_ID!,
  process.env.CUSTOMERIO_API_KEY!,
  { region: RegionUS }
);

interface CustomerIOEvent {
  type: 'identify' | 'track';
  userId: string;
  payload: any;
}

async function processMessage(message: EachMessagePayload): Promise<void> {
  const event: CustomerIOEvent = JSON.parse(message.message.value!.toString());

  if (event.type === 'identify') {
    await client.identify(event.userId, event.payload);
  } else if (event.type === 'track') {
    await client.track(event.userId, {
      name: event.payload.event,
      data: event.payload.properties
    });
  }
}

async function start(): Promise<void> {
  await consumer.connect();
  await consumer.subscribe({ topic: 'customerio-events', fromBeginning: false });

  await consumer.run({
    partitionsConsumedConcurrently: 10,
    eachMessage: async (payload) => {
      try {
        await processMessage(payload);
      } catch (error) {
        console.error('Processing error:', error);
        // Dead letter or retry logic
      }
    }
  });
}

start().catch(console.error);

Step 4: Rate Limiter for Fair Usage

// lib/rate-limiter.ts
import Bottleneck from 'bottleneck';

// Respect Customer.io's 100 req/sec limit
// Leave headroom for other services
const limiter = new Bottleneck({
  reservoir: 80, // 80 tokens
  reservoirRefreshAmount: 80,
  reservoirRefreshInterval: 1000, // per second
  maxConcurrent: 20,
  minTime: 10 // Minimum 10ms between requests
});

// Track rate limit events
limiter.on('depleted', () => {
  console.warn('Rate limiter depleted, requests queued');
});

limiter.on('error', (error) => {
  console.error('Rate limiter error:', error);
});

export async function rateLimitedIdentify(
  client: TrackClient,
  userId: string,
  attributes: Record<string, any>
): Promise<void> {
  return limiter.schedule(() => client.identify(userId, attributes));
}

export async function rateLimitedTrack(
  client: TrackClient,
  userId: string,
  event: string,
  data?: Record<string, any>
): Promise<void> {
  return limiter.schedule(() =>
    client.track(userId, { name: event, data })
  );
}

// Get limiter stats
export function getLimiterStats() {
  return {
    running: limiter.running(),
    queued: limiter.queued(),
    done: limiter.done(),
    reservoir: limiter.reservoir
  };
}

Step 5: Batch Processing

// lib/batch-sender.ts
interface BatchConfig {
  maxBatchSize: number;
  maxWaitMs: number;
  concurrency: number;
}

class BatchSender {
  private batch: Array<{ userId: string; operation: 'identify' | 'track'; data: any }> = [];
  private timer: NodeJS.Timer | null = null;
  private processing = false;

  constructor(
    private client: TrackClient,
    private config: BatchConfig = { maxBatchSize: 100, maxWaitMs: 1000, concurrency: 10 }
  ) {}

  add(userId: string, operation: 'identify' | 'track', data: any): void {
    this.batch.push({ userId, operation, data });

    if (this.batch.length >= this.config.maxBatchSize) {
      this.flush();
    } else if (!this.timer) {
      this.timer = setTimeout(() => this.flush(), this.config.maxWaitMs);
    }
  }

  async flush(): Promise<void> {
    if (this.processing || this.batch.length === 0) return;

    if (this.timer) {
      clearTimeout(this.timer);
      this.timer = null;
    }

    this.processing = true;
    const items = this.batch.splice(0, this.config.maxBatchSize);

    // Process in parallel with limited concurrency
    for (let i = 0; i < items.length; i += this.config.concurrency) {
      const chunk = items.slice(i, i + this.config.concurrency);
      await Promise.allSettled(chunk.map(item => this.processItem(item)));
    }

    this.processing = false;
  }

  private async processItem(item: { userId: string; operation: string; data: any }): Promise<void> {
    if (item.operation === 'identify') {
      await this.client.identify(item.userId, item.data);
    } else {
      await this.client.track(item.userId, {
        name: item.data.event,
        data: item.data.properties
      });
    }
  }
}

Step 6: Load Test Execution

#!/bin/bash
# scripts/run-load-test.sh

# Set credentials
export CUSTOMERIO_AUTH=$(echo -n "$CIO_SITE_ID:$CIO_API_KEY" | base64)

# Run k6 load test
k6 run \
  --out json=results.json \
  --out influxdb=http://localhost:8086/k6 \
  load-tests/customerio.js

# Generate report
k6 run --summary-export=summary.json load-tests/customerio.js

echo "Load test complete. Results in results.json"

Scaling Checklist

  • Rate limits understood
  • Load tests written
  • Horizontal scaling configured
  • Message queue buffering
  • Rate limiting implemented
  • Batch processing enabled
  • Monitoring during tests

Error Handling

IssueSolution
Rate limited (429)Reduce concurrency
Timeout errorsIncrease timeout
Queue backlogScale workers

Resources

Next Steps

After load testing, proceed to

customerio-known-pitfalls
for anti-patterns.