Testing Netflix Conductor Workflows: Unit and Integration Patterns
Netflix Conductor is an open-source workflow orchestration engine. Workflows are defined as JSON, tasks are implemented as microservices. Testing Conductor workflows requires validating the workflow definition, testing task workers independently, and verifying end-to-end execution.
Conductor Architecture for Testing
Understanding what to test:
- Workflow definition (JSON) — the orchestration logic: task order, branching, retries
- Task workers (your code) — Java/Python services that poll and execute tasks
- Conductor server — the orchestration engine (test against embedded or real server)
Each layer needs different testing strategies.
Unit Testing Task Workers (Java)
Task workers are the main code you write. Test them like regular services:
// The task worker
@Component
public class PaymentWorker implements Worker {
private final PaymentService paymentService;
private final NotificationService notificationService;
@Override
public String getTaskDefName() {
return "PROCESS_PAYMENT";
}
@Override
public TaskResult execute(Task task) {
String orderId = (String) task.getInputData().get("orderId");
Double amount = (Double) task.getInputData().get("amount");
try {
String transactionId = paymentService.charge(orderId, amount);
TaskResult result = new TaskResult(task);
result.setStatus(TaskResult.Status.COMPLETED);
result.addOutputData("transactionId", transactionId);
return result;
} catch (PaymentDeclinedException e) {
TaskResult result = new TaskResult(task);
result.setStatus(TaskResult.Status.FAILED);
result.setReasonForIncompletion("Payment declined: " + e.getMessage());
return result;
}
}
}// The test
@ExtendWith(MockitoExtension.class)
class PaymentWorkerTest {
@Mock
private PaymentService paymentService;
@Mock
private NotificationService notificationService;
@InjectMocks
private PaymentWorker worker;
@Test
void execute_successfulCharge_returnsCompleted() throws Exception {
// Given
Task task = new Task();
task.setTaskId("task-123");
task.addInput("orderId", "order-456");
task.addInput("amount", 99.99);
when(paymentService.charge("order-456", 99.99))
.thenReturn("txn-789");
// When
TaskResult result = worker.execute(task);
// Then
assertThat(result.getStatus()).isEqualTo(TaskResult.Status.COMPLETED);
assertThat(result.getOutputData()).containsEntry("transactionId", "txn-789");
verify(paymentService).charge("order-456", 99.99);
}
@Test
void execute_paymentDeclined_returnsFailed() throws Exception {
Task task = new Task();
task.addInput("orderId", "order-456");
task.addInput("amount", 99.99);
when(paymentService.charge(anyString(), anyDouble()))
.thenThrow(new PaymentDeclinedException("Insufficient funds"));
TaskResult result = worker.execute(task);
assertThat(result.getStatus()).isEqualTo(TaskResult.Status.FAILED);
assertThat(result.getReasonForIncompletion())
.contains("Payment declined")
.contains("Insufficient funds");
}
@Test
void taskDefName_isCorrect() {
assertThat(worker.getTaskDefName()).isEqualTo("PROCESS_PAYMENT");
}
}Unit Testing Task Workers (Python)
# payment_worker.py
from conductor.client.worker.worker_task import worker_task
@worker_task(task_definition_name="PROCESS_PAYMENT")
def process_payment(order_id: str, amount: float) -> dict:
"""Process payment for an order."""
try:
transaction_id = payment_service.charge(order_id, amount)
return {"transactionId": transaction_id, "status": "charged"}
except PaymentDeclinedException as e:
raise Exception(f"Payment declined: {e}")# test_payment_worker.py
import pytest
from unittest.mock import patch, MagicMock
from workers.payment_worker import process_payment
def test_process_payment_success():
with patch("workers.payment_worker.payment_service") as mock_service:
mock_service.charge.return_value = "txn-789"
result = process_payment(order_id="order-456", amount=99.99)
assert result["transactionId"] == "txn-789"
assert result["status"] == "charged"
mock_service.charge.assert_called_once_with("order-456", 99.99)
def test_process_payment_declined():
with patch("workers.payment_worker.payment_service") as mock_service:
mock_service.charge.side_effect = PaymentDeclinedException("Insufficient funds")
with pytest.raises(Exception, match="Payment declined: Insufficient funds"):
process_payment(order_id="order-456", amount=99.99)Validating Workflow Definitions
Workflow definitions are JSON. Validate them before deployment:
import json
import pytest
from conductor.client.http.api.metadata_resource_api import MetadataResourceApi
from conductor.client.configuration.configuration import Configuration
def load_workflow_definition(path: str) -> dict:
with open(path) as f:
return json.load(f)
def test_workflow_definition_structure():
workflow = load_workflow_definition("workflows/order_processing.json")
assert "name" in workflow
assert "version" in workflow
assert "tasks" in workflow
assert isinstance(workflow["tasks"], list)
assert len(workflow["tasks"]) > 0
def test_task_references_are_valid():
"""Verify that SWITCH/FORK tasks reference valid task names."""
workflow = load_workflow_definition("workflows/order_processing.json")
task_ids = {t["taskReferenceName"] for t in workflow["tasks"]}
for task in workflow["tasks"]:
if task["type"] == "SWITCH":
for case_tasks in task.get("decisionCases", {}).values():
for ref in case_tasks:
assert ref in task_ids, \
f"SWITCH references unknown task: {ref}"
def test_workflow_has_timeout():
workflow = load_workflow_definition("workflows/order_processing.json")
# Critical workflows must have a timeout
assert workflow.get("timeoutSeconds", 0) > 0, \
"order_processing must have a timeout"
assert workflow.get("timeoutPolicy") in ("ALERT_ONLY", "TIME_OUT_WF")Integration Testing with Embedded Conductor
Use the Conductor test utilities for integration testing:
// pom.xml
<dependency>
<groupId>com.netflix.conductor</groupId>
<artifactId>conductor-test-util</artifactId>
<version>${conductor.version}</version>
<scope>test</scope>
</dependency>@SpringBootTest
@ExtendWith(SpringExtension.class)
class OrderWorkflowIntegrationTest {
@Autowired
private WorkflowClient workflowClient;
@Autowired
private MetadataClient metadataClient;
@Autowired
private TaskClient taskClient;
@BeforeAll
static void registerWorkflow() {
// Register workflow definition
}
@Test
@Timeout(30) // seconds
void orderWorkflow_happyPath_completesSuccessfully() throws Exception {
// Start the workflow
String workflowId = workflowClient.startWorkflow(
"ORDER_PROCESSING",
1, // version
Map.of(
"orderId", "test-order-123",
"amount", 99.99,
"customerId", "cust-456"
)
);
// Poll and complete tasks manually (simulating workers)
awaitTask(workflowId, "VALIDATE_ORDER", task -> {
task.setStatus(TaskResult.Status.COMPLETED);
task.addOutputData("valid", true);
});
awaitTask(workflowId, "PROCESS_PAYMENT", task -> {
task.setStatus(TaskResult.Status.COMPLETED);
task.addOutputData("transactionId", "txn-mock-789");
});
awaitTask(workflowId, "SEND_CONFIRMATION", task -> {
task.setStatus(TaskResult.Status.COMPLETED);
});
// Verify workflow completed
Workflow workflow = workflowClient.getWorkflow(workflowId, true);
assertThat(workflow.getStatus()).isEqualTo(WorkflowStatus.COMPLETED);
assertThat(workflow.getOutput())
.containsKey("transactionId");
}
private void awaitTask(String workflowId, String taskName,
Consumer<TaskResult> completer) throws Exception {
for (int i = 0; i < 20; i++) {
List<Task> tasks = taskClient.pollTask(taskName, "test-worker", null);
if (!tasks.isEmpty()) {
Task task = tasks.get(0);
TaskResult result = new TaskResult(task);
completer.accept(result);
taskClient.updateTask(result);
return;
}
Thread.sleep(500);
}
throw new TimeoutException("Timed out waiting for task: " + taskName);
}
}Testing Workflow Failure and Retry
@Test
@Timeout(60)
void orderWorkflow_paymentFails_retriesAndEventuallySucceeds() throws Exception {
String workflowId = workflowClient.startWorkflow(
"ORDER_PROCESSING", 1,
Map.of("orderId", "retry-order-123", "amount", 49.99)
);
awaitTask(workflowId, "VALIDATE_ORDER", task -> {
task.setStatus(TaskResult.Status.COMPLETED);
task.addOutputData("valid", true);
});
// First payment attempt — fail
awaitTask(workflowId, "PROCESS_PAYMENT", task -> {
task.setStatus(TaskResult.Status.FAILED);
task.setReasonForIncompletion("Gateway timeout");
});
// Second payment attempt (retry) — succeed
awaitTask(workflowId, "PROCESS_PAYMENT", task -> {
task.setStatus(TaskResult.Status.COMPLETED);
task.addOutputData("transactionId", "txn-retry-789");
});
Workflow workflow = workflowClient.getWorkflow(workflowId, true);
assertThat(workflow.getStatus()).isEqualTo(WorkflowStatus.COMPLETED);
}Testing SWITCH (Decision) Tasks
@Test
void orderWorkflow_highValueOrder_routesToApprovalPath() throws Exception {
String workflowId = workflowClient.startWorkflow(
"ORDER_PROCESSING", 1,
Map.of("orderId", "hv-order-123", "amount", 5000.00) // > threshold
);
// Check that MANUAL_APPROVAL is the next task (not PROCESS_PAYMENT)
awaitTask(workflowId, "MANUAL_APPROVAL", task -> {
task.setStatus(TaskResult.Status.COMPLETED);
task.addOutputData("approved", true);
});
// Payment follows approval
awaitTask(workflowId, "PROCESS_PAYMENT", task -> {
task.setStatus(TaskResult.Status.COMPLETED);
task.addOutputData("transactionId", "txn-hv-789");
});
Workflow workflow = workflowClient.getWorkflow(workflowId, true);
assertThat(workflow.getStatus()).isEqualTo(WorkflowStatus.COMPLETED);
}Testing with Docker Compose
# docker-compose.test.yml
services:
conductor-server:
image: conductor:latest
ports:
- "8080:8080"
environment:
- SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/conductor
depends_on:
- mysql
- elasticsearch
mysql:
image: mysql:8.0
environment:
MYSQL_DATABASE: conductor
MYSQL_ROOT_PASSWORD: root
elasticsearch:
image: elasticsearch:8.9.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false# Run integration tests
docker compose -f docker-compose.test.yml up -d
mvn <span class="hljs-built_in">test -Dtest=*IntegrationTest -Dconductor.server.url=http://localhost:8080
docker compose -f docker-compose.test.yml downEnd-to-End Workflow Monitoring
Integration tests verify that workflows execute in the test environment. HelpMeTest tests that workflows produce the correct business outcomes in production:
Scenario: order workflow completes
Given an order is submitted via the API
When the Conductor workflow finishes
Then the order dashboard shows status "Fulfilled"
And the customer receives a confirmation email
And the order appears in the warehouse queueThis catches issues in the full stack — API, Conductor, task workers, and downstream services — that integration tests don't cover.
Key Takeaways
- Test task workers as plain classes with mocked dependencies — don't involve Conductor at all for unit tests
- Validate workflow JSON definitions in CI to catch typos and invalid task references before deployment
- Integration tests should poll task queues manually (simulating real workers) to control task completion precisely
- Test SWITCH decision routing explicitly — check which task becomes available after the decision, not just that the workflow completes
- Use
@Timeouton integration tests to fail fast if Conductor hangs, rather than blocking the CI pipeline indefinitely