Claude-code-plugins-plus-skills customerio-load-scale
install
source · Clone the upstream repo
git clone https://github.com/jeremylongshore/claude-code-plugins-plus-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jeremylongshore/claude-code-plugins-plus-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/saas-packs/customerio-pack/skills/customerio-load-scale" ~/.claude/skills/jeremylongshore-claude-code-plugins-plus-skills-customerio-load-scale && rm -rf "$T"
manifest:
plugins/saas-packs/customerio-pack/skills/customerio-load-scale/SKILL.mdsource content
Customer.io Load & Scale
Overview
Load testing and scaling strategies for high-volume Customer.io integrations: k6 load test scripts, scaling architecture selection based on volume tier, Kubernetes HPA autoscaling, message queue buffering, and rate-limit-aware batch processing.
Scaling Architecture by Volume
| Daily Events | Architecture | Key Components |
|---|---|---|
| < 100K | Direct API | Singleton client, retry, connection pooling |
| 100K - 1M | Batched API | Event queue, batch processor, rate limiter |
| 1M - 10M | Queue-backed | Redis/Kafka queue, worker pool, backpressure |
| > 10M | Distributed | Multiple workspaces, sharded queues, regional routing |
Customer.io rate limit is ~100 req/sec per workspace. Plan your architecture around this.
Instructions
Step 1: k6 Load Test Script
// load-tests/customerio.js // Run: k6 run --vus 10 --duration 60s load-tests/customerio.js import http from "k6/http"; import { check, sleep } from "k6"; import { Counter, Trend } from "k6/metrics"; const SITE_ID = __ENV.CUSTOMERIO_SITE_ID; const API_KEY = __ENV.CUSTOMERIO_TRACK_API_KEY; const BASE_URL = "https://track.customer.io/api/v1"; const AUTH = `${SITE_ID}:${API_KEY}`; const identifyLatency = new Trend("cio_identify_latency"); const trackLatency = new Trend("cio_track_latency"); const errors = new Counter("cio_errors"); export const options = { scenarios: { identify_load: { executor: "ramping-arrival-rate", startRate: 10, timeUnit: "1s", preAllocatedVUs: 20, maxVUs: 50, stages: [ { duration: "30s", target: 50 }, // Ramp to 50/sec { duration: "60s", target: 80 }, // Hold at 80/sec (near limit) { duration: "30s", target: 10 }, // Cool down ], }, }, thresholds: { cio_identify_latency: ["p(95)<500", "p(99)<2000"], cio_track_latency: ["p(95)<500", "p(99)<2000"], cio_errors: ["count<50"], }, }; export default function () { const userId = `k6-load-${__VU}-${__ITER}`; const headers = { "Content-Type": "application/json", Authorization: `Basic ${encoding.b64encode(AUTH)}`, }; // Identify const identifyRes = http.put( `${BASE_URL}/customers/${userId}`, JSON.stringify({ email: `${userId}@loadtest.example.com`, _load_test: true, created_at: Math.floor(Date.now() / 1000), }), { headers } ); identifyLatency.add(identifyRes.timings.duration); check(identifyRes, { "identify 200": (r) => r.status === 200 }) || errors.add(1); // Track event const trackRes = http.post( `${BASE_URL}/customers/${userId}/events`, JSON.stringify({ name: "load_test_event", data: { iteration: __ITER, vu: __VU }, }), { headers } ); trackLatency.add(trackRes.timings.duration); check(trackRes, { "track 200": (r) => r.status === 200 }) || errors.add(1); sleep(0.1); // Small delay between iterations } // Cleanup function — suppress test users after test export function teardown() { console.log("Load test complete. Clean up k6-load-* users in CIO dashboard."); }
Run:
k6 run --env CUSTOMERIO_SITE_ID="$CUSTOMERIO_SITE_ID" \ --env CUSTOMERIO_TRACK_API_KEY="$CUSTOMERIO_TRACK_API_KEY" \ load-tests/customerio.js
Step 2: Queue-Based Architecture
// services/cio-queue-worker.ts import { Queue, Worker, QueueEvents } from "bullmq"; import { TrackClient, RegionUS } from "customerio-node"; import Bottleneck from "bottleneck"; const REDIS_URL = process.env.REDIS_URL ?? "redis://localhost:6379"; // Rate limiter: 80 requests per second (leave headroom under 100/sec limit) const limiter = new Bottleneck({ maxConcurrent: 15, reservoir: 80, reservoirRefreshAmount: 80, reservoirRefreshInterval: 1000, }); const eventQueue = new Queue("cio:events", { connection: { url: REDIS_URL }, defaultJobOptions: { attempts: 5, backoff: { type: "exponential", delay: 2000 }, removeOnComplete: { count: 10000 }, removeOnFail: { count: 50000 }, }, }); // Producer — your application enqueues events here export async function enqueueEvent( type: "identify" | "track", userId: string, data: Record<string, any> ): Promise<void> { await eventQueue.add(type, { userId, data, enqueuedAt: Date.now() }); } // Consumer — workers process events with rate limiting export function startEventWorkers(concurrency = 10): void { const cio = new TrackClient( process.env.CUSTOMERIO_SITE_ID!, process.env.CUSTOMERIO_TRACK_API_KEY!, { region: RegionUS } ); const worker = new Worker( "cio:events", async (job) => { await limiter.schedule(async () => { if (job.name === "identify") { await cio.identify(job.data.userId, job.data.data); } else { await cio.track(job.data.userId, job.data.data); } }); }, { connection: { url: REDIS_URL }, concurrency, } ); worker.on("failed", (job, err) => { console.error(`CIO event failed: ${job?.id} — ${err.message}`); }); // Monitor queue health const events = new QueueEvents("cio:events", { connection: { url: REDIS_URL }, }); setInterval(async () => { const counts = await eventQueue.getJobCounts(); console.log( `CIO queue: waiting=${counts.waiting} active=${counts.active} ` + `failed=${counts.failed} completed=${counts.completed}` ); }, 30000); }
Step 3: Kubernetes HPA Autoscaling
# k8s/hpa.yaml apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: cio-worker-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: cio-event-worker minReplicas: 2 maxReplicas: 20 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Pods pods: metric: name: cio_queue_depth target: type: AverageValue averageValue: "500" behavior: scaleUp: stabilizationWindowSeconds: 60 policies: - type: Pods value: 4 periodSeconds: 60 scaleDown: stabilizationWindowSeconds: 300 policies: - type: Pods value: 2 periodSeconds: 120
Step 4: Batch Sender for Bulk Operations
// lib/cio-batch-sender.ts import { TrackClient, RegionUS } from "customerio-node"; import Bottleneck from "bottleneck"; export async function batchSend( operations: Array<{ type: "identify" | "track"; userId: string; data: Record<string, any>; }>, ratePerSec = 80 ): Promise<{ succeeded: number; failed: number }> { const cio = new TrackClient( process.env.CUSTOMERIO_SITE_ID!, process.env.CUSTOMERIO_TRACK_API_KEY!, { region: RegionUS } ); const limiter = new Bottleneck({ maxConcurrent: 15, reservoir: ratePerSec, reservoirRefreshAmount: ratePerSec, reservoirRefreshInterval: 1000, }); let succeeded = 0; let failed = 0; const promises = operations.map((op, i) => limiter.schedule(async () => { try { if (op.type === "identify") { await cio.identify(op.userId, op.data); } else { await cio.track(op.userId, op.data); } succeeded++; } catch { failed++; } if ((succeeded + failed) % 1000 === 0) { console.log(`Progress: ${succeeded + failed}/${operations.length}`); } }) ); await Promise.all(promises); return { succeeded, failed }; }
Install:
npm install bottleneck bullmq
Load Test Checklist
- Test against staging workspace (NEVER production)
- Start at 10% of target rate, ramp up gradually
- Monitor 429 error rate during test
- Check Customer.io dashboard for processing lag
- Verify cleanup of test users after load test
- Document baseline latency and throughput numbers
- Set up alerts before running at production scale
Error Handling
| Issue | Solution |
|---|---|
| 429 during load test | Reduce rate, check limiter config |
| Queue backlog growing | Scale workers, increase concurrency |
| Memory pressure | Limit batch and queue sizes, enable GC |
| k6 VU exhaustion | Increase and |
Resources
Next Steps
After load testing, proceed to
customerio-known-pitfalls for anti-patterns to avoid.