Using Async Streams

Exploration of how to use AsyncStreams to get information out of classes and actors by wrapping @Published variables or replacing them all together.

Resources

Related Projects

AsyncStream Types

Fold/Unfold

Motivation

I ran into a behavior with AsyncStream I that did not make sense to me at first.

I had an actor with a published variable which I could can “subscribe” to via an AsyncPublisher and it behaved as expected, updating only when there is a change in value. If I created an AsyncStream with a synchronous context (but with a potential task retention problem) it also behaved as expected.

The weirdness happened when I wrapped that publisher in an AsyncStream with an asynchronous context. It started spamming the view with an update per loop it seems, NOT only when there was a change.

I created this project to help figure out what I was missing about (AsyncStream.init(unfolding:oncancel:))[https://developer.apple.com/documentation/swift/asyncstream/init(unfolding:oncancel:)?]

Initial Code

import Foundation
import SwiftUI



actor TestService {
    static let shared = TestService()
    
    @MainActor @Published var counter:Int = 0
    
    @MainActor public func updateCounter(by delta:Int) async {
        counter = counter + delta
    }
    
    public func asyncStream() -> AsyncStream<Int> {
        return AsyncStream.init(unfolding: unfolding, onCancel: onCancel)
        
        //() async -> _?
        func unfolding() async -> Int? {
            for await n in $counter.values {
                //print("\(location)")
                return n
            }
            return nil
        }
        
        //optional
        @Sendable func onCancel() -> Void {
            print("confirm counter got canceled")
        }
    }
    
    //has a task retain problem. 
   public func syncStream() -> AsyncStream<Int> {
        AsyncStream { continuation in
            Task {
                for await n in $counter.values {
                    //do hard work to transform n 
                    continuation.yield(n)
                }
            }
        }
    }
    
}

struct ContentView: View {
    var body: some View {
        VStack {
            TestActorButton()
            HStack {
                //TestActorViewA() //<-- uncomment at your own risk. 
                TestActorViewB()
                TestActorViewC()
            }
        }
        .padding()
    }
}


struct TestActorButton:View {
    var counter = TestService.shared
    
    
    var body: some View {
        Button("increment counter") {
            Task { await counter.updateCounter(by: 2) }
        }
    }
}


struct TestActorViewA:View {
    var counter = TestService.shared
    @State var counterVal:Int = 0
    
    var body: some View {
        Text("\(counterVal)")
            .task {
                //Fires constantly.
                for await value in await counter.asyncStream() {
                    print("View A Value: \(value)")
                    counterVal = value
                }
            }
    }
}

struct TestActorViewB:View {
    var counter = TestService.shared
    @State var counterVal:Int = 0
    
    var body: some View {
        Text("\(counterVal)")
            .task {
                //Behaves like one would expect. Fires once per change.
                for await value in await counter.$counter.values {
                    print("View B Value: \(value)")
                    counterVal = value
                }
            }
    }
}

struct TestActorViewC:View {
    var counter = TestService.shared
    @State var counterVal:Int = 0
    
    var body: some View {
        Text("\(counterVal)")
            .task {
                //Also only fires on update
                for await value in await counter.syncStream() {
                    print("View C Value: \(value)")
                    counterVal = value
                }
            }
    }
}

Best Answer for Simply Wrapping Publisher

The real solution to wrapping a publisher appears to be to stick to the synchronous context initializer and have it cancel it’s own task:

public func stream() -> AsyncStream<Int> {
        AsyncStream { continuation in
            let streamTask = Task {
                for await n in $counter.values {
                    //do hard work to transform n 
                    continuation.yield(n)
                }
            }

            continuation.onTermination = { @Sendable _ in
                streamTask.cancel()
                print("StreamTask Canceled")
            }

        }
    }

Use case for the “Unfolding” init style

From what I can tell the “unfolding” style initializer for AsyncStream is simply not a fit for wrapping an AsyncPublisher. The “unfolding” function will “pull” at the published value from within the stream, so the stream will just keep pushing values from that infinite well.

It seems like the “unfolding” style initializer is best used when processing a finite (but potentially very large) list of items, or when generating ones values from scratch… something like:

struct NumberQueuer {
    let numbers:[Int]
    
    public func queueStream() -> AsyncStream<Int> {
        var iterator = AsyncArray(values: numbers).makeAsyncIterator()
        print("Queue called")
        return AsyncStream.init(unfolding: unfolding, onCancel: onCancel)
        
        //() async -> _?
        func unfolding() async -> Int? {
            do {
                if let item = try await iterator.next() {
                    return item
                }
            } catch let error {
                print(error.localizedDescription)
            }
            return nil
            
        }
        
        //optional
        @Sendable func onCancel() -> Void {
            print("confirm NumberQueue got canceled")
        }
    }
    
}

public struct AsyncArray<Element>: AsyncSequence, AsyncIteratorProtocol {
    
    let values:[Element]
    let delay:TimeInterval
    
    var currentIndex = -1
    
    public init(values: [Element], delay:TimeInterval = 1) {
        self.values = values
        self.delay = delay
    }
    
    public mutating func next() async throws -> Element? {
        currentIndex += 1
        guard currentIndex < values.count else {
            return nil
        }
        try await Task.sleep(nanoseconds: UInt64(delay * 1E09))
        return values[currentIndex]
    }
    
    public func makeAsyncIterator() -> AsyncArray {
        self
    }
}

One can force the unfolding type to work with an @Published by creating a buffer array that is checked repeatedly. The variable wouldn’t actually need to be @Published anymore. This approach has a lot of problems but it can be made to work. See BufferArrayStream

GitHub

View Github