Testing Kotlin Coroutines: runTest, turbine for Flow, and Best Practices

Testing Kotlin Coroutines: runTest, turbine for Flow, and Best Practices

Testing asynchronous Kotlin code used to require Thread.sleep and other workarounds. The kotlinx-coroutines-test library provides proper tools: virtual time, controlled dispatchers, and runTest — which makes async tests run synchronously without any thread juggling.

Dependencies

// build.gradle.kts
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
testImplementation("app.cash.turbine:turbine:1.1.0")  // for Flow testing

runTest: The Foundation

runTest is the entry point for coroutine tests. It:

  • Creates a TestScope with a controlled dispatcher
  • Advances virtual time instead of waiting for real time
  • Fails if any uncaught coroutine exception occurs
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals

class TimerTest {

    @Test
    fun `countdown completes after delay`() = runTest {
        val result = countdown(seconds = 3)  // suspend fun with delay(3000)
        assertEquals("Done", result)
        // No real 3 seconds waited — virtual time advances instantly
    }
}

The delay() calls inside the tested coroutines advance virtual time, not wall-clock time. A test with delay(60_000) takes milliseconds to run.

TestCoroutineDispatcher and StandardTestDispatcher

By default, runTest uses StandardTestDispatcher, which runs coroutines eagerly but requires you to advance time manually for delayed work.

advanceTimeBy and advanceUntilIdle

@Test
fun `polling retries after delay`() = runTest {
    val poller = Poller(retryDelay = 1000L)
    var callCount = 0

    poller.start { callCount++ }

    advanceTimeBy(1001L)   // advance by 1 second + 1ms
    assertEquals(2, callCount)  // initial + 1 retry

    advanceTimeBy(1000L)
    assertEquals(3, callCount)  // 2 retries
}

advanceUntilIdle() runs all pending coroutines and delays until the queue is empty:

@Test
fun `all background jobs complete`() = runTest {
    val processor = BatchProcessor()
    processor.submitAll(items)

    advanceUntilIdle()   // wait for all coroutines to finish

    assertEquals(items.size, processor.processedCount)
}

UnconfinedTestDispatcher

UnconfinedTestDispatcher runs coroutines eagerly without needing advanceUntilIdle():

@Test
fun `eager execution`() = runTest(UnconfinedTestDispatcher()) {
    val viewModel = MyViewModel()
    // StateFlow updates are immediately visible
    assertEquals(UiState.Loading, viewModel.state.value)
    // No need to advance time
}

Use UnconfinedTestDispatcher when testing StateFlow emissions, where you want state changes to propagate immediately.

Injecting Test Dispatchers

Code that uses Dispatchers.IO or Dispatchers.Main must inject the dispatcher for testing:

// Production code
class UserRepository(
    private val api: UserApi,
    private val dispatcher: CoroutineDispatcher = Dispatchers.IO
) {
    suspend fun getUser(id: Long): User = withContext(dispatcher) {
        api.fetchUser(id)
    }
}

// Test
@Test
fun `getUser calls API on correct dispatcher`() = runTest {
    val testDispatcher = StandardTestDispatcher(testScheduler)
    val api = mockk<UserApi>()
    coEvery { api.fetchUser(1L) } returns User(1L, "Alice")

    val repository = UserRepository(api, testDispatcher)
    val result = repository.getUser(1L)

    assertEquals("Alice", result.name)
}

For Dispatchers.Main in Android:

@BeforeEach
fun setUp() {
    Dispatchers.setMain(StandardTestDispatcher())
}

@AfterEach
fun tearDown() {
    Dispatchers.resetMain()
}

Testing StateFlow

class CounterViewModel : ViewModel() {
    private val _count = MutableStateFlow(0)
    val count: StateFlow<Int> = _count.asStateFlow()

    fun increment() { _count.value++ }
}

@Test
fun `increment updates state`() = runTest {
    val viewModel = CounterViewModel()

    assertEquals(0, viewModel.count.value)
    viewModel.increment()
    assertEquals(1, viewModel.count.value)
    viewModel.increment()
    assertEquals(2, viewModel.count.value)
}

StateFlow is synchronous — value reflects the current state immediately.

Testing Flow Emissions with Turbine

Turbine is the standard library for testing Kotlin Flow:

import app.cash.turbine.test
import app.cash.turbine.turbineScope

@Test
fun `search results flow emits correctly`() = runTest {
    val viewModel = SearchViewModel(repository)

    viewModel.results.test {
        viewModel.search("kotlin")

        val loading = awaitItem()
        assertEquals(UiState.Loading, loading)

        val results = awaitItem()
        assertIs<UiState.Success>(results)
        assertEquals(3, results.items.size)

        cancelAndIgnoreRemainingEvents()
    }
}

Turbine's test {} block collects Flow emissions:

  • awaitItem() — waits for the next emission
  • awaitComplete() — asserts the flow completed
  • awaitError() — asserts the flow terminated with an error
  • cancelAndIgnoreRemainingEvents() — cancels collection, ignores what's left
  • expectNoEvents() — asserts no emissions in the next timeout window

Testing Multiple Flows

@Test
fun `state and events flow in sync`() = runTest {
    turbineScope {
        val stateTurbine = viewModel.state.testIn(this)
        val eventTurbine = viewModel.events.testIn(this)

        viewModel.submitForm(validData)

        assertEquals(UiState.Loading, stateTurbine.awaitItem())
        assertEquals(UiState.Success, stateTurbine.awaitItem())
        assertEquals(NavigationEvent.GoToDashboard, eventTurbine.awaitItem())

        stateTurbine.cancelAndIgnoreRemainingEvents()
        eventTurbine.cancelAndIgnoreRemainingEvents()
    }
}

Testing SharedFlow

SharedFlow doesn't replay by default — collect before emitting:

@Test
fun `event bus delivers message`() = runTest {
    val bus = EventBus()
    val received = mutableListOf<String>()

    val job = launch {
        bus.events.collect { received.add(it) }
    }

    bus.emit("hello")
    bus.emit("world")

    advanceUntilIdle()

    assertEquals(listOf("hello", "world"), received)
    job.cancel()
}

Or with Turbine:

@Test
fun `event bus delivers messages`() = runTest {
    val bus = EventBus()

    bus.events.test {
        bus.emit("hello")
        assertEquals("hello", awaitItem())

        bus.emit("world")
        assertEquals("world", awaitItem())

        cancelAndIgnoreRemainingEvents()
    }
}

Testing Exceptions in Coroutines

@Test
fun `network error propagates correctly`() = runTest {
    coEvery { api.fetchUser(any()) } throws IOException("timeout")

    val exception = assertFailsWith<IOException> {
        repository.getUser(1L)
    }
    assertEquals("timeout", exception.message)
}

For structured concurrency — when a child coroutine fails:

@Test
fun `supervisor job isolates failures`() = runTest {
    val supervisor = SupervisorJob()
    val scope = CoroutineScope(supervisor + StandardTestDispatcher(testScheduler))

    var secondCompleted = false

    scope.launch { throw RuntimeException("first fails") }
    scope.launch {
        delay(100)
        secondCompleted = true
    }

    advanceUntilIdle()

    assertTrue(secondCompleted)  // second job completed despite first failing
    supervisor.cancel()
}

Common Patterns

ViewModel testing setup:

@BeforeEach
fun setUp() {
    Dispatchers.setMain(UnconfinedTestDispatcher())
}

@AfterEach
fun tearDown() {
    Dispatchers.resetMain()
}

Timeout assertions:

@Test
fun `operation completes within timeout`() = runTest {
    withTimeout(5_000L) {
        val result = slowOperation()
        assertNotNull(result)
    }
}

Parallel execution:

@Test
fun `concurrent requests all succeed`() = runTest {
    val results = (1..10).map { id ->
        async { repository.getUser(id.toLong()) }
    }.awaitAll()

    assertEquals(10, results.size)
}

Production Monitoring

Coroutine tests verify async logic against mocked dependencies. For production confidence that your live coroutine-powered APIs handle real concurrent load, HelpMeTest monitors live endpoints 24/7 — no source code access required.

Summary

  • runTest runs coroutines with virtual time — delays don't take real wall-clock time
  • StandardTestDispatcher: requires advanceUntilIdle() or advanceTimeBy()
  • UnconfinedTestDispatcher: eager execution, good for StateFlow testing
  • Inject dispatchers instead of using Dispatchers.IO directly for testability
  • Turbine's flow.test {} provides a clean API for asserting Flow emissions
  • turbineScope for testing multiple flows in synchronization
  • Always Dispatchers.setMain() in @BeforeEach and resetMain() in @AfterEach for Android

Read more