Claude-skill-registry customerio-load-scale
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/customerio-load-scale" ~/.claude/skills/majiayu000-claude-skill-registry-customerio-load-scale && rm -rf "$T"
manifest:
skills/data/customerio-load-scale/SKILL.mdsource content
Customer.io Load & Scale
Overview
Load testing and scaling strategies for high-volume Customer.io integrations.
Prerequisites
- Customer.io integration working
- Load testing tools (k6, Artillery)
- Staging environment with test workspace
Capacity Planning
Customer.io Rate Limits
| Endpoint | Limit | Notes |
|---|---|---|
| Track API (identify/track) | 100 req/sec | Per workspace |
| App API (transactional) | 100 req/sec | Per workspace |
| Webhooks (outbound) | Varies | Based on plan |
Scaling Targets
| Volume | Architecture | Notes |
|---|---|---|
| < 1M events/day | Single service | Direct API calls |
| 1-10M events/day | Queue-based | Message queue buffer |
| > 10M events/day | Distributed | Multiple workers |
Instructions
Step 1: Load Test Script (k6)
// load-tests/customerio.js import http from 'k6/http'; import { check, sleep } from 'k6'; import { Rate, Trend } from 'k6/metrics'; const errorRate = new Rate('errors'); const identifyDuration = new Trend('identify_duration'); const trackDuration = new Trend('track_duration'); const BASE_URL = 'https://track.customer.io/api/v1'; const AUTH = __ENV.CUSTOMERIO_AUTH; // base64(site_id:api_key) export const options = { scenarios: { identify_load: { executor: 'ramping-rate', startRate: 10, timeUnit: '1s', preAllocatedVUs: 50, stages: [ { target: 50, duration: '1m' }, { target: 100, duration: '2m' }, { target: 100, duration: '5m' }, { target: 0, duration: '1m' }, ], exec: 'identifyScenario', }, track_load: { executor: 'ramping-rate', startRate: 10, timeUnit: '1s', preAllocatedVUs: 50, stages: [ { target: 50, duration: '1m' }, { target: 100, duration: '2m' }, { target: 100, duration: '5m' }, { target: 0, duration: '1m' }, ], exec: 'trackScenario', }, }, thresholds: { 'errors': ['rate<0.01'], 'identify_duration': ['p95<500'], 'track_duration': ['p95<500'], }, }; export function identifyScenario() { const userId = `load-test-${__VU}-${__ITER}`; const payload = JSON.stringify({ email: `${userId}@loadtest.com`, _load_test: true, created_at: Math.floor(Date.now() / 1000), }); const start = new Date(); const res = http.post( `${BASE_URL}/customers/${userId}`, payload, { headers: { 'Authorization': `Basic ${AUTH}`, 'Content-Type': 'application/json', }, } ); identifyDuration.add(new Date() - start); const success = check(res, { 'identify status is 200': (r) => r.status === 200, }); errorRate.add(!success); sleep(0.1); } export function trackScenario() { const userId = `load-test-${__VU}-${__ITER}`; const payload = JSON.stringify({ name: 'load_test_event', data: { source: 'k6', timestamp: new Date().toISOString(), }, }); const start = new Date(); const res = http.post( `${BASE_URL}/customers/${userId}/events`, payload, { headers: { 'Authorization': `Basic ${AUTH}`, 'Content-Type': 'application/json', }, } ); trackDuration.add(new Date() - start); const success = check(res, { 'track status is 200': (r) => r.status === 200, }); errorRate.add(!success); sleep(0.1); }
Step 2: Horizontal Scaling
# k8s/scaled-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: customerio-worker spec: replicas: 3 selector: matchLabels: app: customerio-worker template: metadata: labels: app: customerio-worker spec: containers: - name: worker image: customerio-worker:latest resources: requests: cpu: "500m" memory: "256Mi" limits: cpu: "1000m" memory: "512Mi" env: - name: CONCURRENCY value: "10" --- apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: customerio-worker-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: customerio-worker minReplicas: 3 maxReplicas: 20 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: External external: metric: name: pubsub.googleapis.com|subscription|num_undelivered_messages selector: matchLabels: resource.labels.subscription_id: customerio-events target: type: AverageValue averageValue: 1000
Step 3: Message Queue Architecture
// lib/scaled-processor.ts import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; import { TrackClient, RegionUS } from '@customerio/track'; const kafka = new Kafka({ clientId: 'customerio-worker', brokers: process.env.KAFKA_BROKERS!.split(',') }); const consumer = kafka.consumer({ groupId: 'customerio-workers', sessionTimeout: 30000, heartbeatInterval: 3000 }); const client = new TrackClient( process.env.CUSTOMERIO_SITE_ID!, process.env.CUSTOMERIO_API_KEY!, { region: RegionUS } ); interface CustomerIOEvent { type: 'identify' | 'track'; userId: string; payload: any; } async function processMessage(message: EachMessagePayload): Promise<void> { const event: CustomerIOEvent = JSON.parse(message.message.value!.toString()); if (event.type === 'identify') { await client.identify(event.userId, event.payload); } else if (event.type === 'track') { await client.track(event.userId, { name: event.payload.event, data: event.payload.properties }); } } async function start(): Promise<void> { await consumer.connect(); await consumer.subscribe({ topic: 'customerio-events', fromBeginning: false }); await consumer.run({ partitionsConsumedConcurrently: 10, eachMessage: async (payload) => { try { await processMessage(payload); } catch (error) { console.error('Processing error:', error); // Dead letter or retry logic } } }); } start().catch(console.error);
Step 4: Rate Limiter for Fair Usage
// lib/rate-limiter.ts import Bottleneck from 'bottleneck'; // Respect Customer.io's 100 req/sec limit // Leave headroom for other services const limiter = new Bottleneck({ reservoir: 80, // 80 tokens reservoirRefreshAmount: 80, reservoirRefreshInterval: 1000, // per second maxConcurrent: 20, minTime: 10 // Minimum 10ms between requests }); // Track rate limit events limiter.on('depleted', () => { console.warn('Rate limiter depleted, requests queued'); }); limiter.on('error', (error) => { console.error('Rate limiter error:', error); }); export async function rateLimitedIdentify( client: TrackClient, userId: string, attributes: Record<string, any> ): Promise<void> { return limiter.schedule(() => client.identify(userId, attributes)); } export async function rateLimitedTrack( client: TrackClient, userId: string, event: string, data?: Record<string, any> ): Promise<void> { return limiter.schedule(() => client.track(userId, { name: event, data }) ); } // Get limiter stats export function getLimiterStats() { return { running: limiter.running(), queued: limiter.queued(), done: limiter.done(), reservoir: limiter.reservoir }; }
Step 5: Batch Processing
// lib/batch-sender.ts interface BatchConfig { maxBatchSize: number; maxWaitMs: number; concurrency: number; } class BatchSender { private batch: Array<{ userId: string; operation: 'identify' | 'track'; data: any }> = []; private timer: NodeJS.Timer | null = null; private processing = false; constructor( private client: TrackClient, private config: BatchConfig = { maxBatchSize: 100, maxWaitMs: 1000, concurrency: 10 } ) {} add(userId: string, operation: 'identify' | 'track', data: any): void { this.batch.push({ userId, operation, data }); if (this.batch.length >= this.config.maxBatchSize) { this.flush(); } else if (!this.timer) { this.timer = setTimeout(() => this.flush(), this.config.maxWaitMs); } } async flush(): Promise<void> { if (this.processing || this.batch.length === 0) return; if (this.timer) { clearTimeout(this.timer); this.timer = null; } this.processing = true; const items = this.batch.splice(0, this.config.maxBatchSize); // Process in parallel with limited concurrency for (let i = 0; i < items.length; i += this.config.concurrency) { const chunk = items.slice(i, i + this.config.concurrency); await Promise.allSettled(chunk.map(item => this.processItem(item))); } this.processing = false; } private async processItem(item: { userId: string; operation: string; data: any }): Promise<void> { if (item.operation === 'identify') { await this.client.identify(item.userId, item.data); } else { await this.client.track(item.userId, { name: item.data.event, data: item.data.properties }); } } }
Step 6: Load Test Execution
#!/bin/bash # scripts/run-load-test.sh # Set credentials export CUSTOMERIO_AUTH=$(echo -n "$CIO_SITE_ID:$CIO_API_KEY" | base64) # Run k6 load test k6 run \ --out json=results.json \ --out influxdb=http://localhost:8086/k6 \ load-tests/customerio.js # Generate report k6 run --summary-export=summary.json load-tests/customerio.js echo "Load test complete. Results in results.json"
Scaling Checklist
- Rate limits understood
- Load tests written
- Horizontal scaling configured
- Message queue buffering
- Rate limiting implemented
- Batch processing enabled
- Monitoring during tests
Error Handling
| Issue | Solution |
|---|---|
| Rate limited (429) | Reduce concurrency |
| Timeout errors | Increase timeout |
| Queue backlog | Scale workers |
Resources
Next Steps
After load testing, proceed to
customerio-known-pitfalls for anti-patterns.