Prefect Testing Patterns: Testing Flows, Tasks, and Caching Behavior
Prefect 2 has a developer-friendly testing story. Flows and tasks are plain Python functions with decorators — you can call them directly in tests, run them with local storage instead of remote infrastructure, and control state management without spinning up Prefect Cloud.
This guide covers the full testing surface: task unit tests, flow integration tests, state manipulation for failure scenarios, and caching behavior validation.
Prefect 2 Testing Fundamentals
Prefect 2's architecture is test-friendly by design:
- Flows and tasks are functions. Call them directly without a runner.
- Local result storage. Use
LocalFileSystemor in-memory storage for tests. - State introspection. Check the Prefect state object to assert on success, failure, and cached results.
- No server required. Unit and integration tests run without Prefect Cloud or a self-hosted server.
pip install prefect pytest pytest-asyncioUnit Testing Tasks
Tasks are the atomic unit — test them in complete isolation:
# tasks.py
from prefect import task
from typing import Optional
import httpx
@task(retries=3, retry_delay_seconds=60)
def fetch_weather_data(city: str, api_key: str) -> dict:
"""Fetch current weather data for a city."""
response = httpx.get(
f"https://api.weather.example.com/current",
params={"city": city, "key": api_key},
timeout=10.0
)
response.raise_for_status()
return response.json()
@task
def transform_weather(raw: dict) -> dict:
"""Transform raw API response to our schema."""
return {
"city": raw["location"]["name"],
"temp_celsius": raw["current"]["temp_c"],
"condition": raw["current"]["condition"]["text"],
"humidity_pct": raw["current"]["humidity"],
}# tests/test_tasks.py
import pytest
from unittest.mock import patch, MagicMock
import httpx
from tasks import fetch_weather_data, transform_weather
class TestFetchWeatherData:
def test_returns_parsed_response(self):
"""Task should return parsed JSON from successful API call."""
mock_response_data = {
"location": {"name": "London"},
"current": {
"temp_c": 15.5,
"condition": {"text": "Partly cloudy"},
"humidity": 72
}
}
with patch("httpx.get") as mock_get:
mock_get.return_value = MagicMock(
status_code=200,
json=lambda: mock_response_data
)
# Call the underlying function directly (bypasses Prefect runtime)
result = fetch_weather_data.fn("London", "test-api-key")
assert result["location"]["name"] == "London"
assert result["current"]["temp_c"] == 15.5
def test_raises_on_http_error(self):
"""Non-2xx responses should raise an exception."""
with patch("httpx.get") as mock_get:
mock_get.return_value = MagicMock(
status_code=429,
raise_for_status=MagicMock(
side_effect=httpx.HTTPStatusError(
"Rate limited",
request=MagicMock(),
response=MagicMock(status_code=429)
)
)
)
with pytest.raises(httpx.HTTPStatusError):
fetch_weather_data.fn("London", "test-api-key")
def test_raises_on_timeout(self):
"""Network timeouts should propagate (Prefect will handle retries)."""
with patch("httpx.get") as mock_get:
mock_get.side_effect = httpx.TimeoutException("Connection timed out")
with pytest.raises(httpx.TimeoutException):
fetch_weather_data.fn("London", "test-api-key")
class TestTransformWeather:
RAW_INPUT = {
"location": {"name": "Tokyo"},
"current": {
"temp_c": 22.3,
"condition": {"text": "Sunny"},
"humidity": 65
}
}
def test_maps_all_fields_correctly(self):
result = transform_weather.fn(self.RAW_INPUT)
assert result["city"] == "Tokyo"
assert result["temp_celsius"] == 22.3
assert result["condition"] == "Sunny"
assert result["humidity_pct"] == 65
def test_missing_field_raises_key_error(self):
"""Malformed input should fail fast with clear error."""
malformed = {"location": {"name": "Tokyo"}} # Missing "current"
with pytest.raises(KeyError, match="current"):
transform_weather.fn(malformed)Integration Testing Flows
Flows can be called directly — they run synchronously with a local result backend:
# flows.py
from prefect import flow, task, get_run_logger
from prefect.filesystems import LocalFileSystem
from tasks import fetch_weather_data, transform_weather
@flow(name="weather-pipeline", result_storage=LocalFileSystem(basepath="/tmp/prefect-test-results"))
def weather_pipeline(cities: list[str], api_key: str) -> list[dict]:
logger = get_run_logger()
results = []
for city in cities:
logger.info(f"Fetching weather for {city}")
raw = fetch_weather_data(city, api_key)
transformed = transform_weather(raw)
results.append(transformed)
return results# tests/test_flows.py
import pytest
from unittest.mock import patch, MagicMock
from flows import weather_pipeline
class TestWeatherPipeline:
MOCK_RAW = {
"location": {"name": "London"},
"current": {"temp_c": 15.5, "condition": {"text": "Cloudy"}, "humidity": 72}
}
def test_processes_multiple_cities(self):
"""Flow should return results for all input cities."""
with patch("tasks.httpx.get") as mock_get:
mock_get.return_value = MagicMock(
status_code=200,
json=lambda: self.MOCK_RAW
)
# Call the flow directly — no Prefect Cloud needed
results = weather_pipeline(
cities=["London", "Paris", "Tokyo"],
api_key="test-key"
)
assert len(results) == 3
# All return London because mock doesn't vary by city
assert all(r["city"] == "London" for r in results)
def test_flow_state_on_success(self):
"""Flow should complete in Completed state."""
from prefect.testing.utilities import prefect_test_harness
with prefect_test_harness():
with patch("tasks.httpx.get") as mock_get:
mock_get.return_value = MagicMock(
status_code=200,
json=lambda: self.MOCK_RAW
)
state = weather_pipeline(
cities=["London"],
api_key="test-key",
return_state=True
)
assert state.is_completed()
results = state.result()
assert len(results) == 1
def test_flow_state_on_task_failure(self):
"""Flow should fail if any task raises an unrecoverable error."""
from prefect.testing.utilities import prefect_test_harness
import httpx
with prefect_test_harness():
with patch("tasks.httpx.get") as mock_get:
mock_get.side_effect = httpx.HTTPStatusError(
"404 Not Found",
request=MagicMock(),
response=MagicMock(status_code=404)
)
state = weather_pipeline(
cities=["InvalidCity"],
api_key="test-key",
return_state=True
)
assert state.is_failed()Testing Mock States
Prefect's state system lets you inject pre-computed states to test flow behavior:
# tests/test_state_handling.py
from prefect.states import Completed, Failed, Cached
from prefect.testing.utilities import prefect_test_harness
import pytest
def test_flow_handles_upstream_cached_task():
"""Flow should accept Cached state from upstream tasks without re-executing."""
from flows import weather_pipeline
from tasks import fetch_weather_data
cached_result = {"location": {"name": "London"}, "current": {"temp_c": 15.5, "condition": {"text": "Clear"}, "humidity": 60}}
with prefect_test_harness():
with patch.object(fetch_weather_data, "submit") as mock_submit:
mock_submit.return_value = Completed(result=cached_result)
# Flow should use the cached result without calling httpx
results = weather_pipeline(cities=["London"], api_key="test-key")
assert len(results) == 1
def test_flow_retries_failed_tasks():
"""Transient task failures should trigger configured retries."""
import httpx
from flows import weather_pipeline
call_count = 0
def flaky_get(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count < 3:
raise httpx.TimeoutException("Timeout")
return MagicMock(
status_code=200,
json=lambda: {"location": {"name": "London"}, "current": {"temp_c": 15.5, "condition": {"text": "Clear"}, "humidity": 60}}
)
with prefect_test_harness():
with patch("tasks.httpx.get", side_effect=flaky_get):
# fetch_weather_data has retries=3 — should succeed on 3rd attempt
results = weather_pipeline(cities=["London"], api_key="test-key")
assert len(results) == 1
assert call_count == 3Testing Caching Behavior
Prefect's cache_key_fn and cache_expiration control when tasks re-execute. Test that caching works correctly:
# tasks_with_cache.py
from prefect import task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1)
)
def expensive_computation(data: list[int]) -> dict:
"""Expensive task that should be cached for 1 hour."""
return {"sum": sum(data), "count": len(data), "mean": sum(data) / len(data)}# tests/test_caching.py
import pytest
from unittest.mock import patch, call
from prefect.testing.utilities import prefect_test_harness
from tasks_with_cache import expensive_computation
def test_cached_task_not_recomputed_same_inputs():
"""Second call with identical inputs should return cached result."""
compute_call_count = 0
original_fn = expensive_computation.fn
def counting_fn(*args, **kwargs):
nonlocal compute_call_count
compute_call_count += 1
return original_fn(*args, **kwargs)
with prefect_test_harness():
with patch.object(expensive_computation, "fn", counting_fn):
result1 = expensive_computation([1, 2, 3, 4, 5])
result2 = expensive_computation([1, 2, 3, 4, 5]) # Same input
assert result1 == result2
assert compute_call_count == 1, (
f"Expected 1 computation (second should be cached), got {compute_call_count}"
)
def test_cache_miss_on_different_inputs():
"""Different inputs must always trigger fresh computation."""
compute_call_count = 0
original_fn = expensive_computation.fn
def counting_fn(*args, **kwargs):
nonlocal compute_call_count
compute_call_count += 1
return original_fn(*args, **kwargs)
with prefect_test_harness():
with patch.object(expensive_computation, "fn", counting_fn):
expensive_computation([1, 2, 3])
expensive_computation([4, 5, 6]) # Different input
assert compute_call_count == 2Async Flow Testing
Prefect 2 supports async flows and tasks:
# tests/test_async_flows.py
import pytest
import asyncio
from prefect.testing.utilities import prefect_test_harness
@pytest.mark.asyncio
async def test_async_flow_completes():
from flows import async_weather_pipeline
with prefect_test_harness():
with patch("tasks.httpx.AsyncClient") as MockClient:
mock_client = MockClient.return_value.__aenter__.return_value
mock_client.get.return_value = MagicMock(
status_code=200,
json=lambda: {"location": {"name": "Berlin"}, "current": {"temp_c": 12.0, "condition": {"text": "Rain"}, "humidity": 88}}
)
results = await async_weather_pipeline(
cities=["Berlin"],
api_key="test-key"
)
assert len(results) == 1
assert results[0]["city"] == "Berlin"CI Integration
# .github/workflows/prefect-tests.yml
name: Prefect Flow Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: pip install prefect pytest pytest-asyncio httpx
- name: Run task unit tests
run: pytest tests/test_tasks.py -v
- name: Run flow integration tests
run: pytest tests/test_flows.py tests/test_state_handling.py -v
- name: Run caching tests
run: pytest tests/test_caching.py -v
- name: Run async tests
run: pytest tests/test_async_flows.py -v --asyncio-mode=autoMonitoring Prefect in Production
Prefect Cloud has built-in monitoring, but you may need alerts that integrate with your existing incident management. HelpMeTest lets you set up health checks that query Prefect's API for flow run status and alert your on-call rotation when critical flows fail — without adding a separate monitoring tool to your stack.
Conclusion
Prefect's Python-native design makes it the most test-friendly workflow orchestrator. Use .fn to test task logic directly without the Prefect runtime. Use prefect_test_harness() for flow-level tests that need state management. Test caching behavior explicitly — cache bugs are subtle and can cause stale data problems that only appear hours after deployment. With this test suite in place, you can refactor flows confidently and catch regressions before they hit production.