Celery Testing with pytest: Unit and Integration Tests for Async Tasks

Celery Testing with pytest: Unit and Integration Tests for Async Tasks

Celery is Python's most popular distributed task queue. Testing Celery tasks well means covering task logic in isolation, testing how tasks interact with the broker and result backend, and verifying retry behaviour. This guide shows how to do all three with pytest.

Setup

pip install celery redis pytest pytest-celery
# or with poetry
poetry add --group dev pytest-celery

Basic Task Structure

# tasks/email_tasks.py
from celery import shared_task
from services.email import send_email
import logging

logger = logging.getLogger(__name__)

@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_id: str, email: str) -> dict:
    try:
        result = send_email(
            to=email,
            template="welcome",
            data={"user_id": user_id},
        )
        return {"sent": True, "message_id": result["message_id"]}
    except Exception as exc:
        logger.warning(f"Email failed for {email}, retrying: {exc}")
        raise self.retry(exc=exc)

Unit Testing with CELERY_TASK_ALWAYS_EAGER

The CELERY_TASK_ALWAYS_EAGER setting makes tasks execute synchronously in the same process. This is the fastest way to test task logic:

# conftest.py
import pytest
from myapp.celery import celery_app

@pytest.fixture(scope="session")
def celery_config():
    return {
        "task_always_eager": True,
        "task_eager_propagates": True,  # let exceptions surface
        "result_backend": "cache+memory://",
        "broker_url": "memory://",
    }

@pytest.fixture
def celery_app_fixture(celery_config):
    celery_app.config_from_object(celery_config)
    return celery_app
# tasks/test_email_tasks.py
import pytest
from unittest.mock import patch, MagicMock
from tasks.email_tasks import send_welcome_email

@pytest.fixture(autouse=True)
def eager_celery(settings):
    # Django-specific; for plain Celery use the conftest fixture above
    settings.CELERY_TASK_ALWAYS_EAGER = True
    settings.CELERY_TASK_EAGER_PROPAGATES = True

class TestSendWelcomeEmail:
    def test_sends_email_with_correct_parameters(self):
        with patch("tasks.email_tasks.send_email") as mock_send:
            mock_send.return_value = {"message_id": "msg-123"}

            result = send_welcome_email.delay("user-1", "alice@example.com")

            mock_send.assert_called_once_with(
                to="alice@example.com",
                template="welcome",
                data={"user_id": "user-1"},
            )
            assert result.get()["sent"] is True

    def test_returns_message_id(self):
        with patch("tasks.email_tasks.send_email") as mock_send:
            mock_send.return_value = {"message_id": "msg-456"}

            result = send_welcome_email.delay("user-2", "bob@example.com")
            data = result.get()

            assert data["message_id"] == "msg-456"

    def test_retries_on_email_failure(self):
        with patch("tasks.email_tasks.send_email") as mock_send:
            mock_send.side_effect = ConnectionError("SMTP unavailable")

            with pytest.raises(ConnectionError):
                send_welcome_email.delay("user-3", "carol@example.com").get()

    def test_task_id_is_set(self):
        with patch("tasks.email_tasks.send_email") as mock_send:
            mock_send.return_value = {"message_id": "msg-789"}

            async_result = send_welcome_email.delay("user-4", "dan@example.com")
            assert async_result.id is not None

Testing Tasks Directly (Without Celery Infrastructure)

For pure unit tests, call the task function directly using .run() or apply():

def test_task_logic_without_celery():
    with patch("tasks.email_tasks.send_email") as mock_send:
        mock_send.return_value = {"message_id": "msg-111"}

        # Call the underlying function directly
        result = send_welcome_email.run("user-5", "eve@example.com")

        assert result["sent"] is True
        mock_send.assert_called_once()

Testing Retry Behaviour

Test that retries are triggered correctly and stop at max_retries:

def test_retries_up_to_max_retries():
    with patch("tasks.email_tasks.send_email") as mock_send:
        mock_send.side_effect = Exception("Transient error")

        # apply() gives more control than delay() for retry testing
        from celery.exceptions import MaxRetriesExceededError

        with pytest.raises(Exception):  # MaxRetriesExceeded or original exc
            send_welcome_email.apply(
                args=["user-6", "fail@example.com"],
                retries=3,  # simulate already at max
            ).get()

def test_retry_countdown():
    """Verify that retry uses the configured delay."""
    task = send_welcome_email

    with patch.object(task, "retry") as mock_retry:
        mock_retry.side_effect = Exception("Retry triggered")

        with patch("tasks.email_tasks.send_email") as mock_send:
            mock_send.side_effect = ConnectionError("SMTP down")

            with pytest.raises(Exception):
                task.run("user-7", "g@example.com")

        # Verify retry was called, not just that it raised
        mock_retry.assert_called_once()
        _, kwargs = mock_retry.call_args
        assert isinstance(kwargs.get("exc"), ConnectionError)

Testing the Result Backend

Verify that task results are stored and retrievable:

# conftest.py — use a real Redis or a test cache
@pytest.fixture(scope="session")
def celery_config():
    return {
        "task_always_eager": False,  # test real async flow
        "broker_url": "redis://localhost:6379/1",
        "result_backend": "redis://localhost:6379/1",
        "result_expires": 3600,
    }
@pytest.mark.integration
def test_result_stored_in_backend():
    """Requires a running Redis instance."""
    with patch("tasks.email_tasks.send_email") as mock_send:
        mock_send.return_value = {"message_id": "msg-live"}

        async_result = send_welcome_email.apply_async(
            args=["user-8", "h@example.com"]
        )
        # Block until complete (integration test — real broker)
        result = async_result.get(timeout=10)

        assert result["sent"] is True
        # Result should still be in backend
        assert async_result.state == "SUCCESS"

Testing Chained Tasks

BullMQ chains and Celery chains are both common patterns. Test them end-to-end:

# tasks/pipeline_tasks.py
from celery import chain, shared_task

@shared_task
def validate_user(user_id: str) -> dict:
    # validation logic
    return {"user_id": user_id, "valid": True}

@shared_task
def provision_account(user_data: dict) -> dict:
    if not user_data["valid"]:
        raise ValueError("Invalid user")
    return {"account_id": f"acc-{user_data['user_id']}", "provisioned": True}
def test_task_chain():
    with patch("tasks.pipeline_tasks.validate_user.run") as mock_validate, \
         patch("tasks.pipeline_tasks.provision_account.run") as mock_provision:

        mock_validate.return_value = {"user_id": "u9", "valid": True}
        mock_provision.return_value = {"account_id": "acc-u9", "provisioned": True}

        pipeline = chain(
            validate_user.s("u9"),
            provision_account.s(),
        )
        result = pipeline.apply().get()

        assert result["provisioned"] is True
        assert result["account_id"] == "acc-u9"

Testing Periodic Tasks (Celery Beat)

# tasks/scheduled_tasks.py
from celery import shared_task
from datetime import datetime

@shared_task
def daily_report() -> dict:
    # generate report logic
    return {"generated_at": datetime.utcnow().isoformat(), "rows": 42}
def test_daily_report_structure():
    """Test task logic independently of scheduling."""
    result = daily_report.run()

    assert "generated_at" in result
    assert isinstance(result["rows"], int)

Common Pitfalls

Forgetting task_eager_propagates: Without it, exceptions in eager mode are swallowed and result.get() returns None for failed tasks. Always set it to True in test configuration.

Testing timing-dependent logic: Avoid time.sleep() in task logic that you intend to test. Use timestamps from injected clocks or freeze time with freezegun.

Database sessions in tasks: If your tasks open DB sessions, ensure they're closed properly. Test teardown with active Celery workers can leak connections.

What Automated Tests Miss

pytest-celery tests cover task logic but won't catch:

  • Worker memory growth over hundreds of task executions
  • Broker connection pool exhaustion under production load
  • Redis keyspace expiration causing result lookups to fail
  • Cross-service failures when Celery tasks call slow external APIs

HelpMeTest runs end-to-end tests against your full stack on a schedule — including the background jobs that pytest never exercises in a real broker. The Free tier gives you 10 tests and 24/7 monitoring to start.

Summary

Testing Celery tasks:

  • task_always_eager = True — runs tasks synchronously, ideal for unit tests
  • task_eager_propagates = True — required to surface exceptions in eager mode
  • .run() for pure logic — bypass Celery entirely and test the function directly
  • Mock external calls — not the task broker; test what your task actually does
  • Integration tests — use a real Redis broker for retry, chain, and result backend tests

Read more