Ktor Client Testing Guide: MockEngine, HttpClient Mocking, Streaming Responses & Timeout Tests

Ktor Client Testing Guide: MockEngine, HttpClient Mocking, Streaming Responses & Timeout Tests

Ktor is JetBrains' official HTTP client for Kotlin Multiplatform—used in KMP projects, pure JVM backends, and Android applications. Unlike Retrofit, which wraps OkHttp, Ktor has its own engine abstraction that makes testing particularly clean: just swap the real engine for MockEngine and control every response.

Why Ktor Testing Is Unique

Ktor's MockEngine intercepts requests at the HttpClient level—no real network connections, no external servers, no port management. This is simpler than OkHttp's MockWebServer for unit tests, while still testing:

  • Request URL, method, and headers
  • Request body serialization
  • Response body deserialization
  • Plugin behavior (auth, logging, retry, content negotiation)
  • Streaming and chunked transfer responses
  • Timeout and cancellation handling

Setting Up Ktor Client Testing

// build.gradle.kts
dependencies {
    testImplementation("io.ktor:ktor-client-mock:$ktor_version")
    testImplementation("io.ktor:ktor-client-content-negotiation:$ktor_version")
    testImplementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version")
    testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.7.3")
}

MockEngine: The Core Testing Tool

@Test
fun shouldGetProductById() = runTest {
    val client = HttpClient(MockEngine) {
        install(ContentNegotiation) {
            json(Json { ignoreUnknownKeys = true })
        }
        engine {
            addHandler { request ->
                when {
                    request.url.encodedPath == "/products/prod-1" ->
                        respond(
                            content = """{"id": "prod-1", "name": "Widget", "price": 29.99}""",
                            status = HttpStatusCode.OK,
                            headers = headersOf(HttpHeaders.ContentType, "application/json")
                        )
                    else -> respondBadRequest()
                }
            }
        }
    }

    val product: Product = client.get("/products/prod-1").body()

    assertEquals("prod-1", product.id)
    assertEquals("Widget", product.name)
    assertEquals(29.99, product.price, 0.001)
}

HttpClient Mocking Patterns

Pattern 1: Structured Handler with State

class ProductApiTest {

    private fun createClient(handler: MockRequestHandleScope.(HttpRequestData) -> HttpResponseData): HttpClient {
        return HttpClient(MockEngine) {
            install(ContentNegotiation) {
                json(Json { ignoreUnknownKeys = true })
            }
            install(HttpRequestRetry) {
                retryOnServerErrors(maxRetries = 3)
                exponentialDelay()
            }
            engine {
                addHandler(handler)
            }
        }
    }

    @Test
    fun shouldReturnProductList() = runTest {
        val productsJson = """
            [
                {"id": "1", "name": "Product A", "price": 10.0},
                {"id": "2", "name": "Product B", "price": 20.0}
            ]
        """.trimIndent()

        val client = createClient { request ->
            assertEquals("/products", request.url.encodedPath)
            assertEquals(HttpMethod.Get, request.method)
            respond(productsJson, HttpStatusCode.OK,
                headersOf(HttpHeaders.ContentType, "application/json"))
        }

        val products: List<Product> = client.get("/products").body()
        assertEquals(2, products.size)
        assertEquals("Product A", products[0].name)
    }
}

Pattern 2: Sequential Responses

@Test
fun shouldHandleSequentialRequests() = runTest {
    val responses = ArrayDeque(listOf(
        Pair(HttpStatusCode.ServiceUnavailable, ""),
        Pair(HttpStatusCode.ServiceUnavailable, ""),
        Pair(HttpStatusCode.OK, """{"id": "1", "name": "Product"}""")
    ))

    val client = HttpClient(MockEngine) {
        install(ContentNegotiation) { json() }
        install(HttpRequestRetry) { retryOnServerErrors(maxRetries = 3) }
        engine {
            addHandler {
                val (status, body) = responses.removeFirst()
                respond(body, status, headersOf(HttpHeaders.ContentType, "application/json"))
            }
        }
    }

    val product: Product = client.get("/products/1").body()
    assertEquals("Product", product.name)
}

Pattern 3: Route-Based Dispatching

fun buildMockClient(vararg routes: Pair<String, () -> HttpResponseData>): HttpClient {
    val routeMap = routes.toMap()

    return HttpClient(MockEngine) {
        install(ContentNegotiation) { json(Json { ignoreUnknownKeys = true }) }
        engine {
            addHandler { request ->
                val path = request.url.encodedPath
                routeMap[path]?.invoke()
                    ?: respondError(HttpStatusCode.NotFound, "Route not found: $path")
            }
        }
    }
}

@Test
fun shouldHandleMultipleRoutes() = runTest {
    val client = buildMockClient(
        "/users/me" to {
            respond(
                """{"id": "user-1", "name": "Alice"}""",
                HttpStatusCode.OK,
                headersOf(HttpHeaders.ContentType, "application/json")
            )
        },
        "/users/me/orders" to {
            respond(
                """[{"id": "order-1", "total": 99.99}]""",
                HttpStatusCode.OK,
                headersOf(HttpHeaders.ContentType, "application/json")
            )
        }
    )

    val user: User = client.get("/users/me").body()
    val orders: List<Order> = client.get("/users/me/orders").body()

    assertEquals("Alice", user.name)
    assertEquals(1, orders.size)
    assertEquals(99.99, orders[0].total, 0.001)
}

Verifying Request Construction

@Test
fun shouldIncludeAuthorizationHeader() = runTest {
    var capturedRequest: HttpRequestData? = null

    val client = HttpClient(MockEngine) {
        install(Auth) {
            bearer {
                loadTokens { BearerTokens("access-token", "refresh-token") }
            }
        }
        engine {
            addHandler { request ->
                capturedRequest = request
                respond("""{"id": "1"}""", HttpStatusCode.OK,
                    headersOf(HttpHeaders.ContentType, "application/json"))
            }
        }
    }

    client.get("/protected/resource").body<Map<String, String>>()

    assertNotNull(capturedRequest)
    assertEquals("Bearer access-token", capturedRequest!!.headers[HttpHeaders.Authorization])
}

@Test
fun shouldSendCorrectJsonBody() = runTest {
    var requestBody: String? = null

    val client = HttpClient(MockEngine) {
        install(ContentNegotiation) { json() }
        engine {
            addHandler { request ->
                requestBody = request.body.toByteArray().toString(Charsets.UTF_8)
                respond("""{"id": "new-1"}""", HttpStatusCode.Created,
                    headersOf(HttpHeaders.ContentType, "application/json"))
            }
        }
    }

    @Serializable
    data class CreateRequest(val name: String, val price: Double)

    client.post("/products") {
        contentType(ContentType.Application.Json)
        setBody(CreateRequest("New Product", 49.99))
    }

    val json = Json.parseToJsonElement(requestBody!!)
    assertEquals("New Product", json.jsonObject["name"]?.jsonPrimitive?.content)
    assertEquals(49.99, json.jsonObject["price"]?.jsonPrimitive?.double)
}

@Test
fun shouldEncodeQueryParameters() = runTest {
    var capturedUrl: Url? = null

    val client = HttpClient(MockEngine) {
        engine {
            addHandler { request ->
                capturedUrl = request.url
                respond("""[]""", HttpStatusCode.OK,
                    headersOf(HttpHeaders.ContentType, "application/json"))
            }
        }
    }

    client.get("/products") {
        parameter("q", "wireless headphones")
        parameter("category", "electronics")
        parameter("page", 2)
    }

    assertNotNull(capturedUrl)
    assertEquals("wireless headphones", capturedUrl!!.parameters["q"])
    assertEquals("electronics", capturedUrl!!.parameters["category"])
    assertEquals("2", capturedUrl!!.parameters["page"])
}

Testing Streaming Responses

Ktor excels at streaming—and MockEngine supports it:

Server-Sent Events (SSE)

@Test
fun shouldHandleServerSentEvents() = runTest {
    val sseBody = """
        data: {"status": "processing"}
        
        data: {"status": "shipped"}
        
        data: {"status": "delivered"}
        
        
    """.trimIndent()

    val client = HttpClient(MockEngine) {
        engine {
            addHandler {
                respond(
                    content = ByteReadChannel(sseBody.toByteArray()),
                    status = HttpStatusCode.OK,
                    headers = headersOf(
                        HttpHeaders.ContentType, "text/event-stream",
                        HttpHeaders.CacheControl, "no-cache"
                    )
                )
            }
        }
    }

    val statuses = mutableListOf<String>()

    client.prepareGet("/orders/123/status").execute { response ->
        val channel = response.bodyAsChannel()
        while (!channel.isClosedForRead) {
            val line = channel.readUTF8Line() ?: break
            if (line.startsWith("data: ")) {
                val data = line.removePrefix("data: ")
                val status = Json.parseToJsonElement(data).jsonObject["status"]?.jsonPrimitive?.content
                if (status != null) statuses.add(status)
            }
        }
    }

    assertEquals(listOf("processing", "shipped", "delivered"), statuses)
}

Chunked Response Testing

@Test
fun shouldHandleLargeChunkedResponse() = runTest {
    val chunkSize = 1024
    val totalChunks = 100
    val totalBytes = chunkSize * totalChunks

    // Build a large response body
    val largeBody = "x".repeat(totalBytes)

    val client = HttpClient(MockEngine) {
        engine {
            addHandler {
                respond(
                    content = ByteReadChannel(largeBody.toByteArray()),
                    status = HttpStatusCode.OK,
                    headers = headersOf(
                        HttpHeaders.ContentType, "application/octet-stream",
                        HttpHeaders.TransferEncoding, "chunked"
                    )
                )
            }
        }
    }

    val collected = StringBuilder()
    client.prepareGet("/large-file").execute { response ->
        val channel = response.bodyAsChannel()
        val buffer = ByteArray(chunkSize)
        while (!channel.isClosedForRead) {
            val bytesRead = channel.readAvailable(buffer)
            if (bytesRead > 0) {
                collected.append(String(buffer, 0, bytesRead))
            }
        }
    }

    assertEquals(totalBytes, collected.length)
}

Timeout Testing

@Test
fun shouldThrowOnConnectionTimeout() = runTest {
    val client = HttpClient(MockEngine) {
        install(HttpTimeout) {
            connectTimeoutMillis = 100
            requestTimeoutMillis = 100
        }
        engine {
            addHandler {
                // Simulate a delay longer than the timeout
                delay(1000)
                respond("""{}""", HttpStatusCode.OK)
            }
        }
    }

    assertThrows<HttpRequestTimeoutException> {
        client.get("/slow-endpoint")
    }
}

@Test
fun shouldThrowOnSocketTimeout() = runTest {
    val client = HttpClient(MockEngine) {
        install(HttpTimeout) {
            socketTimeoutMillis = 200
        }
        engine {
            addHandler {
                // Deliver headers quickly but body slowly
                val bodyChannel = writer(Charsets.UTF_8) {
                    channel.writeStringUtf8("{")
                    delay(5000)  // Hang mid-response
                    channel.writeStringUtf8("}")
                }.channel

                respond(
                    content = bodyChannel,
                    status = HttpStatusCode.OK,
                    headers = headersOf(HttpHeaders.ContentType, "application/json")
                )
            }
        }
    }

    assertThrows<HttpRequestTimeoutException> {
        client.get("/hanging-response").body<Map<String, String>>()
    }
}

@Test
fun shouldRespectCancellation() = runTest {
    val client = HttpClient(MockEngine) {
        engine {
            addHandler {
                delay(Long.MAX_VALUE)  // Never responds
                respond("""{}""", HttpStatusCode.OK)
            }
        }
    }

    val job = launch {
        client.get("/never-responds")
    }

    delay(50)
    job.cancel()
    job.join()

    assertTrue(job.isCancelled)
}

Testing Error Handling

@Test
fun shouldHandleServerError() = runTest {
    val client = HttpClient(MockEngine) {
        engine {
            addHandler {
                respondError(
                    HttpStatusCode.InternalServerError,
                    """{"error": "Something went wrong"}"""
                )
            }
        }
        expectSuccess = false  // Don't throw on non-2xx
    }

    val response = client.get("/broken-endpoint")
    assertEquals(HttpStatusCode.InternalServerError, response.status)

    val body: Map<String, String> = Json.decodeFromString(response.bodyAsText())
    assertEquals("Something went wrong", body["error"])
}

@Test
fun shouldThrowClientRequestExceptionByDefault() = runTest {
    val client = HttpClient(MockEngine) {
        engine {
            addHandler {
                respondError(HttpStatusCode.Forbidden, "Forbidden")
            }
        }
    }

    assertThrows<ClientRequestException> {
        client.get("/forbidden")
    }
}

Testing Plugins

Testing Retry Plugin

@Test
fun shouldRetryOnServerErrors() = runTest {
    var requestCount = 0

    val client = HttpClient(MockEngine) {
        install(HttpRequestRetry) {
            retryOnServerErrors(maxRetries = 2)
            constantDelay(millis = 10)
        }
        engine {
            addHandler {
                requestCount++
                if (requestCount < 3) {
                    respondError(HttpStatusCode.ServiceUnavailable, "Temporarily unavailable")
                } else {
                    respond("""{"success": true}""", HttpStatusCode.OK,
                        headersOf(HttpHeaders.ContentType, "application/json"))
                }
            }
        }
    }

    val response: Map<String, Boolean> = client.get("/unstable-endpoint").body()

    assertEquals(3, requestCount)
    assertTrue(response["success"] == true)
}

Testing Auth Token Refresh

@Test
fun shouldRefreshTokenAndRetryOn401() = runTest {
    var refreshCalled = false
    var requestCount = 0

    val client = HttpClient(MockEngine) {
        install(Auth) {
            bearer {
                loadTokens { BearerTokens("expired-token", "refresh-token") }
                refreshTokens {
                    refreshCalled = true
                    BearerTokens("new-access-token", "new-refresh-token")
                }
            }
        }
        engine {
            addHandler { request ->
                requestCount++
                val auth = request.headers[HttpHeaders.Authorization]
                if (auth == "Bearer expired-token") {
                    respond("Unauthorized", HttpStatusCode.Unauthorized)
                } else {
                    respond("""{"id": "1"}""", HttpStatusCode.OK,
                        headersOf(HttpHeaders.ContentType, "application/json"))
                }
            }
        }
    }

    client.get("/protected").body<Map<String, String>>()

    assertTrue(refreshCalled)
    assertEquals(2, requestCount)  // Original + retry after refresh
}

Ktor Testing with HelpMeTest

For Kotlin Multiplatform apps where Ktor serves both Android and web targets, HelpMeTest validates the full user experience:

*** Test Cases ***
Live Order Tracking Updates
    As    LoggedInUser
    Go To    https://app.example.com/track/order-123
    Wait Until Element Is Visible    [data-testid=tracking-status]
    Element Should Contain    [data-testid=tracking-status]    Processing
    # Status should update via SSE within 30 seconds
    Wait Until Element Contains    [data-testid=tracking-status]    Shipped    timeout=30s

This validates that Ktor's streaming/SSE functionality reaches the UI layer correctly—something unit tests can't verify.

Comparing Ktor MockEngine vs OkHttp MockWebServer

Feature Ktor MockEngine OkHttp MockWebServer
Setup complexity Minimal (in-process) Moderate (real HTTP server)
Captures real requests Yes Yes
Tests HTTP wire format No Yes
Works with non-Ktor clients No Yes
Streaming support Yes Yes
KMP compatibility Yes No (JVM only)
Timeout simulation Yes Yes

Use MockEngine for Ktor unit tests, MockWebServer when you need wire-level validation.

Common Pitfalls

1. Forgetting expectSuccess = false for error assertions By default, Ktor throws ResponseException for non-2xx responses. Set expectSuccess = false when you want to inspect error responses rather than catch exceptions.

2. Not handling all requests in MockEngine If a test sends a request that has no matching handler, MockEngine throws IllegalStateException. Ensure every request has a handler, or add a default fallback.

3. Testing without ContentNegotiation for JSON tests Without the ContentNegotiation plugin, .body<T>() on JSON responses throws. Always install the plugin with the appropriate serializer.

4. Sharing HttpClient between tests HttpClient is stateful (connection pools, cached tokens). Create a new client per test or per test class with proper setup/teardown.

Summary

Ktor's MockEngine provides the cleanest possible HTTP client testing experience:

  • In-process mocking without managing ports or external servers
  • Request capture for verifying URL, method, headers, and body
  • Sequential responses for testing retry and fallback logic
  • Streaming simulation for SSE and chunked transfer testing
  • Timeout testing via HttpTimeout plugin + delay() in handlers
  • Plugin testing for auth refresh, retry, and content negotiation

The combination of Ktor's first-class coroutine support and MockEngine's simplicity makes Ktor one of the most testable HTTP clients in the Kotlin ecosystem.

Read more