From f1853b76f1ac0c7c2e8fb6628b11ca71f5ace385 Mon Sep 17 00:00:00 2001 From: tomer doron Date: Thu, 2 Nov 2023 17:25:05 -0700 Subject: [PATCH] use result type --- .swiftformat | 6 +- Package.swift | 1 - README.md | 3 + Sources/ServiceDiscovery/Docs.docc/index.md | 3 + .../InMemoryServiceDiscovery.swift | 42 +++--- .../ServiceDiscovery/ServiceDiscovery.swift | 21 ++- .../InMemoryServiceDiscoveryTests.swift | 124 ++++++++++++++++-- 7 files changed, 164 insertions(+), 36 deletions(-) diff --git a/.swiftformat b/.swiftformat index 68c5e7a..80aaa0a 100644 --- a/.swiftformat +++ b/.swiftformat @@ -1,4 +1,4 @@ ---swiftversion 5.2 +--swiftversion 5.7 # file options @@ -13,8 +13,10 @@ --stripunusedargs closure-only --wraparguments before-first +# Configure the placement of an extension's access control keyword. +--extensionacl on-declarations + # rules --disable blankLinesAroundMark --disable wrapMultilineStatementBraces - diff --git a/Package.swift b/Package.swift index d67da8b..b2a9ec3 100644 --- a/Package.swift +++ b/Package.swift @@ -30,7 +30,6 @@ let package = Package( ], targets: [ .target(name: "ServiceDiscovery", dependencies: []), - .testTarget(name: "ServiceDiscoveryTests", dependencies: [ "ServiceDiscovery", .product(name: "Atomics", package: "swift-atomics"), diff --git a/README.md b/README.md index 8777ff5..19e420c 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,9 @@ for try await instances in serviceDiscovery.subscribe(to: service) { Underlying the async `subscribe` API is an `AsyncSequence`. To end the subscription, simply break out of the `for`-loop. +Note the AsyncSequence is of a Result type, wrapping either the instances discovered, or a discovery error if such occurred. +A client should decide how to best handle errors in this case, e.g. terminate the subscription or continue and handle the errors. + ### Combinators SwiftServiceDiscovery includes combinators for common requirements such as transforming and filtering instances. For example: diff --git a/Sources/ServiceDiscovery/Docs.docc/index.md b/Sources/ServiceDiscovery/Docs.docc/index.md index c7d98eb..2e0b447 100644 --- a/Sources/ServiceDiscovery/Docs.docc/index.md +++ b/Sources/ServiceDiscovery/Docs.docc/index.md @@ -52,6 +52,9 @@ for try await instances in serviceDiscovery.subscribe() { Underlying the async `subscribe` API is an `AsyncSequence`. To end the subscription, simply break out of the `for`-loop. +Note the AsyncSequence is of a Result type, wrapping either the instances discovered, or a discovery error if such occurred. +A client should decide how to best handle errors in this case, e.g. terminate the subscription or continue and handle the errors. + ## Implementing a service discovery backend > Note: Unless you need to implement a custom service discovery backend, everything in this section is likely not relevant, so please feel free to skip. diff --git a/Sources/ServiceDiscovery/InMemoryServiceDiscovery.swift b/Sources/ServiceDiscovery/InMemoryServiceDiscovery.swift index 9b26338..bdec178 100644 --- a/Sources/ServiceDiscovery/InMemoryServiceDiscovery.swift +++ b/Sources/ServiceDiscovery/InMemoryServiceDiscovery.swift @@ -12,10 +12,10 @@ // //===----------------------------------------------------------------------===// -public actor InMemoryServiceDiscovery: ServiceDiscovery { +public actor InMemoryServiceDiscovery: ServiceDiscovery, ServiceDiscoverySubscription { private var instances: [Instance] private var nextSubscriptionID = 0 - private var subscriptions: [Int: AsyncThrowingStream<[Instance], Error>.Continuation] + private var subscriptions: [Int: AsyncStream>.Continuation] public init(instances: [Instance] = []) { self.instances = instances @@ -26,11 +26,15 @@ public actor InMemoryServiceDiscovery: ServiceDiscovery { self.instances } - public func subscribe() async throws -> _DiscoverySequence { + public func subscribe() async throws -> InMemoryServiceDiscovery { + self + } + + public func next() async -> _DiscoverySequence { defer { self.nextSubscriptionID += 1 } let subscriptionID = self.nextSubscriptionID - let (stream, continuation) = AsyncThrowingStream.makeStream(of: [Instance].self) + let (stream, continuation) = AsyncStream.makeStream(of: Result<[Instance], Error>.self) continuation.onTermination = { _ in Task { await self.unsubscribe(subscriptionID: subscriptionID) @@ -39,10 +43,14 @@ public actor InMemoryServiceDiscovery: ServiceDiscovery { self.subscriptions[subscriptionID] = continuation - let instances = try await self.lookup() - continuation.yield(instances) + do { + let instances = try await self.lookup() + continuation.yield(.success(instances)) + } catch { + continuation.yield(.failure(error)) + } - return DiscoverySequence(stream) + return _DiscoverySequence(stream) } /// Registers new `instances`. @@ -50,7 +58,7 @@ public actor InMemoryServiceDiscovery: ServiceDiscovery { self.instances = instances for continuations in self.subscriptions.values { - continuations.yield(instances) + continuations.yield(.success(instances)) } } @@ -60,11 +68,11 @@ public actor InMemoryServiceDiscovery: ServiceDiscovery { /// Internal use only public struct _DiscoverySequence: AsyncSequence { - public typealias Element = [Instance] + public typealias Element = Result<[Instance], Error> - private var underlying: AsyncThrowingStream<[Instance], Error> + private var underlying: AsyncStream> - init(_ underlying: AsyncThrowingStream<[Instance], Error>) { + init(_ underlying: AsyncStream>) { self.underlying = underlying } @@ -73,14 +81,14 @@ public actor InMemoryServiceDiscovery: ServiceDiscovery { } public struct AsyncIterator: AsyncIteratorProtocol { - private var underlying: AsyncThrowingStream<[Instance], Error>.Iterator + private var underlying: AsyncStream>.Iterator - init(_ underlying: AsyncThrowingStream<[Instance], Error>.Iterator) { + init(_ underlying: AsyncStream>.Iterator) { self.underlying = underlying } - public mutating func next() async throws -> [Instance]? { - try await self.underlying.next() + public mutating func next() async -> Result<[Instance], Error>? { + await self.underlying.next() } } } @@ -88,8 +96,8 @@ public actor InMemoryServiceDiscovery: ServiceDiscovery { #if swift(<5.9) // Async stream API backfill -public extension AsyncThrowingStream { - static func makeStream( +extension AsyncThrowingStream { + public static func makeStream( of elementType: Element.Type = Element.self, throwing failureType: Failure.Type = Failure.self, bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded diff --git a/Sources/ServiceDiscovery/ServiceDiscovery.swift b/Sources/ServiceDiscovery/ServiceDiscovery.swift index 102b6a4..2d39e62 100644 --- a/Sources/ServiceDiscovery/ServiceDiscovery.swift +++ b/Sources/ServiceDiscovery/ServiceDiscovery.swift @@ -16,22 +16,33 @@ /// Provides service instances lookup. /// -/// ### Threading -/// -/// `ServiceDiscovery` implementations **MUST be thread-safe**. +/// `ServiceDiscovery` implementations. public protocol ServiceDiscovery: Sendable { /// Service discovery instance type associatedtype Instance: Sendable /// AsyncSequence of Service discovery instances - associatedtype DiscoverySequence: AsyncSequence where DiscoverySequence.Element == [Instance] + associatedtype Subscription: ServiceDiscoverySubscription where Subscription.Instance == Instance /// Performs async lookup for the given service's instances. /// /// - Returns: A listing of service instances. + /// - throws when failing to lookup instances func lookup() async throws -> [Instance] /// Subscribes to receive a service's instances whenever they change. /// /// - Returns a ``DiscoverySequence``, which is an `AsyncSequence` and each of its items is a snapshot listing of service instances. - func subscribe() async throws -> DiscoverySequence + /// - throws when failing to establish subscription + func subscribe() async throws -> Subscription +} + +/// The ServiceDiscoverySubscription returns an AsyncSequence of Result type, with either the instances discovered or an error if a discovery error has occurred +/// The client should decide how to best handle errors in this case, e.g. terminate the subscription or continue and handle the errors, for example by recording or propagating them +public protocol ServiceDiscoverySubscription: Sendable { + /// Service discovery instance type + associatedtype Instance: Sendable + /// Service discovery AsyncSequence + associatedtype DiscoverySequence: Sendable, AsyncSequence where DiscoverySequence.Element == Result<[Instance], Error> + + func next() async -> DiscoverySequence } diff --git a/Tests/ServiceDiscoveryTests/InMemoryServiceDiscoveryTests.swift b/Tests/ServiceDiscoveryTests/InMemoryServiceDiscoveryTests.swift index 5f87dfe..51a824b 100644 --- a/Tests/ServiceDiscoveryTests/InMemoryServiceDiscoveryTests.swift +++ b/Tests/ServiceDiscoveryTests/InMemoryServiceDiscoveryTests.swift @@ -63,9 +63,12 @@ class InMemoryServiceDiscoveryTests: XCTestCase { await serviceDiscovery.register(instances: [Self.mockInstances2[0]]) let task = Task { + let subscription = try await serviceDiscovery.subscribe() + // for await instances in await subscription.next() { // FIXME: using iterator instead of for..in due to 5.7 compiler bug - var iterator = try await serviceDiscovery.subscribe().makeAsyncIterator() - while let instances = try await iterator.next() { + var iterator = await subscription.next().makeAsyncIterator() + while let result = await iterator.next() { + let instances = try result.get() // for try await instances in try await serviceDiscovery.subscribe() { switch counter.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent) { case 1: @@ -104,6 +107,62 @@ class InMemoryServiceDiscoveryTests: XCTestCase { XCTAssertEqual(counter.load(ordering: .sequentiallyConsistent), 3, "Expected to be called 3 times") } + func testSubscribeWithErrors() async throws { + let serviceDiscovery = ThrowingServiceDiscovery() + + let counter = ManagedAtomic(0) + + #if os(macOS) + let expectation = XCTestExpectation(description: #function) + #else + let semaphore = DispatchSemaphore(value: 0) + #endif + + let task = Task { + let subscription = try await serviceDiscovery.subscribe() + // for await instances in await subscription.next() { + // FIXME: using iterator instead of for..in due to 5.7 compiler bug + var iterator = await subscription.next().makeAsyncIterator() + while let result = await iterator.next() { + switch counter.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent) { + case 1: + XCTAssertNoThrow(try result.get()) + await serviceDiscovery.yield(error: DiscoveryError(description: "error 1")) + case 2: + XCTAssertThrowsError(try result.get()) { error in + XCTAssertEqual(error as? DiscoveryError, DiscoveryError(description: "error 1")) + } + await serviceDiscovery.yield(instances: [(), (), ()]) + case 3: + XCTAssertNoThrow(try result.get()) + XCTAssertEqual(try result.get().count, 3) + await serviceDiscovery.yield(error: DiscoveryError(description: "error 2")) + case 4: + XCTAssertThrowsError(try result.get()) { error in + XCTAssertEqual(error as? DiscoveryError, DiscoveryError(description: "error 2")) + } + #if os(macOS) + expectation.fulfill() + #else + semaphore.signal() + #endif + default: + XCTFail("Expected to be called 3 times") + } + } + } + + #if os(macOS) + await fulfillment(of: [expectation], timeout: 5.0) + #else + XCTAssertEqual(.success, semaphore.wait(timeout: .now() + 1.0)) + #endif + + XCTAssertEqual(counter.load(ordering: .sequentiallyConsistent), 4, "Expected to be called 5 times") + + task.cancel() + } + func testCancellation() async throws { let serviceDiscovery = InMemoryServiceDiscovery(instances: Self.mockInstances1) @@ -123,10 +182,12 @@ class InMemoryServiceDiscoveryTests: XCTestCase { let counter2 = ManagedAtomic(0) let task1 = Task { + let subscription = try await serviceDiscovery.subscribe() // FIXME: using iterator instead of for..in due to 5.7 compiler bug - var iterator = try await serviceDiscovery.subscribe().makeAsyncIterator() - while let instances = try await iterator.next() { - // for try await instances in try await serviceDiscovery.subscribe() { + // for await instances in await subscription.next() { + var iterator = await subscription.next().makeAsyncIterator() + while let result = await iterator.next() { + let instances = try result.get() switch counter1.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent) { case 1: XCTAssertEqual(instances.count, 1) @@ -145,10 +206,12 @@ class InMemoryServiceDiscoveryTests: XCTestCase { } let task2 = Task { + let subscription = try await serviceDiscovery.subscribe() // FIXME: using iterator instead of for..in due to 5.7 compiler bug - var iterator = try await serviceDiscovery.subscribe().makeAsyncIterator() - while let instances = try await iterator.next() { - // for try await instances in try await serviceDiscovery.subscribe() { + // for await instances in await subscription.next() { + var iterator = await subscription.next().makeAsyncIterator() + while let result = await iterator.next() { + let instances = try result.get() // FIXME: casting to HostPort due to a 5.9 compiler bug switch counter2.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent) { case 1: @@ -198,10 +261,15 @@ class InMemoryServiceDiscoveryTests: XCTestCase { // one more time + await serviceDiscovery.register(instances: Self.mockInstances1) + let task3 = Task { + let subscription = try await serviceDiscovery.subscribe() // FIXME: using iterator instead of for..in due to 5.7 compiler bug - var iterator = try await serviceDiscovery.subscribe().makeAsyncIterator() - while let instances = try await iterator.next() { + // for await instances in await subscription.next() { + var iterator = await subscription.next().makeAsyncIterator() + while let result = await iterator.next() { + let instances = try result.get() XCTAssertEqual(instances.count, Self.mockInstances1.count) XCTAssertEqual(instances, Self.mockInstances1) if counter1.load(ordering: .sequentiallyConsistent) == 1 { @@ -214,7 +282,6 @@ class InMemoryServiceDiscoveryTests: XCTestCase { } } - await serviceDiscovery.register(instances: Self.mockInstances1) #if os(macOS) await fulfillment(of: [expectation3], timeout: 1.0) #else @@ -223,3 +290,38 @@ class InMemoryServiceDiscoveryTests: XCTestCase { task3.cancel() } } + +private actor ThrowingServiceDiscovery: ServiceDiscovery, ServiceDiscoverySubscription { + var continuation: AsyncStream>.Continuation? + + func lookup() async throws -> [Void] { + [] + } + + func subscribe() async throws -> ThrowingServiceDiscovery { + self + } + + func next() async -> InMemoryServiceDiscovery.DiscoverySequence { + let (stream, continuation) = AsyncStream.makeStream(of: Result<[Void], Error>.self) + self.continuation = continuation + continuation.yield(.success([])) // get us going + return InMemoryServiceDiscovery.DiscoverySequence(stream) + } + + func yield(error: Error) { + if let continuation = self.continuation { + continuation.yield(.failure(error)) + } + } + + func yield(instances: [Void]) { + if let continuation = self.continuation { + continuation.yield(.success(instances)) + } + } +} + +private struct DiscoveryError: Error, Equatable { + let description: String +}