Claude-skill-registry checkpoint-resume
Exactly-once processing semantics with distributed coordination for file-based data pipelines. Atomic file claiming, status tracking, and automatic retry with in-memory fallback.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/checkpoint-resume" ~/.claude/skills/majiayu000-claude-skill-registry-checkpoint-resume && rm -rf "$T"
manifest:
skills/data/checkpoint-resume/SKILL.mdsource content
Checkpoint & Resume Processing
Exactly-once processing semantics with distributed coordination for file-based data pipelines.
When to Use This Skill
- Processing large file batches across multiple workers
- Need to resume after worker crashes
- Preventing duplicate processing of the same file
- Tracking processing progress and statistics
Core Concepts
The pattern provides:
- Atomic file claiming (only one worker processes each file)
- Status tracking (pending, processing, completed, failed)
- Automatic retry with configurable limits
- In-memory fallback for development/testing
┌──────────┐ ┌─────────────────┐ ┌──────────┐ │ Worker 1 │────▶│ Checkpoint DB │◀────│ Worker 2 │ └──────────┘ └─────────────────┘ └──────────┘ │ │ │ ▼ ▼ ▼ claim_file() atomic claims claim_file() process() status tracking process() complete() retry logic complete()
Implementation
Database Schema (PostgreSQL/Supabase)
CREATE TABLE file_checkpoints ( file_url TEXT PRIMARY KEY, file_type TEXT NOT NULL, file_timestamp TIMESTAMPTZ NOT NULL, status TEXT NOT NULL DEFAULT 'pending', records_total INTEGER DEFAULT 0, records_filtered INTEGER DEFAULT 0, records_persisted INTEGER DEFAULT 0, processing_time_ms INTEGER DEFAULT 0, error_message TEXT, retry_count INTEGER DEFAULT 0, processed_by TEXT, started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, created_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX idx_checkpoints_status ON file_checkpoints(status); -- Atomic claim function CREATE OR REPLACE FUNCTION claim_file( p_file_url TEXT, p_file_type TEXT, p_file_timestamp TIMESTAMPTZ, p_worker_id TEXT ) RETURNS BOOLEAN AS $$ DECLARE v_claimed BOOLEAN := FALSE; BEGIN -- Try to insert new record INSERT INTO file_checkpoints (file_url, file_type, file_timestamp, status, processed_by, started_at) VALUES (p_file_url, p_file_type, p_file_timestamp, 'processing', p_worker_id, NOW()) ON CONFLICT (file_url) DO NOTHING; IF FOUND THEN v_claimed := TRUE; ELSE -- Check if we can retry a failed file UPDATE file_checkpoints SET status = 'processing', processed_by = p_worker_id, started_at = NOW(), retry_count = retry_count + 1 WHERE file_url = p_file_url AND status = 'failed' AND retry_count < 3; v_claimed := FOUND; END IF; RETURN v_claimed; END; $$ LANGUAGE plpgsql;
TypeScript
interface FileCheckpoint { fileUrl: string; fileType: string; fileTimestamp: Date; status: 'pending' | 'processing' | 'completed' | 'failed'; recordsTotal: number; recordsFiltered: number; recordsPersisted: number; processingTimeMs: number; errorMessage?: string; retryCount: number; processedBy?: string; } interface ProcessingStats { totalRows: number; filteredRows: number; persistedRows: number; durationMs: number; } class CheckpointManager { private workerId: string; private inMemory = new Map<string, FileCheckpoint>(); private useInMemory = false; constructor( private getClient: () => DatabaseClient | null, workerId?: string ) { this.workerId = workerId || `worker_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; } async claimFile( fileUrl: string, fileType: string, fileTimestamp: Date ): Promise<boolean> { const client = this.getClient(); if (client) { try { const result = await client.rpc('claim_file', { p_file_url: fileUrl, p_file_type: fileType, p_file_timestamp: fileTimestamp.toISOString(), p_worker_id: this.workerId, }); if (!result.error) return result.data === true; } catch (e) { console.warn('Database unavailable, using in-memory'); } } // Fallback to in-memory (single-worker mode) this.useInMemory = true; if (this.inMemory.has(fileUrl)) { const existing = this.inMemory.get(fileUrl)!; if (existing.status !== 'failed' || existing.retryCount >= 3) { return false; } } this.inMemory.set(fileUrl, { fileUrl, fileType, fileTimestamp, status: 'processing', recordsTotal: 0, recordsFiltered: 0, recordsPersisted: 0, processingTimeMs: 0, retryCount: 0, processedBy: this.workerId, }); return true; } async completeFile(fileUrl: string, stats: ProcessingStats): Promise<void> { const client = this.getClient(); if (client && !this.useInMemory) { await client.rpc('complete_file', { p_file_url: fileUrl, p_records_total: stats.totalRows, p_records_filtered: stats.filteredRows, p_records_persisted: stats.persistedRows, p_processing_time_ms: stats.durationMs, }); return; } const checkpoint = this.inMemory.get(fileUrl); if (checkpoint) { checkpoint.status = 'completed'; checkpoint.recordsTotal = stats.totalRows; checkpoint.recordsFiltered = stats.filteredRows; checkpoint.recordsPersisted = stats.persistedRows; checkpoint.processingTimeMs = stats.durationMs; } } async failFile(fileUrl: string, errorMessage: string): Promise<void> { const client = this.getClient(); if (client && !this.useInMemory) { await client .from('file_checkpoints') .update({ status: 'failed', error_message: errorMessage }) .eq('file_url', fileUrl); return; } const checkpoint = this.inMemory.get(fileUrl); if (checkpoint) { checkpoint.status = 'failed'; checkpoint.errorMessage = errorMessage; checkpoint.retryCount++; } } async isProcessed(fileUrl: string): Promise<boolean> { const client = this.getClient(); if (client && !this.useInMemory) { const { data } = await client .from('file_checkpoints') .select('status') .eq('file_url', fileUrl) .single(); return data?.status === 'completed'; } return this.inMemory.get(fileUrl)?.status === 'completed'; } getWorkerId(): string { return this.workerId; } }
Usage Examples
Processing Files
const checkpoint = new CheckpointManager(getDbClient); async function processFiles(fileUrls: string[]) { for (const url of fileUrls) { // Try to claim const claimed = await checkpoint.claimFile(url, 'events', new Date()); if (!claimed) { console.log(`Skipping ${url} - already claimed`); continue; } const startTime = Date.now(); try { const result = await processFile(url); await checkpoint.completeFile(url, { totalRows: result.total, filteredRows: result.filtered, persistedRows: result.persisted, durationMs: Date.now() - startTime, }); } catch (error) { await checkpoint.failFile(url, error.message); } } }
Best Practices
- Use database functions for atomic claims - prevents race conditions
- Always have in-memory fallback for dev/testing
- Track retry count to prevent infinite loops (max 3 retries)
- Include processing stats for observability
- Generate unique worker IDs to track which worker processed what
Common Mistakes
- Not using atomic operations for claiming (race conditions)
- No retry limit (infinite retry loops)
- Forgetting in-memory fallback (breaks local development)
- Not tracking processing statistics (can't debug issues)
- Using file path instead of URL as key (path changes between environments)
Related Patterns
- batch-processing - Batch database operations
- dead-letter-queue - Handle permanently failed files
- distributed-lock - Coordinate between workers