Claude-skill-registry backpressure
Manage data flow when producers outpace consumers. Bounded buffers, adaptive flushing, and graceful degradation prevent OOM crashes and data loss.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/backpressure" ~/.claude/skills/majiayu000-claude-skill-registry-backpressure && rm -rf "$T"
manifest:
skills/data/backpressure/SKILL.mdsource content
Backpressure Management
Prevent OOM crashes when producers outpace consumers.
When to Use This Skill
- Database writes slower than event ingestion
- Memory filling up with queued items
- Need to handle traffic spikes gracefully
- Want to drop low-priority data under load
Core Concepts
- Bounded buffer - Fixed-size queue prevents unbounded growth
- Watermarks - Thresholds trigger state changes
- Strategies - Block, drop oldest, drop newest, or sample
- Adaptive flushing - Adjust rate based on downstream health
State Machine
NORMAL (< 50%) → ELEVATED (50-80%) → CRITICAL (80-100%) → BLOCKED (100%) ↑ │ └──────────────────────────────────────────────────────────┘ (buffer drains)
TypeScript Implementation
Types
// types.ts export enum BackpressureState { NORMAL = 'normal', ELEVATED = 'elevated', CRITICAL = 'critical', BLOCKED = 'blocked', DRAINING = 'draining', } export enum BackpressureStrategy { BLOCK = 'block', DROP_OLDEST = 'drop_oldest', DROP_NEWEST = 'drop_newest', SAMPLE = 'sample', } export interface BackpressureConfig { maxBufferSize: number; highWatermark: number; // 0-1 lowWatermark: number; // 0-1 strategy: BackpressureStrategy; sampleRate?: number; maxBlockTimeMs?: number; batchSize: number; minFlushIntervalMs: number; maxFlushIntervalMs: number; targetLatencyMs: number; } export interface FlushResult { success: number; failed: number; errors: Error[]; } export type FlushFunction<T> = (items: T[]) => Promise<FlushResult>;
Bounded Buffer
// buffer.ts export class BoundedBuffer<T> { private items: T[] = []; constructor(private readonly maxSize: number) {} get size(): number { return this.items.length; } get capacity(): number { return this.maxSize; } get utilization(): number { return this.items.length / this.maxSize; } isFull(): boolean { return this.items.length >= this.maxSize; } isEmpty(): boolean { return this.items.length === 0; } push(item: T): boolean { if (this.isFull()) return false; this.items.push(item); return true; } pushWithEviction(item: T): T | null { const evicted = this.isFull() ? this.items.shift() ?? null : null; this.items.push(item); return evicted; } take(count: number): T[] { return this.items.splice(0, Math.min(count, this.items.length)); } clear(): T[] { const all = this.items; this.items = []; return all; } }
Backpressure Controller
// controller.ts import { BoundedBuffer } from './buffer'; import { BackpressureState, BackpressureStrategy, BackpressureConfig, FlushFunction, } from './types'; const DEFAULT_CONFIG: BackpressureConfig = { maxBufferSize: 10000, highWatermark: 0.8, lowWatermark: 0.5, strategy: BackpressureStrategy.DROP_OLDEST, batchSize: 100, minFlushIntervalMs: 100, maxFlushIntervalMs: 30000, targetLatencyMs: 500, }; export class BackpressureController<T> { private buffer: BoundedBuffer<T>; private state: BackpressureState = BackpressureState.NORMAL; private config: BackpressureConfig; private flushFn: FlushFunction<T>; private flushInterval: NodeJS.Timeout | null = null; private currentFlushIntervalMs: number; private running = false; // Metrics private eventsAccepted = 0; private eventsDropped = 0; private eventsFlushed = 0; private lastFlushLatencyMs = 0; constructor(flushFn: FlushFunction<T>, config: Partial<BackpressureConfig> = {}) { this.config = { ...DEFAULT_CONFIG, ...config }; this.buffer = new BoundedBuffer(this.config.maxBufferSize); this.flushFn = flushFn; this.currentFlushIntervalMs = this.config.minFlushIntervalMs; } start(): void { if (this.running) return; this.running = true; this.scheduleFlush(); } stop(): void { this.running = false; if (this.flushInterval) { clearTimeout(this.flushInterval); this.flushInterval = null; } } async push(item: T): Promise<boolean> { switch (this.config.strategy) { case BackpressureStrategy.BLOCK: if (this.state === BackpressureState.BLOCKED) { const waited = await this.waitForSpace(); if (!waited) { this.eventsDropped++; return false; } } break; case BackpressureStrategy.DROP_NEWEST: if (this.buffer.isFull()) { this.eventsDropped++; return false; } break; case BackpressureStrategy.DROP_OLDEST: if (this.buffer.isFull()) { this.buffer.pushWithEviction(item); this.eventsDropped++; this.eventsAccepted++; this.updateState(); return true; } break; case BackpressureStrategy.SAMPLE: if (this.state !== BackpressureState.NORMAL) { const sampleRate = this.config.sampleRate || 10; if (this.eventsAccepted % sampleRate !== 0) { this.eventsDropped++; return false; } } break; } const accepted = this.buffer.push(item); if (accepted) { this.eventsAccepted++; } else { this.eventsDropped++; } this.updateState(); return accepted; } async drain(): Promise<void> { this.state = BackpressureState.DRAINING; while (!this.buffer.isEmpty()) { await this.flush(); } } getMetrics() { return { state: this.state, bufferSize: this.buffer.size, bufferUtilization: this.buffer.utilization, eventsAccepted: this.eventsAccepted, eventsDropped: this.eventsDropped, eventsFlushed: this.eventsFlushed, lastFlushLatencyMs: this.lastFlushLatencyMs, }; } private async flush(): Promise<void> { if (this.buffer.isEmpty()) return; const batch = this.buffer.take(this.config.batchSize); const startTime = Date.now(); try { const result = await this.flushFn(batch); this.eventsFlushed += result.success; this.lastFlushLatencyMs = Date.now() - startTime; this.adaptFlushInterval(); } catch (error) { console.error('[Backpressure] Flush error:', error); } this.updateState(); } private scheduleFlush(): void { if (!this.running) return; this.flushInterval = setTimeout(async () => { await this.flush(); this.scheduleFlush(); }, this.currentFlushIntervalMs); } private adaptFlushInterval(): void { const { targetLatencyMs, minFlushIntervalMs, maxFlushIntervalMs } = this.config; if (this.lastFlushLatencyMs > targetLatencyMs * 1.5) { this.currentFlushIntervalMs = Math.min( this.currentFlushIntervalMs * 1.5, maxFlushIntervalMs ); } else if (this.lastFlushLatencyMs < targetLatencyMs * 0.5) { this.currentFlushIntervalMs = Math.max( this.currentFlushIntervalMs * 0.8, minFlushIntervalMs ); } } private updateState(): void { const util = this.buffer.utilization; if (this.state === BackpressureState.DRAINING) return; if (util >= 1.0) { this.state = BackpressureState.BLOCKED; } else if (util >= this.config.highWatermark) { this.state = BackpressureState.CRITICAL; } else if (util >= this.config.lowWatermark) { this.state = BackpressureState.ELEVATED; } else { this.state = BackpressureState.NORMAL; } } private async waitForSpace(): Promise<boolean> { const maxWait = this.config.maxBlockTimeMs || 5000; const startTime = Date.now(); while (Date.now() - startTime < maxWait) { if (!this.buffer.isFull()) return true; await new Promise(r => setTimeout(r, 50)); } return false; } }
Usage Examples
// Create controller const controller = new BackpressureController( async (items) => { const result = await db.batchInsert('events', items); return { success: result.inserted, failed: 0, errors: [] }; }, { strategy: BackpressureStrategy.DROP_OLDEST, maxBufferSize: 10000, batchSize: 100, } ); // Start processing controller.start(); // Push events await controller.push(event); // On shutdown await controller.drain(); controller.stop();
Strategy Selection
| Strategy | Use Case | Trade-off |
|---|---|---|
| BLOCK | Critical data | Producers slow down |
| DROP_OLDEST | Time-series | Lose historical data |
| DROP_NEWEST | Batch jobs | Reject new work |
| SAMPLE | Telemetry | Statistical accuracy |
Best Practices
- Size buffers for memory - Don't exceed available RAM
- Match strategy to data - Critical data = BLOCK
- Monitor drop rates - Alert on high drops
- Drain on shutdown - Don't lose buffered data
- Combine with circuit breaker - Protect flush function
Common Mistakes
- Unbounded queues (OOM crash)
- No metrics on drops
- Not draining on shutdown
- Wrong strategy for data criticality
- No adaptive rate adjustment