git clone https://github.com/MacPhobos/research-mind
T=$(mktemp -d) && git clone --depth=1 https://github.com/MacPhobos/research-mind "$T" && mkdir -p ~/.claude/skills && cp -r "$T/.claude/skills/toolchains-python-async-celery" ~/.claude/skills/macphobos-research-mind-toolchains-python-async-celery && rm -rf "$T"
.claude/skills/toolchains-python-async-celery/skill.mdCelery: Distributed Task Queue
Summary
Celery is a distributed task queue system for Python that enables asynchronous execution of background jobs across multiple workers. It supports scheduling, retries, task workflows, and integrates seamlessly with Django, Flask, and FastAPI.
When to Use
- Background Processing: Offload long-running operations (email, file processing, reports)
- Scheduled Tasks: Cron-like periodic jobs (cleanup, backups, data sync)
- Distributed Computing: Process tasks across multiple workers/servers
- Async Workflows: Chain, group, and orchestrate complex task dependencies
- Real-time Processing: Handle webhooks, notifications, data pipelines
- Load Balancing: Distribute CPU-intensive work across workers
Don't Use When:
- Simple async I/O (use
instead)asyncio - Real-time request/response (use async web frameworks)
- Sub-second latency required (use in-memory queues)
- Minimal infrastructure (use simpler alternatives like RQ or Huey)
Quick Start
Installation
# Basic installation pip install celery # With Redis broker pip install celery[redis] # With RabbitMQ broker pip install celery[amqp] # Full batteries (recommended) pip install celery[redis,msgpack,auth,cassandra,elasticsearch,s3,sqs]
Basic Setup
# celery_app.py from celery import Celery # Create Celery app with Redis broker app = Celery( 'myapp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1' ) # Configuration app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True, ) # Define a task @app.task def add(x, y): return x + y @app.task def send_email(to, subject, body): # Simulate email sending import time time.sleep(2) print(f"Email sent to {to}: {subject}") return {"status": "sent", "to": to}
Running Workers
# Start worker celery -A celery_app worker --loglevel=info # Multiple workers with concurrency celery -A celery_app worker --concurrency=4 --loglevel=info # Named worker for specific queues celery -A celery_app worker -Q emails,reports --loglevel=info
Executing Tasks
# Call task asynchronously result = add.delay(4, 6) # Wait for result print(result.get(timeout=10)) # 10 # Apply async with options result = send_email.apply_async( args=['user@example.com', 'Hello', 'Welcome!'], countdown=60 # Execute after 60 seconds ) # Check task state print(result.status) # PENDING, STARTED, SUCCESS, FAILURE
Core Concepts
Architecture Components
Broker: Message queue that stores tasks
- Redis (recommended for most use cases)
- RabbitMQ (enterprise-grade, complex)
- Amazon SQS (serverless, AWS-native)
Workers: Processes that execute tasks
- Pull tasks from broker
- Execute task code
- Store results in backend
Result Backend: Storage for task results
- Redis (fast, in-memory)
- Database (PostgreSQL, MySQL)
- S3 (large results)
- Cassandra, Elasticsearch (specialized)
Beat Scheduler: Periodic task scheduler
- Cron-like scheduling
- Interval-based tasks
- Stores schedule in database or file
Task States
PENDING → STARTED → SUCCESS → RETRY → SUCCESS → FAILURE
- PENDING: Task waiting in queue
- STARTED: Worker picked up task
- SUCCESS: Task completed successfully
- FAILURE: Task raised exception
- RETRY: Task will retry after failure
- REVOKED: Task cancelled before execution
Broker Setup
Redis Configuration
# celery_config.py broker_url = 'redis://localhost:6379/0' result_backend = 'redis://localhost:6379/1' # With authentication broker_url = 'redis://:password@localhost:6379/0' # Redis Sentinel (high availability) broker_url = 'sentinel://localhost:26379;sentinel://localhost:26380' broker_transport_options = { 'master_name': 'mymaster', 'sentinel_kwargs': {'password': 'password'}, } # Redis connection pool settings broker_pool_limit = 10 broker_connection_retry = True broker_connection_retry_on_startup = True broker_connection_max_retries = 10
RabbitMQ Configuration
# Basic RabbitMQ broker_url = 'amqp://guest:guest@localhost:5672//' # With virtual host broker_url = 'amqp://user:password@localhost:5672/myvhost' # High availability (multiple brokers) broker_url = [ 'amqp://user:password@host1:5672//', 'amqp://user:password@host2:5672//', ] # RabbitMQ-specific settings broker_heartbeat = 30 broker_pool_limit = 10
Amazon SQS Configuration
# AWS SQS (serverless) broker_url = 'sqs://' broker_transport_options = { 'region': 'us-east-1', 'queue_name_prefix': 'myapp-', 'visibility_timeout': 3600, 'polling_interval': 1, } # With custom credentials import boto3 broker_transport_options = { 'region': 'us-east-1', 'predefined_queues': { 'default': { 'url': 'https://sqs.us-east-1.amazonaws.com/123456789/myapp-default', } } }
Task Basics
Task Definition
from celery import Task, shared_task from celery_app import app # Method 1: Decorator @app.task def simple_task(x, y): return x + y # Method 2: Shared task (framework-agnostic) @shared_task def framework_task(data): return process(data) # Method 3: Task class (advanced) class CustomTask(Task): def on_success(self, retval, task_id, args, kwargs): print(f"Task {task_id} succeeded with {retval}") def on_failure(self, exc, task_id, args, kwargs, einfo): print(f"Task {task_id} failed: {exc}") def on_retry(self, exc, task_id, args, kwargs, einfo): print(f"Task {task_id} retrying: {exc}") @app.task(base=CustomTask) def monitored_task(x): return x * 2
Task Options
@app.task( name='custom.task.name', # Custom task name bind=True, # Bind task instance as first arg ignore_result=True, # Don't store result (performance) max_retries=3, # Max retry attempts default_retry_delay=60, # Retry delay in seconds rate_limit='100/h', # Rate limiting time_limit=300, # Hard time limit (kills task) soft_time_limit=240, # Soft time limit (raises exception) serializer='json', # Task serializer compression='gzip', # Compress large messages priority=5, # Task priority (0-9) queue='high_priority', # Target queue routing_key='priority.high', # Routing key acks_late=True, # Acknowledge after execution reject_on_worker_lost=True, # Reject if worker dies ) def advanced_task(self, data): try: return process(data) except Exception as exc: # Retry with exponential backoff raise self.retry(exc=exc, countdown=2 ** self.request.retries)
Task Context (bind=True)
@app.task(bind=True) def context_aware_task(self, x, y): # Access task metadata print(f"Task ID: {self.request.id}") print(f"Task Name: {self.name}") print(f"Args: {self.request.args}") print(f"Kwargs: {self.request.kwargs}") print(f"Retries: {self.request.retries}") print(f"Delivery Info: {self.request.delivery_info}") # Manual retry try: result = risky_operation(x, y) except Exception as exc: raise self.retry(exc=exc, countdown=60, max_retries=3) return result
Task Execution
Delay vs Apply Async
# delay() - Simple async execution result = add.delay(4, 6) # apply_async() - Full control result = add.apply_async( args=(4, 6), kwargs={'extra': 'data'}, # Timing options countdown=60, # Execute after N seconds eta=datetime(2025, 12, 1, 10, 0), # Execute at specific time expires=3600, # Task expires after N seconds # Routing options queue='math', # Target specific queue routing_key='math.add', # Custom routing key exchange='tasks', # Target exchange priority=9, # High priority # Execution options serializer='json', # Message serializer compression='gzip', # Compress payload retry=True, # Auto-retry on failure retry_policy={ 'max_retries': 3, 'interval_start': 0, 'interval_step': 0.2, 'interval_max': 0.2, }, # Task linking link=log_result.s(), # Success callback link_error=handle_error.s(), # Error callback ) # Check result if result.ready(): print(result.get()) # Get result (blocks) print(result.result) # Get result (non-blocking)
Task Signatures
from celery import signature # Create signature (doesn't execute) sig = add.signature((2, 2), countdown=10) sig = add.s(2, 2) # Shorthand # Partial arguments (currying) partial = add.s(2) # One arg fixed result = partial.apply_async(args=(4,)) # Add second arg # Immutable signature (args can't be replaced) immutable = add.si(2, 2) # Clone and modify new_sig = sig.clone(countdown=60) # Execute signature result = sig.delay() result = sig.apply_async() result = sig() # Synchronous execution
Result Handling
# Basic result retrieval result = add.delay(4, 6) value = result.get(timeout=10) # Blocks until complete # Non-blocking result check if result.ready(): print(result.result) # Result states print(result.status) # PENDING, STARTED, SUCCESS, FAILURE print(result.successful()) # True if SUCCESS print(result.failed()) # True if FAILURE # Result metadata print(result.traceback) # Exception traceback if failed print(result.info) # Task return value or exception # Forget result (free memory) result.forget() # Revoke task (cancel) result.revoke(terminate=True) # Kill running task add.AsyncResult(task_id).revoke() # Revoke by ID
Task Routing
Queue Configuration
# Define queues from kombu import Queue, Exchange app.conf.task_queues = ( Queue('default', Exchange('default'), routing_key='default'), Queue('high_priority', Exchange('priority'), routing_key='priority.high'), Queue('low_priority', Exchange('priority'), routing_key='priority.low'), Queue('emails', Exchange('tasks'), routing_key='tasks.email'), Queue('reports', Exchange('tasks'), routing_key='tasks.report'), ) # Default queue app.conf.task_default_queue = 'default' app.conf.task_default_exchange = 'tasks' app.conf.task_default_routing_key = 'default'
Task Routing Rules
# Route specific tasks to queues app.conf.task_routes = { 'myapp.tasks.send_email': {'queue': 'emails'}, 'myapp.tasks.generate_report': {'queue': 'reports', 'priority': 9}, 'myapp.tasks.*': {'queue': 'default'}, } # Function-based routing def route_task(name, args, kwargs, options, task=None, **kw): if 'email' in name: return {'queue': 'emails', 'routing_key': 'email.send'} elif 'report' in name: return {'queue': 'reports', 'priority': 5} return {'queue': 'default'} app.conf.task_routes = (route_task,)
Worker Queue Binding
# Worker consuming specific queues celery -A myapp worker -Q emails,reports --loglevel=info # Multiple workers for different queues celery -A myapp worker -Q high_priority -c 4 --loglevel=info celery -A myapp worker -Q default -c 2 --loglevel=info celery -A myapp worker -Q low_priority -c 1 --loglevel=info
Priority Queues
# Configure priority support app.conf.task_queue_max_priority = 10 app.conf.task_default_priority = 5 # Send task with priority high_priority_task.apply_async(args=(), priority=9) low_priority_task.apply_async(args=(), priority=1) # Priority-based routing app.conf.task_routes = { 'critical_task': {'queue': 'default', 'priority': 10}, 'background_task': {'queue': 'default', 'priority': 1}, }
Periodic Tasks
Celery Beat Setup
# celery_config.py from celery.schedules import crontab, solar # Periodic task schedule beat_schedule = { # Run every 30 seconds 'add-every-30-seconds': { 'task': 'myapp.tasks.add', 'schedule': 30.0, 'args': (16, 16) }, # Run every morning at 7:30 AM 'send-daily-report': { 'task': 'myapp.tasks.send_daily_report', 'schedule': crontab(hour=7, minute=30), }, # Run every Monday morning 'weekly-cleanup': { 'task': 'myapp.tasks.cleanup', 'schedule': crontab(hour=0, minute=0, day_of_week=1), }, # Run on specific days 'monthly-report': { 'task': 'myapp.tasks.monthly_report', 'schedule': crontab(hour=0, minute=0, day_of_month='1'), 'kwargs': {'month_offset': 1} }, # Solar schedule (sunrise/sunset) 'wake-up-at-sunrise': { 'task': 'myapp.tasks.morning_routine', 'schedule': solar('sunrise', -37.81, 144.96), # Melbourne }, } app.conf.beat_schedule = beat_schedule
Crontab Patterns
from celery.schedules import crontab # Every minute crontab() # Every 15 minutes crontab(minute='*/15') # Every hour at :30 crontab(minute=30) # Every day at midnight crontab(hour=0, minute=0) # Every weekday at 5 PM crontab(hour=17, minute=0, day_of_week='1-5') # Every Monday, Wednesday, Friday at noon crontab(hour=12, minute=0, day_of_week='mon,wed,fri') # First day of month crontab(hour=0, minute=0, day_of_month='1') # Last day of month (use day_of_month='28-31' with logic in task) crontab(hour=0, minute=0, day_of_month='28-31') # Quarterly (every 3 months) crontab(hour=0, minute=0, day_of_month='1', month_of_year='*/3')
Running Beat Scheduler
# Start beat scheduler celery -A myapp beat --loglevel=info # Beat with custom scheduler celery -A myapp beat --scheduler django_celery_beat.schedulers:DatabaseScheduler # Combine worker and beat (development only) celery -A myapp worker --beat --loglevel=info
Dynamic Schedules (django-celery-beat)
pip install django-celery-beat
# settings.py (Django) INSTALLED_APPS = [ 'django_celery_beat', ] # Migrate database python manage.py migrate django_celery_beat # Run beat with database scheduler celery -A myapp beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# Create periodic task via Django admin or ORM from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule import json # Create interval schedule (every 10 seconds) schedule, created = IntervalSchedule.objects.get_or_create( every=10, period=IntervalSchedule.SECONDS, ) PeriodicTask.objects.create( interval=schedule, name='Import feed every 10 seconds', task='myapp.tasks.import_feed', args=json.dumps(['https://example.com/feed']), ) # Create crontab schedule schedule, created = CrontabSchedule.objects.get_or_create( minute='0', hour='*/4', # Every 4 hours day_of_week='*', day_of_month='*', month_of_year='*', ) PeriodicTask.objects.create( crontab=schedule, name='Hourly cleanup', task='myapp.tasks.cleanup', )
Workflows (Canvas)
Chains
from celery import chain # Sequential execution result = chain(add.s(2, 2), add.s(4), add.s(8))() # Equivalent to: add(add(add(2, 2), 4), 8) # Result: 16 # Shorthand syntax result = (add.s(2, 2) | add.s(4) | add.s(8))() # Chain with different tasks workflow = ( fetch_data.s(url) | process_data.s() | save_results.s() ) result = workflow.apply_async()
Groups
from celery import group # Parallel execution job = group([ add.s(2, 2), add.s(4, 4), add.s(8, 8), ]) result = job.apply_async() # Wait for all results results = result.get(timeout=10) # [4, 8, 16] # Group with callbacks job = group([ process_item.s(item) for item in items ]) | summarize_results.s()
Chords
from celery import chord # Group with callback job = chord([ fetch_url.s(url) for url in urls ])(combine_results.s()) # Example: Process multiple files, then merge workflow = chord([ process_file.s(file) for file in files ])(merge_results.s()) result = workflow.apply_async()
Map and Starmap
from celery import group # Map: Apply same task to list of args results = add.map([(2, 2), (4, 4), (8, 8)]) # Starmap: Unpack arguments results = add.starmap([(2, 2), (4, 4), (8, 8)]) # Equivalent to: results = group([add.s(2, 2), add.s(4, 4), add.s(8, 8)])()
Complex Workflows
from celery import chain, group, chord # Parallel processing with sequential steps workflow = chain( # Step 1: Fetch data fetch_data.s(source), # Step 2: Process in parallel group([ process_chunk.s(chunk_id) for chunk_id in range(10) ]), # Step 3: Aggregate results aggregate.s(), # Step 4: Save to database save_results.s() ) # Nested chords workflow = chord([ chord([ subtask.s(item) for item in chunk ])(process_chunk.s()) for chunk in chunks ])(final_callback.s()) # Real-world example: Report generation generate_report = chain( fetch_user_data.s(user_id), chord([ calculate_stats.s(), fetch_transactions.s(), fetch_activity.s(), ])(combine_sections.s()), render_pdf.s(), send_email.s(user_email) )
Error Handling
Automatic Retries
@app.task( autoretry_for=(RequestException, IOError), # Auto-retry these exceptions retry_kwargs={'max_retries': 5}, # Max 5 retries retry_backoff=True, # Exponential backoff retry_backoff_max=600, # Max 10 minutes backoff retry_jitter=True, # Add randomness to backoff ) def fetch_url(url): response = requests.get(url) response.raise_for_status() return response.json()
Manual Retries
@app.task(bind=True, max_retries=3) def process_data(self, data): try: result = external_api_call(data) return result except TemporaryError as exc: # Retry after 60 seconds raise self.retry(exc=exc, countdown=60) except PermanentError as exc: # Don't retry, log and fail logger.error(f"Permanent error: {exc}") raise except Exception as exc: # Exponential backoff raise self.retry( exc=exc, countdown=2 ** self.request.retries, max_retries=3 )
Error Callbacks
@app.task def on_error(request, exc, traceback): """Called when task fails""" logger.error(f"Task {request.id} failed: {exc}") send_alert(f"Task failure: {request.task}", str(exc)) @app.task def risky_task(data): return process(data) # Link error callback risky_task.apply_async( args=(data,), link_error=on_error.s() )
Task Failure Handling
from celery import Task class CallbackTask(Task): def on_failure(self, exc, task_id, args, kwargs, einfo): """Handle task failure""" logger.error(f"Task {task_id} failed with {exc}") # Send notification send_notification('Task Failed', str(exc)) def on_success(self, retval, task_id, args, kwargs): """Handle task success""" logger.info(f"Task {task_id} succeeded: {retval}") def on_retry(self, exc, task_id, args, kwargs, einfo): """Handle task retry""" logger.warning(f"Task {task_id} retrying: {exc}") @app.task(base=CallbackTask) def monitored_task(x): if x < 0: raise ValueError("Negative value") return x * 2
Exception Handling Patterns
@app.task(bind=True) def robust_task(self, data): # Categorize exceptions try: return process(data) except NetworkError as exc: # Transient error - retry raise self.retry(exc=exc, countdown=60, max_retries=5) except ValidationError as exc: # Permanent error - don't retry logger.error(f"Invalid data: {exc}") return {'status': 'failed', 'error': str(exc)} except DatabaseError as exc: # Critical error - retry with exponential backoff backoff = min(2 ** self.request.retries * 60, 3600) raise self.retry(exc=exc, countdown=backoff, max_retries=10) except Exception as exc: # Unknown error - retry limited times if self.request.retries < 3: raise self.retry(exc=exc, countdown=120) else: # Max retries exceeded - fail and alert logger.critical(f"Task failed after retries: {exc}") send_alert('Critical Task Failure', str(exc)) raise
Monitoring and Management
Task Events
# Enable events app.conf.worker_send_task_events = True app.conf.task_send_sent_event = True # Event listeners from celery import signals @signals.task_prerun.connect def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **extra): print(f"Task {task.name}[{task_id}] starting") @signals.task_postrun.connect def task_postrun_handler(sender=None, task_id=None, task=None, retval=None, **extra): print(f"Task {task.name}[{task_id}] completed: {retval}") @signals.task_failure.connect def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, **extra): print(f"Task {task_id} failed: {exception}") @signals.task_retry.connect def task_retry_handler(sender=None, task_id=None, reason=None, **extra): print(f"Task {task_id} retrying: {reason}")
Flower Monitoring
# Install Flower pip install flower # Start Flower celery -A myapp flower --port=5555 # Access dashboard # http://localhost:5555
# Flower configuration flower_basic_auth = ['admin:password'] flower_persistent = True flower_db = 'flower.db' flower_max_tasks = 10000
Inspecting Workers
from celery_app import app # Get active tasks i = app.control.inspect() print(i.active()) # Get scheduled tasks print(i.scheduled()) # Get reserved tasks print(i.reserved()) # Get worker stats print(i.stats()) # Get registered tasks print(i.registered()) # Revoke task app.control.revoke(task_id, terminate=True) # Shutdown worker app.control.shutdown() # Pool restart app.control.pool_restart() # Rate limit app.control.rate_limit('myapp.tasks.slow_task', '10/m')
Command Line Inspection
# List active tasks celery -A myapp inspect active # List scheduled tasks celery -A myapp inspect scheduled # Worker stats celery -A myapp inspect stats # Registered tasks celery -A myapp inspect registered # Revoke task celery -A myapp control revoke <task_id> # Shutdown workers celery -A myapp control shutdown # Purge all tasks celery -A myapp purge
Custom Metrics
@app.task(bind=True) def tracked_task(self, data): from prometheus_client import Counter, Histogram task_counter = Counter('celery_tasks_total', 'Total tasks') task_duration = Histogram('celery_task_duration_seconds', 'Task duration') with task_duration.time(): result = process(data) task_counter.inc() return result
Framework Integration
Django Integration
# myproject/celery.py import os from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings') app = Celery('myproject') app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks() # myproject/__init__.py from .celery import app as celery_app __all__ = ('celery_app',) # settings.py CELERY_BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/1' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TIMEZONE = 'UTC' CELERY_ENABLE_UTC = True # Task in Django app # myapp/tasks.py from celery import shared_task from django.core.mail import send_mail @shared_task def send_email_task(subject, message, recipient): send_mail(subject, message, 'from@example.com', [recipient]) return f"Email sent to {recipient}" # Use in views from myapp.tasks import send_email_task def my_view(request): send_email_task.delay('Hello', 'Welcome!', 'user@example.com') return HttpResponse('Email queued')
FastAPI Integration
# celery_app.py from celery import Celery celery_app = Celery( 'fastapi_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1' ) @celery_app.task def process_data(data: dict): # Long-running task import time time.sleep(10) return {"processed": data, "status": "complete"} # main.py from fastapi import FastAPI, BackgroundTasks from celery_app import process_data app = FastAPI() @app.post("/process") async def process_endpoint(data: dict): # Option 1: FastAPI BackgroundTasks (simple, in-process) # background_tasks.add_task(process_data, data) # Option 2: Celery (distributed, persistent) task = process_data.delay(data) return {"task_id": task.id, "status": "queued"} @app.get("/status/{task_id}") async def check_status(task_id: str): from celery.result import AsyncResult task = AsyncResult(task_id, app=celery_app) return { "task_id": task_id, "status": task.status, "result": task.result if task.ready() else None }
When to Use Celery vs FastAPI BackgroundTasks:
- FastAPI BackgroundTasks: Simple, fire-and-forget tasks (logging, cleanup)
- Celery: Distributed processing, retries, scheduling, task results
Flask Integration
# celery_app.py from celery import Celery def make_celery(app): celery = Celery( app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'] ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask return celery # app.py from flask import Flask from celery_app import make_celery app = Flask(__name__) app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/1' celery = make_celery(app) @celery.task def send_email(to, subject, body): with app.app_context(): # Use Flask-Mail or similar mail.send(Message(subject, recipients=[to], body=body)) @app.route('/send') def send_route(): send_email.delay('user@example.com', 'Hello', 'Welcome!') return 'Email queued'
Testing Strategies
Eager Mode (Synchronous Execution)
# conftest.py (pytest) import pytest from celery_app import app @pytest.fixture(scope='session') def celery_config(): return { 'broker_url': 'memory://', 'result_backend': 'cache+memory://', 'task_always_eager': True, # Execute tasks synchronously 'task_eager_propagates': True, # Propagate exceptions } # Test tasks def test_add_task(): result = add.delay(4, 6) assert result.get() == 10 def test_task_failure(): with pytest.raises(ValueError): failing_task.delay()
Testing with Real Broker
# conftest.py import pytest from celery_app import app @pytest.fixture(scope='session') def celery_config(): return { 'broker_url': 'redis://localhost:6379/15', # Test database 'result_backend': 'redis://localhost:6379/15', } @pytest.fixture def celery_worker(celery_app): """Start worker for tests""" with celery_app.Worker() as worker: yield worker def test_async_task(celery_worker): result = async_task.delay(data) assert result.get(timeout=10) == expected
Mocking External Dependencies
from unittest.mock import patch, MagicMock @app.task def fetch_and_process(url): response = requests.get(url) return process(response.json()) def test_fetch_and_process(): with patch('requests.get') as mock_get: mock_get.return_value.json.return_value = {'data': 'test'} result = fetch_and_process.delay('http://example.com') assert result.get() == expected_result mock_get.assert_called_once_with('http://example.com')
Testing Periodic Tasks
from celery.schedules import crontab def test_periodic_task_schedule(): from celery_app import app schedule = app.conf.beat_schedule['daily-report'] assert schedule['task'] == 'myapp.tasks.daily_report' assert schedule['schedule'] == crontab(hour=0, minute=0) def test_periodic_task_execution(): # Test task logic directly result = daily_report() assert result['status'] == 'complete'
Integration Testing
import pytest from celery_app import app @pytest.fixture(scope='module') def celery_app(): app.conf.update( broker_url='redis://localhost:6379/15', result_backend='redis://localhost:6379/15', ) return app @pytest.fixture(scope='module') def celery_worker(celery_app): with celery_app.Worker() as worker: yield worker def test_workflow(celery_worker): from celery import chain workflow = chain( fetch_data.s(url), process_data.s(), save_results.s() ) result = workflow.apply_async() output = result.get(timeout=30) assert output['status'] == 'saved'
Production Patterns
Worker Configuration
# Production worker with autoscaling celery -A myapp worker \ --autoscale=10,3 \ --max-tasks-per-child=1000 \ --time-limit=300 \ --soft-time-limit=240 \ --loglevel=info \ --logfile=/var/log/celery/worker.log \ --pidfile=/var/run/celery/worker.pid # Multiple specialized workers celery multi start \ worker1 -A myapp -Q high_priority -c 4 --max-tasks-per-child=100 \ worker2 -A myapp -Q default -c 2 --max-tasks-per-child=1000 \ worker3 -A myapp -Q low_priority -c 1 --autoscale=3,1 # Graceful shutdown celery multi stop worker1 worker2 worker3 celery multi stopwait worker1 worker2 worker3 # Wait for tasks to finish
Configuration Best Practices
# production_config.py import os # Broker settings broker_url = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0') broker_connection_retry_on_startup = True broker_pool_limit = 50 # Result backend result_backend = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1') result_expires = 3600 # 1 hour # Serialization task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'UTC' enable_utc = True # Performance worker_prefetch_multiplier = 4 # Tasks to prefetch per worker worker_max_tasks_per_child = 1000 # Restart worker after N tasks (prevent memory leaks) task_acks_late = True # Acknowledge after task completes task_reject_on_worker_lost = True # Requeue if worker dies # Reliability task_track_started = True # Track when task starts task_time_limit = 300 # 5 minutes hard limit task_soft_time_limit = 240 # 4 minutes soft limit # Logging worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s' worker_task_log_format = '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'
Systemd Service
# /etc/systemd/system/celery.service [Unit] Description=Celery Service After=network.target redis.target [Service] Type=forking User=celery Group=celery WorkingDirectory=/opt/myapp Environment="PATH=/opt/myapp/venv/bin" ExecStart=/opt/myapp/venv/bin/celery multi start worker1 \ -A myapp \ --pidfile=/var/run/celery/%n.pid \ --logfile=/var/log/celery/%n%I.log \ --loglevel=INFO ExecStop=/opt/myapp/venv/bin/celery multi stopwait worker1 \ --pidfile=/var/run/celery/%n.pid ExecReload=/opt/myapp/venv/bin/celery multi restart worker1 \ -A myapp \ --pidfile=/var/run/celery/%n.pid \ --logfile=/var/log/celery/%n%I.log \ --loglevel=INFO Restart=always [Install] WantedBy=multi-user.target # /etc/systemd/system/celerybeat.service [Unit] Description=Celery Beat Service After=network.target redis.target [Service] Type=simple User=celery Group=celery WorkingDirectory=/opt/myapp Environment="PATH=/opt/myapp/venv/bin" ExecStart=/opt/myapp/venv/bin/celery -A myapp beat \ --loglevel=INFO \ --pidfile=/var/run/celery/beat.pid Restart=always [Install] WantedBy=multi-user.target
Sentry Integration
# Install pip install sentry-sdk # Configuration import sentry_sdk from sentry_sdk.integrations.celery import CeleryIntegration sentry_sdk.init( dsn="https://your-sentry-dsn", integrations=[CeleryIntegration()], traces_sample_rate=0.1, # 10% of transactions ) # Tasks are automatically tracked @app.task def my_task(x): # Exceptions automatically sent to Sentry return risky_operation(x)
Rate Limiting
# Global rate limiting app.conf.task_default_rate_limit = '100/m' # 100 tasks per minute # Per-task rate limiting @app.task(rate_limit='10/m') def rate_limited_task(x): return expensive_operation(x) # Dynamic rate limiting app.control.rate_limit('myapp.tasks.slow_task', '5/m') # Token bucket rate limiting @app.task(rate_limit='10/s') def api_call(endpoint): return requests.get(endpoint)
Health Checks
# health.py from celery_app import app def check_celery_health(): """Health check endpoint""" try: # Ping workers i = app.control.inspect() stats = i.stats() if not stats: return {'status': 'unhealthy', 'reason': 'No workers available'} # Check broker connection result = app.control.ping(timeout=1.0) if not result: return {'status': 'unhealthy', 'reason': 'Workers not responding'} return {'status': 'healthy', 'workers': len(stats)} except Exception as e: return {'status': 'unhealthy', 'error': str(e)} # FastAPI health endpoint @app.get("/health/celery") async def celery_health(): return check_celery_health()
Performance Optimization
Task Optimization
# Use ignore_result for fire-and-forget tasks @app.task(ignore_result=True) def send_notification(user_id, message): # Don't need result, save backend overhead notify(user_id, message) # Compression for large payloads @app.task(compression='gzip') def process_large_data(data): return analyze(data) # Serialization choice @app.task(serializer='msgpack') # Faster than JSON def fast_task(data): return process(data)
Worker Tuning
# Worker concurrency worker_concurrency = 4 # CPU-bound: num_cores worker_concurrency = 20 # I/O-bound: higher value # Prefetch multiplier (how many tasks to prefetch) worker_prefetch_multiplier = 4 # Balance: 4x concurrency # Task acknowledgment task_acks_late = True # Acknowledge after completion (reliability) task_acks_late = False # Acknowledge on receipt (performance) # Memory management worker_max_tasks_per_child = 1000 # Restart worker after N tasks worker_max_memory_per_child = 200000 # Restart after 200MB
Database Result Backend Optimization
# Use Redis instead of database for results result_backend = 'redis://localhost:6379/1' # If using database, optimize result_backend = 'db+postgresql://user:pass@localhost/celery' database_engine_options = { 'pool_size': 20, 'pool_recycle': 3600, } # Reduce result expiry time result_expires = 3600 # 1 hour instead of default 24 hours
Task Chunking
from celery import group # Bad: One task per item (overhead) for item in large_list: process_item.delay(item) # Good: Chunk items def chunks(lst, n): for i in range(0, len(lst), n): yield lst[i:i + n] @app.task def process_batch(items): return [process_item(item) for item in items] # Process in batches of 100 job = group(process_batch.s(chunk) for chunk in chunks(large_list, 100)) result = job.apply_async()
Connection Pooling
# Redis connection pool broker_pool_limit = 50 # Max connections to broker redis_max_connections = 50 # Database connection pool from sqlalchemy import create_engine from sqlalchemy.pool import QueuePool engine = create_engine( 'postgresql://user:pass@localhost/db', poolclass=QueuePool, pool_size=20, max_overflow=0, )
Common Use Cases
Email Sending
@app.task(bind=True, max_retries=3) def send_email_task(self, to, subject, body, attachments=None): try: msg = EmailMessage(subject, body, 'from@example.com', [to]) if attachments: for filename, content, mimetype in attachments: msg.attach(filename, content, mimetype) msg.send() return {'status': 'sent', 'to': to} except SMTPException as exc: # Retry with exponential backoff raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries)) # Bulk email with rate limiting @app.task(rate_limit='100/m') def send_bulk_email(recipients, subject, template): for recipient in recipients: send_email_task.delay(recipient, subject, render_template(template, recipient))
Report Generation
@app.task(bind=True, time_limit=600) def generate_report(self, report_type, user_id, start_date, end_date): # Update progress self.update_state(state='PROGRESS', meta={'current': 0, 'total': 100}) # Fetch data data = fetch_report_data(report_type, start_date, end_date) self.update_state(state='PROGRESS', meta={'current': 30, 'total': 100}) # Generate PDF pdf = render_pdf(data) self.update_state(state='PROGRESS', meta={'current': 70, 'total': 100}) # Upload to S3 url = upload_to_s3(pdf, f'reports/{user_id}/{report_type}.pdf') self.update_state(state='PROGRESS', meta={'current': 90, 'total': 100}) # Send notification send_email_task.delay( get_user_email(user_id), 'Report Ready', f'Your report is ready: {url}' ) return {'status': 'complete', 'url': url} # Check progress from celery.result import AsyncResult task = AsyncResult(task_id) if task.state == 'PROGRESS': print(task.info) # {'current': 30, 'total': 100}
Data Processing Pipeline
from celery import chain, group @app.task def fetch_data(source): return download(source) @app.task def clean_data(raw_data): return clean(raw_data) @app.task def transform_data(clean_data): return transform(clean_data) @app.task def load_data(transformed_data): save_to_database(transformed_data) return {'status': 'loaded', 'rows': len(transformed_data)} # ETL pipeline etl_pipeline = chain( fetch_data.s('https://api.example.com/data'), clean_data.s(), transform_data.s(), load_data.s() ) result = etl_pipeline.apply_async()
Webhook Processing
@app.task(bind=True, autoretry_for=(RequestException,), max_retries=5) def process_webhook(self, webhook_data): # Validate signature if not verify_signature(webhook_data): raise ValueError("Invalid signature") # Process event event_type = webhook_data['type'] if event_type == 'payment.success': update_order_status(webhook_data['order_id'], 'paid') send_confirmation_email.delay(webhook_data['customer_email']) elif event_type == 'payment.failed': notify_admin.delay('Payment Failed', webhook_data) return {'status': 'processed', 'event': event_type} # FastAPI webhook endpoint @app.post("/webhooks/stripe") async def stripe_webhook(request: Request): data = await request.json() process_webhook.delay(data) return {"status": "queued"}
Image Processing
from celery import group, chord @app.task def resize_image(image_path, size): from PIL import Image img = Image.open(image_path) img.thumbnail(size) output_path = f"{image_path}_{size[0]}x{size[1]}.jpg" img.save(output_path) return output_path @app.task def upload_to_cdn(image_paths): urls = [] for path in image_paths: url = cdn_upload(path) urls.append(url) return urls # Generate multiple sizes and upload def process_uploaded_image(image_path): sizes = [(800, 600), (400, 300), (200, 150), (100, 100)] workflow = chord([ resize_image.s(image_path, size) for size in sizes ])(upload_to_cdn.s()) return workflow.apply_async()
Alternatives Comparison
Celery vs RQ (Redis Queue)
RQ: Simpler Redis-only task queue
When to use RQ:
- Simple use case (no routing, basic retries)
- Redis-only infrastructure
- Python 3 only
- Smaller scale (<1000 tasks/min)
When to use Celery:
- Complex workflows (chains, chords)
- Multiple broker options
- Advanced routing and priorities
- Large scale (>1000 tasks/min)
- Periodic tasks
# RQ Example from redis import Redis from rq import Queue redis_conn = Redis() q = Queue(connection=redis_conn) job = q.enqueue(my_function, arg1, arg2) result = job.result
Celery vs Huey
Huey: Lightweight task queue with minimal dependencies
When to use Huey:
- Small to medium projects
- Minimal configuration
- Redis or in-memory only
- Simple periodic tasks
When to use Celery:
- Enterprise-scale applications
- Complex task dependencies
- Multiple broker/backend options
- Advanced monitoring needs
# Huey Example from huey import RedisHuey huey = RedisHuey('myapp') @huey.task() def add(a, b): return a + b result = add(1, 2)
Celery vs Dramatiq
Dramatiq: Modern alternative focusing on reliability
When to use Dramatiq:
- Reliability over features
- Simpler API
- Better type hints
- RabbitMQ or Redis
When to use Celery:
- Mature ecosystem
- More broker options
- Canvas workflows
- Larger community
# Dramatiq Example import dramatiq @dramatiq.actor def add(x, y): return x + y add.send(1, 2)
Celery vs Cloud Services
AWS Lambda, Google Cloud Functions, Azure Functions
When to use Cloud Functions:
- Serverless infrastructure
- Event-driven workflows
- Pay-per-execution model
- Auto-scaling
When to use Celery:
- Self-hosted infrastructure
- Complex task workflows
- Cost predictability
- Full control over execution
Best Practices
Task Design
-
Idempotency: Tasks should be safe to run multiple times
@app.task def process_order(order_id): order = Order.objects.get(id=order_id) if order.status == 'processed': return # Already processed, skip order.process() order.status = 'processed' order.save() -
Small, Focused Tasks: One responsibility per task
# Bad: Monolithic task @app.task def process_user(user_id): send_welcome_email(user_id) create_profile(user_id) setup_notifications(user_id) # Good: Separate tasks @app.task def send_welcome_email(user_id): ... @app.task def create_profile(user_id): ... workflow = group([ send_welcome_email.s(user_id), create_profile.s(user_id), setup_notifications.s(user_id) ]) -
Avoid Database Objects in Arguments: Use IDs instead
# Bad @app.task def process_user(user): # User object ... # Good @app.task def process_user(user_id): user = User.objects.get(id=user_id) ... -
Set Time Limits: Prevent runaway tasks
@app.task(time_limit=300, soft_time_limit=240) def bounded_task(): ...
Error Handling
- Categorize Exceptions: Different handling for different errors
- Use Exponential Backoff: Avoid overwhelming failing services
- Set Max Retries: Don't retry forever
- Log Failures: Always log why tasks fail
Performance
- Use
: For tasks that don't need resultsignore_result=True - Batch Operations: Process multiple items per task
- Optimize Serialization: Use msgpack for speed
- Connection Pooling: Reuse database/broker connections
- Task Chunking: Avoid creating millions of tiny tasks
Monitoring
- Enable Events: Track task lifecycle
- Use Flower: Web-based monitoring
- Health Checks: Monitor worker availability
- Sentry Integration: Track errors
Security
- Validate Input: Always validate task arguments
- Secure Broker: Use authentication and encryption
- Limit Task Execution Time: Prevent resource exhaustion
- Rate Limiting: Protect against task flooding
Troubleshooting
Tasks Not Executing
Symptoms: Tasks queued but not processing
Diagnosis:
# Check if workers are running celery -A myapp inspect active # Check worker stats celery -A myapp inspect stats # Check registered tasks celery -A myapp inspect registered
Solutions:
- Start workers:
celery -A myapp worker - Check worker is consuming correct queues
- Verify task routing configuration
- Check broker connectivity
Tasks Failing Silently
Symptoms: Tasks show SUCCESS but don't work
Diagnosis:
# Enable task tracking app.conf.task_track_started = True # Check task result and traceback result = task.delay() if result.failed(): print(result.traceback)
Solutions:
- Check logs:
celery -A myapp worker --loglevel=debug - Enable eager mode in tests to see exceptions
- Use
in teststask_eager_propagates = True
Memory Leaks
Symptoms: Worker memory grows over time
Solutions:
# Restart workers after N tasks worker_max_tasks_per_child = 1000 # Restart on memory limit worker_max_memory_per_child = 200000 # 200MB
Slow Task Execution
Symptoms: Tasks taking longer than expected
Diagnosis:
# Add timing import time @app.task(bind=True) def timed_task(self): start = time.time() result = slow_operation() duration = time.time() - start logger.info(f"Task {self.request.id} took {duration}s") return result
Solutions:
- Increase worker concurrency
- Optimize task code
- Use task chunking
- Add more workers
Broker Connection Issues
Symptoms: Tasks not reaching workers
Diagnosis:
# Test broker connection python -c "from celery_app import app; print(app.connection().connect())"
Solutions:
- Check broker is running:
orredis-cli pingrabbitmqctl status - Verify broker URL in configuration
- Check network connectivity
- Enable connection retry:
broker_connection_retry_on_startup = True
Task Results Not Persisting
Symptoms:
result.get() returns None
Solutions:
- Verify result backend configured
- Check task doesn't have
ignore_result=True - Verify result hasn't expired (
)result_expires - Test backend connection
Beat Not Scheduling Tasks
Symptoms: Periodic tasks not running
Diagnosis:
# Check beat is running ps aux | grep celery | grep beat # Check beat schedule celery -A myapp inspect scheduled
Solutions:
- Ensure beat process is running
- Verify
configurationbeat_schedule - Check beat log for errors
- Use database scheduler for dynamic schedules
Worker Crashes
Symptoms: Workers die unexpectedly
Solutions:
- Check logs for errors
- Set
to prevent memory leaksworker_max_tasks_per_child - Add task time limits
- Use systemd for automatic restart
- Monitor with Flower
Task Queue Buildup
Symptoms: Tasks accumulating in queue
Solutions:
- Add more workers
- Increase worker concurrency
- Optimize slow tasks
- Add task routing to distribute load
- Check for blocked workers
Advanced Configuration
Custom Task Classes
from celery import Task class DatabaseTask(Task): """Task that manages database connections""" _db = None @property def db(self): if self._db is None: self._db = create_db_connection() return self._db def after_return(self, status, retval, task_id, args, kwargs, einfo): """Close connection after task""" if self._db is not None: self._db.close() @app.task(base=DatabaseTask) def db_task(query): return db_task.db.execute(query)
Custom Serializers
from kombu.serialization import register def my_encoder(obj): # Custom encoding logic return json.dumps(obj) def my_decoder(data): # Custom decoding logic return json.loads(data) register('myjson', my_encoder, my_decoder, content_type='application/x-myjson', content_encoding='utf-8') app.conf.task_serializer = 'myjson'
Task Inheritance
class BaseTask(Task): def on_failure(self, exc, task_id, args, kwargs, einfo): send_alert(f"Task {self.name} failed", str(exc)) def on_retry(self, exc, task_id, args, kwargs, einfo): logger.warning(f"Task {self.name} retrying") @app.task(base=BaseTask) def monitored_task(): return perform_work()
End of Celery Skill Documentation
For more information:
- Official Documentation: https://docs.celeryq.dev/
- GitHub: https://github.com/celery/celery
- Community: https://groups.google.com/forum/#!forum/celery-users
Related Skills
When using Celery, these skills enhance your workflow:
- django: Django + Celery integration for background tasks
- fastapi-local-dev: FastAPI + Celery patterns for async API operations
- test-driven-development: Testing async tasks and task chains
- systematic-debugging: Debugging distributed task failures and race conditions
[Full documentation available in these skills if deployed in your bundle]