Claude-skill-registry erpnext-syntax-scheduler

Scheduler and background jobs syntax for Frappe/ERPNext v14/v15/v16. Use for scheduler_events in hooks.py, frappe.enqueue() for async jobs, queue configuration, job deduplication, error handling, and monitoring. Triggers on questions about scheduled tasks, background processing, cron jobs, RQ workers, job queues, async tasks.

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/erpnext-syntax-scheduler" ~/.claude/skills/majiayu000-claude-skill-registry-erpnext-syntax-scheduler && rm -rf "$T"
manifest: skills/data/erpnext-syntax-scheduler/SKILL.md
source content

ERPNext Syntax: Scheduler & Background Jobs

Deterministic syntax reference for Frappe scheduler events and background job processing.

Quick Reference

Scheduler Events (hooks.py)

# hooks.py
scheduler_events = {
    "all": ["myapp.tasks.every_tick"],
    "hourly": ["myapp.tasks.hourly_task"],
    "daily": ["myapp.tasks.daily_task"],
    "weekly": ["myapp.tasks.weekly_task"],
    "monthly": ["myapp.tasks.monthly_task"],
    "daily_long": ["myapp.tasks.heavy_daily"],  # Long queue
    "cron": {
        "0 9 * * 1-5": ["myapp.tasks.weekday_9am"],
        "*/15 * * * *": ["myapp.tasks.every_15_min"]
    }
}

CRITICAL: After EVERY change to scheduler_events:

bench migrate

frappe.enqueue Basics

# Simple
frappe.enqueue("myapp.tasks.process", customer="CUST-001")

# With queue and timeout
frappe.enqueue(
    "myapp.tasks.heavy_task",
    queue="long",
    timeout=3600,
    param="value"
)

# With deduplication (v15)
from frappe.utils.background_jobs import is_job_enqueued

job_id = f"import::{doc.name}"
if not is_job_enqueued(job_id):
    frappe.enqueue("myapp.tasks.import_data", job_id=job_id, doc=doc.name)

Scheduler Event Types

EventFrequencyQueue
all
Every tick (v14: 4min, v15: 60s)default
hourly
Per hourdefault
daily
Per daydefault
weekly
Per weekdefault
monthly
Per monthdefault
hourly_long
Per hourlong
daily_long
Per daylong
weekly_long
Per weeklong
monthly_long
Per monthlong
cron
Custom scheduleconfigurable

Version difference scheduler tick:

  • v14: ~240 seconds (4 min)
  • v15: ~60 seconds

Queue Types

QueueTimeoutUsage
short
300s (5 min)Quick tasks, UI responses
default
300s (5 min)Standard tasks
long
1500s (25 min)Heavy processing, imports

frappe.enqueue Parameters

frappe.enqueue(
    method,                      # REQUIRED: function or module path
    queue="default",             # Queue name
    timeout=None,                # Override timeout (seconds)
    is_async=True,               # False = execute directly
    now=False,                   # True = via frappe.call()
    job_id=None,                 # v15: unique ID for deduplication
    enqueue_after_commit=False,  # Wait for DB commit
    at_front=False,              # Place at front of queue
    on_success=None,             # Success callback
    on_failure=None,             # Failure callback
    **kwargs                     # Arguments for method
)

Job Deduplication

v15+ (Recommended)

from frappe.utils.background_jobs import is_job_enqueued

job_id = f"process::{doc.name}"
if not is_job_enqueued(job_id):
    frappe.enqueue(
        "myapp.tasks.process",
        job_id=job_id,
        doc_name=doc.name
    )

v14 (Deprecated)

# DO NOT USE - only for legacy code
from frappe.core.page.background_jobs.background_jobs import get_info
enqueued = [d.get("job_name") for d in get_info()]
if name not in enqueued:
    frappe.enqueue(..., job_name=name)

Error Handling Pattern

def process_records(records):
    for record in records:
        try:
            process_single(record)
            frappe.db.commit()  # Commit per success
        except Exception:
            frappe.db.rollback()  # Rollback on error
            frappe.log_error(
                frappe.get_traceback(),
                f"Process Error: {record}"
            )

Callbacks

def on_success_handler(job, connection, result, *args, **kwargs):
    frappe.publish_realtime("show_alert", {"message": "Done!"})

def on_failure_handler(job, connection, type, value, traceback):
    frappe.log_error(f"Job {job.id} failed: {value}")

frappe.enqueue(
    "myapp.tasks.risky_task",
    on_success=on_success_handler,
    on_failure=on_failure_handler
)

User Context

IMPORTANT: Scheduler jobs run as Administrator!

def scheduled_task():
    # frappe.session.user = "Administrator"
    
    # Set explicit owner:
    doc = frappe.new_doc("ToDo")
    doc.owner = "user@example.com"
    doc.insert(ignore_permissions=True)

Monitoring

ToolDescription
RQ Worker (DocType)Worker status, busy/idle
RQ Job (DocType)Job status, queue filter
bench doctor
Scheduler status overview
Scheduled Job LogExecution history

Version Differences v14 vs v15

Featurev14v15
Tick interval4 min60 sec
Config key
scheduler_interval
scheduler_tick_interval
Deduplication
job_name
job_id
+
is_job_enqueued()

Reference Files

Critical Rules

  1. ALWAYS
    bench migrate
    after hooks.py scheduler_events changes
  2. USE
    job_id
    +
    is_job_enqueued()
    for deduplication (v15)
  3. CHOOSE correct queue: short/default/long based on duration
  4. COMMIT per successful record, rollback on error
  5. REMEMBER that jobs run as Administrator
  6. ENQUEUE heavy tasks from scheduler events, don't execute directly