Testing Netflix Conductor Workflows: Unit and Integration Patterns

Testing Netflix Conductor Workflows: Unit and Integration Patterns

Netflix Conductor is an open-source workflow orchestration engine. Workflows are defined as JSON, tasks are implemented as microservices. Testing Conductor workflows requires validating the workflow definition, testing task workers independently, and verifying end-to-end execution.

Conductor Architecture for Testing

Understanding what to test:

  • Workflow definition (JSON) — the orchestration logic: task order, branching, retries
  • Task workers (your code) — Java/Python services that poll and execute tasks
  • Conductor server — the orchestration engine (test against embedded or real server)

Each layer needs different testing strategies.

Unit Testing Task Workers (Java)

Task workers are the main code you write. Test them like regular services:

// The task worker
@Component
public class PaymentWorker implements Worker {

    private final PaymentService paymentService;
    private final NotificationService notificationService;

    @Override
    public String getTaskDefName() {
        return "PROCESS_PAYMENT";
    }

    @Override
    public TaskResult execute(Task task) {
        String orderId = (String) task.getInputData().get("orderId");
        Double amount = (Double) task.getInputData().get("amount");

        try {
            String transactionId = paymentService.charge(orderId, amount);
            TaskResult result = new TaskResult(task);
            result.setStatus(TaskResult.Status.COMPLETED);
            result.addOutputData("transactionId", transactionId);
            return result;
        } catch (PaymentDeclinedException e) {
            TaskResult result = new TaskResult(task);
            result.setStatus(TaskResult.Status.FAILED);
            result.setReasonForIncompletion("Payment declined: " + e.getMessage());
            return result;
        }
    }
}
// The test
@ExtendWith(MockitoExtension.class)
class PaymentWorkerTest {

    @Mock
    private PaymentService paymentService;

    @Mock
    private NotificationService notificationService;

    @InjectMocks
    private PaymentWorker worker;

    @Test
    void execute_successfulCharge_returnsCompleted() throws Exception {
        // Given
        Task task = new Task();
        task.setTaskId("task-123");
        task.addInput("orderId", "order-456");
        task.addInput("amount", 99.99);

        when(paymentService.charge("order-456", 99.99))
            .thenReturn("txn-789");

        // When
        TaskResult result = worker.execute(task);

        // Then
        assertThat(result.getStatus()).isEqualTo(TaskResult.Status.COMPLETED);
        assertThat(result.getOutputData()).containsEntry("transactionId", "txn-789");
        verify(paymentService).charge("order-456", 99.99);
    }

    @Test
    void execute_paymentDeclined_returnsFailed() throws Exception {
        Task task = new Task();
        task.addInput("orderId", "order-456");
        task.addInput("amount", 99.99);

        when(paymentService.charge(anyString(), anyDouble()))
            .thenThrow(new PaymentDeclinedException("Insufficient funds"));

        TaskResult result = worker.execute(task);

        assertThat(result.getStatus()).isEqualTo(TaskResult.Status.FAILED);
        assertThat(result.getReasonForIncompletion())
            .contains("Payment declined")
            .contains("Insufficient funds");
    }

    @Test
    void taskDefName_isCorrect() {
        assertThat(worker.getTaskDefName()).isEqualTo("PROCESS_PAYMENT");
    }
}

Unit Testing Task Workers (Python)

# payment_worker.py
from conductor.client.worker.worker_task import worker_task

@worker_task(task_definition_name="PROCESS_PAYMENT")
def process_payment(order_id: str, amount: float) -> dict:
    """Process payment for an order."""
    try:
        transaction_id = payment_service.charge(order_id, amount)
        return {"transactionId": transaction_id, "status": "charged"}
    except PaymentDeclinedException as e:
        raise Exception(f"Payment declined: {e}")
# test_payment_worker.py
import pytest
from unittest.mock import patch, MagicMock
from workers.payment_worker import process_payment

def test_process_payment_success():
    with patch("workers.payment_worker.payment_service") as mock_service:
        mock_service.charge.return_value = "txn-789"

        result = process_payment(order_id="order-456", amount=99.99)

    assert result["transactionId"] == "txn-789"
    assert result["status"] == "charged"
    mock_service.charge.assert_called_once_with("order-456", 99.99)

def test_process_payment_declined():
    with patch("workers.payment_worker.payment_service") as mock_service:
        mock_service.charge.side_effect = PaymentDeclinedException("Insufficient funds")

        with pytest.raises(Exception, match="Payment declined: Insufficient funds"):
            process_payment(order_id="order-456", amount=99.99)

Validating Workflow Definitions

Workflow definitions are JSON. Validate them before deployment:

import json
import pytest
from conductor.client.http.api.metadata_resource_api import MetadataResourceApi
from conductor.client.configuration.configuration import Configuration

def load_workflow_definition(path: str) -> dict:
    with open(path) as f:
        return json.load(f)

def test_workflow_definition_structure():
    workflow = load_workflow_definition("workflows/order_processing.json")

    assert "name" in workflow
    assert "version" in workflow
    assert "tasks" in workflow
    assert isinstance(workflow["tasks"], list)
    assert len(workflow["tasks"]) > 0

def test_task_references_are_valid():
    """Verify that SWITCH/FORK tasks reference valid task names."""
    workflow = load_workflow_definition("workflows/order_processing.json")
    task_ids = {t["taskReferenceName"] for t in workflow["tasks"]}

    for task in workflow["tasks"]:
        if task["type"] == "SWITCH":
            for case_tasks in task.get("decisionCases", {}).values():
                for ref in case_tasks:
                    assert ref in task_ids, \
                        f"SWITCH references unknown task: {ref}"

def test_workflow_has_timeout():
    workflow = load_workflow_definition("workflows/order_processing.json")
    # Critical workflows must have a timeout
    assert workflow.get("timeoutSeconds", 0) > 0, \
        "order_processing must have a timeout"
    assert workflow.get("timeoutPolicy") in ("ALERT_ONLY", "TIME_OUT_WF")

Integration Testing with Embedded Conductor

Use the Conductor test utilities for integration testing:

// pom.xml
<dependency>
    <groupId>com.netflix.conductor</groupId>
    <artifactId>conductor-test-util</artifactId>
    <version>${conductor.version}</version>
    <scope>test</scope>
</dependency>
@SpringBootTest
@ExtendWith(SpringExtension.class)
class OrderWorkflowIntegrationTest {

    @Autowired
    private WorkflowClient workflowClient;

    @Autowired
    private MetadataClient metadataClient;

    @Autowired
    private TaskClient taskClient;

    @BeforeAll
    static void registerWorkflow() {
        // Register workflow definition
    }

    @Test
    @Timeout(30) // seconds
    void orderWorkflow_happyPath_completesSuccessfully() throws Exception {
        // Start the workflow
        String workflowId = workflowClient.startWorkflow(
            "ORDER_PROCESSING",
            1, // version
            Map.of(
                "orderId", "test-order-123",
                "amount", 99.99,
                "customerId", "cust-456"
            )
        );

        // Poll and complete tasks manually (simulating workers)
        awaitTask(workflowId, "VALIDATE_ORDER", task -> {
            task.setStatus(TaskResult.Status.COMPLETED);
            task.addOutputData("valid", true);
        });

        awaitTask(workflowId, "PROCESS_PAYMENT", task -> {
            task.setStatus(TaskResult.Status.COMPLETED);
            task.addOutputData("transactionId", "txn-mock-789");
        });

        awaitTask(workflowId, "SEND_CONFIRMATION", task -> {
            task.setStatus(TaskResult.Status.COMPLETED);
        });

        // Verify workflow completed
        Workflow workflow = workflowClient.getWorkflow(workflowId, true);
        assertThat(workflow.getStatus()).isEqualTo(WorkflowStatus.COMPLETED);
        assertThat(workflow.getOutput())
            .containsKey("transactionId");
    }

    private void awaitTask(String workflowId, String taskName,
                           Consumer<TaskResult> completer) throws Exception {
        for (int i = 0; i < 20; i++) {
            List<Task> tasks = taskClient.pollTask(taskName, "test-worker", null);
            if (!tasks.isEmpty()) {
                Task task = tasks.get(0);
                TaskResult result = new TaskResult(task);
                completer.accept(result);
                taskClient.updateTask(result);
                return;
            }
            Thread.sleep(500);
        }
        throw new TimeoutException("Timed out waiting for task: " + taskName);
    }
}

Testing Workflow Failure and Retry

@Test
@Timeout(60)
void orderWorkflow_paymentFails_retriesAndEventuallySucceeds() throws Exception {
    String workflowId = workflowClient.startWorkflow(
        "ORDER_PROCESSING", 1,
        Map.of("orderId", "retry-order-123", "amount", 49.99)
    );

    awaitTask(workflowId, "VALIDATE_ORDER", task -> {
        task.setStatus(TaskResult.Status.COMPLETED);
        task.addOutputData("valid", true);
    });

    // First payment attempt — fail
    awaitTask(workflowId, "PROCESS_PAYMENT", task -> {
        task.setStatus(TaskResult.Status.FAILED);
        task.setReasonForIncompletion("Gateway timeout");
    });

    // Second payment attempt (retry) — succeed
    awaitTask(workflowId, "PROCESS_PAYMENT", task -> {
        task.setStatus(TaskResult.Status.COMPLETED);
        task.addOutputData("transactionId", "txn-retry-789");
    });

    Workflow workflow = workflowClient.getWorkflow(workflowId, true);
    assertThat(workflow.getStatus()).isEqualTo(WorkflowStatus.COMPLETED);
}

Testing SWITCH (Decision) Tasks

@Test
void orderWorkflow_highValueOrder_routesToApprovalPath() throws Exception {
    String workflowId = workflowClient.startWorkflow(
        "ORDER_PROCESSING", 1,
        Map.of("orderId", "hv-order-123", "amount", 5000.00) // > threshold
    );

    // Check that MANUAL_APPROVAL is the next task (not PROCESS_PAYMENT)
    awaitTask(workflowId, "MANUAL_APPROVAL", task -> {
        task.setStatus(TaskResult.Status.COMPLETED);
        task.addOutputData("approved", true);
    });

    // Payment follows approval
    awaitTask(workflowId, "PROCESS_PAYMENT", task -> {
        task.setStatus(TaskResult.Status.COMPLETED);
        task.addOutputData("transactionId", "txn-hv-789");
    });

    Workflow workflow = workflowClient.getWorkflow(workflowId, true);
    assertThat(workflow.getStatus()).isEqualTo(WorkflowStatus.COMPLETED);
}

Testing with Docker Compose

# docker-compose.test.yml
services:
  conductor-server:
    image: conductor:latest
    ports:
      - "8080:8080"
    environment:
      - SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/conductor
    depends_on:
      - mysql
      - elasticsearch

  mysql:
    image: mysql:8.0
    environment:
      MYSQL_DATABASE: conductor
      MYSQL_ROOT_PASSWORD: root

  elasticsearch:
    image: elasticsearch:8.9.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
# Run integration tests
docker compose -f docker-compose.test.yml up -d
mvn <span class="hljs-built_in">test -Dtest=*IntegrationTest -Dconductor.server.url=http://localhost:8080
docker compose -f docker-compose.test.yml down

End-to-End Workflow Monitoring

Integration tests verify that workflows execute in the test environment. HelpMeTest tests that workflows produce the correct business outcomes in production:

Scenario: order workflow completes
  Given an order is submitted via the API
  When the Conductor workflow finishes
  Then the order dashboard shows status "Fulfilled"
  And the customer receives a confirmation email
  And the order appears in the warehouse queue

This catches issues in the full stack — API, Conductor, task workers, and downstream services — that integration tests don't cover.

Key Takeaways

  • Test task workers as plain classes with mocked dependencies — don't involve Conductor at all for unit tests
  • Validate workflow JSON definitions in CI to catch typos and invalid task references before deployment
  • Integration tests should poll task queues manually (simulating real workers) to control task completion precisely
  • Test SWITCH decision routing explicitly — check which task becomes available after the decision, not just that the workflow completes
  • Use @Timeout on integration tests to fail fast if Conductor hangs, rather than blocking the CI pipeline indefinitely

Read more