Skills celery

install
source · Clone the upstream repo
git clone https://github.com/TerminalSkills/skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/TerminalSkills/skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/celery" ~/.claude/skills/terminalskills-skills-celery && rm -rf "$T"
manifest: skills/celery/SKILL.md
safety · automated scan (low risk)
This is a pattern-based risk scan, not a security review. Our crawler flagged:
  • pip install
Always read a skill's source content before installing. Patterns alone don't mean the skill is malicious — but they warrant attention.
source content

Celery

Overview

Celery is the standard Python library for distributed task processing. Offload slow operations (email sending, report generation, image processing) from web requests to background workers. Supports task retries, scheduling, rate limiting, and chaining.

Instructions

Step 1: Setup

pip install celery[redis]
# celery_app.py — Celery application configuration
from celery import Celery

app = Celery(
    'myapp',
    broker='redis://localhost:6379/0',       # message broker
    backend='redis://localhost:6379/1',       # result storage
)

app.conf.update(
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    timezone='UTC',
    task_acks_late=True,                     # ack after processing (safer)
    worker_prefetch_multiplier=1,            # one task at a time per worker
)

Step 2: Define Tasks

# tasks.py — Background task definitions
from celery_app import app
from celery import shared_task
import time

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_id: int):
    """Send welcome email to new user.

    Args:
        user_id: Database ID of the newly registered user
    """
    try:
        user = get_user(user_id)
        send_email(
            to=user.email,
            subject='Welcome!',
            body=render_template('welcome.html', user=user),
        )
    except EmailServiceError as exc:
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))


@app.task(rate_limit='10/m')    # max 10 per minute
def process_image(image_path: str, output_path: str):
    """Resize and optimize uploaded image."""
    img = Image.open(image_path)
    img.thumbnail((1200, 1200))
    img.save(output_path, optimize=True, quality=85)
    return output_path


@app.task
def generate_report(org_id: int, start_date: str, end_date: str):
    """Generate analytics report (may take several minutes)."""
    data = fetch_analytics(org_id, start_date, end_date)
    pdf_path = render_pdf_report(data)
    notify_user(org_id, pdf_path)
    return pdf_path

Step 3: Call Tasks

# In your web handler (Django view, FastAPI endpoint, etc.)
from tasks import send_welcome_email, generate_report
from celery import chain, group

# Fire and forget
send_welcome_email.delay(user.id)

# Get result later
result = generate_report.delay(org.id, '2025-01-01', '2025-01-31')
print(result.status)      # PENDING → STARTED → SUCCESS
print(result.get())        # blocks until done

# Chain: task1 result feeds into task2
chain(extract_data.s(url), transform_data.s(), load_data.s())()

# Group: run tasks in parallel
group(process_image.s(path) for path in image_paths)()

Step 4: Run Workers

celery -A celery_app worker --loglevel=info --concurrency=4
celery -A celery_app beat --loglevel=info    # for periodic tasks

Guidelines

  • Always use
    task_acks_late=True
    for reliability — tasks survive worker crashes.
  • Use
    bind=True
    and
    self.retry()
    for automatic retry with backoff.
  • Redis is the simplest broker; RabbitMQ is more robust for production.
  • Monitor with Flower:
    celery -A celery_app flower
    (web dashboard on port 5555).