Testing Temporal.io Workflows: Unit, Integration, and Replay Tests

Testing Temporal.io Workflows: Unit, Integration, and Replay Tests

Temporal workflows coordinate long-running, fault-tolerant business processes. Testing them requires a different approach than testing regular functions — workflows encode history, run across multiple servers, and must be deterministic across replays.

Why Temporal Testing Is Different

A Temporal workflow is a durable function that can pause, resume after failures, and replay from its event history. This creates unique testing challenges:

  • Determinism: workflows must produce the same result when replayed from history
  • Time: workflows can sleep for days — tests can't wait that long
  • Activities: external calls (APIs, databases) must be mockable
  • History: testing recovery scenarios requires simulating partial execution

Temporal SDKs provide TestWorkflowEnvironment for exactly these challenges.

TestWorkflowEnvironment

TestWorkflowEnvironment is a simulated Temporal server that runs in-process. It handles workflow execution, activity dispatch, timers, and signals without a real server.

Go SDK

package workflow_test

import (
    "testing"
    "time"

    "github.com/stretchr/testify/mock"
    "github.com/stretchr/testify/suite"
    "go.temporal.io/sdk/testsuite"
    "go.temporal.io/sdk/workflow"
)

type OrderWorkflowSuite struct {
    suite.Suite
    testsuite.WorkflowTestSuite
    env *testsuite.TestWorkflowEnvironment
}

func (s *OrderWorkflowSuite) SetupTest() {
    s.env = s.NewTestWorkflowEnvironment()
}

func (s *OrderWorkflowSuite) TearDownTest() {
    s.env.AssertExpectations(s.T())
}

func (s *OrderWorkflowSuite) Test_OrderWorkflow_HappyPath() {
    // Mock the ProcessPayment activity
    s.env.OnActivity(ProcessPaymentActivity, mock.Anything, PaymentInput{
        OrderID: "order-123",
        Amount:  99.99,
    }).Return(PaymentResult{TransactionID: "txn-456"}, nil)

    // Mock the SendConfirmationActivity
    s.env.OnActivity(SendConfirmationActivity, mock.Anything, mock.Anything).
        Return(nil)

    // Execute the workflow
    s.env.ExecuteWorkflow(OrderWorkflow, OrderInput{
        OrderID: "order-123",
        Amount:  99.99,
    })

    s.True(s.env.IsWorkflowCompleted())
    s.NoError(s.env.GetWorkflowError())

    var result OrderResult
    s.NoError(s.env.GetWorkflowResult(&result))
    s.Equal("txn-456", result.TransactionID)
    s.Equal("completed", result.Status)
}

func TestOrderWorkflow(t *testing.T) {
    suite.Run(t, new(OrderWorkflowSuite))
}

Python SDK

import pytest
from temporalio import activity, workflow
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

@activity.defn
async def process_payment(order_id: str, amount: float) -> str:
    # Real implementation calls payment API
    ...

@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, order_id: str, amount: float) -> str:
        transaction_id = await workflow.execute_activity(
            process_payment,
            args=[order_id, amount],
            start_to_close_timeout=timedelta(seconds=30),
        )
        return transaction_id

@pytest.mark.asyncio
async def test_order_workflow():
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue="test-queue",
            workflows=[OrderWorkflow],
            activities=[process_payment],
        ):
            # Override activity with mock
            result = await env.client.execute_workflow(
                OrderWorkflow.run,
                args=["order-123", 99.99],
                id="test-order-123",
                task_queue="test-queue",
            )
            assert result == "txn-456"

Testing Timers and Sleep

Temporal's workflow.Sleep() can be days or weeks in production. TestWorkflowEnvironment provides time-skipping:

func (s *OrderWorkflowSuite) Test_RetryAfterDelay() {
    callCount := 0

    s.env.OnActivity(ProcessPaymentActivity, mock.Anything, mock.Anything).
        Return(func(ctx context.Context, input PaymentInput) (PaymentResult, error) {
            callCount++
            if callCount == 1 {
                return PaymentResult{}, errors.New("gateway timeout")
            }
            return PaymentResult{TransactionID: "txn-456"}, nil
        })

    s.env.ExecuteWorkflow(OrderWorkflowWithRetry, OrderInput{
        OrderID: "order-123",
    })

    // Time is automatically skipped past the retry delay
    s.True(s.env.IsWorkflowCompleted())
    s.Equal(2, callCount)
}

Registering a Timer Callback

func (s *OrderWorkflowSuite) Test_WorkflowTimesOut() {
    // Register a callback that fires after 24 hours
    s.env.RegisterDelayedCallback(func() {
        // Simulate no response — workflow should handle this
    }, 24*time.Hour)

    s.env.OnActivity(CheckOrderStatusActivity, mock.Anything, mock.Anything).
        Return(OrderStatus{State: "pending"}, nil).Times(1)

    s.env.ExecuteWorkflow(OrderPollingWorkflow, "order-123")

    s.True(s.env.IsWorkflowCompleted())
    var err *temporal.ApplicationError
    s.True(errors.As(s.env.GetWorkflowError(), &err))
    s.Equal("ORDER_TIMEOUT", err.Type())
}

Testing Signals

Workflows can receive signals while running. Test that your workflow responds correctly:

func (s *OrderWorkflowSuite) Test_WorkflowHandlesCancel() {
    s.env.OnActivity(ProcessPaymentActivity, mock.Anything, mock.Anything).
        Return(PaymentResult{}, nil).Maybe()

    s.env.RegisterDelayedCallback(func() {
        // Send a cancel signal after 1 second
        s.env.SignalWorkflow("cancel", CancelRequest{Reason: "user cancelled"})
    }, time.Second)

    s.env.ExecuteWorkflow(OrderWorkflow, OrderInput{OrderID: "order-123"})

    s.True(s.env.IsWorkflowCompleted())
    var result OrderResult
    s.NoError(s.env.GetWorkflowResult(&result))
    s.Equal("cancelled", result.Status)
}

Integration Tests Against a Real Temporal Server

For integration testing, use Temporal's test server or a Docker-based environment:

// integration_test.go
//go:build integration

package workflow_test

import (
    "context"
    "testing"

    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"
)

func TestOrderWorkflow_Integration(t *testing.T) {
    c, err := client.Dial(client.Options{
        HostPort: "localhost:7233",
    })
    if err != nil {
        t.Fatalf("failed to connect to Temporal: %v", err)
    }
    defer c.Close()

    w := worker.New(c, "integration-test-queue", worker.Options{})
    w.RegisterWorkflow(OrderWorkflow)
    w.RegisterActivity(ProcessPaymentActivity)
    w.RegisterActivity(SendConfirmationActivity)

    if err := w.Start(); err != nil {
        t.Fatalf("failed to start worker: %v", err)
    }
    defer w.Stop()

    ctx := context.Background()
    run, err := c.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
        ID:        "integration-test-order-123",
        TaskQueue: "integration-test-queue",
    }, OrderWorkflow, OrderInput{OrderID: "order-123", Amount: 99.99})

    if err != nil {
        t.Fatalf("failed to start workflow: %v", err)
    }

    var result OrderResult
    if err := run.Get(ctx, &result); err != nil {
        t.Fatalf("workflow failed: %v", err)
    }

    if result.Status != "completed" {
        t.Errorf("expected completed, got %s", result.Status)
    }
}

Start Temporal for integration tests:

# Using Temporal CLI
temporal server start-dev

<span class="hljs-comment"># Using Docker
docker run -p 7233:7233 temporalio/temporal:latest

<span class="hljs-comment"># Run integration tests
go <span class="hljs-built_in">test -tags=integration -run TestOrderWorkflow_Integration ./...

Replay Tests

Replay tests verify that code changes don't break determinism for workflows currently running in production. They replay a workflow's history file against your new code:

func Test_ReplayOrderWorkflow(t *testing.T) {
    // Download history from production workflow
    // temporal workflow show --workflow-id prod-order-456 --output json > testdata/order_history.json

    replayer := worker.NewWorkflowReplayer()
    replayer.RegisterWorkflow(OrderWorkflow)

    err := replayer.ReplayWorkflowHistoryFromJSONFile(
        zaptest.NewLogger(t),
        "testdata/order_history.json",
    )

    if err != nil {
        t.Fatalf("replay failed (breaking change detected): %v", err)
    }
}

Replay tests should be part of your CI for any workflow code change. A failing replay test means your change would break in-flight workflows in production.

Testing Activity Failures and Retries

func (s *OrderWorkflowSuite) Test_ActivityRetryOnTransientError() {
    callCount := 0

    s.env.OnActivity(ProcessPaymentActivity, mock.Anything, mock.Anything).
        Return(func(ctx context.Context, input PaymentInput) (PaymentResult, error) {
            callCount++
            if callCount <= 2 {
                // Return retryable error
                return PaymentResult{}, temporal.NewApplicationError(
                    "gateway timeout",
                    "GATEWAY_TIMEOUT",
                )
            }
            return PaymentResult{TransactionID: "txn-456"}, nil
        })

    s.env.ExecuteWorkflow(OrderWorkflow, OrderInput{OrderID: "order-123"})

    s.True(s.env.IsWorkflowCompleted())
    s.NoError(s.env.GetWorkflowError())
    s.Equal(3, callCount) // failed twice, succeeded on third
}

Testing Queries

Workflows can expose query handlers to return current state:

func (s *OrderWorkflowSuite) Test_WorkflowQueryState() {
    // Block the workflow on a sleep
    s.env.RegisterDelayedCallback(func() {
        // Query the workflow state while it's sleeping
        encodedState, err := s.env.QueryWorkflow("getStatus")
        s.NoError(err)

        var status string
        s.NoError(encodedState.Get(&status))
        s.Equal("processing", status)
    }, time.Second)

    s.env.OnActivity(ProcessPaymentActivity, mock.Anything, mock.Anything).
        Return(PaymentResult{TransactionID: "txn-456"}, nil)

    s.env.ExecuteWorkflow(OrderWorkflow, OrderInput{OrderID: "order-123"})
    s.True(s.env.IsWorkflowCompleted())
}

End-to-End Workflow Testing

Temporal tests verify workflow logic and activity behavior. End-to-end testing verifies that your system — the API that triggers workflows, the worker that processes them, and the downstream effects — works correctly together. HelpMeTest can test the full chain:

Scenario: order placed end-to-end
  Given a customer places an order via the API
  When the Temporal workflow processes it
  Then the order status changes to "confirmed" within 30 seconds
  And the customer receives a confirmation email
  And the inventory is decremented

This catches integration failures between your API, Temporal server, workers, and external systems that unit tests won't catch.

Key Takeaways

  • Use TestWorkflowEnvironment for unit tests — it runs in-process, skips time automatically, and mocks activities cleanly
  • Write replay tests for every workflow change that runs in production — a failed replay means production workflows will break
  • Test signals and queries with RegisterDelayedCallback to simulate external events mid-execution
  • Run integration tests against a real Temporal server for deployment verification
  • Use build tags (//go:build integration) to separate fast unit tests from slower integration tests

Read more