Mocking Temporal Activities and Side Effects in Go and Python

Mocking Temporal Activities and Side Effects in Go and Python

Temporal activities are where your workflow touches the outside world — HTTP calls, database writes, email sends. Testing workflows in isolation means replacing activities with mocks that return predictable values without making real external calls.

Why Mock Activities

Activities in Temporal are the "impure" part of the system — they perform I/O with retry policies and timeouts. When testing workflows:

  • Real activities are slow: a payment API might take 500ms; tests should be milliseconds
  • Real activities are flaky: network errors, rate limits, and test database state make tests unreliable
  • Real activities have side effects: you don't want to charge real credit cards in tests
  • Real activities require infrastructure: running a full environment (databases, APIs, queues) adds complexity

Mocking activities solves all of these.

Go SDK Activity Mocking

Basic Mock

import (
    "github.com/stretchr/testify/mock"
    "go.temporal.io/sdk/testsuite"
)

type ChargePaymentSuite struct {
    suite.Suite
    testsuite.WorkflowTestSuite
    env *testsuite.TestWorkflowEnvironment
}

func (s *ChargePaymentSuite) SetupTest() {
    s.env = s.NewTestWorkflowEnvironment()
}

func (s *ChargePaymentSuite) Test_ChargePayment() {
    // Mock the activity by function reference
    s.env.OnActivity(ChargePaymentActivity, mock.Anything, ChargeInput{
        CustomerID: "cust-123",
        Amount:     49.99,
        Currency:   "USD",
    }).Return(ChargeResult{
        ChargeID: "ch_abc123",
        Status:   "succeeded",
    }, nil)

    s.env.ExecuteWorkflow(SubscriptionWorkflow, SubscriptionInput{
        CustomerID: "cust-123",
        PlanID:     "plan-monthly",
    })

    s.True(s.env.IsWorkflowCompleted())
    s.NoError(s.env.GetWorkflowError())
}

Mock with Dynamic Return

When you need the mock to return different values based on input, use a function:

func (s *ChargePaymentSuite) Test_RetryOnDecline() {
    callCount := 0

    s.env.OnActivity(ChargePaymentActivity, mock.Anything, mock.Anything).
        Return(func(ctx context.Context, input ChargeInput) (ChargeResult, error) {
            callCount++
            switch callCount {
            case 1:
                // First attempt: card declined
                return ChargeResult{}, temporal.NewApplicationError(
                    "card_declined",
                    "CARD_DECLINED",
                )
            case 2:
                // Second attempt (different card): succeeded
                return ChargeResult{ChargeID: "ch_xyz789", Status: "succeeded"}, nil
            default:
                s.Fail("unexpected call count: %d", callCount)
                return ChargeResult{}, nil
            }
        })

    s.env.ExecuteWorkflow(SubscriptionWorkflow, SubscriptionInput{
        CustomerID: "cust-123",
        PlanID:     "plan-monthly",
    })

    s.True(s.env.IsWorkflowCompleted())
    s.Equal(2, callCount)
}

Matching Any Argument

// Match any context but specific struct field
s.env.OnActivity(SendEmailActivity, mock.Anything, mock.MatchedBy(func(input EmailInput) bool {
    return input.To == "user@example.com" && input.Template == "welcome"
})).Return(nil)

Expecting Activity Not Called

func (s *ChargePaymentSuite) Test_FreeTrialSkipsPayment() {
    // Assert that payment activity is never called for free trial users
    s.env.OnActivity(ChargePaymentActivity, mock.Anything, mock.Anything).
        Times(0) // must not be called

    s.env.ExecuteWorkflow(SubscriptionWorkflow, SubscriptionInput{
        CustomerID: "cust-123",
        PlanID:     "plan-free-trial",
    })

    s.True(s.env.IsWorkflowCompleted())
}

Python SDK Activity Mocking

Using Mock Activities

In Python, replace real activity implementations with mocks by registering different functions with the worker:

import pytest
from unittest.mock import AsyncMock, patch
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

# Real activity (not called in tests)
@activity.defn
async def charge_payment(customer_id: str, amount: float) -> str:
    response = await payment_api.charge(customer_id, amount)
    return response.charge_id

# Mock activity for testing
@activity.defn(name="charge_payment")  # same name as real activity
async def mock_charge_payment(customer_id: str, amount: float) -> str:
    if customer_id == "fail-customer":
        raise ApplicationError("Card declined", "CARD_DECLINED")
    return "ch_mock_123"

@pytest.mark.asyncio
async def test_subscription_workflow():
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue="test-queue",
            workflows=[SubscriptionWorkflow],
            activities=[mock_charge_payment],  # use mock, not real
        ):
            result = await env.client.execute_workflow(
                SubscriptionWorkflow.run,
                args=["cust-123", "plan-monthly"],
                id="test-sub-123",
                task_queue="test-queue",
            )
            assert result["charge_id"] == "ch_mock_123"

Parametrized Activity Behavior

@pytest.mark.asyncio
@pytest.mark.parametrize("customer_id,expected_charge_id,should_succeed", [
    ("valid-customer", "ch_abc123", True),
    ("blocked-customer", None, False),
    ("new-customer", "ch_xyz789", True),
])
async def test_subscription_scenarios(customer_id, expected_charge_id, should_succeed):
    charged = {}

    @activity.defn(name="charge_payment")
    async def mock_charge(cust_id: str, amount: float) -> str:
        if not should_succeed:
            raise ApplicationError("payment blocked", "BLOCKED")
        charge_id = expected_charge_id
        charged[cust_id] = charge_id
        return charge_id

    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(env.client, task_queue="test-queue",
                         workflows=[SubscriptionWorkflow], activities=[mock_charge]):
            if should_succeed:
                result = await env.client.execute_workflow(
                    SubscriptionWorkflow.run,
                    args=[customer_id, "plan-monthly"],
                    id=f"test-{customer_id}",
                    task_queue="test-queue",
                )
                assert result["charge_id"] == expected_charge_id
                assert customer_id in charged
            else:
                with pytest.raises(WorkflowFailureError):
                    await env.client.execute_workflow(
                        SubscriptionWorkflow.run,
                        args=[customer_id, "plan-monthly"],
                        id=f"test-{customer_id}",
                        task_queue="test-queue",
                    )

Mocking Side Effects Within Activities

Sometimes you want to test the activity itself, not just mock it at the workflow level. Mock the underlying clients:

func TestChargePaymentActivity_Success(t *testing.T) {
    // Create a mock Stripe client
    mockStripe := &MockStripeClient{}
    mockStripe.On("CreateCharge", mock.Anything, "cust-123", 4999).
        Return(&stripe.Charge{ID: "ch_abc123", Status: "succeeded"}, nil)

    // Inject the mock into the activity
    activity := &PaymentActivity{
        stripe: mockStripe,
    }

    testEnv := testsuite.TestActivityEnvironment{}
    testEnv.RegisterActivity(activity)

    val, err := testEnv.ExecuteActivity(activity.ChargePayment, ChargeInput{
        CustomerID: "cust-123",
        Amount:     49.99,
    })

    require.NoError(t, err)

    var result ChargeResult
    require.NoError(t, val.Get(&result))
    assert.Equal(t, "ch_abc123", result.ChargeID)
    mockStripe.AssertExpectations(t)
}

TestActivityEnvironment

The SDK provides TestActivityEnvironment for testing activities in isolation:

func TestSendEmailActivity(t *testing.T) {
    testSuite := testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestActivityEnvironment()

    // Register a mock for context-injected heartbeat
    env.SetTestTimeout(3 * time.Second)

    mockMailer := &MockMailer{}
    mockMailer.On("Send", "user@example.com", "Welcome!", mock.Anything).
        Return(nil)

    env.RegisterActivity(&EmailActivity{mailer: mockMailer})

    val, err := env.ExecuteActivity(SendEmailActivity, EmailInput{
        To:      "user@example.com",
        Subject: "Welcome!",
        Body:    "Thanks for signing up.",
    })

    require.NoError(t, err)
    mockMailer.AssertExpectations(t)
    _ = val
}

Controlling Mock Expectations

Ordered Calls

// Enforce call order — email must be sent before SMS
firstCall := s.env.OnActivity(SendEmailActivity, mock.Anything, mock.Anything).
    Return(nil).Once()

s.env.OnActivity(SendSMSActivity, mock.Anything, mock.Anything).
    Return(nil).Once().NotBefore(firstCall)

Times

// Expect exactly N calls
s.env.OnActivity(PollStatusActivity, mock.Anything, mock.Anything).
    Return(StatusResult{State: "processing"}, nil).Times(3)

// Expect at least 1 call
s.env.OnActivity(SendAlertActivity, mock.Anything, mock.Anything).
    Return(nil).MinTimes(1)

Maybe (optional)

// Activity may or may not be called — don't fail if it's not
s.env.OnActivity(CleanupActivity, mock.Anything, mock.Anything).
    Return(nil).Maybe()

Testing Non-Deterministic Side Effects

Temporal workflows must be deterministic — the same history must produce the same result on replay. Use workflow.SideEffect for non-deterministic operations and mock them:

// In your workflow
func OrderWorkflow(ctx workflow.Context, input OrderInput) (OrderResult, error) {
    var orderID string
    // SideEffect records the result in history — deterministic on replay
    encoded := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
        return generateUniqueID() // called once, result stored in history
    })
    encoded.Get(&orderID)
    // ...
}

In tests, SideEffect is called normally — the test environment records the result. If you need to control the output, wrap generateUniqueID as a mockable function.

Asserting Activity Arguments

Always verify the arguments activities receive, not just that they were called:

func (s *ChargePaymentSuite) Test_ActivityReceivesCorrectAmount() {
    s.env.OnActivity(ChargePaymentActivity, mock.Anything, mock.MatchedBy(func(input ChargeInput) bool {
        s.Equal("cust-123", input.CustomerID)
        s.InDelta(49.99, input.Amount, 0.001)
        s.Equal("USD", input.Currency)
        return true
    })).Return(ChargeResult{ChargeID: "ch_abc", Status: "succeeded"}, nil)

    s.env.ExecuteWorkflow(SubscriptionWorkflow, SubscriptionInput{
        CustomerID: "cust-123",
        PlanID:     "plan-monthly-49.99",
    })

    s.True(s.env.IsWorkflowCompleted())
}

End-to-End Workflow Verification

Activity mocks verify that your workflow logic is correct in isolation. End-to-end tests verify the full system — the API that submits workflows, the Temporal server that orchestrates them, the workers that run activities, and the databases they write to. HelpMeTest tests the full chain:

Scenario: subscription activation flow
  Given a user upgrades their plan via the dashboard
  When Temporal processes the subscription workflow
  Then the user's plan is updated in the database within 60 seconds
  And a welcome email is delivered
  And the Stripe charge appears on the account

This catches integration failures that mocked tests never see.

Key Takeaways

  • Mock activities with OnActivity(activityFunc, args...) — use mock.Anything for the context, be specific with business inputs
  • Use dynamic return functions (not fixed values) when you need to verify retry behavior or call count
  • TestActivityEnvironment tests activities in isolation with mocked clients — verify the activity itself, not just the workflow's interaction with it
  • Assert on activity arguments with mock.MatchedBy — checking that an activity was called is not enough
  • Run env.AssertExpectations(t) in TearDownTest to catch missing or unexpected activity calls

Read more