Combine Schedulers

A few schedulers that make working with Combine more testable and more versatile.

Schedulers

The Combine framework provides the Scheduler protocol, which is a powerful abstraction for describing how and when units of work are executed. It unifies many disparate ways of executing work, such as DispatchQueue, RunLoop and OperationQueue.

However, the moment you use any of these schedulers in your reactive code you instantly make the publisher asynchronous and therefore much harder to test, forcing you to use expectations and waits for time to pass as your publisher executes.

This library provides new schedulers that allow you to turn any asynchronous publisher into a synchronous one for ease of testing and debugging.

AnyScheduler

The AnyScheduler provides a type-erasing wrapper for the Scheduler protocol, which can be useful for being generic over many types of schedulers without needing to actually introduce a generic to your code. The Combine framework ships with many type-erasing wrappers, such as AnySubscriber, AnyPublisher and AnyCancellable, yet for some reason does not ship with AnyScheduler.

This type is useful for times that you want to be able to customize the scheduler used in some code from the outside, but you don’t want to introduce a generic to make it customizable. For example, suppose you have an ObservableObject view model that performs an API request when a method is called:

// MARK: - CurrencyViewModel

final class CurrencyViewModel: ObservableObject {

    // MARK: - Properties

    /// Current currency value
    @Published var currency: Currency?

    /// APIClient instance
    let apiClient: APIClient

    // MARK: - Initializers

    /// Default initializer
    /// - Parameter apiClient: APIClient instance
    init(apiClient: APIClient) {
        self.apiClient = apiClient
    }

    // MARK: - Useful

    /// Process reload button event
    func reloadButtonTapped() {
        self.apiClient
            .fetchCurrency()
            .receive(on: DispatchQueue.main)
            .assign(to: &self.$currency)
    }
}

Notice that we are using DispatchQueue.main in the reloadButtonTapped method because the fetchCurrency endpoint most likely delivers its output on a background thread (as is the case with URLSession).

This code seems innocent enough, but the presence of .receive(on: DispatchQueue.main) makes this code harder to test since you have to use XCTest expectations to explicitly wait a small amount of time for the queue to execute. This can lead to flakiness in tests and make test suites take longer to execute than necessary.

One way to fix this testing problem is to use an immediate scheduler instead of DispatchQueue.main, which will cause fetchCurrency to deliver its output as soon as possible with no thread hops. In order to allow for this we would need to inject a scheduler into our view model so that we can control it from the outside:

// MARK: - CurrencyViewModel

final class CurrencyViewModel<S: Scheduler>: ObservableObject {

    // MARK: - Properties

    /// Current currency value
    @Published var currency: Currency?

    /// APIClient instance
    let apiClient: APIClient

    /// Current scheduler instance
    let scheduler: S

    // MARK: - Initializers

    /// Default initializer
    /// - Parameters:
    ///   - apiClient: APIClient instance
    ///   - scheduler: Current scheduler instance
    init(apiClient: APIClient, scheduler: S) {
        self.apiClient = apiClient
        self.scheduler = scheduler
    }

    // MARK: - Useful
    
    /// Process reload button event
    func reloadButtonTapped() {
        self.apiClient
            .fetchCurrency()
            .receive(on: self.scheduler)
            .assign(to: &self.$currency)
    }
}

Now we can initialize this view model in production by using DispatchQueue.main and we can initialize it in tests using DispatchQueue.immediate. Sounds like a win!

However, introducing this generic to our view model is quite heavyweight as it is loudly announcing to the outside world that this type uses a scheduler, and worse it will end up infecting any code that touches this view model that also wants to be testable. For example, any view that uses this view model will need to introduce a generic if it wants to also be able to control the scheduler, which would be useful if we wanted to write snapshot tests.

Instead of introducing a generic to allow for substituting in different schedulers we can use AnyScheduler. It allows us to be somewhat generic in the scheduler, but without actually introducing a generic.

Instead of holding a generic scheduler in our view model we can say that we only want a scheduler whose associated types match that of DispatchQueue:

// MARK: - CurrencyViewModel

final class CurrencyViewModel: ObservableObject {

    // MARK: - Properties

    /// Current currency value
    @Published var currency: Currency?

    /// APIClient instance
    let apiClient: APIClient

    /// Current scheduler instance
    let scheduler: AnySchedulerOf<DispatchQueue>

    // MARK: - Initializers

    /// Default initializer
    /// - Parameters:
    ///   - apiClient: APIClient instance
    ///   - scheduler: Current scheduler instance
    init(apiClient: APIClient, scheduler: AnySchedulerOf<DispatchQueue>) {
        self.apiClient = apiClient
        self.scheduler = scheduler
    }

    // MARK: - Useful

    /// Process reload button event
    func reloadButtonTapped() {
        self.apiClient
            .fetchCurrency()
            .receive(on: self.scheduler)
            .assign(to: &self.$currency)
    }
}

Then, in production we can create a view model that uses a live DispatchQueue, but we just
have to first erase its type:

let viewModel = CurrencyViewModel(
    apiClient: apiClient,
    scheduler: DispatchQueue.main.eraseToAnyScheduler()
)

For common schedulers, like DispatchQueue, OperationQueue, and RunLoop, there is even a
static helper on AnyScheduler that further simplifies this:

let viewModel = CurrencyViewModel(
    apiClient: apiClient,
    scheduler: .main
)

And in tests we can use an immediate scheduler:

let viewModel = CurrencyViewModel(
    apiClient: apiClient,
    scheduler: .immediate
)

So, in general, AnyScheduler is great for allowing one to control what scheduler is used
in classes, functions, etc. without needing to introduce a generic, which can help simplify
the code and reduce implementation details from leaking out.

TestScheduler

A scheduler whose current time and execution can be controlled in a deterministic manner.

This scheduler is useful for testing how the flow of time effects publishers that use
asynchronous operators, such as debounce, throttle, delay, timeout, receive(on:),
subscribe(on:) and more.

For example, consider the following race operator that runs two futures in parallel, but
only emits the first one that completes:

func race<Output, Failure: Error>(
    _ first: Future<Output, Failure>,
    _ second: Future<Output, Failure>
) -> AnyPublisher<Output, Failure> {
    first
        .merge(with: second)
        .prefix(1)
        .eraseToAnyPublisher()
}

Although this publisher is quite simple we may still want to write some tests for it.

To do this we can create a test scheduler and create two futures, one that emits after a
second and one that emits after two seconds:

let scheduler = DispatchQueue.test
let first = Future<Int, Never> { callback in
    scheduler.schedule(after: scheduler.now.advanced(by: 1)) {
        callback(.success(1))
    }
}
let second = Future<Int, Never> { callback in
    scheduler.schedule(after: scheduler.now.advanced(by: 2)) {
        callback(.success(2))
    }
}

And then we can race these futures and collect their emissions into an array:

var output: [Int] = []
let cancellable = race(first, second).sink { output.append($0) }

And then we can deterministically move time forward in the scheduler to see how the publisher
emits. We can start by moving time forward by one second:

scheduler.advance(by: 1)
XCTAssertEqual(output, [1])

This proves that we get the first emission from the publisher since one second of time has
passed. If we further advance by one more second we can prove that we do not get anymore
emissions:

scheduler.advance(by: 1)
XCTAssertEqual(output, [1])

This is a very simple example of how to control the flow of time with the test scheduler,
but this technique can be used to test any publisher that involves Combine’s asynchronous
operations.

ImmediateScheduler

The Combine framework comes with an ImmediateScheduler type of its own, but it defines all new types for the associated types of SchedulerTimeType and SchedulerOptions. This means you cannot easily swap between a live DispatchQueue and an “immediate” DispatchQueue that executes work synchronously. The only way to do that would be to introduce generics to any code making use of that scheduler, which can become unwieldy.

So, instead, this library’s ImmediateScheduler uses the same associated types as an existing scheduler, which means you can use DispatchQueue.immediate to have a scheduler that looks like a dispatch queue but executes its work immediately. Similarly you can construct RunLoop.immediate and OperationQueue.immediate.

This scheduler is useful for writing tests against publishers that use asynchrony operators, such as receive(on:), subscribe(on:) and others, because it forces the publisher to emit immediately rather than needing to wait for thread hops or delays using XCTestExpectation.

This scheduler is different from TestScheduler in that you cannot explicitly control how time flows through your publisher, but rather you are instantly collapsing time into a single point.

As a basic example, suppose you have a view model that loads some data after waiting for 10 seconds from when a button is tapped:

// MARK: - HomeViewModel

final class HomeViewModel: ObservableObject {

    // MARK: - Properties

    /// Current albums list
    @Published var albums: [Album]?

    /// APIClient instance
    let apiClient: APIClient

    // MARK: - Initializers

    /// Default initializer
    /// - Parameter apiClient: APIClient instance
    init(apiClient: APIClient) {
        self.apiClient = apiClient
    }

    /// Process reload button event
    func reloadButtonTapped() {
        Just(())
            .delay(for: .seconds(10), scheduler: DispachQueue.main)
            .flatMap { apiClient.fetchAlbums() }
            .assign(to: &self.albums)
    }
}

In order to test this code you would literally need to wait 10 seconds for the publisher to
emit:

func testViewModel() {
    let viewModel = HomeViewModel(apiClient: .mock)
    viewModel.reloadButtonTapped()
    _ = XCTWaiter.wait(for: [XCTestExpectation()], timeout: 10)
    XCTAssert(viewModel.albums, [Album(id: 42)])
}

Alternatively, we can explicitly pass a scheduler into the view model initializer so that it
can be controller from the outside:

// MARK: - HomeViewModel

final class HomeViewModel: ObservableObject {

    // MARK: - Properties

    /// Current albums list
    @Published var albums: [Album]?

    /// APIClient instance
    let apiClient: APIClient

    /// Current scheduler instance
    let scheduler: AnySchedulerOf<DispatchQueue>

    // MARK: - Initializers

    /// Default initializer
    /// - Parameters:
    ///   - apiClient: APIClient instance
    ///   - scheduler: Current scheduler instance
    init(apiClient: APIClient, scheduler: AnySchedulerOf<DispatchQueue>) {
        self.apiClient = apiClient
        self.scheduler = scheduler
    }

    /// Process reload button event
    func reloadButtonTapped() {
        Just(())
            .delay(for: .seconds(10), scheduler: scheduler)
            .flatMap { apiClient.fetchAlbums() }
            .assign(to: &self.albums)
    }
}

And then in tests use an immediate scheduler:

func testViewModel() {

    let viewModel = HomeViewModel(
        apiClient: .mock,
        scheduler: .immediate
    )

    viewModel.reloadButtonTapped()

    // No more waiting...

    XCTAssert(viewModel.albums, [Album(id: 42)])
}

Animated schedulers

CombineSchedulers comes with helpers that aid in asynchronous animations in SwiftUI.

If a SwiftUI state mutation should be animated, you can invoke the transaction method to transform an existing scheduler into one that schedules its actions with a transaction. These APIs mirror SwiftUI’s withAnimation and withTransaction functions, which are invoked by the animated scheduler.

For example, to animate an API response in your view model, you can specify that the scheduler that receives this state should be animated:

self.apiClient
    .fetchUser()
    .receive(on: scheduler.animation())
    .assign(to: &self.$user)

FailingScheduler

A scheduler that causes the current XCTest test case to fail if it is used.

This scheduler can provide an additional layer of certainty that a tested code path does not
require the use of a scheduler.

As a view model becomes more complex, only some of its logic may require a scheduler. When
writing unit tests for any logic that does not require a scheduler, one should provide a
failing scheduler, instead. This documents, directly in the test, that the feature does not
use a scheduler. If it did, or ever does in the future, the test will fail.

For example, the following view model has a couple responsibilities:

// MARK: - CurrencyViewModel

final class CurrencyViewModel: ObservableObject {

    // MARK: - Properties

    /// Current currency value
    @Published var currency: Currency?

    /// APIClient instance
    let apiClient: APIClient

    /// Current scheduler instance
    let scheduler: AnySchedulerOf<DispatchQueue>

    // MARK: - Initializers

    /// Default initializer
    /// - Parameters:
    ///   - apiClient: APIClient instance
    ///   - scheduler: Current scheduler instance
    init(apiClient: APIClient, scheduler: AnySchedulerOf<DispatchQueue>) {
        self.apiClient = apiClient
        self.scheduler = scheduler
    }

    // MARK: - Useful

    /// Process reload button event
    func reloadButtonTapped() {
        self.apiClient
            .fetchCurrency()
            .receive(on: self.scheduler)
            .assign(to: &self.$currency)
    }

    /// Process favorite button event
    func favoriteButtonTapped() {
        self.currency?.isFavorite.toggle()
    }
}
  • It lets the user tap a button to refresh some currency data
  • It lets the user toggle if the currency is one of their favorites

The API client delivers the currency on a background queue, so the view model must receive it
on its main queue before mutating its state.

Tapping the reload button, however, involves no scheduling. This means that a test can be
written with a failing scheduler:

func testFavoriteButton() {

    let viewModel = CurrencyViewModel(
        apiClient: .mock,
        mainQueue: .failing
    )
    viewModel.currency = .mock

    viewModel.favoriteButtonTapped()
    XCTAssert(viewModel.currency?.isFavorite == true)

    viewModel.favoriteButtonTapped()
    XCTAssert(viewModel.currency?.isFavorite == false)
}

With .failing, this test pretty strongly declares that favoriting an currency does not need
a scheduler to do the job, which means it is reasonable to assume that the feature is simple
and does not involve any asynchrony.

In the future, should favoriting an currency fire off an API request that involves a scheduler,
this test will begin to fail, which is a good thing! This will force us to address the
complexity that was introduced. Had we used any other scheduler, it would quietly receive this
additional work and the test would continue to pass.

UIScheduler

A scheduler that executes its work on the main queue as soon as possible. This scheduler is inspired by the equivalent scheduler in the ReactiveSwift project.

If UIScheduler.shared.schedule is invoked from the main thread then the unit of work will be performed immediately. This is in contrast to DispatchQueue.main.schedule, which will incur a thread hop before executing since it uses DispatchQueue.main.async under the hood.

This scheduler can be useful for situations where you need work executed as quickly as possible on the main thread, and for which a thread hop would be problematic, such as when performing animations.

Publishers.Timer

A publisher that emits a scheduler’s current time on a repeating interval

This publisher is an alternative to Foundation’s Timer.publisher, with its primary
difference being that it allows you to use any scheduler for the timer, not just RunLoop.
This is useful because the RunLoop scheduler is not testable in the sense that if you want
to write tests against a publisher that makes use of Timer.publisher you must explicitly
wait for time to pass in order to get emissions. This is likely to lead to fragile tests and
greatly bloat the time your tests take to execute.

It can be used much like Foundation’s timer, except you specify a scheduler rather than a
run loop:

Publishers.Timer(every: .seconds(1), scheduler: DispatchQueue.main)
    .autoconnect()
    .sink { print("Timer", $0) }

But more importantly, you can use it with TestScheduler so that any Combine code you write
involving timers becomes more testable. This shows how we can easily simulate the idea of
moving time forward 1,000 seconds in a timer:

let scheduler = DispatchQueue.test
var output: [Int] = []

Publishers.Timer(every: 1, scheduler: scheduler)
    .autoconnect()
    .sink { _ in output.append(output.count) }
    .store(in: &self.cancellables)

XCTAssertEqual(output, [])

scheduler.advance(by: 1)
XCTAssertEqual(output, [0])

scheduler.advance(by: 1)
XCTAssertEqual(output, [0, 1])

scheduler.advance(by: 1_000)
XCTAssertEqual(output, Array(0...1_001))

Compatibility

This library is compatible with iOS 13.2 and higher. Please note that there are bugs in the Combine framework and iOS 13.1 and lower that will cause crashes when trying to compare DispatchQueue.SchedulerTimeType values, which is an operation that the TestScheduler depends on.

Installation

You can add CombineSchedulers to an Xcode project by adding it as a package dependency.

  1. From the File menu, select Swift Packages › Add Package Dependency…
  2. Enter https://github.com/Incetro/combine-schedulers into the package repository URL text field
  3. Depending on how your project is structured:
    • If you have a single application target that needs access to the library, then add CombineSchedulers directly to your application.
    • If you want to use this library from multiple targets you must create a shared framework that depends on CombineSchedulers, and then depend on that framework from your other targets.

License

This library is released under the MIT license. See LICENSE for details.

GitHub

https://github.com/Incetro/combine-schedulers