Turbine Testing Guide: Testing Kotlin Flows with awaitItem, awaitError & StateFlow
Kotlin Flow is the standard for reactive programming in Android—but testing asynchronous streams is notoriously awkward. Turbine is Cash App's library that makes Flow testing readable and deterministic. Instead of manual coroutine collection and timing hacks, you get clean, sequential assertions.
Why Testing Flows Is Hard Without Turbine
The naive approach to testing a Flow:
// Without Turbine — fragile and verbose
@Test
fun shouldEmitUserProfile() = runTest {
val items = mutableListOf<User>()
val job = launch {
repository.getUser("user-1").collect { items.add(it) }
}
advanceUntilIdle()
job.cancel()
assertEquals(1, items.size)
assertEquals("Alice", items[0].name)
}Problems: race conditions, manual job cancellation, no clean timeout handling. Turbine solves all of this.
Setting Up Turbine
// build.gradle.kts
dependencies {
testImplementation("app.cash.turbine:turbine:1.1.0")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.7.3")
}Core Turbine API
awaitItem() — Assert a Single Emitted Value
@Test
fun shouldEmitProductOnLoad() = runTest {
val product = Product("prod-1", "Wireless Headphones", 79.99)
coEvery { repository.getProduct("prod-1") } returns flowOf(product)
val viewModel = ProductDetailViewModel(repository)
viewModel.productState.test {
assertEquals(ProductState.Loading, awaitItem()) // Initial loading state
assertEquals(ProductState.Success(product), awaitItem()) // Loaded product
awaitComplete()
}
}awaitError() — Assert a Flow Failure
@Test
fun shouldEmitErrorWhenNetworkFails() = runTest {
val networkError = IOException("Connection refused")
coEvery { repository.getUser("user-1") } returns flow {
throw networkError
}
val viewModel = UserViewModel(repository)
viewModel.loadUser("user-1")
viewModel.uiState.test {
assertEquals(UiState.Loading, awaitItem())
val errorState = awaitItem()
assertTrue(errorState is UiState.Error)
assertEquals("Connection refused", (errorState as UiState.Error).message)
}
}awaitComplete() — Assert the Flow Completes
@Test
fun shouldCompleteAfterEmittingAllItems() = runTest {
val items = listOf("a", "b", "c")
flowOf(*items.toTypedArray()).test {
assertEquals("a", awaitItem())
assertEquals("b", awaitItem())
assertEquals("c", awaitItem())
awaitComplete() // Verifies the flow ended normally
}
}cancelAndIgnoreRemainingEvents() — Stop Collecting Early
@Test
fun shouldEmitLoadingBeforeData() = runTest {
// We only care about the first emission, not the rest
viewModel.uiState.test {
assertEquals(UiState.Loading, awaitItem())
cancelAndIgnoreRemainingEvents()
}
}StateFlow Testing
StateFlow always has a current value and never completes—Turbine handles both characteristics:
class SearchViewModel(private val repository: SearchRepository) : ViewModel() {
private val _searchState = MutableStateFlow<SearchState>(SearchState.Idle)
val searchState: StateFlow<SearchState> = _searchState.asStateFlow()
fun search(query: String) {
viewModelScope.launch {
_searchState.value = SearchState.Loading
try {
val results = repository.search(query)
_searchState.value = SearchState.Results(results)
} catch (e: Exception) {
_searchState.value = SearchState.Error(e.message ?: "Unknown error")
}
}
}
}
@Test
fun shouldTransitionThroughStatesOnSearch() = runTest {
val mockRepository = mockk<SearchRepository>()
val results = listOf(SearchResult("1", "Kotlin testing guide"))
coEvery { mockRepository.search("kotlin") } returns results
val viewModel = SearchViewModel(mockRepository)
viewModel.searchState.test {
// StateFlow emits current value immediately
assertEquals(SearchState.Idle, awaitItem())
viewModel.search("kotlin")
assertEquals(SearchState.Loading, awaitItem())
assertEquals(SearchState.Results(results), awaitItem())
// StateFlow never completes — cancel when done asserting
cancelAndIgnoreRemainingEvents()
}
}Skipping the Initial StateFlow Value
Often you don't care about the initial state and want to start asserting from the first meaningful emission:
@Test
fun shouldShowResultsAfterSearch() = runTest {
val viewModel = SearchViewModel(mockRepository)
val results = listOf(SearchResult("1", "Result"))
coEvery { mockRepository.search(any()) } returns results
viewModel.searchState.test {
awaitItem() // Skip initial Idle state
viewModel.search("query")
awaitItem() // Skip Loading
val resultsState = awaitItem() as SearchState.Results
assertEquals(1, resultsState.items.size)
cancelAndIgnoreRemainingEvents()
}
}Testing Complex Emission Sequences
Testing Pagination
@Test
fun shouldEmitPagesSequentially() = runTest {
val page1 = listOf(Product("1", "Item 1", 10.0, true))
val page2 = listOf(Product("2", "Item 2", 20.0, true))
val page3 = listOf(Product("3", "Item 3", 30.0, true))
val viewModel = ProductListViewModel(mockRepository)
coEvery { mockRepository.getPage(1) } returns page1
coEvery { mockRepository.getPage(2) } returns page2
coEvery { mockRepository.getPage(3) } returns page3
viewModel.products.test {
assertEquals(emptyList<Product>(), awaitItem()) // Initial empty state
viewModel.loadNextPage()
assertEquals(page1, awaitItem())
viewModel.loadNextPage()
assertEquals(page1 + page2, awaitItem()) // Accumulated pages
viewModel.loadNextPage()
assertEquals(page1 + page2 + page3, awaitItem())
cancelAndIgnoreRemainingEvents()
}
}Testing Debounced Search
@Test
fun shouldDebounceSearchQueries() = runTest {
val viewModel = SearchViewModel(mockRepository)
coEvery { mockRepository.search(any()) } returns emptyList()
viewModel.searchState.test {
awaitItem() // Skip initial idle
// Rapid queries - only last should fire after debounce
viewModel.updateQuery("k")
viewModel.updateQuery("ko")
viewModel.updateQuery("kot")
viewModel.updateQuery("kotl")
viewModel.updateQuery("kotli")
viewModel.updateQuery("kotlin")
// Advance past debounce delay (typically 300ms)
advanceTimeBy(400)
// Should only see one search, not 6
assertEquals(SearchState.Loading, awaitItem())
assertEquals(SearchState.Results(emptyList()), awaitItem())
// Verify search was called only once with final query
coVerify(exactly = 1) { mockRepository.search("kotlin") }
cancelAndIgnoreRemainingEvents()
}
}Testing Flow Transformation Pipelines
// Production code: a pipeline of Flow operators
fun getFormattedPrices(productIds: List<String>): Flow<String> =
productIds.asFlow()
.flatMapMerge { id -> repository.getProduct(id).asFlow() }
.map { product -> "$${product.price}" }
.distinctUntilChanged()
@Test
fun shouldFormatPricesCorrectly() = runTest {
val products = mapOf(
"prod-1" to Product("prod-1", "A", 9.99, true),
"prod-2" to Product("prod-2", "B", 19.99, true)
)
products.forEach { (id, product) ->
every { repository.getProduct(id) } returns product
}
getFormattedPrices(listOf("prod-1", "prod-2")).test {
val emitted = mutableListOf<String>()
// Collect both items (order may vary with flatMapMerge)
emitted.add(awaitItem())
emitted.add(awaitItem())
awaitComplete()
assertThat(emitted).containsExactlyInAnyOrder("$9.99", "$19.99")
}
}Testing SharedFlow
SharedFlow is used for one-time events (navigation, snackbars):
class OrderViewModel(private val repository: OrderRepository) : ViewModel() {
private val _events = MutableSharedFlow<OrderEvent>()
val events: SharedFlow<OrderEvent> = _events.asSharedFlow()
fun placeOrder(cart: Cart) {
viewModelScope.launch {
val result = repository.placeOrder(cart)
_events.emit(
if (result.isSuccess) OrderEvent.OrderPlaced(result.getOrThrow().id)
else OrderEvent.Error("Order failed")
)
}
}
}
@Test
fun shouldEmitOrderPlacedEventOnSuccess() = runTest {
val mockRepository = mockk<OrderRepository>()
val orderId = "order-123"
coEvery { mockRepository.placeOrder(any()) } returns Result.success(Order(orderId))
val viewModel = OrderViewModel(mockRepository)
viewModel.events.test {
viewModel.placeOrder(Cart(items = listOf()))
val event = awaitItem()
assertTrue(event is OrderEvent.OrderPlaced)
assertEquals(orderId, (event as OrderEvent.OrderPlaced).orderId)
cancelAndIgnoreRemainingEvents()
}
}Testing Flows with Errors and Recovery
@Test
fun shouldRetryOnTransientError() = runTest {
val mockSource = mockk<DataSource>()
var callCount = 0
coEvery { mockSource.getData() } answers {
callCount++
if (callCount < 3) throw IOException("Temporary error")
"success-data"
}
val retryFlow = flow {
emit(mockSource.getData())
}.retry(2) { it is IOException }
retryFlow.test {
assertEquals("success-data", awaitItem())
awaitComplete()
}
assertEquals(3, callCount)
}
@Test
fun shouldFallbackToCacheOnNetworkError() = runTest {
val cachedData = listOf(Product("cached-1", "Cached Product", 5.0, true))
coEvery { networkRepository.getProducts() } throws IOException("No network")
coEvery { cacheRepository.getProducts() } returns cachedData
val viewModel = ProductListViewModel(networkRepository, cacheRepository)
viewModel.products.test {
assertEquals(ProductsState.Loading, awaitItem())
val state = awaitItem()
assertTrue(state is ProductsState.CachedResults)
assertEquals(cachedData, (state as ProductsState.CachedResults).products)
cancelAndIgnoreRemainingEvents()
}
}Testing Multiple Flows
Turbine supports testing multiple flows in one test using testIn:
@Test
fun shouldSynchronizeUserAndCartFlows() = runTest {
val userFlow = MutableStateFlow<User?>(null)
val cartFlow = MutableStateFlow(Cart(emptyList()))
val userTurbine = userFlow.testIn(backgroundScope)
val cartTurbine = cartFlow.testIn(backgroundScope)
// Verify initial states
assertNull(userTurbine.awaitItem())
assertTrue(cartTurbine.awaitItem().items.isEmpty())
// Update user
userFlow.value = User("user-1", "Alice")
assertEquals("Alice", userTurbine.awaitItem()?.name)
// Update cart
cartFlow.value = Cart(listOf(CartItem("prod-1", 1, 29.99)))
assertEquals(1, cartTurbine.awaitItem().items.size)
userTurbine.cancel()
cartTurbine.cancel()
}Turbine with HelpMeTest
For apps where Kotlin Flow drives real-time UI updates (live data feeds, order tracking), HelpMeTest validates the user experience:
*** Test Cases ***
Order Status Updates In Real Time
As LoggedInUser
Go To https://app.example.com/orders/latest
Wait Until Element Is Visible [data-testid=order-status]
Element Should Contain [data-testid=order-status] Processing
Wait Until Element Contains [data-testid=order-status] Shipped timeout=30sWhile Turbine tests the Flow machinery, HelpMeTest validates that the StateFlow updates actually propagate to the UI and display correctly.
Common Mistakes
1. Not canceling infinite Flows StateFlow and SharedFlow never complete. Always end tests with cancelAndIgnoreRemainingEvents() or the test will hang.
2. Asserting on emission order for flatMapMerge flatMapMerge collects flows concurrently—emission order isn't guaranteed. Use containsExactlyInAnyOrder instead of position-based assertions.
3. Missing advanceUntilIdle() for coroutine-launched emissions When emit() is called from a launched coroutine, use advanceUntilIdle() before awaitItem() to ensure the coroutine has run.
4. Testing viewModelScope flows without UnconfinedTestDispatcher ViewModel flows run on Dispatchers.Main. In tests, set Dispatchers.setMain(UnconfinedTestDispatcher()) in @Before to ensure coroutines run eagerly.
Summary
Turbine makes Kotlin Flow testing as readable as synchronous code:
awaitItem()for asserting emitted values in sequenceawaitError()for asserting flow failures with the right exceptionawaitComplete()for verifying normal flow terminationcancelAndIgnoreRemainingEvents()for stopping infinite flows (StateFlow, SharedFlow)testIn(backgroundScope)for testing multiple flows simultaneously- Debounce testing using
advanceTimeBy()from coroutines-test
Combine Turbine with MockK and runTest for a complete, deterministic Kotlin coroutine testing stack.