Apache Pekko Actor Testing Patterns and Best Practices

Apache Pekko Actor Testing Patterns and Best Practices

Apache Pekko is the community-maintained fork of Akka (post-BSL license change). Its testing API mirrors Akka's closely, with the same ActorTestKit, BehaviorTestKit, and TestProbe tools — making migration from Akka to Pekko straightforward. This guide focuses on Pekko Typed actor testing patterns.

Setup

// build.sbt
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-actor-typed"         % "1.1.2",
  "org.apache.pekko" %% "pekko-actor-testkit-typed" % "1.1.2" % Test,
  "org.scalatest"    %% "scalatest"                 % "3.2.17" % Test
)

ActorTestKit Overview

ActorTestKit creates a real Pekko actor system for async testing. Use it for most integration-level actor tests:

import org.apache.pekko.actor.testkit.typed.scaladsl.ActorTestKit
import org.scalatest.BeforeAndAfterAll
import org.scalatest.wordspec.AnyWordSpec

class UserRegistrySpec extends AnyWordSpec with BeforeAndAfterAll {
  val testKit = ActorTestKit()

  override def afterAll(): Unit = testKit.shutdownTestKit()

  "UserRegistry" should {
    "register a user and return their ID" in {
      val registry = testKit.spawn(UserRegistry())
      val probe    = testKit.createTestProbe[UserRegistry.Response]()

      registry ! UserRegistry.Register("alice@example.com", probe.ref)

      val response = probe.receiveMessage()
      assert(response.isInstanceOf[UserRegistry.Registered])
    }
  }
}

ActorTestKit Configuration

// Custom configuration
val config = ConfigFactory.parseString("""
  pekko.loglevel = "WARNING"
  pekko.actor.provider = "local"
""")

val testKit = ActorTestKit(ActorTestKitBase.testNameFromCallStack(), config)

BehaviorTestKit — Synchronous Testing

BehaviorTestKit runs behaviors synchronously in a single thread — ideal for testing message handling logic without concurrency:

import org.apache.pekko.actor.testkit.typed.scaladsl.BehaviorTestKit
import org.apache.pekko.actor.testkit.typed.scaladsl.TestInbox

class CounterSpec extends AnyWordSpec {
  "Counter" should {
    "increment count on Increment message" in {
      val testKit = BehaviorTestKit(Counter())
      val inbox   = TestInbox[Counter.State]()

      testKit.run(Counter.Increment)
      testKit.run(Counter.Increment)
      testKit.run(Counter.GetState(inbox.ref))

      inbox.expectMessage(Counter.State(count = 2))
    }

    "reset to zero on Reset" in {
      val testKit = BehaviorTestKit(Counter())
      val inbox   = TestInbox[Counter.State]()

      testKit.run(Counter.Increment)
      testKit.run(Counter.Increment)
      testKit.run(Counter.Reset)
      testKit.run(Counter.GetState(inbox.ref))

      inbox.expectMessage(Counter.State(count = 0))
    }
  }
}

BehaviorTestKit is synchronous — all messages process before the next line runs. Use it for pure behavior logic tests. For concurrency and timing tests, use ActorTestKit.

Testing Child Actor Spawning

"ParentActor" should {
  "spawn a child worker on CreateWorker" in {
    val testKit = BehaviorTestKit(ParentActor())

    testKit.run(ParentActor.CreateWorker("worker-1"))

    // Verify the spawn effect
    val spawnedEffect = testKit.expectEffectType[Effects.Spawned[Worker.Command]]
    assertEquals(spawnedEffect.childName, "worker-1")
  }

  "send message to child after spawning" in {
    val testKit = BehaviorTestKit(ParentActor())
    testKit.run(ParentActor.CreateWorker("w-1"))

    // Get reference to child's inbox
    val childInbox = testKit.childInbox[Worker.Command]("w-1")

    testKit.run(ParentActor.StartWorker("w-1"))
    childInbox.expectMessage(Worker.Start)
  }
}

TestProbe Patterns

TestProbe is a fake actor that records all received messages:

"OrderProcessor" should {
  "coordinate between inventory and payment" in {
    val inventory = testKit.createTestProbe[Inventory.Command]()
    val payment   = testKit.createTestProbe[Payment.Command]()
    val replyTo   = testKit.createTestProbe[OrderProcessor.Response]()

    val processor = testKit.spawn(
      OrderProcessor(inventory.ref, payment.ref)
    )

    processor ! OrderProcessor.PlaceOrder(
      itemId = "item-1",
      quantity = 2,
      replyTo = replyTo.ref
    )

    // Verify inventory check
    val stockCheck = inventory.receiveMessage()
    assert(stockCheck.isInstanceOf[Inventory.CheckStock])
    inventory.lastSender ! Inventory.StockAvailable("item-1", 10)

    // Verify payment charge
    val charge = payment.receiveMessage()
    assert(charge.isInstanceOf[Payment.Charge])
    payment.lastSender ! Payment.Charged("tx-456")

    // Verify order confirmation
    replyTo.expectMessage(OrderProcessor.OrderConfirmed("tx-456"))
  }
}

Probe Expectations

val probe = testKit.createTestProbe[MyMessage]()

// Exact message
probe.expectMessage(MyMessage("hello"))

// With timeout
probe.expectMessage(5.seconds, MyMessage("hello"))

// Type match
probe.expectMessageType[MyMessage]

// No message expected
probe.expectNoMessage(200.milliseconds)

// Receive N messages
val messages = probe.receiveMessages(3)

// Receive within duration
val message = probe.receiveMessage(2.seconds)

Supervision Testing

Test what happens when actors crash:

"WorkerSupervisor" should {
  "restart worker on ArithmeticException" in {
    val supervisor = testKit.spawn(WorkerSupervisor())
    val probe      = testKit.createTestProbe[Any]()

    supervisor ! WorkerSupervisor.CreateWorker(probe.ref)
    val worker = probe.receiveMessage().asInstanceOf[ActorRef[Worker.Command]]

    // Watch for termination
    val deathWatch = testKit.createDeathProbe()
    deathWatch.watch(worker)

    // Trigger exception
    worker ! Worker.DivideByZero

    // Worker restarts, not terminates
    deathWatch.expectNoTermination(500.milliseconds)

    // Worker recovers
    val replyProbe = testKit.createTestProbe[Int]()
    worker ! Worker.Calculate(10, 2, replyProbe.ref)
    replyProbe.expectMessage(5)
  }

  "stop worker on unrecoverable error" in {
    val supervisor = testKit.spawn(WorkerSupervisor())
    val probe      = testKit.createTestProbe[Any]()

    supervisor ! WorkerSupervisor.CreateWorker(probe.ref)
    val worker = probe.receiveMessage().asInstanceOf[ActorRef[Worker.Command]]

    val deathProbe = testKit.createDeathProbe()
    deathProbe.watch(worker)

    worker ! Worker.FatalError

    deathProbe.expectTerminated(worker, 2.seconds)
  }
}

Testing Timers and Scheduled Messages

Pekko actors use timers.startTimerWithFixedDelay for scheduling. Control time in tests with ManualTime:

import org.apache.pekko.actor.testkit.typed.scaladsl.ManualTime

"HeartbeatActor" should {
  "send heartbeat every second" in {
    val manualTime = ManualTime()
    val probe      = testKit.createTestProbe[Heartbeat]()

    testKit.spawn(
      HeartbeatActor(interval = 1.second, target = probe.ref)
    )

    probe.expectNoMessage(100.milliseconds)

    manualTime.timePasses(1.second)
    probe.expectMessage(Heartbeat)

    manualTime.timePasses(1.second)
    probe.expectMessage(Heartbeat)
  }
}

ManualTime prevents real-time sleeping in timer tests — essential for tests involving minutes or hours of simulated time.

State Machine Testing

For complex FSM-style behaviors:

sealed trait TrafficLightState
case object Red extends TrafficLightState
case object Yellow extends TrafficLightState
case object Green extends TrafficLightState

"TrafficLight" should {
  "follow correct state sequence" in {
    val probe = testKit.createTestProbe[TrafficLightState]()
    val light = testKit.spawn(TrafficLight(probe.ref))

    // Initial state
    probe.expectMessage(Red)

    light ! TrafficLight.Next
    probe.expectMessage(Green)

    light ! TrafficLight.Next
    probe.expectMessage(Yellow)

    light ! TrafficLight.Next
    probe.expectMessage(Red)
  }
}

Cluster Testing (Multi-Node)

For Pekko Cluster tests, use the multi-node testing toolkit:

libraryDependencies += "org.apache.pekko" %% "pekko-multi-node-testkit" % "1.1.2" % Test
class ClusterSpec extends MultiNodeSpec(ClusterSpecConfig)
  with MultiNodeSpecCallbacks {

  "A cluster" must {
    "form when two nodes join" in {
      runOn(ClusterSpecConfig.node1) {
        Cluster(system).join(node(ClusterSpecConfig.node1).address)
      }
      runOn(ClusterSpecConfig.node2) {
        Cluster(system).join(node(ClusterSpecConfig.node1).address)
      }

      awaitClusterMembersUp(2)
      enterBarrier("cluster-formed")
    }
  }
}

Multi-node tests spawn real JVM processes — use them sparingly for cluster formation and sharding tests.

Migrating from Akka to Pekko

The API is nearly identical. Key namespace changes:

Akka Pekko
akka.actor org.apache.pekko.actor
akka.testkit org.apache.pekko.testkit
akka.actor.testkit.typed org.apache.pekko.actor.testkit.typed
ActorTestKit() ActorTestKit() (same class, different package)

A global find-and-replace of import akka.import org.apache.pekko. handles most of the migration.

Summary

Apache Pekko's testing toolkit matches Akka's API — if you know Akka testing, you know Pekko testing. Use BehaviorTestKit for synchronous behavior-unit tests, ActorTestKit for async integration tests, and TestProbe to observe inter-actor communication.

ManualTime eliminates real-time waits in timer tests. DeathProbe verifies supervision outcomes. Multi-node tests validate cluster behavior. Together, these tools give you complete coverage of distributed actor systems without requiring a real cluster for the vast majority of test scenarios.

Complement actor unit tests with continuous production monitoring through HelpMeTest to catch issues that only surface under real load and network conditions.

Read more