Claude-skill-registry FastAPI Background Tasks
This skill should be used when the user asks to "create background task", "add async job", "implement task queue", "schedule periodic task", "use Celery", "use ARQ", "process async", or mentions background processing, task queues, job scheduling, workers, or async jobs. Provides multiple task queue framework patterns.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/fastapi-background" ~/.claude/skills/majiayu000-claude-skill-registry-fastapi-background-tasks && rm -rf "$T"
manifest:
skills/data/fastapi-background/SKILL.mdsource content
FastAPI Background Task Processing
This skill provides patterns for background task processing with multiple frameworks: ARQ (recommended for async), Celery, and Dramatiq.
ARQ (Async Redis Queue) - Recommended
Installation
pip install arq
Configuration
# app/workers/config.py from arq.connections import RedisSettings from app.config import get_settings settings = get_settings() class WorkerSettings: redis_settings = RedisSettings( host=settings.redis_host, port=settings.redis_port, password=settings.redis_password, database=1 # Separate from cache ) # Job settings max_jobs = 10 job_timeout = 300 # 5 minutes keep_result = 3600 # 1 hour queue_name = "default" # Cron jobs cron_jobs = []
Task Definitions
# app/workers/tasks.py from arq import cron from typing import Dict, Any import asyncio async def send_email(ctx: Dict[str, Any], to: str, subject: str, body: str): """Send email asynchronously.""" email_service = ctx.get("email_service") await email_service.send(to=to, subject=subject, body=body) return {"status": "sent", "to": to} async def process_upload(ctx: Dict[str, Any], file_id: str, user_id: str): """Process uploaded file (resize, convert, etc.).""" storage = ctx.get("storage") file_data = await storage.get(file_id) # Process file processed = await process_file(file_data) # Save processed file await storage.put(f"processed/{file_id}", processed) return {"status": "processed", "file_id": file_id} async def cleanup_expired(ctx: Dict[str, Any]): """Periodic cleanup of expired data.""" db = ctx.get("db") result = await db.delete_expired() return {"deleted": result.deleted_count} # Cron job example @cron(hour=2, minute=0) # Run at 2 AM daily async def daily_report(ctx: Dict[str, Any]): """Generate daily report.""" report_service = ctx.get("report_service") await report_service.generate_daily()
Worker Entry Point
# app/workers/main.py from arq import create_pool from arq.connections import RedisSettings from app.workers.config import WorkerSettings from app.workers.tasks import send_email, process_upload, cleanup_expired, daily_report from app.infrastructure.database import init_database from app.services.email import EmailService async def startup(ctx: Dict[str, Any]): """Worker startup - initialize services.""" await init_database() ctx["email_service"] = EmailService() ctx["db"] = get_db() async def shutdown(ctx: Dict[str, Any]): """Worker shutdown - cleanup.""" await close_database() class WorkerSettings(WorkerSettings): functions = [send_email, process_upload, cleanup_expired] cron_jobs = [daily_report] on_startup = startup on_shutdown = shutdown # Run with: arq app.workers.main.WorkerSettings
Enqueueing Tasks from FastAPI
# app/dependencies.py from arq import ArqRedis, create_pool from arq.connections import RedisSettings async def get_task_queue() -> ArqRedis: return await create_pool(RedisSettings()) # app/routes/users.py from fastapi import Depends from arq import ArqRedis @router.post("/users/{user_id}/welcome") async def send_welcome_email( user_id: str, queue: ArqRedis = Depends(get_task_queue) ): user = await get_user(user_id) # Enqueue background task job = await queue.enqueue_job( "send_email", to=user.email, subject="Welcome!", body="Thanks for signing up." ) return {"job_id": job.job_id, "status": "queued"} @router.post("/uploads") async def upload_file( file: UploadFile, user: User = Depends(get_current_user), queue: ArqRedis = Depends(get_task_queue) ): # Save file file_id = await save_file(file) # Enqueue processing await queue.enqueue_job( "process_upload", file_id=file_id, user_id=str(user.id), _defer_by=5 # Delay 5 seconds ) return {"file_id": file_id, "status": "processing"}
Celery (Battle-Tested)
Configuration
# app/workers/celery_app.py from celery import Celery from app.config import get_settings settings = get_settings() celery_app = Celery( "worker", broker=settings.celery_broker_url, backend=settings.celery_result_backend, include=["app.workers.celery_tasks"] ) celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="UTC", enable_utc=True, task_track_started=True, task_time_limit=300, worker_prefetch_multiplier=1, ) # Periodic tasks (Celery Beat) celery_app.conf.beat_schedule = { "cleanup-every-hour": { "task": "app.workers.celery_tasks.cleanup_expired", "schedule": 3600.0, }, "daily-report": { "task": "app.workers.celery_tasks.generate_daily_report", "schedule": crontab(hour=2, minute=0), }, }
Celery Tasks
# app/workers/celery_tasks.py from app.workers.celery_app import celery_app import asyncio def run_async(coro): """Helper to run async code in sync Celery tasks.""" loop = asyncio.get_event_loop() return loop.run_until_complete(coro) @celery_app.task(bind=True, max_retries=3) def send_email(self, to: str, subject: str, body: str): try: run_async(_send_email_async(to, subject, body)) return {"status": "sent", "to": to} except Exception as exc: self.retry(exc=exc, countdown=60) @celery_app.task def process_upload(file_id: str, user_id: str): run_async(_process_upload_async(file_id, user_id)) return {"status": "processed", "file_id": file_id}
Dramatiq (Modern Celery Alternative)
Configuration
# app/workers/dramatiq_app.py import dramatiq from dramatiq.brokers.redis import RedisBroker from dramatiq.results import Results from dramatiq.results.backends import RedisBackend redis_broker = RedisBroker(url="redis://localhost:6379/0") result_backend = RedisBackend(url="redis://localhost:6379/1") redis_broker.add_middleware(Results(backend=result_backend)) dramatiq.set_broker(redis_broker)
Dramatiq Tasks
# app/workers/dramatiq_tasks.py import dramatiq @dramatiq.actor(max_retries=3, min_backoff=1000) def send_email(to: str, subject: str, body: str): # Sync implementation return {"status": "sent", "to": to} @dramatiq.actor(time_limit=300000) # 5 min timeout def process_upload(file_id: str, user_id: str): return {"status": "processed", "file_id": file_id}
FastAPI Built-in Background Tasks
For simple fire-and-forget tasks (no persistence):
from fastapi import BackgroundTasks async def write_log(message: str): with open("log.txt", "a") as f: f.write(f"{message}\n") @router.post("/log") async def create_log(message: str, background_tasks: BackgroundTasks): background_tasks.add_task(write_log, message) return {"status": "logged"}
Additional Resources
Reference Files
For detailed patterns:
- ARQ advanced patterns, retries, prioritiesreferences/arq-advanced.md
- Celery best practices, chains, groupsreferences/celery-patterns.md
- Flower, task monitoringreferences/monitoring.md
Example Files
Working examples in
examples/:
- Complete ARQ workerexamples/arq_worker.py
- Celery configurationexamples/celery_app.py
- Task enqueueing serviceexamples/task_service.py