Claude-skill-registry anomaly-detection
Rule-based anomaly detection for production systems with configurable thresholds, cooldown periods to prevent alert storms, and error pattern tracking for repeated failures.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/anomaly-detection" ~/.claude/skills/majiayu000-claude-skill-registry-anomaly-detection && rm -rf "$T"
manifest:
skills/data/anomaly-detection/SKILL.mdsource content
Anomaly Detection
Rule-based anomaly detection with cooldowns and error pattern tracking.
When to Use This Skill
- Detecting slow job degradation before failures
- Tracking error rate creep over time
- Identifying repeated error patterns
- Preventing alert fatigue with cooldowns
Core Concepts
Production systems fail in subtle ways - jobs getting slower, error rates creeping up, same errors repeating. The solution:
- Configurable rules with severity levels
- Cooldown periods to prevent alert storms
- Error pattern tracking for repeated failures
- Violation decay to reward recovery
Implementation
TypeScript
enum AnomalyType { SLOW_JOB = 'slow_job', HIGH_FAILURE_RATE = 'high_failure_rate', WORKER_UNHEALTHY = 'worker_unhealthy', QUEUE_BACKLOG = 'queue_backlog', TIMEOUT_SPIKE = 'timeout_spike', REPEATED_ERROR = 'repeated_error', MEMORY_SPIKE = 'memory_spike', CPU_SPIKE = 'cpu_spike', } enum AnomalySeverity { CRITICAL = 'critical', HIGH = 'high', MEDIUM = 'medium', LOW = 'low', } interface AnomalyAlert { id: string; anomalyType: AnomalyType; severity: AnomalySeverity; workerName: string; jobId?: string; message: string; details: Record<string, unknown>; detectedAt: Date; resolvedAt?: Date; resolution?: string; } interface RuleContext { workerName: string; status: string; failureRate: number; queueDepth: number; durationMs: number; expectedDurationMs: number; timeoutCount: number; errorRepeatCount: number; errorMessage: string; memoryMb: number; cpuPercent: number; } interface AnomalyRule { anomalyType: AnomalyType; severity: AnomalySeverity; description: string; checkFn: (ctx: RuleContext) => boolean; messageTemplate: string; cooldownSeconds: number; } const ANOMALY_RULES: AnomalyRule[] = [ { anomalyType: AnomalyType.SLOW_JOB, severity: AnomalySeverity.MEDIUM, description: 'Job execution time exceeds expected duration', checkFn: (ctx) => ctx.durationMs > ctx.expectedDurationMs * 2, messageTemplate: 'Job took {durationMs}ms, expected {expectedDurationMs}ms', cooldownSeconds: 300, }, { anomalyType: AnomalyType.HIGH_FAILURE_RATE, severity: AnomalySeverity.HIGH, description: 'Worker failure rate exceeds threshold', checkFn: (ctx) => ctx.failureRate > 15, messageTemplate: 'Failure rate {failureRate}% exceeds 15% threshold', cooldownSeconds: 600, }, { anomalyType: AnomalyType.WORKER_UNHEALTHY, severity: AnomalySeverity.CRITICAL, description: 'Worker health status is unhealthy', checkFn: (ctx) => ctx.status === 'unhealthy', messageTemplate: 'Worker {workerName} is unhealthy', cooldownSeconds: 300, }, { anomalyType: AnomalyType.QUEUE_BACKLOG, severity: AnomalySeverity.MEDIUM, description: 'Queue depth exceeds threshold', checkFn: (ctx) => ctx.queueDepth > 50, messageTemplate: 'Queue depth {queueDepth} exceeds threshold', cooldownSeconds: 300, }, { anomalyType: AnomalyType.REPEATED_ERROR, severity: AnomalySeverity.HIGH, description: 'Same error repeated multiple times', checkFn: (ctx) => ctx.errorRepeatCount > 5, messageTemplate: 'Error "{errorMessage}" repeated {errorRepeatCount} times', cooldownSeconds: 900, }, { anomalyType: AnomalyType.MEMORY_SPIKE, severity: AnomalySeverity.HIGH, description: 'Memory usage exceeds threshold', checkFn: (ctx) => ctx.memoryMb > 1024, messageTemplate: 'Memory usage {memoryMb}MB exceeds 1GB threshold', cooldownSeconds: 300, }, ]; class AnomalyDetector { private alerts = new Map<string, AnomalyAlert>(); private cooldowns = new Map<string, Date>(); private errorCounts = new Map<string, Map<string, number>>(); private timeoutCounts = new Map<string, number>(); private alertIdCounter = 0; checkWorkerHealth( workerName: string, health: { status: string; jobsProcessed: number; jobsFailed: number; queueDepth: number; lastDurationMs: number; expectedDurationMs: number; memoryMb: number; cpuPercent: number; } ): AnomalyAlert[] { const detected: AnomalyAlert[] = []; const failureRate = health.jobsProcessed > 0 ? (health.jobsFailed / health.jobsProcessed) * 100 : 0; const ctx: RuleContext = { workerName, status: health.status, failureRate, queueDepth: health.queueDepth, durationMs: health.lastDurationMs, expectedDurationMs: health.expectedDurationMs, timeoutCount: this.timeoutCounts.get(workerName) || 0, errorRepeatCount: 0, errorMessage: '', memoryMb: health.memoryMb, cpuPercent: health.cpuPercent, }; for (const rule of ANOMALY_RULES) { if (this.isOnCooldown(workerName, rule.anomalyType)) continue; if (rule.checkFn(ctx)) { const alert = this.createAlert(workerName, rule, ctx); detected.push(alert); this.setCooldown(workerName, rule.anomalyType, rule.cooldownSeconds); } } return detected; } checkJobExecution( workerName: string, jobId: string, durationMs: number, expectedDurationMs: number, success: boolean, error?: string ): AnomalyAlert[] { const detected: AnomalyAlert[] = []; if (!success && error) { this.trackError(workerName, error); } // Check slow job if (durationMs > expectedDurationMs * 2) { if (!this.isOnCooldown(workerName, AnomalyType.SLOW_JOB)) { const rule = ANOMALY_RULES[0]; const alert = this.createAlert(workerName, rule, { durationMs, expectedDurationMs, } as RuleContext); alert.jobId = jobId; detected.push(alert); this.setCooldown(workerName, AnomalyType.SLOW_JOB, 300); } } // Check repeated errors if (error) { const errorCounts = this.errorCounts.get(workerName); const count = errorCounts?.get(error.slice(0, 200)) || 0; if (count > 5 && !this.isOnCooldown(workerName, AnomalyType.REPEATED_ERROR)) { const rule = ANOMALY_RULES.find(r => r.anomalyType === AnomalyType.REPEATED_ERROR)!; const alert = this.createAlert(workerName, rule, { errorMessage: error.slice(0, 100), errorRepeatCount: count, } as RuleContext); detected.push(alert); this.setCooldown(workerName, AnomalyType.REPEATED_ERROR, 900); } } return detected; } resolveAnomaly(alertId: string, resolution: string): boolean { const alert = this.alerts.get(alertId); if (!alert || alert.resolvedAt) return false; alert.resolvedAt = new Date(); alert.resolution = resolution; return true; } getActiveAnomalies(): AnomalyAlert[] { return Array.from(this.alerts.values()) .filter(a => !a.resolvedAt) .sort((a, b) => { const order = { critical: 0, high: 1, medium: 2, low: 3 }; return order[a.severity] - order[b.severity]; }); } private trackError(workerName: string, error: string): void { if (!this.errorCounts.has(workerName)) { this.errorCounts.set(workerName, new Map()); } const counts = this.errorCounts.get(workerName)!; const key = error.slice(0, 200); counts.set(key, (counts.get(key) || 0) + 1); } private createAlert(workerName: string, rule: AnomalyRule, ctx: Partial<RuleContext>): AnomalyAlert { const id = `anomaly_${++this.alertIdCounter}_${Date.now()}`; let message = rule.messageTemplate; for (const [key, value] of Object.entries(ctx)) { message = message.replace(`{${key}}`, String(value)); } const alert: AnomalyAlert = { id, anomalyType: rule.anomalyType, severity: rule.severity, workerName, message, details: ctx as Record<string, unknown>, detectedAt: new Date(), }; this.alerts.set(id, alert); return alert; } private isOnCooldown(workerName: string, anomalyType: AnomalyType): boolean { const key = `${workerName}:${anomalyType}`; const cooldownEnd = this.cooldowns.get(key); return cooldownEnd !== undefined && cooldownEnd > new Date(); } private setCooldown(workerName: string, anomalyType: AnomalyType, seconds: number): void { const key = `${workerName}:${anomalyType}`; this.cooldowns.set(key, new Date(Date.now() + seconds * 1000)); } } // Singleton let detector: AnomalyDetector | null = null; export function getAnomalyDetector(): AnomalyDetector { if (!detector) { detector = new AnomalyDetector(); } return detector; }
Python
from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Dict, List, Optional, Callable from enum import Enum class AnomalyType(str, Enum): SLOW_JOB = "slow_job" HIGH_FAILURE_RATE = "high_failure_rate" WORKER_UNHEALTHY = "worker_unhealthy" QUEUE_BACKLOG = "queue_backlog" TIMEOUT_SPIKE = "timeout_spike" REPEATED_ERROR = "repeated_error" MEMORY_SPIKE = "memory_spike" class AnomalySeverity(str, Enum): CRITICAL = "critical" HIGH = "high" MEDIUM = "medium" LOW = "low" @dataclass class AnomalyAlert: id: str anomaly_type: AnomalyType severity: AnomalySeverity worker_name: str message: str details: Dict detected_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) job_id: Optional[str] = None resolved_at: Optional[datetime] = None resolution: Optional[str] = None @dataclass class RuleContext: worker_name: str status: str = "healthy" failure_rate: float = 0.0 queue_depth: int = 0 duration_ms: float = 0.0 expected_duration_ms: float = 0.0 timeout_count: int = 0 error_repeat_count: int = 0 error_message: str = "" memory_mb: float = 0.0 cpu_percent: float = 0.0 @dataclass class AnomalyRule: anomaly_type: AnomalyType severity: AnomalySeverity description: str check_fn: Callable[[RuleContext], bool] message_template: str cooldown_seconds: int ANOMALY_RULES: List[AnomalyRule] = [ AnomalyRule( anomaly_type=AnomalyType.SLOW_JOB, severity=AnomalySeverity.MEDIUM, description="Job execution time exceeds expected duration", check_fn=lambda ctx: ctx.duration_ms > ctx.expected_duration_ms * 2, message_template="Job took {duration_ms}ms, expected {expected_duration_ms}ms", cooldown_seconds=300, ), AnomalyRule( anomaly_type=AnomalyType.HIGH_FAILURE_RATE, severity=AnomalySeverity.HIGH, description="Worker failure rate exceeds threshold", check_fn=lambda ctx: ctx.failure_rate > 15, message_template="Failure rate {failure_rate}% exceeds 15% threshold", cooldown_seconds=600, ), AnomalyRule( anomaly_type=AnomalyType.WORKER_UNHEALTHY, severity=AnomalySeverity.CRITICAL, description="Worker health status is unhealthy", check_fn=lambda ctx: ctx.status == "unhealthy", message_template="Worker {worker_name} is unhealthy", cooldown_seconds=300, ), AnomalyRule( anomaly_type=AnomalyType.REPEATED_ERROR, severity=AnomalySeverity.HIGH, description="Same error repeated multiple times", check_fn=lambda ctx: ctx.error_repeat_count > 5, message_template='Error "{error_message}" repeated {error_repeat_count} times', cooldown_seconds=900, ), ] class AnomalyDetector: def __init__(self): self._alerts: Dict[str, AnomalyAlert] = {} self._cooldowns: Dict[str, datetime] = {} self._error_counts: Dict[str, Dict[str, int]] = {} self._timeout_counts: Dict[str, int] = {} self._alert_counter = 0 def check_worker_health( self, worker_name: str, status: str, jobs_processed: int, jobs_failed: int, queue_depth: int, last_duration_ms: float, expected_duration_ms: float, memory_mb: float = 0, cpu_percent: float = 0, ) -> List[AnomalyAlert]: detected: List[AnomalyAlert] = [] failure_rate = (jobs_failed / jobs_processed * 100) if jobs_processed > 0 else 0 ctx = RuleContext( worker_name=worker_name, status=status, failure_rate=failure_rate, queue_depth=queue_depth, duration_ms=last_duration_ms, expected_duration_ms=expected_duration_ms, timeout_count=self._timeout_counts.get(worker_name, 0), memory_mb=memory_mb, cpu_percent=cpu_percent, ) for rule in ANOMALY_RULES: if self._is_on_cooldown(worker_name, rule.anomaly_type): continue if rule.check_fn(ctx): alert = self._create_alert(worker_name, rule, ctx) detected.append(alert) self._set_cooldown(worker_name, rule.anomaly_type, rule.cooldown_seconds) return detected def check_job_execution( self, worker_name: str, job_id: str, duration_ms: float, expected_duration_ms: float, success: bool, error: Optional[str] = None, ) -> List[AnomalyAlert]: detected: List[AnomalyAlert] = [] if not success and error: self._track_error(worker_name, error) # Check slow job if duration_ms > expected_duration_ms * 2: if not self._is_on_cooldown(worker_name, AnomalyType.SLOW_JOB): rule = ANOMALY_RULES[0] ctx = RuleContext( worker_name=worker_name, duration_ms=duration_ms, expected_duration_ms=expected_duration_ms, ) alert = self._create_alert(worker_name, rule, ctx) alert.job_id = job_id detected.append(alert) self._set_cooldown(worker_name, AnomalyType.SLOW_JOB, 300) # Check repeated errors if error: error_counts = self._error_counts.get(worker_name, {}) count = error_counts.get(error[:200], 0) if count > 5 and not self._is_on_cooldown(worker_name, AnomalyType.REPEATED_ERROR): rule = next(r for r in ANOMALY_RULES if r.anomaly_type == AnomalyType.REPEATED_ERROR) ctx = RuleContext( worker_name=worker_name, error_message=error[:100], error_repeat_count=count, ) alert = self._create_alert(worker_name, rule, ctx) detected.append(alert) self._set_cooldown(worker_name, AnomalyType.REPEATED_ERROR, 900) return detected def resolve_anomaly(self, alert_id: str, resolution: str) -> bool: alert = self._alerts.get(alert_id) if not alert or alert.resolved_at: return False alert.resolved_at = datetime.now(timezone.utc) alert.resolution = resolution return True def get_active_anomalies(self) -> List[AnomalyAlert]: severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3} return sorted( [a for a in self._alerts.values() if not a.resolved_at], key=lambda a: severity_order[a.severity.value] ) def _track_error(self, worker_name: str, error: str) -> None: if worker_name not in self._error_counts: self._error_counts[worker_name] = {} key = error[:200] self._error_counts[worker_name][key] = self._error_counts[worker_name].get(key, 0) + 1 def _create_alert(self, worker_name: str, rule: AnomalyRule, ctx: RuleContext) -> AnomalyAlert: self._alert_counter += 1 alert_id = f"anomaly_{self._alert_counter}_{int(datetime.now().timestamp() * 1000)}" message = rule.message_template for key, value in ctx.__dict__.items(): message = message.replace(f"{{{key}}}", str(value)) alert = AnomalyAlert( id=alert_id, anomaly_type=rule.anomaly_type, severity=rule.severity, worker_name=worker_name, message=message, details=ctx.__dict__, ) self._alerts[alert_id] = alert return alert def _is_on_cooldown(self, worker_name: str, anomaly_type: AnomalyType) -> bool: key = f"{worker_name}:{anomaly_type.value}" cooldown_end = self._cooldowns.get(key) return cooldown_end is not None and cooldown_end > datetime.now(timezone.utc) def _set_cooldown(self, worker_name: str, anomaly_type: AnomalyType, seconds: int) -> None: from datetime import timedelta key = f"{worker_name}:{anomaly_type.value}" self._cooldowns[key] = datetime.now(timezone.utc) + timedelta(seconds=seconds) # Singleton _detector: Optional[AnomalyDetector] = None def get_anomaly_detector() -> AnomalyDetector: global _detector if _detector is None: _detector = AnomalyDetector() return _detector
Usage Examples
Worker Job Monitoring
const detector = getAnomalyDetector(); async function executeJob(job: Job) { const startTime = Date.now(); try { await processJob(job); const duration = Date.now() - startTime; const alerts = detector.checkJobExecution( 'data-processor', job.id, duration, 30000, // Expected 30s true ); for (const alert of alerts) { await notifyOps(alert); } } catch (error) { const duration = Date.now() - startTime; const alerts = detector.checkJobExecution( 'data-processor', job.id, duration, 30000, false, error.message ); for (const alert of alerts) { await notifyOps(alert); } throw error; } }
Periodic Health Checks
setInterval(async () => { const health = await getWorkerHealth('data-processor'); const alerts = detector.checkWorkerHealth('data-processor', health); for (const alert of alerts) { if (alert.severity === 'critical') { await pageOnCall(alert); } else { await notifySlack(alert); } } }, 30000);
Best Practices
- Set cooldowns long enough to prevent alert storms
- Use severity levels to route alerts appropriately
- Track error patterns to catch repeated failures
- Clean up old resolved alerts periodically
- Tune thresholds based on your baseline metrics
Common Mistakes
- Cooldowns too short (alert fatigue)
- No error pattern tracking (miss repeated failures)
- Same severity for all alerts (everything becomes noise)
- Not resolving alerts (dashboard becomes useless)
- Thresholds too sensitive (false positives)
Related Patterns
- health-checks - Source of health data for anomaly detection
- logging-observability - Structured logging for alert context
- graceful-degradation - Response to detected anomalies