Testing Celery Tasks: Unit Tests, Integration Tests, and Task Chaining

Testing Celery Tasks: Unit Tests, Integration Tests, and Task Chaining

Celery is the standard distributed task queue for Python. Testing Celery tasks requires deciding whether to run them synchronously (unit tests) or asynchronously against a real broker (integration tests).

Task Structure for Testability

Separate task logic from the Celery decorator:

# tasks/email_tasks.py
from celery import shared_task
from .handlers import send_welcome_email_handler

@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_id: int):
    """Celery task wrapper — delegates to handler."""
    try:
        return send_welcome_email_handler(user_id)
    except Exception as exc:
        raise self.retry(exc=exc)

# handlers/email_handlers.py — pure business logic
def send_welcome_email_handler(user_id: int) -> dict:
    from .models import User
    from .services import email_service
    
    user = User.objects.get(pk=user_id)
    email_service.send_welcome(user.email, user.name)
    user.welcome_email_sent = True
    user.save()
    return {'email_sent': True, 'user_id': user_id}

Unit Testing with ALWAYS_EAGER

Set CELERY_TASK_ALWAYS_EAGER = True to execute tasks synchronously:

# conftest.py
import pytest
from django.test import override_settings

@pytest.fixture
def eager_celery():
    with override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True):
        yield
# tests/test_email_tasks.py
import pytest
from unittest.mock import patch, MagicMock
from tasks.email_tasks import send_welcome_email

def test_send_welcome_email_handler_success():
    """Test handler logic directly — fastest approach."""
    with patch('handlers.email_handlers.User') as MockUser, \
         patch('handlers.email_handlers.email_service') as mock_email:
        
        user = MagicMock(email='alice@example.com', name='Alice')
        MockUser.objects.get.return_value = user
        
        from handlers.email_handlers import send_welcome_email_handler
        result = send_welcome_email_handler(user_id=1)
        
        assert result == {'email_sent': True, 'user_id': 1}
        mock_email.send_welcome.assert_called_once_with('alice@example.com', 'Alice')
        assert user.welcome_email_sent is True
        user.save.assert_called_once()

@pytest.mark.django_db
def test_send_welcome_email_task_eager(eager_celery):
    """Test task via .delay() in eager mode."""
    with patch('tasks.email_tasks.send_welcome_email_handler') as mock_handler:
        mock_handler.return_value = {'email_sent': True, 'user_id': 1}
        
        result = send_welcome_email.delay(user_id=1)
        
        assert result.successful()
        assert result.get() == {'email_sent': True, 'user_id': 1}
        mock_handler.assert_called_once_with(1)

pytest-celery for Integration Tests

pip install pytest-celery
# conftest.py
from pytest_celery import (
    CELERY_RABBITMQ_BROKER,
    CeleryTestSetup,
    CeleryWorkerContainer,
    RabbitMQContainer,
    celery_worker,
    celery_broker,
)
import pytest

@pytest.fixture
def celery_app():
    from myapp.celery import app
    return app
# tests/test_integration.py
import pytest
from tasks.email_tasks import send_welcome_email

@pytest.mark.integration
def test_task_executes_on_worker(celery_worker):
    with patch('tasks.email_tasks.send_welcome_email_handler') as mock_handler:
        mock_handler.return_value = {'email_sent': True, 'user_id': 42}
        
        result = send_welcome_email.apply_async((42,))
        output = result.get(timeout=10)
        
        assert output == {'email_sent': True, 'user_id': 42}

Testing Task Retries

def test_task_retries_on_exception():
    with patch('handlers.email_handlers.send_welcome_email_handler') as mock_handler:
        mock_handler.side_effect = ConnectionError('SMTP unavailable')
        
        task = send_welcome_email.s(user_id=1)
        
        # In eager mode with propagate, retry raises the original exception
        with pytest.raises(ConnectionError):
            task.apply(throw=True)
        
        # Handler should have been called
        assert mock_handler.call_count >= 1

def test_task_succeeds_after_retry():
    """Simulate transient failure then success."""
    call_count = 0
    
    def flaky_handler(user_id):
        nonlocal call_count
        call_count += 1
        if call_count < 3:
            raise ConnectionError('transient')
        return {'email_sent': True, 'user_id': user_id}
    
    with patch('handlers.email_handlers.send_welcome_email_handler', side_effect=flaky_handler):
        with pytest.raises(ConnectionError):
            # With eager mode, max_retries exhausted raises
            send_welcome_email.apply(args=(1,), throw=True)
    
    assert call_count == 4  # initial + 3 retries

Testing Chains and Chords

# tasks/pipeline.py
from celery import chain, chord, group
from .image_tasks import resize_image, generate_thumbnail
from .notification_tasks import notify_upload_complete

def process_uploaded_image(image_id: int):
    """Chain: resize → thumbnail → notify."""
    return chain(
        resize_image.s(image_id),
        generate_thumbnail.s(),
        notify_upload_complete.s(image_id),
    )()

# tests/test_pipeline.py
def test_image_processing_chain(eager_celery):
    with patch('tasks.image_tasks.resize_image_handler') as mock_resize, \
         patch('tasks.image_tasks.generate_thumbnail_handler') as mock_thumb, \
         patch('tasks.notification_tasks.notify_handler') as mock_notify:
        
        mock_resize.return_value = {'resized_path': '/images/resized/1.jpg'}
        mock_thumb.return_value = {'thumb_path': '/images/thumbs/1.jpg'}
        mock_notify.return_value = {'notified': True}
        
        result = process_uploaded_image(image_id=1)
        
        assert result.successful()
        mock_resize.assert_called_once_with(1)
        mock_thumb.assert_called_once()
        mock_notify.assert_called_once()

Testing Periodic Tasks (Beat)

# tasks/scheduled_tasks.py
from celery.schedules import crontab
from celery import shared_task

@shared_task
def cleanup_expired_sessions():
    from django.contrib.sessions.backends.db import SessionStore
    SessionStore.clear_expired()
    return {'cleaned': True}

# tests/test_scheduled.py
def test_cleanup_task_removes_expired_sessions():
    with patch('django.contrib.sessions.backends.db.SessionStore') as MockStore:
        from tasks.scheduled_tasks import cleanup_expired_sessions
        
        result = cleanup_expired_sessions()
        
        assert result == {'cleaned': True}
        MockStore.clear_expired.assert_called_once()

def test_cleanup_task_in_beat_schedule():
    from myapp.celery import app
    
    schedule = app.conf.beat_schedule
    assert 'cleanup-sessions' in schedule
    
    task_config = schedule['cleanup-sessions']
    assert task_config['task'] == 'tasks.scheduled_tasks.cleanup_expired_sessions'
    # Runs daily at midnight
    assert task_config['schedule'] == crontab(hour=0, minute=0)

pytest Fixtures

# conftest.py
import pytest

@pytest.fixture(autouse=True)
def celery_eager(settings):
    """Make all Celery tasks execute synchronously."""
    settings.CELERY_TASK_ALWAYS_EAGER = True
    settings.CELERY_TASK_EAGER_PROPAGATES = True
    yield

@pytest.fixture
def celery_async(settings):
    """Opt out of eager mode for integration tests."""
    settings.CELERY_TASK_ALWAYS_EAGER = False
    yield

Summary

Three testing strategies for Celery:

  1. Test handler functions directly — fastest, no Celery involved at all
  2. ALWAYS_EAGER mode — test .delay() calls synchronously, no broker needed
  3. Real broker — full async testing with pytest-celery, use for integration tests

Cover handler logic with unit tests (approach 1). Verify task registration and chaining with eager tests (approach 2). Reserve real broker tests for CI and critical workflows only.

Read more