Testing Combine Publishers in Swift with XCTestExpectation and CombineExpectations
Combine's asynchronous, declarative model makes testing tricky — pipelines emit values over time, operators transform them, and errors can cancel the stream. This guide covers two approaches: the built-in XCTestExpectation pattern for simple cases, and the CombineExpectations library for ergonomic, readable publisher assertions. You'll write tests for transformations, error handling, and scheduler-dependent operators.
The Challenge of Testing Async Streams
A Combine publisher doesn't emit synchronously. When you call .sink, the subscription is set up but values may arrive later — on a background thread, after a delay, or in response to an external event. Naive test code without expectations will pass before any values arrive:
// THIS IS WRONG — test finishes before publisher emits
func testWrongWay() {
let publisher = Just(42)
var received: Int?
publisher.sink { received = $0 }.store(in: &cancellables)
XCTAssertEqual(received, 42) // might be nil
}For Just, the above works by accident (it's synchronous). For real publishers — URLSession.dataTaskPublisher, debounced input, or subjects — you need proper async handling.
XCTestExpectation: The Built-in Approach
XCTestExpectation lets you pause the test until a condition is fulfilled:
import XCTest
import Combine
final class SearchViewModelTests: XCTestCase {
var cancellables = Set<AnyCancellable>()
func testSearchEmitsResults() {
let expectation = XCTestExpectation(description: "Search returns results")
let viewModel = SearchViewModel()
var results: [String] = []
viewModel.resultsPublisher
.sink(
receiveCompletion: { _ in },
receiveValue: { values in
results = values
expectation.fulfill()
}
)
.store(in: &cancellables)
viewModel.search(query: "swift")
wait(for: [expectation], timeout: 2.0)
XCTAssertFalse(results.isEmpty)
XCTAssertTrue(results.contains("Swift Testing"))
}
}For multiple emissions, use expectedFulfillmentCount:
func testThreeValuesEmitted() {
let expectation = XCTestExpectation(description: "Three values")
expectation.expectedFulfillmentCount = 3
[1, 2, 3].publisher
.sink { _ in expectation.fulfill() }
.store(in: &cancellables)
wait(for: [expectation], timeout: 1.0)
}To assert completion:
func testPublisherCompletes() {
let valueExpectation = XCTestExpectation(description: "Value received")
let completionExpectation = XCTestExpectation(description: "Completed")
Just("hello")
.sink(
receiveCompletion: { completion in
if case .finished = completion {
completionExpectation.fulfill()
}
},
receiveValue: { _ in
valueExpectation.fulfill()
}
)
.store(in: &cancellables)
wait(for: [valueExpectation, completionExpectation], timeout: 1.0)
}Testing Error Handling
enum SearchError: Error { case networkUnavailable }
func testPublisherFailsWithNetworkError() {
let expectation = XCTestExpectation(description: "Error received")
Fail<String, SearchError>(error: .networkUnavailable)
.sink(
receiveCompletion: { completion in
if case .failure(let error) = completion {
XCTAssertEqual(error, .networkUnavailable)
expectation.fulfill()
}
},
receiveValue: { _ in XCTFail("Should not receive value") }
)
.store(in: &cancellables)
wait(for: [expectation], timeout: 1.0)
}CombineExpectations: Ergonomic Publisher Testing
XCTestExpectation becomes verbose for multi-step pipelines. CombineExpectations provides a cleaner API designed specifically for Combine.
Install via SPM:
.package(url: "https://github.com/groue/CombineExpectations.git", from: "0.10.0")Collecting All Values
import CombineExpectations
func testTransformationPipeline() throws {
let publisher = [1, 2, 3, 4, 5].publisher
.filter { $0 % 2 == 0 }
.map { $0 * 10 }
let recorder = publisher.record()
let elements = try wait(for: recorder.elements, timeout: 1.0)
XCTAssertEqual(elements, [20, 40])
}.record() captures all emitted values and the completion. recorder.elements is a Publisher.Expectation you wait for.
Checking the First Value
func testFirstSearchResult() throws {
let viewModel = SearchViewModel(service: MockSearchService())
viewModel.search(query: "combine")
let recorder = viewModel.resultsPublisher.record()
let first = try wait(for: recorder.next(), timeout: 2.0)
XCTAssertFalse(first.isEmpty)
}Checking Completion
func testPublisherFinishes() throws {
let recorder = Just("done").record()
let completion = try wait(for: recorder.completion, timeout: 1.0)
guard case .finished = completion else {
XCTFail("Expected finished, got \(completion)")
return
}
}
func testPublisherFails() throws {
let recorder = Fail<String, URLError>(error: URLError(.notConnectedToInternet))
.record()
let completion = try wait(for: recorder.completion, timeout: 1.0)
guard case .failure(let error as URLError) = completion else {
XCTFail("Expected URLError")
return
}
XCTAssertEqual(error.code, .notConnectedToInternet)
}Checking Prefix
func testFirstTwoEmissions() throws {
let subject = PassthroughSubject<Int, Never>()
let recorder = subject.record()
subject.send(10)
subject.send(20)
subject.send(30)
let prefix = try wait(for: recorder.prefix(2), timeout: 1.0)
XCTAssertEqual(prefix, [10, 20])
}Testing Schedulers with TestScheduler
Operators like .debounce, .delay, and .throttle are time-dependent. Testing them with real time makes tests slow and flaky. Use a controlled scheduler instead.
Swift's Combine doesn't include a test scheduler, but CombineSchedulers from Point-Free provides one:
import CombineSchedulers
func testDebouncedSearch() {
let scheduler = DispatchQueue.test
var received: [String] = []
let subject = PassthroughSubject<String, Never>()
subject
.debounce(for: .milliseconds(300), scheduler: scheduler)
.sink { received.append($0) }
.store(in: &cancellables)
subject.send("s")
subject.send("sw")
subject.send("swi")
subject.send("swif")
subject.send("swift")
// No time has passed — nothing emitted yet
XCTAssertEqual(received, [])
// Advance past the debounce window
scheduler.advance(by: .milliseconds(300))
XCTAssertEqual(received, ["swift"])
}The test runs instantly because DispatchQueue.test is a virtual clock you control.
Testing a ViewModel with Combine
A realistic pattern — testing a view model that uses Combine internally:
class UserProfileViewModel: ObservableObject {
@Published var username: String = ""
@Published var isValid: Bool = false
private var cancellables = Set<AnyCancellable>()
init() {
$username
.map { $0.count >= 3 }
.assign(to: &$isValid)
}
}
final class UserProfileViewModelTests: XCTestCase {
func testValidationUpdatesWithUsername() throws {
let vm = UserProfileViewModel()
let recorder = vm.$isValid.record()
vm.username = "ab" // too short
vm.username = "abc" // exactly 3 — valid
vm.username = "abcde" // valid
let values = try wait(for: recorder.prefix(4), timeout: 1.0)
// Initial false, then false (ab), then true (abc), then true (abcde)
XCTAssertEqual(values, [false, false, true, true])
}
}Structuring Combine Tests
A few patterns that keep Combine tests maintainable:
Use setUp/tearDown for cancellable management:
var cancellables = Set<AnyCancellable>()
override func setUp() {
super.setUp()
cancellables = Set<AnyCancellable>()
}
override func tearDown() {
cancellables.removeAll()
super.tearDown()
}Mock publishers with PassthroughSubject:
struct MockAPIService {
let responseSubject = PassthroughSubject<Data, URLError>()
func fetch(url: URL) -> AnyPublisher<Data, URLError> {
responseSubject.eraseToAnyPublisher()
}
}
// In test:
let mockService = MockAPIService()
let vm = DataViewModel(service: mockService)
mockService.responseSubject.send(mockData)
mockService.responseSubject.send(completion: .finished)Prefer XCTAssertThrowsError for synchronous CombineExpectations failures:
func testPublisherDoesNotEmit() throws {
let empty = Empty<Int, Never>(completeImmediately: true)
let recorder = empty.record()
XCTAssertThrowsError(try wait(for: recorder.next(), timeout: 0.5))
}Summary
| Tool | Best for |
|---|---|
XCTestExpectation |
Simple async value/completion assertions |
CombineExpectations |
Readable multi-step publisher testing |
CombineSchedulers TestScheduler |
Time-sensitive operators (debounce, delay, throttle) |
PassthroughSubject |
Mocking publisher inputs in tests |
Test Combine pipelines at the boundary where your business logic interacts with them — usually in ViewModels. Keep operator chains short and testable; if a chain is hard to test, it's probably doing too much.