Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Youre missing async sequence builder like kotlin's #19

Open
ursusursus opened this issue Jul 10, 2022 · 4 comments
Open

Youre missing async sequence builder like kotlin's #19

ursusursus opened this issue Jul 10, 2022 · 4 comments
Assignees
Labels
enhancement New feature or request

Comments

@ursusursus
Copy link

ursusursus commented Jul 10, 2022

I'm a kotlin developer familiar with kotlin coroutines and Flow, which is direct mapping to async/await + AsyncSequence.

The most powerful thing that kotlin flow has is the builder, which allows you to emit values arbitrarily and use all the async stuff since the closure is async, like so (I'll write it in async/await syntax so you understand better)

flow { emitter in
   await emitter.emit(1)
   await Task.sleep(1_000)
   await emitter.emit(10)
   await Task.sleep(1_000)
   for 0...10 {
      await emitter.emit(100)
   }
}

when the closure returns, the flow terminates, if you throw inside the closure, then error is propagated as usual

It allow you to then create arbitrary custom operators, or simply way to pipe async function into AsyncSequence like

asyncSequence {
   await someFunction()
}
.flatMapLatest { ... }
.collect { ... }

this is very needed

@ursusursus ursusursus added the enhancement New feature or request label Jul 10, 2022
@hoc081098
Copy link

hoc081098 commented Jul 11, 2022

Swift has built-in function

AsyncThrowingStream { continuation in        

    Task {
       continuation.yield(data) }  
       await ...
       continuation.finish(throwing: nil)
       continuation.finish(throwing: error)
 }

@ursusursus
Copy link
Author

Hm, but is that not leaky? The task will continue after callsite task gets canceled, no?

@ursusursus
Copy link
Author

How does this look?

extension AsyncThrowingStream {
    public struct Emitter {
        let continuation: Continuation

        public func emit(_ value: Element) {
            continuation.yield(value)
        }
    }

    public init(body: @escaping (Emitter) async throws -> ()) where Failure == Error {
        self.init { continuation in
            let task = Task {
                do {
                    let emitter = Emitter(continuation: continuation)
                    try await body(emitter)
                    continuation.finish()
                } catch {
                    continuation.finish(throwing: error)
                }
            }
            continuation.onTermination = { @Sendable termination in
                switch termination {
                case .cancelled:
                    task.cancel()
                default:
                    break
                }
            }
        }
    }
}
let source = AsyncThrowingStream<String, Error> { emitter in
    try await Task.sleep(nanoseconds: 1_000_000_000)
    emitter.emit("A")

    try await Task.sleep(nanoseconds: 1_000_000_000)
    emitter.emit("B")
}

@natario1
Copy link

I'd like something similar too. The implementation above looks good to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants