AsyncExtensions aims to mimic Swift Combine operators for async sequences

AsyncExtensions

AsyncExtensions provides a collection of operators, async sequences and async streams that mimics Combine behaviour.

The purpose is to be able to chain operators, just as you would do with any reactive programming framework:

AsyncSequences
    .Merge(sequence1, sequence2, sequence3)
    .prepend(0)
    .handleEvents(onElement: { print($0) }, onFinish: { print("Finished") })
    .scan("") { accumulator, element in accumulator + "\(element)" }
    .collect { print($0) }

Async Sequences

Async Streams

Operators

More operators and extensions are to come. Pull requests are of course welcome.

Async Sequences

Just

Just is an AsyncSequence that outputs a single value and finishes.

let justSequence = AsyncSequences.Just<Int>(1)
for try await element in justSequence {
    // will be called once with element = 1
}

Empty

Empty is an AsyncSequence that immediately finishes without emitting values.

let emptySequence = AsyncSequences.Empty<Int>()
for try await element in emptySequence {
    // will never be called
}

Fail

Fail is an AsyncSequence that outputs no elements and throws an error.

let failSequence = AsyncSequences.Fail<Int, Swift.Error>(error: NSError(domain: "", code: 1))
do {
    for try await element in failSequence {
        // will never be called
    }
} catch {
    // will catch `NSError(domain: "", code: 1)` here
}

From

From is an AsyncSequence that outputs elements from a traditional Sequence.

let fromSequence = AsyncSequences.From([1, 2, 3, 4, 5])

for await element in fromSequence {
    print(element) // will print 1 2 3 4 5
}

A variation offers to set an interval of time between each element.

let fromSequence = AsyncSequences.From([1, 2, 3, 4, 5], interval: .milliSeconds(10))

for await element in fromSequence {
    print(element) // will print 1 2 3 4 5 with an interval of 10ms between elements
}

Merge

Merge is an AsyncSequence that merges several async sequences respecting their temporality while being iterated over. When all the async sequences have finished, so too does the merged async sequence. If an async sequence fails, so too does the merged async sequence.

// 0.1ms   1ms    1.5ms   2ms     3ms     4.5ms
//  4       1       5      2       3        6

let asyncSequence1 = AsyncStream(Int.self, bufferingPolicy: .unbounded) { continuation in
    Task {
        try await Task.sleep(nanoseconds: 1_000_000)
        continuation.yield(1)
        try await Task.sleep(nanoseconds: 1_000_000)
        continuation.yield(2)
        try await Task.sleep(nanoseconds: 1_000_000)
        continuation.yield(3)
        continuation.finish()
    }
}

let asyncSequence2 = AsyncStream(Int.self, bufferingPolicy: .unbounded) { continuation in
    Task {
        try await Task.sleep(nanoseconds: 100_000)
        continuation.yield(4)
        try await Task.sleep(nanoseconds: 1_400_000)
        continuation.yield(5)
        try await Task.sleep(nanoseconds: 3_000_000)
        continuation.yield(6)
        continuation.finish()
    }
}

let mergedAsyncSequence = AsyncSequences.Merge(asyncSequence1, asyncSequence2)

for try await element in mergedAsyncSequence {
    print(element) // will print -> 4 1 5 2 3 6
}

Zip2

Zip2 is an AsyncSequence that combines the latest elements from two sequences according to their temporality and emits a tuple to the client. If any async sequence ends successfully or fails with an error, so too does the zipped async Sequence.

let asyncSequence1 = AsyncSequences.From([1, 2, 3, 4, 5])
let asyncSequence2 = AsyncSequences.From(["5", "4", "3", "2", "1"])

let zippedAsyncSequence = AsyncSequences.Zip2(asyncSequence1, asyncSequence2)

for try await element in zippedAsyncSequence {
    print(element) // will print -> (1, "5") (2, "4") (3, "3") (4, "2") (5, "1")
}

Zip3

Zip3 is an AsyncSequence that combines the latest elements from two sequences according to their temporality and emits a tuple to the client. If any async sequence ends successfully or fails with an error, so too does the zipped async Sequence.

let asyncSequence1 = AsyncSequences.From([1, 2, 3, 4, 5])
let asyncSequence2 = AsyncSequences.From(["5", "4", "3", "2", "1"])
let asyncSequence3 = AsyncSequences.From([true, false, true, false, true])

let zippedAsyncSequence = AsyncSequences.Zip3(asyncSequence1, asyncSequence2, asyncSequence3)

for try await element in zippedAsyncSequence {
    print(element) // will print -> (1, "5", true) (2, "4", false) (3, "3", true) (4, "2", false) (5, "1", true)
}

Zip

Zip is an AsyncSequence that combines the latest elements from several sequences according to their temporality and emits an array to the client. If any async sequence ends successfully or fails with an error, so too does the zipped async Sequence.

let asyncSequence1 = AsyncSequences.From([1, 2, 3])
let asyncSequence2 = AsyncSequences.From([1, 2, 3])
let asyncSequence3 = AsyncSequences.From([1, 2, 3])
let asyncSequence4 = AsyncSequences.From([1, 2, 3])
let asyncSequence5 = AsyncSequences.From([1, 2, 3])

let zippedAsyncSequence = AsyncSequences.Zip(asyncSequence1, asyncSequence2, asyncSequence3, asyncSequence4, asyncSequence5)

for try await element in zippedAsyncSequence {
    print(element) // will print -> [1, 1, 1, 1, 1] [2, 2, 2, 2, 2] [3, 3, 3, 3, 3]
}

Timer

Timer is an async sequence that repeatedly emits the current date on the given interval, with the given priority.

let timer = AsyncSequences.Timer(priority: .high, every: .seconds(1))

Task {
    for try await element in timer {
        print(element)
    }
}

// will print:
// 2022-03-06 19:31:22 +0000
// 2022-03-06 19:31:23 +0000
// 2022-03-06 19:31:24 +0000
// 2022-03-06 19:31:25 +0000
// 2022-03-06 19:31:26 +0000
// and will stop once timer.cancel() is called or the parent task is cancelled.

Async Streams

Passthrough

Passthrough is an async sequence in which one can send values over time.

let passthrough = AsyncStreams.Passthrough<Int>()

Task {
    for try await element in passthrough {
        print(element) // will print 1 2
    }
}

Task {
    for try await element in passthrough {
        print(element) // will print 1 2
    }
}

.. later in the application flow

passthrough.send(1)
passthrough.send(2)

CurrentValue

CurrentValue is an async sequence in which one can send values over time. The current value is always accessible as an instance variable. The current value is replayed for any new async loop.

let currentValue = AsyncStreams.CurrentValue<Int>(1)

Task {
    for try await element in currentValue {
        print(element) // will print 1 2
    }
}

Task {
    for try await element in currentValue {
        print(element) // will print 1 2
    }
}

.. later in the application flow

currentValue.send(2)

print(currentValue.element) // will print 2

Replay

Replayis an async sequence in which one can send values over time. Values are buffered in a FIFO fashion so they can be iterated over by new loops. When the bufferSize is outreached the oldest value is dropped.

let replay = AsyncStreams.Replay<Int>(bufferSize: 3)

(1...5).forEach { replay.send($0) }

for try await element in replay {
    print(element) // will print 3, 4, 5
}

Streamed

Streamed is a property wrapper that streams a property as an AsyncSequence. It is a structured concurrency equivalent to Combine @Published.

class Weather {
    @Streamed var temperature: Double
    init(temperature: Double) {
        self.temperature = temperature
    }
}

let weather = Weather(temperature: 20)
Task {
    for try await element in weather.$temperature {
        print ("Temperature now: \(element)")
    }
}

// ... later in the application flow

weather.temperature = 25

// will print:
// Temperature now: 20.0
// Temperature now: 25.0

Operators

Collect

collect(_:) iterates over each element of the AsyncSequence and give it to the async block.

let fromSequence = AsyncSequences.From([1, 2, 3])
fromSequence
    .collect { print($0) } // will print 1 2 3

Scan

scan(_:_:) transforms elements from the upstream async sequence by providing the current element to a closure along with the last value returned by the closure. Each intermediate value will be emitted in the downstream async sequence.

let sourceSequence = AsyncSequences.From([1, 2, 3, 4, 5])
let scannedSequence = sourceSequence.scan("") { accumulator, element in accumulator + "\(element)"}

for try await element in scannedSequence {
    print(element)
}

// will print:
1
12
123
1234
12345

SwitchToLatest

switchToLatest() re-emits elements sent by the most recently received async sequence. This operator applies only in the case where the upstream async sequence’s Element is it-self an async sequence.

let sourceSequence = AsyncSequences.From([1, 2, 3])
let mappedSequence = sourceSequence.map { element in
	AsyncSequences.From(["a\(element)", "b\(element)"])
}
let switchedSequence = mappedSequence.switchToLatest()

for try await element in switchedSequence {
    print(element) // will print a3 b3
}

FlatMapLatest

flatMapLatest(_:) transforms the upstream async sequence elements into an async sequence and flattens the sequence of events from these multiple sources async sequences to appear as if they were coming from a single async sequence of events. Mapping to a new async sequence will cancel the task related to the previous one.

This operator is basically a shortcut for map() and switchToLatest().

let sourceSequence = AsyncSequences.From([1, 2, 3])
let flatMapLatestSequence = sourceSequence.flatMapLatest { element in
	AsyncSequences.From(["a\(element)", "b\(element)"])
}

for try await element in flatMapLatestSequence {
    print(element) // will print a3 b3
}

Prepend

prepend(_:) prepends an element to the upstream async sequence.

let sourceSequence = AsyncSequences.From([1, 2, 3])
let prependSequence = sourceSequence.prepend(0)

for try await element in prependSequence {
    print(element) // will print 0 1 2 3
}

HandleEvents

handleEvents(onStart:onElement:onCancel:onFinish) performs the specified closures when async sequences events occur.

let sourceSequence = AsyncSequences.From([1, 2, 3, 4, 5])
let handledSequence = sourceSequence.handleEvents {
   print("Begin iterating")
} onElement: { element in
   print("Element is \(element)")
} onCancel: {
   print("Cancelled")
} onFinish: { termination in
   print(termination)
}

for try await element in handledSequence {}

// will print:
// Begin iterating
// Element is 1
// Element is 2
// Element is 3
// Element is 4
// Element is 5
// finished

Assign

assign(to:on:) assigns each element from the async sequence to a property on an object.

class Root {
    var property: String = ""
}

let root = Root()
let fromSequence = AsyncSequences.From(["1", "2", "3"])
try await fromSequence.assign(to: \.property, on: root) // will set the property value to "1", "2", "3"

Multicast

multicast(_:) is useful when you have multiple client loops, but you want the upstream async sequence to only produce a single AsyncIterator.

let stream = AsyncStreams.Passthrough<(String, Int)>()
let multicastedAsyncSequence = ["First", "Second", "Third"]
    .asyncElements
    .map { ($0, Int.random(in: 0...100)) }
    .handleEvents(onElement: { print("AsyncSequence produces: ($0)") })
    .multicast(stream)

Task {
    try await multicastedAsyncSequence
        .collect { print ("Task 1 received: \($0)") }
}

Task {
    try await multicastedAsyncSequence
        .collect { print ("Task 2 received: \($0)") }
}

multicastedAsyncSequence.connect()

// will print:
// AsyncSequence produces: ("First", 78)
// Stream 2 received: ("First", 78)
// Stream 1 received: ("First", 78)
// AsyncSequence produces: ("Second", 98)
// Stream 2 received: ("Second", 98)
// Stream 1 received: ("Second", 98)
// AsyncSequence produces: ("Third", 61)
// Stream 2 received: ("Third", 61)
// Stream 1 received: ("Third", 61)

Share

share() shares the output of an upstream async sequence with multiple client loops. share() is effectively a shortcut for multicast(_:) using a Passthrough stream, with an implicit autoconnect().

let sharedAsyncSequence = AsyncSequences.From(["first", "second", "third"], interval: .seconds(1))
    .map { ($0, Int.random(in: 0...100)) }
    .handleEvents(onElement: { print("AsyncSequence produces: \($0)") })
    .share()

Task {
    try await sharedAsyncSequence
        .collect { print ("Task 1 received: \($0)") }
}

Task {
    try await sharedAsyncSequence
        .collect { print ("Task 2 received: \($0)") }
}

// will print:
// AsyncSequence produces: ("First", 78)
// Stream 2 received: ("First", 78)
// Stream 1 received: ("First", 78)
// AsyncSequence produces: ("Second", 98)
// Stream 2 received: ("Second", 98)
// Stream 1 received: ("Second", 98)
// AsyncSequence produces: ("Third", 61)
// Stream 2 received: ("Third", 61)
// Stream 1 received: ("Third", 61)

WithLatestFrom

withLatestFrom(_:) merges two async sequences into a single one by combining each value from self with the latest value from the other sequence, if any.

let seq1 = AsyncStreams.CurrentValue<Int>(1)
let seq2 = AsyncStreams.CurrentValue<String>("1")

let combinedSeq = seq1.withLatestFrom(seq2)

Task {
   for try await element in combinedSeq {
       print(element)
   }
}

seq1.send(2)
seq2.send("2")
seq1.send(3)

// will print:
(1, "1")
(2, "1")
(3, "2")

EraseToAnyAsyncSequence

eraseToAnyAsyncSequence() type-erases the async sequence into an AnyAsyncSequence.

GitHub

View Github