Frappe_Claude_Skill_Package frappe-syntax-scheduler

install
source · Clone the upstream repo
git clone https://github.com/OpenAEC-Foundation/Frappe_Claude_Skill_Package
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/OpenAEC-Foundation/Frappe_Claude_Skill_Package "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/source/syntax/frappe-syntax-scheduler" ~/.claude/skills/openaec-foundation-frappe-claude-skill-package-frappe-syntax-scheduler && rm -rf "$T"
manifest: skills/source/syntax/frappe-syntax-scheduler/SKILL.md
source content

Frappe Scheduler & Background Jobs

Deterministic syntax reference for Frappe scheduler events and background job processing via Redis Queue (RQ).

Decision Tree

Need periodic execution?
├─ Fixed interval (hourly/daily/weekly/monthly) → scheduler_events in hooks.py
├─ Custom cron schedule → scheduler_events.cron in hooks.py
├─ User-configurable interval → Scheduled Job Type DocType
└─ No, triggered by user/event
   ├─ Run method on a specific document → frappe.enqueue_doc()
   ├─ Run standalone function async → frappe.enqueue()
   └─ Run from controller on self → self.queue_action()

Quick Reference: Scheduler Events (hooks.py)

# hooks.py — ALWAYS run bench migrate after changes
scheduler_events = {
    # Standard events (default queue)
    "all": ["myapp.tasks.every_tick"],           # Every tick [v14: 240s, v15+: 60s]
    "hourly": ["myapp.tasks.hourly_task"],
    "daily": ["myapp.tasks.daily_task"],
    "weekly": ["myapp.tasks.weekly_task"],
    "monthly": ["myapp.tasks.monthly_task"],

    # Long queue events (for heavy processing)
    "hourly_long": ["myapp.tasks.hourly_heavy"],
    "daily_long": ["myapp.tasks.daily_heavy"],
    "weekly_long": ["myapp.tasks.weekly_heavy"],
    "monthly_long": ["myapp.tasks.monthly_heavy"],

    # Cron events (croniter-compatible syntax)
    "cron": {
        "*/15 * * * *": ["myapp.tasks.every_15_min"],
        "0 9 * * 1-5": ["myapp.tasks.weekday_9am"],
        "0 0 1 * *": ["myapp.tasks.first_of_month"],
    }
}

CRITICAL: ALWAYS run

bench migrate
after ANY change to scheduler_events. Without it, changes are NOT applied.

Scheduler Event Types

EventFrequencyQueueUse Case
all
Every tick [v14: 4min, v15+: 60s]defaultFrequent polling
hourly
Once per hourdefaultSync, cleanup
daily
Once per daydefaultReports, summaries
weekly
Once per weekdefaultArchival
monthly
Once per monthdefaultBilling, statements
hourly_long
Once per hourlongHeavy sync
daily_long
Once per daylongLarge exports
weekly_long
Once per weeklongData warehousing
monthly_long
Once per monthlongAnnual reports
cron
Custom scheduleconfigurableAny custom timing

Cron Syntax

┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, Sunday=0)
│ │ │ │ │
* * * * *
SymbolMeaningExample
*
Any value
* * * * *
= every minute
,
List
1,15 * * * *
= minute 1 and 15
-
Range
0 9-17 * * *
= hours 9 through 17
/
Interval
*/10 * * * *
= every 10 minutes

Common patterns:

  • Every 5 min:
    */5 * * * *
  • Weekdays at 9:00:
    0 9 * * 1-5
  • Monday at 8:00:
    0 8 * * 1
  • Business hours hourly:
    0 9-17 * * 1-5

Quick Reference: frappe.enqueue()

frappe.enqueue(
    method,                      # REQUIRED: function or "dotted.module.path"
    queue="default",             # "short", "default", "long", or custom
    timeout=None,                # Override queue timeout (seconds)
    is_async=True,               # False = run synchronously (skip worker)
    now=False,                   # True = run via frappe.call() directly
    job_id=None,                 # [v15+] Unique ID for deduplication
    enqueue_after_commit=False,  # Wait for DB commit before enqueue
    at_front=False,              # Place at front of queue
    on_success=None,             # Success callback
    on_failure=None,             # Failure callback
    **kwargs                     # Arguments passed to method
)

Queue Types

QueueDefault TimeoutUse When
short
300s (5 min)Task < 30 seconds
default
300s (5 min)Task 30s - 5 min
long
1500s (25 min)Task 5 - 25 min
long
+ custom timeout
user-definedTask > 25 min
# Short queue — quick status update
frappe.enqueue("myapp.tasks.update_status", queue="short", doc=doc.name)

# Long queue — heavy report generation
frappe.enqueue("myapp.tasks.generate_report", queue="long", timeout=3600)

frappe.enqueue_doc()

Enqueue a controller method on a specific document.

frappe.enqueue_doc(
    "Sales Invoice",              # DocType
    "SINV-00001",                 # Document name
    "send_notification",          # Controller method name
    queue="long",
    timeout=600,
    recipient="user@example.com"  # kwargs passed to method
)

The controller method MUST be decorated with

@frappe.whitelist()
:

class SalesInvoice(Document):
    @frappe.whitelist()
    def send_notification(self, recipient):
        # self is the loaded document
        pass

self.queue_action()

Alternative from within a controller:

class SalesOrder(Document):
    def on_submit(self):
        self.queue_action("send_emails", emails=email_list)

    def send_emails(self, emails):
        for email in emails:
            send_mail(email)

Job Deduplication

[v15+] Recommended Pattern

from frappe.utils.background_jobs import is_job_enqueued

job_id = f"import::{doc.name}"
if not is_job_enqueued(job_id):
    frappe.enqueue(
        "myapp.tasks.import_data",
        job_id=job_id,
        doc_name=doc.name
    )
else:
    frappe.msgprint("Import already in progress")

[v14] Legacy Pattern (NEVER use in new code)

from frappe.core.page.background_jobs.background_jobs import get_info
enqueued = [d.get("job_name") for d in get_info()]
if name not in enqueued:
    frappe.enqueue(..., job_name=name)

Error Handling Pattern

ALWAYS use try/except with commit/rollback per record in batch jobs:

def process_records(records):
    success, errors = 0, 0
    for record in records:
        try:
            process_single(record)
            frappe.db.commit()
            success += 1
        except Exception:
            frappe.db.rollback()
            frappe.log_error(
                frappe.get_traceback(),
                f"Process Error: {record}"
            )
            errors += 1
    return {"success": success, "errors": errors}

Retry Pattern

def task_with_retry(data, retry_count=0, max_retries=3):
    try:
        external_api_call(data)
    except Exception:
        if retry_count < max_retries:
            frappe.enqueue(
                "myapp.tasks.task_with_retry",
                queue="default",
                data=data,
                retry_count=retry_count + 1,
                max_retries=max_retries,
                enqueue_after_commit=True
            )
            frappe.log_error(f"Retry {retry_count+1}/{max_retries}", "Task Retry")
        else:
            frappe.log_error(frappe.get_traceback(), f"Failed after {max_retries} retries")
            raise

Callbacks

def on_success_handler(job, connection, result, *args, **kwargs):
    frappe.publish_realtime("show_alert", {"message": "Done!"})

def on_failure_handler(job, connection, type, value, traceback):
    frappe.log_error(f"Job {job.id} failed: {value}", "Job Error")

frappe.enqueue(
    "myapp.tasks.risky_task",
    on_success=on_success_handler,
    on_failure=on_failure_handler,
)

Progress Updates

def long_task(items, user):
    total = len(items)
    for i, item in enumerate(items):
        process_item(item)
        frappe.publish_realtime(
            "task_progress",
            {"progress": (i + 1) / total * 100, "current": i + 1, "total": total},
            user=user,
        )

User Context

CRITICAL: Scheduler jobs run as Administrator. ALWAYS set explicit ownership when creating documents:

def scheduled_task():
    doc = frappe.new_doc("ToDo")
    doc.owner = "user@example.com"
    doc.insert(ignore_permissions=True)

Monitoring

ToolPurpose
bench doctor
Scheduler status, worker health
RQ Worker (DocType)Worker status: busy/idle
RQ Job (DocType)Job status, queue filtering
Scheduled Job Log (DocType)Execution history, errors
logs/worker.error.log
Worker exceptions
logs/scheduler.log
Scheduler activity

Version Differences

Featurev14v15+
Tick interval (
all
event)
~240s (4 min)~60s
Config key for tick
scheduler_interval
scheduler_tick_interval
Deduplication
job_name
(deprecated)
job_id
+
is_job_enqueued()

Custom tick in

common_site_config.json
:

{ "scheduler_tick_interval": 120 }

Critical Rules

  1. ALWAYS run
    bench migrate
    after any scheduler_events change in hooks.py
  2. ALWAYS use
    job_id
    +
    is_job_enqueued()
    for deduplication [v15+]
  3. ALWAYS choose the correct queue: short/default/long based on task duration
  4. ALWAYS commit per record and rollback on error in batch jobs
  5. ALWAYS remember that scheduler jobs run as Administrator
  6. NEVER run heavy logic directly in a scheduler event — enqueue it instead
  7. NEVER use
    job_name
    for deduplication in new code (v14 legacy)

Reference Files

See Also

  • frappe-syntax-hooks
    — Full hooks.py reference
  • frappe-core-background
    — Background job architecture
  • frappe-errors-jobs
    — Job failure debugging