Claude-skill-registry erpnext-syntax-scheduler
Scheduler and background jobs syntax for Frappe/ERPNext v14/v15/v16. Use for scheduler_events in hooks.py, frappe.enqueue() for async jobs, queue configuration, job deduplication, error handling, and monitoring. Triggers on questions about scheduled tasks, background processing, cron jobs, RQ workers, job queues, async tasks.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/erpnext-syntax-scheduler" ~/.claude/skills/majiayu000-claude-skill-registry-erpnext-syntax-scheduler && rm -rf "$T"
manifest:
skills/data/erpnext-syntax-scheduler/SKILL.mdsource content
ERPNext Syntax: Scheduler & Background Jobs
Deterministic syntax reference for Frappe scheduler events and background job processing.
Quick Reference
Scheduler Events (hooks.py)
# hooks.py scheduler_events = { "all": ["myapp.tasks.every_tick"], "hourly": ["myapp.tasks.hourly_task"], "daily": ["myapp.tasks.daily_task"], "weekly": ["myapp.tasks.weekly_task"], "monthly": ["myapp.tasks.monthly_task"], "daily_long": ["myapp.tasks.heavy_daily"], # Long queue "cron": { "0 9 * * 1-5": ["myapp.tasks.weekday_9am"], "*/15 * * * *": ["myapp.tasks.every_15_min"] } }
CRITICAL: After EVERY change to scheduler_events:
bench migrate
frappe.enqueue Basics
# Simple frappe.enqueue("myapp.tasks.process", customer="CUST-001") # With queue and timeout frappe.enqueue( "myapp.tasks.heavy_task", queue="long", timeout=3600, param="value" ) # With deduplication (v15) from frappe.utils.background_jobs import is_job_enqueued job_id = f"import::{doc.name}" if not is_job_enqueued(job_id): frappe.enqueue("myapp.tasks.import_data", job_id=job_id, doc=doc.name)
Scheduler Event Types
| Event | Frequency | Queue |
|---|---|---|
| Every tick (v14: 4min, v15: 60s) | default |
| Per hour | default |
| Per day | default |
| Per week | default |
| Per month | default |
| Per hour | long |
| Per day | long |
| Per week | long |
| Per month | long |
| Custom schedule | configurable |
Version difference scheduler tick:
- v14: ~240 seconds (4 min)
- v15: ~60 seconds
Queue Types
| Queue | Timeout | Usage |
|---|---|---|
| 300s (5 min) | Quick tasks, UI responses |
| 300s (5 min) | Standard tasks |
| 1500s (25 min) | Heavy processing, imports |
frappe.enqueue Parameters
frappe.enqueue( method, # REQUIRED: function or module path queue="default", # Queue name timeout=None, # Override timeout (seconds) is_async=True, # False = execute directly now=False, # True = via frappe.call() job_id=None, # v15: unique ID for deduplication enqueue_after_commit=False, # Wait for DB commit at_front=False, # Place at front of queue on_success=None, # Success callback on_failure=None, # Failure callback **kwargs # Arguments for method )
Job Deduplication
v15+ (Recommended)
from frappe.utils.background_jobs import is_job_enqueued job_id = f"process::{doc.name}" if not is_job_enqueued(job_id): frappe.enqueue( "myapp.tasks.process", job_id=job_id, doc_name=doc.name )
v14 (Deprecated)
# DO NOT USE - only for legacy code from frappe.core.page.background_jobs.background_jobs import get_info enqueued = [d.get("job_name") for d in get_info()] if name not in enqueued: frappe.enqueue(..., job_name=name)
Error Handling Pattern
def process_records(records): for record in records: try: process_single(record) frappe.db.commit() # Commit per success except Exception: frappe.db.rollback() # Rollback on error frappe.log_error( frappe.get_traceback(), f"Process Error: {record}" )
Callbacks
def on_success_handler(job, connection, result, *args, **kwargs): frappe.publish_realtime("show_alert", {"message": "Done!"}) def on_failure_handler(job, connection, type, value, traceback): frappe.log_error(f"Job {job.id} failed: {value}") frappe.enqueue( "myapp.tasks.risky_task", on_success=on_success_handler, on_failure=on_failure_handler )
User Context
IMPORTANT: Scheduler jobs run as Administrator!
def scheduled_task(): # frappe.session.user = "Administrator" # Set explicit owner: doc = frappe.new_doc("ToDo") doc.owner = "user@example.com" doc.insert(ignore_permissions=True)
Monitoring
| Tool | Description |
|---|---|
| RQ Worker (DocType) | Worker status, busy/idle |
| RQ Job (DocType) | Job status, queue filter |
| Scheduler status overview |
| Scheduled Job Log | Execution history |
Version Differences v14 vs v15
| Feature | v14 | v15 |
|---|---|---|
| Tick interval | 4 min | 60 sec |
| Config key | | |
| Deduplication | | + |
Reference Files
- scheduler-events.md: All event types, cron syntax, configuration
- enqueue-api.md: Complete frappe.enqueue/enqueue_doc API
- queues.md: Queue types, timeouts, custom queues, workers
- examples.md: Complete working examples
- anti-patterns.md: Common mistakes and corrections
Critical Rules
- ALWAYS
after hooks.py scheduler_events changesbench migrate - USE
+job_id
for deduplication (v15)is_job_enqueued() - CHOOSE correct queue: short/default/long based on duration
- COMMIT per successful record, rollback on error
- REMEMBER that jobs run as Administrator
- ENQUEUE heavy tasks from scheduler events, don't execute directly