Prefect Testing Patterns: Testing Flows, Tasks, and Caching Behavior

Prefect Testing Patterns: Testing Flows, Tasks, and Caching Behavior

Prefect 2 has a developer-friendly testing story. Flows and tasks are plain Python functions with decorators — you can call them directly in tests, run them with local storage instead of remote infrastructure, and control state management without spinning up Prefect Cloud.

This guide covers the full testing surface: task unit tests, flow integration tests, state manipulation for failure scenarios, and caching behavior validation.

Prefect 2 Testing Fundamentals

Prefect 2's architecture is test-friendly by design:

  • Flows and tasks are functions. Call them directly without a runner.
  • Local result storage. Use LocalFileSystem or in-memory storage for tests.
  • State introspection. Check the Prefect state object to assert on success, failure, and cached results.
  • No server required. Unit and integration tests run without Prefect Cloud or a self-hosted server.
pip install prefect pytest pytest-asyncio

Unit Testing Tasks

Tasks are the atomic unit — test them in complete isolation:

# tasks.py
from prefect import task
from typing import Optional
import httpx

@task(retries=3, retry_delay_seconds=60)
def fetch_weather_data(city: str, api_key: str) -> dict:
    """Fetch current weather data for a city."""
    response = httpx.get(
        f"https://api.weather.example.com/current",
        params={"city": city, "key": api_key},
        timeout=10.0
    )
    response.raise_for_status()
    return response.json()

@task
def transform_weather(raw: dict) -> dict:
    """Transform raw API response to our schema."""
    return {
        "city": raw["location"]["name"],
        "temp_celsius": raw["current"]["temp_c"],
        "condition": raw["current"]["condition"]["text"],
        "humidity_pct": raw["current"]["humidity"],
    }
# tests/test_tasks.py
import pytest
from unittest.mock import patch, MagicMock
import httpx

from tasks import fetch_weather_data, transform_weather

class TestFetchWeatherData:
    
    def test_returns_parsed_response(self):
        """Task should return parsed JSON from successful API call."""
        mock_response_data = {
            "location": {"name": "London"},
            "current": {
                "temp_c": 15.5,
                "condition": {"text": "Partly cloudy"},
                "humidity": 72
            }
        }
        
        with patch("httpx.get") as mock_get:
            mock_get.return_value = MagicMock(
                status_code=200,
                json=lambda: mock_response_data
            )
            
            # Call the underlying function directly (bypasses Prefect runtime)
            result = fetch_weather_data.fn("London", "test-api-key")
            
            assert result["location"]["name"] == "London"
            assert result["current"]["temp_c"] == 15.5
    
    def test_raises_on_http_error(self):
        """Non-2xx responses should raise an exception."""
        with patch("httpx.get") as mock_get:
            mock_get.return_value = MagicMock(
                status_code=429,
                raise_for_status=MagicMock(
                    side_effect=httpx.HTTPStatusError(
                        "Rate limited",
                        request=MagicMock(),
                        response=MagicMock(status_code=429)
                    )
                )
            )
            
            with pytest.raises(httpx.HTTPStatusError):
                fetch_weather_data.fn("London", "test-api-key")
    
    def test_raises_on_timeout(self):
        """Network timeouts should propagate (Prefect will handle retries)."""
        with patch("httpx.get") as mock_get:
            mock_get.side_effect = httpx.TimeoutException("Connection timed out")
            
            with pytest.raises(httpx.TimeoutException):
                fetch_weather_data.fn("London", "test-api-key")

class TestTransformWeather:
    
    RAW_INPUT = {
        "location": {"name": "Tokyo"},
        "current": {
            "temp_c": 22.3,
            "condition": {"text": "Sunny"},
            "humidity": 65
        }
    }
    
    def test_maps_all_fields_correctly(self):
        result = transform_weather.fn(self.RAW_INPUT)
        
        assert result["city"] == "Tokyo"
        assert result["temp_celsius"] == 22.3
        assert result["condition"] == "Sunny"
        assert result["humidity_pct"] == 65
    
    def test_missing_field_raises_key_error(self):
        """Malformed input should fail fast with clear error."""
        malformed = {"location": {"name": "Tokyo"}}  # Missing "current"
        
        with pytest.raises(KeyError, match="current"):
            transform_weather.fn(malformed)

Integration Testing Flows

Flows can be called directly — they run synchronously with a local result backend:

# flows.py
from prefect import flow, task, get_run_logger
from prefect.filesystems import LocalFileSystem
from tasks import fetch_weather_data, transform_weather

@flow(name="weather-pipeline", result_storage=LocalFileSystem(basepath="/tmp/prefect-test-results"))
def weather_pipeline(cities: list[str], api_key: str) -> list[dict]:
    logger = get_run_logger()
    results = []
    
    for city in cities:
        logger.info(f"Fetching weather for {city}")
        raw = fetch_weather_data(city, api_key)
        transformed = transform_weather(raw)
        results.append(transformed)
    
    return results
# tests/test_flows.py
import pytest
from unittest.mock import patch, MagicMock

from flows import weather_pipeline

class TestWeatherPipeline:
    
    MOCK_RAW = {
        "location": {"name": "London"},
        "current": {"temp_c": 15.5, "condition": {"text": "Cloudy"}, "humidity": 72}
    }
    
    def test_processes_multiple_cities(self):
        """Flow should return results for all input cities."""
        with patch("tasks.httpx.get") as mock_get:
            mock_get.return_value = MagicMock(
                status_code=200,
                json=lambda: self.MOCK_RAW
            )
            
            # Call the flow directly — no Prefect Cloud needed
            results = weather_pipeline(
                cities=["London", "Paris", "Tokyo"],
                api_key="test-key"
            )
            
            assert len(results) == 3
            # All return London because mock doesn't vary by city
            assert all(r["city"] == "London" for r in results)
    
    def test_flow_state_on_success(self):
        """Flow should complete in Completed state."""
        from prefect.testing.utilities import prefect_test_harness
        
        with prefect_test_harness():
            with patch("tasks.httpx.get") as mock_get:
                mock_get.return_value = MagicMock(
                    status_code=200,
                    json=lambda: self.MOCK_RAW
                )
                
                state = weather_pipeline(
                    cities=["London"],
                    api_key="test-key",
                    return_state=True
                )
                
                assert state.is_completed()
                results = state.result()
                assert len(results) == 1
    
    def test_flow_state_on_task_failure(self):
        """Flow should fail if any task raises an unrecoverable error."""
        from prefect.testing.utilities import prefect_test_harness
        import httpx
        
        with prefect_test_harness():
            with patch("tasks.httpx.get") as mock_get:
                mock_get.side_effect = httpx.HTTPStatusError(
                    "404 Not Found",
                    request=MagicMock(),
                    response=MagicMock(status_code=404)
                )
                
                state = weather_pipeline(
                    cities=["InvalidCity"],
                    api_key="test-key",
                    return_state=True
                )
                
                assert state.is_failed()

Testing Mock States

Prefect's state system lets you inject pre-computed states to test flow behavior:

# tests/test_state_handling.py
from prefect.states import Completed, Failed, Cached
from prefect.testing.utilities import prefect_test_harness
import pytest

def test_flow_handles_upstream_cached_task():
    """Flow should accept Cached state from upstream tasks without re-executing."""
    from flows import weather_pipeline
    from tasks import fetch_weather_data
    
    cached_result = {"location": {"name": "London"}, "current": {"temp_c": 15.5, "condition": {"text": "Clear"}, "humidity": 60}}
    
    with prefect_test_harness():
        with patch.object(fetch_weather_data, "submit") as mock_submit:
            mock_submit.return_value = Completed(result=cached_result)
            
            # Flow should use the cached result without calling httpx
            results = weather_pipeline(cities=["London"], api_key="test-key")
            assert len(results) == 1

def test_flow_retries_failed_tasks():
    """Transient task failures should trigger configured retries."""
    import httpx
    from flows import weather_pipeline
    
    call_count = 0
    
    def flaky_get(*args, **kwargs):
        nonlocal call_count
        call_count += 1
        if call_count < 3:
            raise httpx.TimeoutException("Timeout")
        return MagicMock(
            status_code=200,
            json=lambda: {"location": {"name": "London"}, "current": {"temp_c": 15.5, "condition": {"text": "Clear"}, "humidity": 60}}
        )
    
    with prefect_test_harness():
        with patch("tasks.httpx.get", side_effect=flaky_get):
            # fetch_weather_data has retries=3 — should succeed on 3rd attempt
            results = weather_pipeline(cities=["London"], api_key="test-key")
            assert len(results) == 1
            assert call_count == 3

Testing Caching Behavior

Prefect's cache_key_fn and cache_expiration control when tasks re-execute. Test that caching works correctly:

# tasks_with_cache.py
from prefect import task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1)
)
def expensive_computation(data: list[int]) -> dict:
    """Expensive task that should be cached for 1 hour."""
    return {"sum": sum(data), "count": len(data), "mean": sum(data) / len(data)}
# tests/test_caching.py
import pytest
from unittest.mock import patch, call
from prefect.testing.utilities import prefect_test_harness

from tasks_with_cache import expensive_computation

def test_cached_task_not_recomputed_same_inputs():
    """Second call with identical inputs should return cached result."""
    
    compute_call_count = 0
    original_fn = expensive_computation.fn
    
    def counting_fn(*args, **kwargs):
        nonlocal compute_call_count
        compute_call_count += 1
        return original_fn(*args, **kwargs)
    
    with prefect_test_harness():
        with patch.object(expensive_computation, "fn", counting_fn):
            result1 = expensive_computation([1, 2, 3, 4, 5])
            result2 = expensive_computation([1, 2, 3, 4, 5])  # Same input
            
            assert result1 == result2
            assert compute_call_count == 1, (
                f"Expected 1 computation (second should be cached), got {compute_call_count}"
            )

def test_cache_miss_on_different_inputs():
    """Different inputs must always trigger fresh computation."""
    
    compute_call_count = 0
    original_fn = expensive_computation.fn
    
    def counting_fn(*args, **kwargs):
        nonlocal compute_call_count
        compute_call_count += 1
        return original_fn(*args, **kwargs)
    
    with prefect_test_harness():
        with patch.object(expensive_computation, "fn", counting_fn):
            expensive_computation([1, 2, 3])
            expensive_computation([4, 5, 6])  # Different input
            
            assert compute_call_count == 2

Async Flow Testing

Prefect 2 supports async flows and tasks:

# tests/test_async_flows.py
import pytest
import asyncio
from prefect.testing.utilities import prefect_test_harness

@pytest.mark.asyncio
async def test_async_flow_completes():
    from flows import async_weather_pipeline
    
    with prefect_test_harness():
        with patch("tasks.httpx.AsyncClient") as MockClient:
            mock_client = MockClient.return_value.__aenter__.return_value
            mock_client.get.return_value = MagicMock(
                status_code=200,
                json=lambda: {"location": {"name": "Berlin"}, "current": {"temp_c": 12.0, "condition": {"text": "Rain"}, "humidity": 88}}
            )
            
            results = await async_weather_pipeline(
                cities=["Berlin"],
                api_key="test-key"
            )
            
            assert len(results) == 1
            assert results[0]["city"] == "Berlin"

CI Integration

# .github/workflows/prefect-tests.yml
name: Prefect Flow Tests

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"
      
      - name: Install dependencies
        run: pip install prefect pytest pytest-asyncio httpx
      
      - name: Run task unit tests
        run: pytest tests/test_tasks.py -v
      
      - name: Run flow integration tests
        run: pytest tests/test_flows.py tests/test_state_handling.py -v
      
      - name: Run caching tests
        run: pytest tests/test_caching.py -v
      
      - name: Run async tests
        run: pytest tests/test_async_flows.py -v --asyncio-mode=auto

Monitoring Prefect in Production

Prefect Cloud has built-in monitoring, but you may need alerts that integrate with your existing incident management. HelpMeTest lets you set up health checks that query Prefect's API for flow run status and alert your on-call rotation when critical flows fail — without adding a separate monitoring tool to your stack.

Conclusion

Prefect's Python-native design makes it the most test-friendly workflow orchestrator. Use .fn to test task logic directly without the Prefect runtime. Use prefect_test_harness() for flow-level tests that need state management. Test caching behavior explicitly — cache bugs are subtle and can cause stale data problems that only appear hours after deployment. With this test suite in place, you can refactor flows confidently and catch regressions before they hit production.

Read more