Testing Kotlin Coroutines: runTest, turbine for Flow, and Best Practices
Testing asynchronous Kotlin code used to require Thread.sleep and other workarounds. The kotlinx-coroutines-test library provides proper tools: virtual time, controlled dispatchers, and runTest — which makes async tests run synchronously without any thread juggling.
Dependencies
// build.gradle.kts
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
testImplementation("app.cash.turbine:turbine:1.1.0") // for Flow testingrunTest: The Foundation
runTest is the entry point for coroutine tests. It:
- Creates a
TestScopewith a controlled dispatcher - Advances virtual time instead of waiting for real time
- Fails if any uncaught coroutine exception occurs
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals
class TimerTest {
@Test
fun `countdown completes after delay`() = runTest {
val result = countdown(seconds = 3) // suspend fun with delay(3000)
assertEquals("Done", result)
// No real 3 seconds waited — virtual time advances instantly
}
}The delay() calls inside the tested coroutines advance virtual time, not wall-clock time. A test with delay(60_000) takes milliseconds to run.
TestCoroutineDispatcher and StandardTestDispatcher
By default, runTest uses StandardTestDispatcher, which runs coroutines eagerly but requires you to advance time manually for delayed work.
advanceTimeBy and advanceUntilIdle
@Test
fun `polling retries after delay`() = runTest {
val poller = Poller(retryDelay = 1000L)
var callCount = 0
poller.start { callCount++ }
advanceTimeBy(1001L) // advance by 1 second + 1ms
assertEquals(2, callCount) // initial + 1 retry
advanceTimeBy(1000L)
assertEquals(3, callCount) // 2 retries
}advanceUntilIdle() runs all pending coroutines and delays until the queue is empty:
@Test
fun `all background jobs complete`() = runTest {
val processor = BatchProcessor()
processor.submitAll(items)
advanceUntilIdle() // wait for all coroutines to finish
assertEquals(items.size, processor.processedCount)
}UnconfinedTestDispatcher
UnconfinedTestDispatcher runs coroutines eagerly without needing advanceUntilIdle():
@Test
fun `eager execution`() = runTest(UnconfinedTestDispatcher()) {
val viewModel = MyViewModel()
// StateFlow updates are immediately visible
assertEquals(UiState.Loading, viewModel.state.value)
// No need to advance time
}Use UnconfinedTestDispatcher when testing StateFlow emissions, where you want state changes to propagate immediately.
Injecting Test Dispatchers
Code that uses Dispatchers.IO or Dispatchers.Main must inject the dispatcher for testing:
// Production code
class UserRepository(
private val api: UserApi,
private val dispatcher: CoroutineDispatcher = Dispatchers.IO
) {
suspend fun getUser(id: Long): User = withContext(dispatcher) {
api.fetchUser(id)
}
}
// Test
@Test
fun `getUser calls API on correct dispatcher`() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val api = mockk<UserApi>()
coEvery { api.fetchUser(1L) } returns User(1L, "Alice")
val repository = UserRepository(api, testDispatcher)
val result = repository.getUser(1L)
assertEquals("Alice", result.name)
}For Dispatchers.Main in Android:
@BeforeEach
fun setUp() {
Dispatchers.setMain(StandardTestDispatcher())
}
@AfterEach
fun tearDown() {
Dispatchers.resetMain()
}Testing StateFlow
class CounterViewModel : ViewModel() {
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count.asStateFlow()
fun increment() { _count.value++ }
}
@Test
fun `increment updates state`() = runTest {
val viewModel = CounterViewModel()
assertEquals(0, viewModel.count.value)
viewModel.increment()
assertEquals(1, viewModel.count.value)
viewModel.increment()
assertEquals(2, viewModel.count.value)
}StateFlow is synchronous — value reflects the current state immediately.
Testing Flow Emissions with Turbine
Turbine is the standard library for testing Kotlin Flow:
import app.cash.turbine.test
import app.cash.turbine.turbineScope
@Test
fun `search results flow emits correctly`() = runTest {
val viewModel = SearchViewModel(repository)
viewModel.results.test {
viewModel.search("kotlin")
val loading = awaitItem()
assertEquals(UiState.Loading, loading)
val results = awaitItem()
assertIs<UiState.Success>(results)
assertEquals(3, results.items.size)
cancelAndIgnoreRemainingEvents()
}
}Turbine's test {} block collects Flow emissions:
awaitItem()— waits for the next emissionawaitComplete()— asserts the flow completedawaitError()— asserts the flow terminated with an errorcancelAndIgnoreRemainingEvents()— cancels collection, ignores what's leftexpectNoEvents()— asserts no emissions in the next timeout window
Testing Multiple Flows
@Test
fun `state and events flow in sync`() = runTest {
turbineScope {
val stateTurbine = viewModel.state.testIn(this)
val eventTurbine = viewModel.events.testIn(this)
viewModel.submitForm(validData)
assertEquals(UiState.Loading, stateTurbine.awaitItem())
assertEquals(UiState.Success, stateTurbine.awaitItem())
assertEquals(NavigationEvent.GoToDashboard, eventTurbine.awaitItem())
stateTurbine.cancelAndIgnoreRemainingEvents()
eventTurbine.cancelAndIgnoreRemainingEvents()
}
}Testing SharedFlow
SharedFlow doesn't replay by default — collect before emitting:
@Test
fun `event bus delivers message`() = runTest {
val bus = EventBus()
val received = mutableListOf<String>()
val job = launch {
bus.events.collect { received.add(it) }
}
bus.emit("hello")
bus.emit("world")
advanceUntilIdle()
assertEquals(listOf("hello", "world"), received)
job.cancel()
}Or with Turbine:
@Test
fun `event bus delivers messages`() = runTest {
val bus = EventBus()
bus.events.test {
bus.emit("hello")
assertEquals("hello", awaitItem())
bus.emit("world")
assertEquals("world", awaitItem())
cancelAndIgnoreRemainingEvents()
}
}Testing Exceptions in Coroutines
@Test
fun `network error propagates correctly`() = runTest {
coEvery { api.fetchUser(any()) } throws IOException("timeout")
val exception = assertFailsWith<IOException> {
repository.getUser(1L)
}
assertEquals("timeout", exception.message)
}For structured concurrency — when a child coroutine fails:
@Test
fun `supervisor job isolates failures`() = runTest {
val supervisor = SupervisorJob()
val scope = CoroutineScope(supervisor + StandardTestDispatcher(testScheduler))
var secondCompleted = false
scope.launch { throw RuntimeException("first fails") }
scope.launch {
delay(100)
secondCompleted = true
}
advanceUntilIdle()
assertTrue(secondCompleted) // second job completed despite first failing
supervisor.cancel()
}Common Patterns
ViewModel testing setup:
@BeforeEach
fun setUp() {
Dispatchers.setMain(UnconfinedTestDispatcher())
}
@AfterEach
fun tearDown() {
Dispatchers.resetMain()
}Timeout assertions:
@Test
fun `operation completes within timeout`() = runTest {
withTimeout(5_000L) {
val result = slowOperation()
assertNotNull(result)
}
}Parallel execution:
@Test
fun `concurrent requests all succeed`() = runTest {
val results = (1..10).map { id ->
async { repository.getUser(id.toLong()) }
}.awaitAll()
assertEquals(10, results.size)
}Production Monitoring
Coroutine tests verify async logic against mocked dependencies. For production confidence that your live coroutine-powered APIs handle real concurrent load, HelpMeTest monitors live endpoints 24/7 — no source code access required.
Summary
runTestruns coroutines with virtual time — delays don't take real wall-clock timeStandardTestDispatcher: requiresadvanceUntilIdle()oradvanceTimeBy()UnconfinedTestDispatcher: eager execution, good for StateFlow testing- Inject dispatchers instead of using
Dispatchers.IOdirectly for testability - Turbine's
flow.test {}provides a clean API for asserting Flow emissions turbineScopefor testing multiple flows in synchronization- Always
Dispatchers.setMain()in@BeforeEachandresetMain()in@AfterEachfor Android