Claude-skill-registry job-state-machine
Async job processing with validated state transitions, progress tracking, and asset linking. Ensure jobs always reach terminal states with proper error handling.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/job-state-machine" ~/.claude/skills/majiayu000-claude-skill-registry-job-state-machine && rm -rf "$T"
manifest:
skills/data/job-state-machine/SKILL.mdsource content
Job State Machine
Validated state transitions for async jobs.
When to Use This Skill
- Processing async jobs (image generation, exports, etc.)
- Need progress tracking for long operations
- Want to link created assets to jobs
- Need reliable state management
Core Concepts
- Defined states - QUEUED → PROCESSING → COMPLETED/FAILED/PARTIAL
- Valid transitions - Only allowed state changes
- Terminal states - Jobs must reach an end state
- Asset linking - Track outputs created by jobs
State Machine
QUEUED │ ▼ PROCESSING / | \ ▼ ▼ ▼ COMPLETED PARTIAL FAILED (terminal states)
TypeScript Implementation
Types
// types.ts export enum JobStatus { QUEUED = 'queued', PROCESSING = 'processing', COMPLETED = 'completed', FAILED = 'failed', PARTIAL = 'partial', } export const VALID_TRANSITIONS: Record<JobStatus, JobStatus[]> = { [JobStatus.QUEUED]: [JobStatus.PROCESSING], [JobStatus.PROCESSING]: [JobStatus.COMPLETED, JobStatus.PARTIAL, JobStatus.FAILED], [JobStatus.COMPLETED]: [], [JobStatus.FAILED]: [], [JobStatus.PARTIAL]: [], }; export function isTerminalState(status: JobStatus): boolean { return VALID_TRANSITIONS[status].length === 0; } export interface Job { id: string; userId: string; jobType: string; status: JobStatus; progress: number; errorMessage?: string; parameters?: Record<string, unknown>; result?: Record<string, unknown>; createdAt: Date; updatedAt: Date; completedAt?: Date; } export interface Asset { id: string; jobId: string; userId: string; assetType: string; url: string; storagePath: string; fileSize: number; createdAt: Date; }
Job Service
// job-service.ts import { Job, JobStatus, Asset, VALID_TRANSITIONS, isTerminalState } from './types'; export class InvalidStateTransitionError extends Error { constructor(public currentStatus: JobStatus, public targetStatus: JobStatus) { super(`Cannot transition from '${currentStatus}' to '${targetStatus}'`); this.name = 'InvalidStateTransitionError'; } } export class JobService { constructor(private db: Database) {} async createJob( userId: string, jobType: string, parameters?: Record<string, unknown> ): Promise<Job> { const job: Job = { id: crypto.randomUUID(), userId, jobType, status: JobStatus.QUEUED, progress: 0, parameters, createdAt: new Date(), updatedAt: new Date(), }; await this.db.jobs.insert(job); return job; } async getJob(jobId: string, userId: string): Promise<Job> { const job = await this.db.jobs.findOne({ id: jobId }); if (!job) throw new Error('Job not found'); if (job.userId !== userId) throw new Error('Unauthorized'); return job; } async transitionStatus( jobId: string, targetStatus: JobStatus, options: { progress?: number; errorMessage?: string; result?: Record<string, unknown>; } = {} ): Promise<Job> { const job = await this.db.jobs.findOne({ id: jobId }); if (!job) throw new Error('Job not found'); // Allow same-state for progress updates if (job.status !== targetStatus) { if (!VALID_TRANSITIONS[job.status].includes(targetStatus)) { throw new InvalidStateTransitionError(job.status, targetStatus); } } const updates: Partial<Job> = { status: targetStatus, progress: options.progress ?? job.progress, updatedAt: new Date(), }; if (options.errorMessage !== undefined) { updates.errorMessage = options.errorMessage; } if (options.result !== undefined) { updates.result = options.result; } if (isTerminalState(targetStatus)) { updates.completedAt = new Date(); } await this.db.jobs.update({ id: jobId }, updates); return { ...job, ...updates }; } async updateProgress(jobId: string, progress: number): Promise<Job> { const job = await this.db.jobs.findOne({ id: jobId }); if (!job) throw new Error('Job not found'); if (job.status !== JobStatus.PROCESSING) { console.warn(`Ignoring progress update for job ${jobId} in ${job.status} state`); return job; } return this.transitionStatus(jobId, JobStatus.PROCESSING, { progress }); } async markCompleted(jobId: string, result?: Record<string, unknown>): Promise<Job> { return this.transitionStatus(jobId, JobStatus.COMPLETED, { progress: 100, result }); } async markFailed(jobId: string, errorMessage: string): Promise<Job> { return this.transitionStatus(jobId, JobStatus.FAILED, { errorMessage }); } async markPartial( jobId: string, result?: Record<string, unknown>, errorMessage?: string ): Promise<Job> { return this.transitionStatus(jobId, JobStatus.PARTIAL, { progress: 100, result, errorMessage, }); } async createAsset( jobId: string, userId: string, assetType: string, url: string, storagePath: string, fileSize: number ): Promise<Asset> { const asset: Asset = { id: crypto.randomUUID(), jobId, userId, assetType, url, storagePath, fileSize, createdAt: new Date(), }; await this.db.assets.insert(asset); return asset; } async getJobAssets(jobId: string, userId: string): Promise<Asset[]> { await this.getJob(jobId, userId); // Verify ownership return this.db.assets.find({ jobId }); } }
Python Implementation
# job_service.py from enum import Enum from dataclasses import dataclass from datetime import datetime from typing import Optional, Dict, Any, List from uuid import uuid4 class JobStatus(str, Enum): QUEUED = "queued" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" PARTIAL = "partial" VALID_TRANSITIONS = { JobStatus.QUEUED: [JobStatus.PROCESSING], JobStatus.PROCESSING: [JobStatus.COMPLETED, JobStatus.PARTIAL, JobStatus.FAILED], JobStatus.COMPLETED: [], JobStatus.FAILED: [], JobStatus.PARTIAL: [], } def is_terminal_state(status: JobStatus) -> bool: return len(VALID_TRANSITIONS.get(status, [])) == 0 @dataclass class Job: id: str user_id: str job_type: str status: JobStatus progress: int created_at: datetime updated_at: datetime error_message: Optional[str] = None parameters: Optional[Dict[str, Any]] = None result: Optional[Dict[str, Any]] = None completed_at: Optional[datetime] = None class InvalidStateTransitionError(Exception): def __init__(self, current: JobStatus, target: JobStatus): self.current = current self.target = target super().__init__(f"Cannot transition from '{current}' to '{target}'") class JobService: def __init__(self, db): self.db = db async def create_job( self, user_id: str, job_type: str, parameters: Optional[Dict[str, Any]] = None, ) -> Job: now = datetime.utcnow() job = Job( id=str(uuid4()), user_id=user_id, job_type=job_type, status=JobStatus.QUEUED, progress=0, parameters=parameters, created_at=now, updated_at=now, ) await self.db.jobs.insert(job) return job async def transition_status( self, job_id: str, target_status: JobStatus, progress: Optional[int] = None, error_message: Optional[str] = None, result: Optional[Dict[str, Any]] = None, ) -> Job: job = await self.db.jobs.find_one(id=job_id) if not job: raise ValueError("Job not found") if job.status != target_status: if target_status not in VALID_TRANSITIONS.get(job.status, []): raise InvalidStateTransitionError(job.status, target_status) job.status = target_status job.updated_at = datetime.utcnow() if progress is not None: job.progress = progress if error_message is not None: job.error_message = error_message if result is not None: job.result = result if is_terminal_state(target_status): job.completed_at = datetime.utcnow() await self.db.jobs.update(job) return job async def mark_completed(self, job_id: str, result: Optional[Dict] = None) -> Job: return await self.transition_status( job_id, JobStatus.COMPLETED, progress=100, result=result ) async def mark_failed(self, job_id: str, error_message: str) -> Job: return await self.transition_status( job_id, JobStatus.FAILED, error_message=error_message )
Worker Integration
async function processJob(jobId: string) { const jobService = getJobService(); try { // Transition to PROCESSING await jobService.transitionStatus(jobId, JobStatus.PROCESSING, { progress: 0 }); const job = await jobService.getJobInternal(jobId); // Process with progress updates await jobService.updateProgress(jobId, 25); const result1 = await doStep1(job.parameters); await jobService.updateProgress(jobId, 50); const result2 = await doStep2(result1); await jobService.updateProgress(jobId, 75); const assetUrl = await uploadResult(result2); // Create asset await jobService.createAsset( jobId, job.userId, job.jobType, assetUrl, `${job.userId}/${jobId}/result.png`, result2.length ); // Mark completed await jobService.markCompleted(jobId, { assetCount: 1 }); } catch (error) { console.error(`Job ${jobId} failed:`, error); await jobService.markFailed(jobId, error.message); } }
Database Schema
CREATE TABLE jobs ( id UUID PRIMARY KEY, user_id UUID NOT NULL REFERENCES users(id), job_type VARCHAR(50) NOT NULL, status VARCHAR(20) NOT NULL DEFAULT 'queued', progress INTEGER NOT NULL DEFAULT 0, error_message TEXT, parameters JSONB, result JSONB, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), completed_at TIMESTAMPTZ, CONSTRAINT valid_status CHECK (status IN ('queued', 'processing', 'completed', 'failed', 'partial')), CONSTRAINT valid_progress CHECK (progress >= 0 AND progress <= 100) ); CREATE INDEX idx_jobs_user_status ON jobs(user_id, status); CREATE TABLE assets ( id UUID PRIMARY KEY, job_id UUID NOT NULL REFERENCES jobs(id) ON DELETE CASCADE, user_id UUID NOT NULL REFERENCES users(id), asset_type VARCHAR(50) NOT NULL, url TEXT NOT NULL, storage_path TEXT NOT NULL, file_size INTEGER NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX idx_assets_job_id ON assets(job_id);
Best Practices
- Validate all transitions - Never skip state machine validation
- Always reach terminal - Jobs must complete, fail, or partial
- Track progress - Provide feedback for long operations
- Link assets - Maintain relationship for cleanup
- Log transitions - Essential for debugging
Common Mistakes
- Allowing invalid state transitions
- Jobs stuck in non-terminal states
- Not tracking progress for long jobs
- Orphaned assets without job links
- Missing error messages on failure