Testing Temporal.io Workflows: Unit, Integration, and Replay Tests
Temporal workflows coordinate long-running, fault-tolerant business processes. Testing them requires a different approach than testing regular functions — workflows encode history, run across multiple servers, and must be deterministic across replays.
Why Temporal Testing Is Different
A Temporal workflow is a durable function that can pause, resume after failures, and replay from its event history. This creates unique testing challenges:
- Determinism: workflows must produce the same result when replayed from history
- Time: workflows can sleep for days — tests can't wait that long
- Activities: external calls (APIs, databases) must be mockable
- History: testing recovery scenarios requires simulating partial execution
Temporal SDKs provide TestWorkflowEnvironment for exactly these challenges.
TestWorkflowEnvironment
TestWorkflowEnvironment is a simulated Temporal server that runs in-process. It handles workflow execution, activity dispatch, timers, and signals without a real server.
Go SDK
package workflow_test
import (
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/sdk/workflow"
)
type OrderWorkflowSuite struct {
suite.Suite
testsuite.WorkflowTestSuite
env *testsuite.TestWorkflowEnvironment
}
func (s *OrderWorkflowSuite) SetupTest() {
s.env = s.NewTestWorkflowEnvironment()
}
func (s *OrderWorkflowSuite) TearDownTest() {
s.env.AssertExpectations(s.T())
}
func (s *OrderWorkflowSuite) Test_OrderWorkflow_HappyPath() {
// Mock the ProcessPayment activity
s.env.OnActivity(ProcessPaymentActivity, mock.Anything, PaymentInput{
OrderID: "order-123",
Amount: 99.99,
}).Return(PaymentResult{TransactionID: "txn-456"}, nil)
// Mock the SendConfirmationActivity
s.env.OnActivity(SendConfirmationActivity, mock.Anything, mock.Anything).
Return(nil)
// Execute the workflow
s.env.ExecuteWorkflow(OrderWorkflow, OrderInput{
OrderID: "order-123",
Amount: 99.99,
})
s.True(s.env.IsWorkflowCompleted())
s.NoError(s.env.GetWorkflowError())
var result OrderResult
s.NoError(s.env.GetWorkflowResult(&result))
s.Equal("txn-456", result.TransactionID)
s.Equal("completed", result.Status)
}
func TestOrderWorkflow(t *testing.T) {
suite.Run(t, new(OrderWorkflowSuite))
}Python SDK
import pytest
from temporalio import activity, workflow
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
@activity.defn
async def process_payment(order_id: str, amount: float) -> str:
# Real implementation calls payment API
...
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order_id: str, amount: float) -> str:
transaction_id = await workflow.execute_activity(
process_payment,
args=[order_id, amount],
start_to_close_timeout=timedelta(seconds=30),
)
return transaction_id
@pytest.mark.asyncio
async def test_order_workflow():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue="test-queue",
workflows=[OrderWorkflow],
activities=[process_payment],
):
# Override activity with mock
result = await env.client.execute_workflow(
OrderWorkflow.run,
args=["order-123", 99.99],
id="test-order-123",
task_queue="test-queue",
)
assert result == "txn-456"Testing Timers and Sleep
Temporal's workflow.Sleep() can be days or weeks in production. TestWorkflowEnvironment provides time-skipping:
func (s *OrderWorkflowSuite) Test_RetryAfterDelay() {
callCount := 0
s.env.OnActivity(ProcessPaymentActivity, mock.Anything, mock.Anything).
Return(func(ctx context.Context, input PaymentInput) (PaymentResult, error) {
callCount++
if callCount == 1 {
return PaymentResult{}, errors.New("gateway timeout")
}
return PaymentResult{TransactionID: "txn-456"}, nil
})
s.env.ExecuteWorkflow(OrderWorkflowWithRetry, OrderInput{
OrderID: "order-123",
})
// Time is automatically skipped past the retry delay
s.True(s.env.IsWorkflowCompleted())
s.Equal(2, callCount)
}Registering a Timer Callback
func (s *OrderWorkflowSuite) Test_WorkflowTimesOut() {
// Register a callback that fires after 24 hours
s.env.RegisterDelayedCallback(func() {
// Simulate no response — workflow should handle this
}, 24*time.Hour)
s.env.OnActivity(CheckOrderStatusActivity, mock.Anything, mock.Anything).
Return(OrderStatus{State: "pending"}, nil).Times(1)
s.env.ExecuteWorkflow(OrderPollingWorkflow, "order-123")
s.True(s.env.IsWorkflowCompleted())
var err *temporal.ApplicationError
s.True(errors.As(s.env.GetWorkflowError(), &err))
s.Equal("ORDER_TIMEOUT", err.Type())
}Testing Signals
Workflows can receive signals while running. Test that your workflow responds correctly:
func (s *OrderWorkflowSuite) Test_WorkflowHandlesCancel() {
s.env.OnActivity(ProcessPaymentActivity, mock.Anything, mock.Anything).
Return(PaymentResult{}, nil).Maybe()
s.env.RegisterDelayedCallback(func() {
// Send a cancel signal after 1 second
s.env.SignalWorkflow("cancel", CancelRequest{Reason: "user cancelled"})
}, time.Second)
s.env.ExecuteWorkflow(OrderWorkflow, OrderInput{OrderID: "order-123"})
s.True(s.env.IsWorkflowCompleted())
var result OrderResult
s.NoError(s.env.GetWorkflowResult(&result))
s.Equal("cancelled", result.Status)
}Integration Tests Against a Real Temporal Server
For integration testing, use Temporal's test server or a Docker-based environment:
// integration_test.go
//go:build integration
package workflow_test
import (
"context"
"testing"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func TestOrderWorkflow_Integration(t *testing.T) {
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
})
if err != nil {
t.Fatalf("failed to connect to Temporal: %v", err)
}
defer c.Close()
w := worker.New(c, "integration-test-queue", worker.Options{})
w.RegisterWorkflow(OrderWorkflow)
w.RegisterActivity(ProcessPaymentActivity)
w.RegisterActivity(SendConfirmationActivity)
if err := w.Start(); err != nil {
t.Fatalf("failed to start worker: %v", err)
}
defer w.Stop()
ctx := context.Background()
run, err := c.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
ID: "integration-test-order-123",
TaskQueue: "integration-test-queue",
}, OrderWorkflow, OrderInput{OrderID: "order-123", Amount: 99.99})
if err != nil {
t.Fatalf("failed to start workflow: %v", err)
}
var result OrderResult
if err := run.Get(ctx, &result); err != nil {
t.Fatalf("workflow failed: %v", err)
}
if result.Status != "completed" {
t.Errorf("expected completed, got %s", result.Status)
}
}Start Temporal for integration tests:
# Using Temporal CLI
temporal server start-dev
<span class="hljs-comment"># Using Docker
docker run -p 7233:7233 temporalio/temporal:latest
<span class="hljs-comment"># Run integration tests
go <span class="hljs-built_in">test -tags=integration -run TestOrderWorkflow_Integration ./...Replay Tests
Replay tests verify that code changes don't break determinism for workflows currently running in production. They replay a workflow's history file against your new code:
func Test_ReplayOrderWorkflow(t *testing.T) {
// Download history from production workflow
// temporal workflow show --workflow-id prod-order-456 --output json > testdata/order_history.json
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(OrderWorkflow)
err := replayer.ReplayWorkflowHistoryFromJSONFile(
zaptest.NewLogger(t),
"testdata/order_history.json",
)
if err != nil {
t.Fatalf("replay failed (breaking change detected): %v", err)
}
}Replay tests should be part of your CI for any workflow code change. A failing replay test means your change would break in-flight workflows in production.
Testing Activity Failures and Retries
func (s *OrderWorkflowSuite) Test_ActivityRetryOnTransientError() {
callCount := 0
s.env.OnActivity(ProcessPaymentActivity, mock.Anything, mock.Anything).
Return(func(ctx context.Context, input PaymentInput) (PaymentResult, error) {
callCount++
if callCount <= 2 {
// Return retryable error
return PaymentResult{}, temporal.NewApplicationError(
"gateway timeout",
"GATEWAY_TIMEOUT",
)
}
return PaymentResult{TransactionID: "txn-456"}, nil
})
s.env.ExecuteWorkflow(OrderWorkflow, OrderInput{OrderID: "order-123"})
s.True(s.env.IsWorkflowCompleted())
s.NoError(s.env.GetWorkflowError())
s.Equal(3, callCount) // failed twice, succeeded on third
}Testing Queries
Workflows can expose query handlers to return current state:
func (s *OrderWorkflowSuite) Test_WorkflowQueryState() {
// Block the workflow on a sleep
s.env.RegisterDelayedCallback(func() {
// Query the workflow state while it's sleeping
encodedState, err := s.env.QueryWorkflow("getStatus")
s.NoError(err)
var status string
s.NoError(encodedState.Get(&status))
s.Equal("processing", status)
}, time.Second)
s.env.OnActivity(ProcessPaymentActivity, mock.Anything, mock.Anything).
Return(PaymentResult{TransactionID: "txn-456"}, nil)
s.env.ExecuteWorkflow(OrderWorkflow, OrderInput{OrderID: "order-123"})
s.True(s.env.IsWorkflowCompleted())
}End-to-End Workflow Testing
Temporal tests verify workflow logic and activity behavior. End-to-end testing verifies that your system — the API that triggers workflows, the worker that processes them, and the downstream effects — works correctly together. HelpMeTest can test the full chain:
Scenario: order placed end-to-end
Given a customer places an order via the API
When the Temporal workflow processes it
Then the order status changes to "confirmed" within 30 seconds
And the customer receives a confirmation email
And the inventory is decrementedThis catches integration failures between your API, Temporal server, workers, and external systems that unit tests won't catch.
Key Takeaways
- Use
TestWorkflowEnvironmentfor unit tests — it runs in-process, skips time automatically, and mocks activities cleanly - Write replay tests for every workflow change that runs in production — a failed replay means production workflows will break
- Test signals and queries with
RegisterDelayedCallbackto simulate external events mid-execution - Run integration tests against a real Temporal server for deployment verification
- Use build tags (
//go:build integration) to separate fast unit tests from slower integration tests