Testing Combine Publishers in Swift with XCTestExpectation and CombineExpectations

Testing Combine Publishers in Swift with XCTestExpectation and CombineExpectations

Combine's asynchronous, declarative model makes testing tricky — pipelines emit values over time, operators transform them, and errors can cancel the stream. This guide covers two approaches: the built-in XCTestExpectation pattern for simple cases, and the CombineExpectations library for ergonomic, readable publisher assertions. You'll write tests for transformations, error handling, and scheduler-dependent operators.

The Challenge of Testing Async Streams

A Combine publisher doesn't emit synchronously. When you call .sink, the subscription is set up but values may arrive later — on a background thread, after a delay, or in response to an external event. Naive test code without expectations will pass before any values arrive:

// THIS IS WRONG — test finishes before publisher emits
func testWrongWay() {
    let publisher = Just(42)
    var received: Int?
    publisher.sink { received = $0 }.store(in: &cancellables)
    XCTAssertEqual(received, 42) // might be nil
}

For Just, the above works by accident (it's synchronous). For real publishers — URLSession.dataTaskPublisher, debounced input, or subjects — you need proper async handling.

XCTestExpectation: The Built-in Approach

XCTestExpectation lets you pause the test until a condition is fulfilled:

import XCTest
import Combine

final class SearchViewModelTests: XCTestCase {
    var cancellables = Set<AnyCancellable>()

    func testSearchEmitsResults() {
        let expectation = XCTestExpectation(description: "Search returns results")
        let viewModel = SearchViewModel()
        var results: [String] = []

        viewModel.resultsPublisher
            .sink(
                receiveCompletion: { _ in },
                receiveValue: { values in
                    results = values
                    expectation.fulfill()
                }
            )
            .store(in: &cancellables)

        viewModel.search(query: "swift")
        wait(for: [expectation], timeout: 2.0)

        XCTAssertFalse(results.isEmpty)
        XCTAssertTrue(results.contains("Swift Testing"))
    }
}

For multiple emissions, use expectedFulfillmentCount:

func testThreeValuesEmitted() {
    let expectation = XCTestExpectation(description: "Three values")
    expectation.expectedFulfillmentCount = 3

    [1, 2, 3].publisher
        .sink { _ in expectation.fulfill() }
        .store(in: &cancellables)

    wait(for: [expectation], timeout: 1.0)
}

To assert completion:

func testPublisherCompletes() {
    let valueExpectation = XCTestExpectation(description: "Value received")
    let completionExpectation = XCTestExpectation(description: "Completed")

    Just("hello")
        .sink(
            receiveCompletion: { completion in
                if case .finished = completion {
                    completionExpectation.fulfill()
                }
            },
            receiveValue: { _ in
                valueExpectation.fulfill()
            }
        )
        .store(in: &cancellables)

    wait(for: [valueExpectation, completionExpectation], timeout: 1.0)
}

Testing Error Handling

enum SearchError: Error { case networkUnavailable }

func testPublisherFailsWithNetworkError() {
    let expectation = XCTestExpectation(description: "Error received")

    Fail<String, SearchError>(error: .networkUnavailable)
        .sink(
            receiveCompletion: { completion in
                if case .failure(let error) = completion {
                    XCTAssertEqual(error, .networkUnavailable)
                    expectation.fulfill()
                }
            },
            receiveValue: { _ in XCTFail("Should not receive value") }
        )
        .store(in: &cancellables)

    wait(for: [expectation], timeout: 1.0)
}

CombineExpectations: Ergonomic Publisher Testing

XCTestExpectation becomes verbose for multi-step pipelines. CombineExpectations provides a cleaner API designed specifically for Combine.

Install via SPM:

.package(url: "https://github.com/groue/CombineExpectations.git", from: "0.10.0")

Collecting All Values

import CombineExpectations

func testTransformationPipeline() throws {
    let publisher = [1, 2, 3, 4, 5].publisher
        .filter { $0 % 2 == 0 }
        .map { $0 * 10 }

    let recorder = publisher.record()
    let elements = try wait(for: recorder.elements, timeout: 1.0)

    XCTAssertEqual(elements, [20, 40])
}

.record() captures all emitted values and the completion. recorder.elements is a Publisher.Expectation you wait for.

Checking the First Value

func testFirstSearchResult() throws {
    let viewModel = SearchViewModel(service: MockSearchService())
    viewModel.search(query: "combine")

    let recorder = viewModel.resultsPublisher.record()
    let first = try wait(for: recorder.next(), timeout: 2.0)

    XCTAssertFalse(first.isEmpty)
}

Checking Completion

func testPublisherFinishes() throws {
    let recorder = Just("done").record()
    let completion = try wait(for: recorder.completion, timeout: 1.0)

    guard case .finished = completion else {
        XCTFail("Expected finished, got \(completion)")
        return
    }
}

func testPublisherFails() throws {
    let recorder = Fail<String, URLError>(error: URLError(.notConnectedToInternet))
        .record()
    let completion = try wait(for: recorder.completion, timeout: 1.0)

    guard case .failure(let error as URLError) = completion else {
        XCTFail("Expected URLError")
        return
    }
    XCTAssertEqual(error.code, .notConnectedToInternet)
}

Checking Prefix

func testFirstTwoEmissions() throws {
    let subject = PassthroughSubject<Int, Never>()
    let recorder = subject.record()

    subject.send(10)
    subject.send(20)
    subject.send(30)

    let prefix = try wait(for: recorder.prefix(2), timeout: 1.0)
    XCTAssertEqual(prefix, [10, 20])
}

Testing Schedulers with TestScheduler

Operators like .debounce, .delay, and .throttle are time-dependent. Testing them with real time makes tests slow and flaky. Use a controlled scheduler instead.

Swift's Combine doesn't include a test scheduler, but CombineSchedulers from Point-Free provides one:

import CombineSchedulers

func testDebouncedSearch() {
    let scheduler = DispatchQueue.test
    var received: [String] = []

    let subject = PassthroughSubject<String, Never>()
    subject
        .debounce(for: .milliseconds(300), scheduler: scheduler)
        .sink { received.append($0) }
        .store(in: &cancellables)

    subject.send("s")
    subject.send("sw")
    subject.send("swi")
    subject.send("swif")
    subject.send("swift")

    // No time has passed — nothing emitted yet
    XCTAssertEqual(received, [])

    // Advance past the debounce window
    scheduler.advance(by: .milliseconds(300))
    XCTAssertEqual(received, ["swift"])
}

The test runs instantly because DispatchQueue.test is a virtual clock you control.

Testing a ViewModel with Combine

A realistic pattern — testing a view model that uses Combine internally:

class UserProfileViewModel: ObservableObject {
    @Published var username: String = ""
    @Published var isValid: Bool = false

    private var cancellables = Set<AnyCancellable>()

    init() {
        $username
            .map { $0.count >= 3 }
            .assign(to: &$isValid)
    }
}

final class UserProfileViewModelTests: XCTestCase {
    func testValidationUpdatesWithUsername() throws {
        let vm = UserProfileViewModel()
        let recorder = vm.$isValid.record()

        vm.username = "ab"     // too short
        vm.username = "abc"    // exactly 3 — valid
        vm.username = "abcde"  // valid

        let values = try wait(for: recorder.prefix(4), timeout: 1.0)
        // Initial false, then false (ab), then true (abc), then true (abcde)
        XCTAssertEqual(values, [false, false, true, true])
    }
}

Structuring Combine Tests

A few patterns that keep Combine tests maintainable:

Use setUp/tearDown for cancellable management:

var cancellables = Set<AnyCancellable>()

override func setUp() {
    super.setUp()
    cancellables = Set<AnyCancellable>()
}

override func tearDown() {
    cancellables.removeAll()
    super.tearDown()
}

Mock publishers with PassthroughSubject:

struct MockAPIService {
    let responseSubject = PassthroughSubject<Data, URLError>()

    func fetch(url: URL) -> AnyPublisher<Data, URLError> {
        responseSubject.eraseToAnyPublisher()
    }
}

// In test:
let mockService = MockAPIService()
let vm = DataViewModel(service: mockService)
mockService.responseSubject.send(mockData)
mockService.responseSubject.send(completion: .finished)

Prefer XCTAssertThrowsError for synchronous CombineExpectations failures:

func testPublisherDoesNotEmit() throws {
    let empty = Empty<Int, Never>(completeImmediately: true)
    let recorder = empty.record()
    XCTAssertThrowsError(try wait(for: recorder.next(), timeout: 0.5))
}

Summary

Tool Best for
XCTestExpectation Simple async value/completion assertions
CombineExpectations Readable multi-step publisher testing
CombineSchedulers TestScheduler Time-sensitive operators (debounce, delay, throttle)
PassthroughSubject Mocking publisher inputs in tests

Test Combine pipelines at the boundary where your business logic interacts with them — usually in ViewModels. Keep operator chains short and testable; if a chain is hard to test, it's probably doing too much.

Read more