Catalog
affaan-m/django-celery

affaan-m

django-celery

Django + Celery async task patterns — configuration, task design, beat scheduling, retries, canvas workflows, monitoring, and testing. Use when adding background jobs, scheduled tasks, or async processing to a Django app.

global
0installs0uses~3.3k
v1.0Saved May 15, 2026

Django + Celery Async Task Patterns

Production-grade patterns for background task processing in Django using Celery with Redis or RabbitMQ.

When to Activate

  • Adding background jobs or async processing to a Django app
  • Implementing periodic/scheduled tasks
  • Offloading slow operations (email, PDF generation, API calls) from request cycle
  • Setting up Celery Beat for cron-like scheduling
  • Debugging task failures, retries, or queue backlogs
  • Writing tests for Celery tasks

Project Setup

Installation

pip install celery[redis] django-celery-results django-celery-beat

celery.py — App Entrypoint

# config/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.development')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()  # Discovers tasks.py in each INSTALLED_APP

@app.task(bind=True, ignore_result=True)
def debug_task(self):
    print(f'Request: {self.request!r}')
# config/__init__.py
from .celery import app as celery_app

__all__ = ('celery_app',)

Django Settings

# config/settings/base.py

# Broker (Redis recommended for production)
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND', default='django-db')

# Serialization
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# Task behavior
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60        # Hard limit: 30 min
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60   # Soft limit: sends SoftTimeLimitExceeded
CELERY_WORKER_PREFETCH_MULTIPLIER = 1   # Prevent worker hoarding long tasks
CELERY_TASK_ACKS_LATE = True            # Re-queue on worker crash

# Result persistence
CELERY_RESULT_EXPIRES = 60 * 60 * 24   # Keep results 24 hours

# Beat scheduler (for periodic tasks)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

# Installed apps
INSTALLED_APPS += [
    'django_celery_results',
    'django_celery_beat',
]

Running Workers

# Start worker (development)
celery -A config worker --loglevel=info

# Start beat scheduler (periodic tasks)
celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler

# Combined worker + beat (dev only, never production)
celery -A config worker --beat --loglevel=info

# Production: multiple workers with concurrency
celery -A config worker --loglevel=warning --concurrency=4 -Q default,high_priority

Task Design Patterns

Basic Task

# apps/notifications/tasks.py
from celery import shared_task
import logging

logger = logging.getLogger(__name__)

@shared_task(name='notifications.send_welcome_email')
def send_welcome_email(user_id: int) -> None:
    """Send welcome email to newly registered user."""
    from apps.users.models import User
    from apps.notifications.services import EmailService

    try:
        user = User.objects.get(pk=user_id)
    except User.DoesNotExist:
        logger.warning('send_welcome_email: user %s not found', user_id)
        return  # Idempotent — do not raise, task already impossible to complete

    EmailService.send_welcome(user)
    logger.info('Welcome email sent to user %s', user_id)

Retryable Task

@shared_task(
    bind=True,
    name='integrations.sync_to_crm',
    max_retries=5,
    default_retry_delay=60,       # seconds before first retry
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,           # exponential backoff
    retry_backoff_max=600,        # cap at 10 minutes
    retry_jitter=True,            # randomise to avoid thundering herd
)
def sync_contact_to_crm(self, contact_id: int) -> dict:
    """Sync contact to external CRM with retry on transient failures."""
    from apps.crm.services import CRMClient

    try:
        result = CRMClient().sync(contact_id)
        return result
    except CRMClient.RateLimitError as exc:
        # Specific retry delay from response header
        raise self.retry(exc=exc, countdown=int(exc.retry_after))

Idempotent Task Pattern

Design tasks so they can safely run multiple times with the same inputs:

@shared_task(name='orders.mark_shipped')
def mark_order_shipped(order_id: int, tracking_number: str) -> None:
    """Mark order as shipped — safe to run multiple times."""
    from apps.orders.models import Order

    updated = Order.objects.filter(
        pk=order_id,
        status=Order.Status.PROCESSING,    # Guard: only update if not already shipped
    ).update(
        status=Order.Status.SHIPPED,
        tracking_number=tracking_number,
    )

    if not updated:
        logger.info('mark_order_shipped: order %s already shipped or not found', order_id)

Task with Soft Time Limit

from celery.exceptions import SoftTimeLimitExceeded

@shared_task(
    bind=True,
    name='reports.generate_pdf',
    soft_time_limit=120,
    time_limit=150,
)
def generate_pdf_report(self, report_id: int) -> str:
    """Generate PDF report with graceful timeout handling."""
    from apps.reports.services import PDFGenerator

    try:
        path = PDFGenerator.build(report_id)
        return path
    except SoftTimeLimitExceeded:
        # Clean up partial files before hard kill
        PDFGenerator.cleanup(report_id)
        raise

Calling Tasks

from datetime import timedelta
from django.utils import timezone

# Fire and forget (async)
send_welcome_email.delay(user.pk)

# Schedule in the future
send_reminder.apply_async(args=[user.pk], countdown=3600)  # 1 hour from now
send_reminder.apply_async(args=[user.pk], eta=timezone.now() + timedelta(days=1))

# Apply with queue routing
sync_contact_to_crm.apply_async(args=[contact.pk], queue='high_priority')

# Run synchronously (tests / debugging only)
result = generate_pdf_report.apply(args=[report.pk])

Beat Scheduling (Periodic Tasks)

Code-Defined Schedule

# config/settings/base.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'cleanup-expired-sessions': {
        'task': 'users.cleanup_expired_sessions',
        'schedule': crontab(hour=2, minute=0),   # 2am daily
    },
    'sync-inventory': {
        'task': 'products.sync_inventory',
        'schedule': 60.0,                         # every 60 seconds
    },
    'weekly-digest': {
        'task': 'notifications.send_weekly_digest',
        'schedule': crontab(day_of_week='monday', hour=8, minute=0),
    },
}

Database-Defined Schedule (via django-celery-beat)

# Manage periodic tasks from Django admin or code
from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json

schedule, _ = CrontabSchedule.objects.get_or_create(
    hour='*/6', minute='0',
    timezone='UTC',
)

PeriodicTask.objects.update_or_create(
    name='Sync inventory every 6 hours',
    defaults={
        'crontab': schedule,
        'task': 'products.sync_inventory',
        'args': json.dumps([]),
        'enabled': True,
    }
)

Canvas: Chaining and Grouping Tasks

from celery import chain, group, chord

# Chain: run tasks sequentially, passing results
pipeline = chain(
    fetch_data.s(source_id),
    transform_data.s(),          # receives fetch_data result as first arg
    load_to_warehouse.s(),
)
pipeline.delay()

# Group: run tasks in parallel
parallel = group(
    send_welcome_email.s(user_id)
    for user_id in new_user_ids
)
parallel.delay()

# Chord: parallel tasks + callback when all complete
result = chord(
    group(process_chunk.s(chunk) for chunk in data_chunks),
    aggregate_results.s(),       # called with list of chunk results
)
result.delay()

Error Handling and Dead Letter Queue

# apps/core/tasks.py
from celery.signals import task_failure

@task_failure.connect
def on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **kw):
    """Log all task failures to Sentry / alerting."""
    import sentry_sdk
    with sentry_sdk.new_scope() as scope:
        scope.set_context('celery', {
            'task': sender.name,
            'task_id': task_id,
            'args': args,
            'kwargs': kwargs,
        })
        sentry_sdk.capture_exception(exception)
# Route failed tasks to dead-letter queue after max retries
@shared_task(
    bind=True,
    max_retries=3,
    name='payments.charge_card',
)
def charge_card(self, order_id: int) -> None:
    from apps.payments.models import Order, FailedCharge

    try:
        _do_charge(order_id)
    except Exception as exc:
        if self.request.retries >= self.max_retries:
            # Persist to dead-letter table for manual review
            FailedCharge.objects.create(
                order_id=order_id,
                error=str(exc),
                task_id=self.request.id,
            )
            return  # Don't raise — task is permanently failed
        raise self.retry(exc=exc)

Testing Celery Tasks

Unit Testing (No Broker)

# tests/test_tasks.py
import pytest
from unittest.mock import patch, MagicMock
from apps.notifications.tasks import send_welcome_email

class TestSendWelcomeEmail:

    @pytest.mark.django_db
    def test_sends_email_to_existing_user(self, user):
        with patch('apps.notifications.services.EmailService') as mock_email:
            send_welcome_email(user.pk)
            mock_email.send_welcome.assert_called_once_with(user)

    @pytest.mark.django_db
    def test_skips_missing_user_gracefully(self):
        """Should not raise when user is deleted between enqueue and execute."""
        send_welcome_email(99999)  # Non-existent user — must not raise

Integration Testing with CELERY_TASK_ALWAYS_EAGER

# config/settings/test.py
CELERY_TASK_ALWAYS_EAGER = True      # Run tasks synchronously in tests
CELERY_TASK_EAGER_PROPAGATES = True  # Re-raise exceptions from tasks

# tests/test_integration.py
@pytest.mark.django_db
def test_registration_triggers_welcome_email(client):
    with patch('apps.notifications.services.EmailService') as mock_email:
        response = client.post('/api/users/', {
            'email': 'new@example.com',
            'password': 'strongpass123',
        })

    assert response.status_code == 201
    mock_email.send_welcome.assert_called_once()

Testing Retries

@pytest.mark.django_db
def test_task_retries_on_connection_error():
    with patch('apps.crm.services.CRMClient.sync') as mock_sync:
        mock_sync.side_effect = ConnectionError('timeout')

        with pytest.raises(ConnectionError):
            sync_contact_to_crm.apply(args=[1], throw=True)

        assert mock_sync.call_count == 1  # First attempt only when eager

Monitoring

# Inspect active workers and queues
celery -A config inspect active
celery -A config inspect stats
celery -A config inspect reserved

# Check queue lengths (Redis)
redis-cli llen celery

# Flower: web-based real-time monitor
pip install flower
celery -A config flower --port=5555

Anti-Patterns

# BAD: Passing model instances — they may be stale by execution time
send_welcome_email.delay(user)        # Never pass ORM objects
send_welcome_email.delay(user.pk)     # Always pass PKs

# BAD: Calling tasks synchronously in production views
result = generate_report.apply()      # Blocks the request thread

# BAD: Non-idempotent task without guards
@shared_task
def charge_and_fulfill(order_id):
    order.charge()     # May charge twice if task retries!
    order.fulfill()

# GOOD: Idempotent with status guard
@shared_task
def charge_and_fulfill(order_id):
    order = Order.objects.select_for_update().get(pk=order_id)
    if order.status != Order.Status.PENDING:
        return  # Already processed
    order.charge()
    order.fulfill()

Production Checklist

Check Setting
Worker restarts on crash supervisord or systemd unit
CELERY_TASK_ACKS_LATE = True Re-queue tasks on worker crash
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 Fair distribution of long tasks
Separate queues per priority -Q default,high_priority,low_priority
CELERY_TASK_SOFT_TIME_LIMIT set Graceful timeout before hard kill
Sentry integration Capture all task_failure signals
Flower or other monitor Visibility into queue depths
Beat runs on single node only Prevents duplicate scheduled task execution
  • django-patterns — ORM, service layer, and project structure
  • django-tdd — Testing Django models, views, and services
  • python-testing — pytest configuration and fixtures
Files1
1 files · 1.0 KB

Select a file to preview

Overall Score

87/100

Grade

A

Excellent

Safety

88

Quality

89

Clarity

84

Completeness

85

Summary

A comprehensive guide to implementing asynchronous task processing in Django using Celery with Redis or RabbitMQ. Covers configuration, task design patterns (basic, retryable, idempotent), Beat scheduling, Canvas workflows, error handling, testing strategies, monitoring, and production best practices with anti-patterns and a deployment checklist.

Detected Capabilities

task orchestration (chain, group, chord)configuration managementerror handling and signal handlingtesting patterns (mocking, eager mode)monitoring and introspectiondatabase integration (django-celery-beat)

Trigger Keywords

Phrases that MCP clients use to match this skill to user intent.

background job processingcelery task designasync djangoscheduled tasksretry logictask monitoring

Use Cases

  • Adding background jobs to handle long-running operations (email, PDF generation, API calls) without blocking request cycle
  • Implementing periodic scheduled tasks using Celery Beat for cron-like scheduling
  • Designing retry logic with exponential backoff for transient failures in external integrations
  • Setting up task chaining, grouping, and chords for complex multi-step workflows
  • Testing Celery tasks with eager mode and mocking external services
  • Monitoring task queues and worker health in production using Flower or inspection commands

Quality Notes

  • Excellent use of real-world code examples with detailed docstrings and type hints
  • Clear separation of anti-patterns from recommended approaches with side-by-side comparisons
  • Production checklist provides actionable deployment guardrails and monitoring setup
  • Comprehensive coverage of error handling, retries, and task idempotency design principles
  • Good explanation of why certain patterns (e.g., passing PKs instead of ORM objects) matter in async context
  • Clear warnings on dangerous practices (synchronous execution in views, non-idempotent tasks, worker hoarding)
  • Settings examples show both development and production-grade configurations with sensible defaults
  • Testing section covers multiple levels: unit tests with mocking, integration tests with eager mode, and retry testing
Model: claude-haiku-4-5-20251001Analyzed: May 15, 2026

Reviews

Add this skill to your library to leave a review.

No reviews yet

Be the first to share your experience.

Add affaan-m/django-celery to your library

Command Palette

Search for a command to run...