Turbine Testing Guide: Testing Kotlin Flows with awaitItem, awaitError & StateFlow

Turbine Testing Guide: Testing Kotlin Flows with awaitItem, awaitError & StateFlow

Kotlin Flow is the standard for reactive programming in Android—but testing asynchronous streams is notoriously awkward. Turbine is Cash App's library that makes Flow testing readable and deterministic. Instead of manual coroutine collection and timing hacks, you get clean, sequential assertions.

Why Testing Flows Is Hard Without Turbine

The naive approach to testing a Flow:

// Without Turbine — fragile and verbose
@Test
fun shouldEmitUserProfile() = runTest {
    val items = mutableListOf<User>()
    val job = launch {
        repository.getUser("user-1").collect { items.add(it) }
    }
    advanceUntilIdle()
    job.cancel()

    assertEquals(1, items.size)
    assertEquals("Alice", items[0].name)
}

Problems: race conditions, manual job cancellation, no clean timeout handling. Turbine solves all of this.

Setting Up Turbine

// build.gradle.kts
dependencies {
    testImplementation("app.cash.turbine:turbine:1.1.0")
    testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.7.3")
}

Core Turbine API

awaitItem() — Assert a Single Emitted Value

@Test
fun shouldEmitProductOnLoad() = runTest {
    val product = Product("prod-1", "Wireless Headphones", 79.99)
    coEvery { repository.getProduct("prod-1") } returns flowOf(product)

    val viewModel = ProductDetailViewModel(repository)

    viewModel.productState.test {
        assertEquals(ProductState.Loading, awaitItem())       // Initial loading state
        assertEquals(ProductState.Success(product), awaitItem()) // Loaded product
        awaitComplete()
    }
}

awaitError() — Assert a Flow Failure

@Test
fun shouldEmitErrorWhenNetworkFails() = runTest {
    val networkError = IOException("Connection refused")
    coEvery { repository.getUser("user-1") } returns flow {
        throw networkError
    }

    val viewModel = UserViewModel(repository)
    viewModel.loadUser("user-1")

    viewModel.uiState.test {
        assertEquals(UiState.Loading, awaitItem())

        val errorState = awaitItem()
        assertTrue(errorState is UiState.Error)
        assertEquals("Connection refused", (errorState as UiState.Error).message)
    }
}

awaitComplete() — Assert the Flow Completes

@Test
fun shouldCompleteAfterEmittingAllItems() = runTest {
    val items = listOf("a", "b", "c")

    flowOf(*items.toTypedArray()).test {
        assertEquals("a", awaitItem())
        assertEquals("b", awaitItem())
        assertEquals("c", awaitItem())
        awaitComplete()  // Verifies the flow ended normally
    }
}

cancelAndIgnoreRemainingEvents() — Stop Collecting Early

@Test
fun shouldEmitLoadingBeforeData() = runTest {
    // We only care about the first emission, not the rest
    viewModel.uiState.test {
        assertEquals(UiState.Loading, awaitItem())
        cancelAndIgnoreRemainingEvents()
    }
}

StateFlow Testing

StateFlow always has a current value and never completes—Turbine handles both characteristics:

class SearchViewModel(private val repository: SearchRepository) : ViewModel() {
    private val _searchState = MutableStateFlow<SearchState>(SearchState.Idle)
    val searchState: StateFlow<SearchState> = _searchState.asStateFlow()

    fun search(query: String) {
        viewModelScope.launch {
            _searchState.value = SearchState.Loading
            try {
                val results = repository.search(query)
                _searchState.value = SearchState.Results(results)
            } catch (e: Exception) {
                _searchState.value = SearchState.Error(e.message ?: "Unknown error")
            }
        }
    }
}

@Test
fun shouldTransitionThroughStatesOnSearch() = runTest {
    val mockRepository = mockk<SearchRepository>()
    val results = listOf(SearchResult("1", "Kotlin testing guide"))

    coEvery { mockRepository.search("kotlin") } returns results

    val viewModel = SearchViewModel(mockRepository)

    viewModel.searchState.test {
        // StateFlow emits current value immediately
        assertEquals(SearchState.Idle, awaitItem())

        viewModel.search("kotlin")

        assertEquals(SearchState.Loading, awaitItem())
        assertEquals(SearchState.Results(results), awaitItem())

        // StateFlow never completes — cancel when done asserting
        cancelAndIgnoreRemainingEvents()
    }
}

Skipping the Initial StateFlow Value

Often you don't care about the initial state and want to start asserting from the first meaningful emission:

@Test
fun shouldShowResultsAfterSearch() = runTest {
    val viewModel = SearchViewModel(mockRepository)
    val results = listOf(SearchResult("1", "Result"))
    coEvery { mockRepository.search(any()) } returns results

    viewModel.searchState.test {
        awaitItem() // Skip initial Idle state

        viewModel.search("query")
        awaitItem() // Skip Loading

        val resultsState = awaitItem() as SearchState.Results
        assertEquals(1, resultsState.items.size)

        cancelAndIgnoreRemainingEvents()
    }
}

Testing Complex Emission Sequences

Testing Pagination

@Test
fun shouldEmitPagesSequentially() = runTest {
    val page1 = listOf(Product("1", "Item 1", 10.0, true))
    val page2 = listOf(Product("2", "Item 2", 20.0, true))
    val page3 = listOf(Product("3", "Item 3", 30.0, true))

    val viewModel = ProductListViewModel(mockRepository)
    coEvery { mockRepository.getPage(1) } returns page1
    coEvery { mockRepository.getPage(2) } returns page2
    coEvery { mockRepository.getPage(3) } returns page3

    viewModel.products.test {
        assertEquals(emptyList<Product>(), awaitItem()) // Initial empty state

        viewModel.loadNextPage()
        assertEquals(page1, awaitItem())

        viewModel.loadNextPage()
        assertEquals(page1 + page2, awaitItem()) // Accumulated pages

        viewModel.loadNextPage()
        assertEquals(page1 + page2 + page3, awaitItem())

        cancelAndIgnoreRemainingEvents()
    }
}
@Test
fun shouldDebounceSearchQueries() = runTest {
    val viewModel = SearchViewModel(mockRepository)
    coEvery { mockRepository.search(any()) } returns emptyList()

    viewModel.searchState.test {
        awaitItem() // Skip initial idle

        // Rapid queries - only last should fire after debounce
        viewModel.updateQuery("k")
        viewModel.updateQuery("ko")
        viewModel.updateQuery("kot")
        viewModel.updateQuery("kotl")
        viewModel.updateQuery("kotli")
        viewModel.updateQuery("kotlin")

        // Advance past debounce delay (typically 300ms)
        advanceTimeBy(400)

        // Should only see one search, not 6
        assertEquals(SearchState.Loading, awaitItem())
        assertEquals(SearchState.Results(emptyList()), awaitItem())

        // Verify search was called only once with final query
        coVerify(exactly = 1) { mockRepository.search("kotlin") }

        cancelAndIgnoreRemainingEvents()
    }
}

Testing Flow Transformation Pipelines

// Production code: a pipeline of Flow operators
fun getFormattedPrices(productIds: List<String>): Flow<String> =
    productIds.asFlow()
        .flatMapMerge { id -> repository.getProduct(id).asFlow() }
        .map { product -> "$${product.price}" }
        .distinctUntilChanged()

@Test
fun shouldFormatPricesCorrectly() = runTest {
    val products = mapOf(
        "prod-1" to Product("prod-1", "A", 9.99, true),
        "prod-2" to Product("prod-2", "B", 19.99, true)
    )
    products.forEach { (id, product) ->
        every { repository.getProduct(id) } returns product
    }

    getFormattedPrices(listOf("prod-1", "prod-2")).test {
        val emitted = mutableListOf<String>()
        // Collect both items (order may vary with flatMapMerge)
        emitted.add(awaitItem())
        emitted.add(awaitItem())
        awaitComplete()

        assertThat(emitted).containsExactlyInAnyOrder("$9.99", "$19.99")
    }
}

Testing SharedFlow

SharedFlow is used for one-time events (navigation, snackbars):

class OrderViewModel(private val repository: OrderRepository) : ViewModel() {
    private val _events = MutableSharedFlow<OrderEvent>()
    val events: SharedFlow<OrderEvent> = _events.asSharedFlow()

    fun placeOrder(cart: Cart) {
        viewModelScope.launch {
            val result = repository.placeOrder(cart)
            _events.emit(
                if (result.isSuccess) OrderEvent.OrderPlaced(result.getOrThrow().id)
                else OrderEvent.Error("Order failed")
            )
        }
    }
}

@Test
fun shouldEmitOrderPlacedEventOnSuccess() = runTest {
    val mockRepository = mockk<OrderRepository>()
    val orderId = "order-123"

    coEvery { mockRepository.placeOrder(any()) } returns Result.success(Order(orderId))

    val viewModel = OrderViewModel(mockRepository)

    viewModel.events.test {
        viewModel.placeOrder(Cart(items = listOf()))

        val event = awaitItem()
        assertTrue(event is OrderEvent.OrderPlaced)
        assertEquals(orderId, (event as OrderEvent.OrderPlaced).orderId)

        cancelAndIgnoreRemainingEvents()
    }
}

Testing Flows with Errors and Recovery

@Test
fun shouldRetryOnTransientError() = runTest {
    val mockSource = mockk<DataSource>()
    var callCount = 0

    coEvery { mockSource.getData() } answers {
        callCount++
        if (callCount < 3) throw IOException("Temporary error")
        "success-data"
    }

    val retryFlow = flow {
        emit(mockSource.getData())
    }.retry(2) { it is IOException }

    retryFlow.test {
        assertEquals("success-data", awaitItem())
        awaitComplete()
    }

    assertEquals(3, callCount)
}

@Test
fun shouldFallbackToCacheOnNetworkError() = runTest {
    val cachedData = listOf(Product("cached-1", "Cached Product", 5.0, true))

    coEvery { networkRepository.getProducts() } throws IOException("No network")
    coEvery { cacheRepository.getProducts() } returns cachedData

    val viewModel = ProductListViewModel(networkRepository, cacheRepository)

    viewModel.products.test {
        assertEquals(ProductsState.Loading, awaitItem())
        val state = awaitItem()
        assertTrue(state is ProductsState.CachedResults)
        assertEquals(cachedData, (state as ProductsState.CachedResults).products)
        cancelAndIgnoreRemainingEvents()
    }
}

Testing Multiple Flows

Turbine supports testing multiple flows in one test using testIn:

@Test
fun shouldSynchronizeUserAndCartFlows() = runTest {
    val userFlow = MutableStateFlow<User?>(null)
    val cartFlow = MutableStateFlow(Cart(emptyList()))

    val userTurbine = userFlow.testIn(backgroundScope)
    val cartTurbine = cartFlow.testIn(backgroundScope)

    // Verify initial states
    assertNull(userTurbine.awaitItem())
    assertTrue(cartTurbine.awaitItem().items.isEmpty())

    // Update user
    userFlow.value = User("user-1", "Alice")
    assertEquals("Alice", userTurbine.awaitItem()?.name)

    // Update cart
    cartFlow.value = Cart(listOf(CartItem("prod-1", 1, 29.99)))
    assertEquals(1, cartTurbine.awaitItem().items.size)

    userTurbine.cancel()
    cartTurbine.cancel()
}

Turbine with HelpMeTest

For apps where Kotlin Flow drives real-time UI updates (live data feeds, order tracking), HelpMeTest validates the user experience:

*** Test Cases ***
Order Status Updates In Real Time
    As    LoggedInUser
    Go To    https://app.example.com/orders/latest
    Wait Until Element Is Visible    [data-testid=order-status]
    Element Should Contain    [data-testid=order-status]    Processing
    Wait Until Element Contains    [data-testid=order-status]    Shipped    timeout=30s

While Turbine tests the Flow machinery, HelpMeTest validates that the StateFlow updates actually propagate to the UI and display correctly.

Common Mistakes

1. Not canceling infinite Flows StateFlow and SharedFlow never complete. Always end tests with cancelAndIgnoreRemainingEvents() or the test will hang.

2. Asserting on emission order for flatMapMerge flatMapMerge collects flows concurrently—emission order isn't guaranteed. Use containsExactlyInAnyOrder instead of position-based assertions.

3. Missing advanceUntilIdle() for coroutine-launched emissions When emit() is called from a launched coroutine, use advanceUntilIdle() before awaitItem() to ensure the coroutine has run.

4. Testing viewModelScope flows without UnconfinedTestDispatcher ViewModel flows run on Dispatchers.Main. In tests, set Dispatchers.setMain(UnconfinedTestDispatcher()) in @Before to ensure coroutines run eagerly.

Summary

Turbine makes Kotlin Flow testing as readable as synchronous code:

  • awaitItem() for asserting emitted values in sequence
  • awaitError() for asserting flow failures with the right exception
  • awaitComplete() for verifying normal flow termination
  • cancelAndIgnoreRemainingEvents() for stopping infinite flows (StateFlow, SharedFlow)
  • testIn(backgroundScope) for testing multiple flows simultaneously
  • Debounce testing using advanceTimeBy() from coroutines-test

Combine Turbine with MockK and runTest for a complete, deterministic Kotlin coroutine testing stack.

Read more