Testing Celery Tasks: Unit Tests, Integration Tests, and Task Chaining
Celery is the standard distributed task queue for Python. Testing Celery tasks requires deciding whether to run them synchronously (unit tests) or asynchronously against a real broker (integration tests).
Task Structure for Testability
Separate task logic from the Celery decorator:
# tasks/email_tasks.py
from celery import shared_task
from .handlers import send_welcome_email_handler
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_id: int):
"""Celery task wrapper — delegates to handler."""
try:
return send_welcome_email_handler(user_id)
except Exception as exc:
raise self.retry(exc=exc)
# handlers/email_handlers.py — pure business logic
def send_welcome_email_handler(user_id: int) -> dict:
from .models import User
from .services import email_service
user = User.objects.get(pk=user_id)
email_service.send_welcome(user.email, user.name)
user.welcome_email_sent = True
user.save()
return {'email_sent': True, 'user_id': user_id}Unit Testing with ALWAYS_EAGER
Set CELERY_TASK_ALWAYS_EAGER = True to execute tasks synchronously:
# conftest.py
import pytest
from django.test import override_settings
@pytest.fixture
def eager_celery():
with override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True):
yield# tests/test_email_tasks.py
import pytest
from unittest.mock import patch, MagicMock
from tasks.email_tasks import send_welcome_email
def test_send_welcome_email_handler_success():
"""Test handler logic directly — fastest approach."""
with patch('handlers.email_handlers.User') as MockUser, \
patch('handlers.email_handlers.email_service') as mock_email:
user = MagicMock(email='alice@example.com', name='Alice')
MockUser.objects.get.return_value = user
from handlers.email_handlers import send_welcome_email_handler
result = send_welcome_email_handler(user_id=1)
assert result == {'email_sent': True, 'user_id': 1}
mock_email.send_welcome.assert_called_once_with('alice@example.com', 'Alice')
assert user.welcome_email_sent is True
user.save.assert_called_once()
@pytest.mark.django_db
def test_send_welcome_email_task_eager(eager_celery):
"""Test task via .delay() in eager mode."""
with patch('tasks.email_tasks.send_welcome_email_handler') as mock_handler:
mock_handler.return_value = {'email_sent': True, 'user_id': 1}
result = send_welcome_email.delay(user_id=1)
assert result.successful()
assert result.get() == {'email_sent': True, 'user_id': 1}
mock_handler.assert_called_once_with(1)pytest-celery for Integration Tests
pip install pytest-celery# conftest.py
from pytest_celery import (
CELERY_RABBITMQ_BROKER,
CeleryTestSetup,
CeleryWorkerContainer,
RabbitMQContainer,
celery_worker,
celery_broker,
)
import pytest
@pytest.fixture
def celery_app():
from myapp.celery import app
return app# tests/test_integration.py
import pytest
from tasks.email_tasks import send_welcome_email
@pytest.mark.integration
def test_task_executes_on_worker(celery_worker):
with patch('tasks.email_tasks.send_welcome_email_handler') as mock_handler:
mock_handler.return_value = {'email_sent': True, 'user_id': 42}
result = send_welcome_email.apply_async((42,))
output = result.get(timeout=10)
assert output == {'email_sent': True, 'user_id': 42}Testing Task Retries
def test_task_retries_on_exception():
with patch('handlers.email_handlers.send_welcome_email_handler') as mock_handler:
mock_handler.side_effect = ConnectionError('SMTP unavailable')
task = send_welcome_email.s(user_id=1)
# In eager mode with propagate, retry raises the original exception
with pytest.raises(ConnectionError):
task.apply(throw=True)
# Handler should have been called
assert mock_handler.call_count >= 1
def test_task_succeeds_after_retry():
"""Simulate transient failure then success."""
call_count = 0
def flaky_handler(user_id):
nonlocal call_count
call_count += 1
if call_count < 3:
raise ConnectionError('transient')
return {'email_sent': True, 'user_id': user_id}
with patch('handlers.email_handlers.send_welcome_email_handler', side_effect=flaky_handler):
with pytest.raises(ConnectionError):
# With eager mode, max_retries exhausted raises
send_welcome_email.apply(args=(1,), throw=True)
assert call_count == 4 # initial + 3 retriesTesting Chains and Chords
# tasks/pipeline.py
from celery import chain, chord, group
from .image_tasks import resize_image, generate_thumbnail
from .notification_tasks import notify_upload_complete
def process_uploaded_image(image_id: int):
"""Chain: resize → thumbnail → notify."""
return chain(
resize_image.s(image_id),
generate_thumbnail.s(),
notify_upload_complete.s(image_id),
)()
# tests/test_pipeline.py
def test_image_processing_chain(eager_celery):
with patch('tasks.image_tasks.resize_image_handler') as mock_resize, \
patch('tasks.image_tasks.generate_thumbnail_handler') as mock_thumb, \
patch('tasks.notification_tasks.notify_handler') as mock_notify:
mock_resize.return_value = {'resized_path': '/images/resized/1.jpg'}
mock_thumb.return_value = {'thumb_path': '/images/thumbs/1.jpg'}
mock_notify.return_value = {'notified': True}
result = process_uploaded_image(image_id=1)
assert result.successful()
mock_resize.assert_called_once_with(1)
mock_thumb.assert_called_once()
mock_notify.assert_called_once()Testing Periodic Tasks (Beat)
# tasks/scheduled_tasks.py
from celery.schedules import crontab
from celery import shared_task
@shared_task
def cleanup_expired_sessions():
from django.contrib.sessions.backends.db import SessionStore
SessionStore.clear_expired()
return {'cleaned': True}
# tests/test_scheduled.py
def test_cleanup_task_removes_expired_sessions():
with patch('django.contrib.sessions.backends.db.SessionStore') as MockStore:
from tasks.scheduled_tasks import cleanup_expired_sessions
result = cleanup_expired_sessions()
assert result == {'cleaned': True}
MockStore.clear_expired.assert_called_once()
def test_cleanup_task_in_beat_schedule():
from myapp.celery import app
schedule = app.conf.beat_schedule
assert 'cleanup-sessions' in schedule
task_config = schedule['cleanup-sessions']
assert task_config['task'] == 'tasks.scheduled_tasks.cleanup_expired_sessions'
# Runs daily at midnight
assert task_config['schedule'] == crontab(hour=0, minute=0)pytest Fixtures
# conftest.py
import pytest
@pytest.fixture(autouse=True)
def celery_eager(settings):
"""Make all Celery tasks execute synchronously."""
settings.CELERY_TASK_ALWAYS_EAGER = True
settings.CELERY_TASK_EAGER_PROPAGATES = True
yield
@pytest.fixture
def celery_async(settings):
"""Opt out of eager mode for integration tests."""
settings.CELERY_TASK_ALWAYS_EAGER = False
yieldSummary
Three testing strategies for Celery:
- Test handler functions directly — fastest, no Celery involved at all
- ALWAYS_EAGER mode — test
.delay()calls synchronously, no broker needed - Real broker — full async testing with pytest-celery, use for integration tests
Cover handler logic with unit tests (approach 1). Verify task registration and chaining with eager tests (approach 2). Reserve real broker tests for CI and critical workflows only.