Skip to content

Commit

Permalink
use result type
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerd committed Nov 3, 2023
1 parent b313e25 commit 4cfd669
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 40 deletions.
6 changes: 4 additions & 2 deletions .swiftformat
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
--swiftversion 5.2
--swiftversion 5.7

# file options

Expand All @@ -13,8 +13,10 @@
--stripunusedargs closure-only
--wraparguments before-first

# Configure the placement of an extension's access control keyword.
--extensionacl on-declarations

# rules

--disable blankLinesAroundMark
--disable wrapMultilineStatementBraces

1 change: 0 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ let package = Package(
],
targets: [
.target(name: "ServiceDiscovery", dependencies: []),

.testTarget(name: "ServiceDiscoveryTests", dependencies: [
"ServiceDiscovery",
.product(name: "Atomics", package: "swift-atomics"),
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ for try await instances in serviceDiscovery.subscribe(to: service) {
Underlying the async `subscribe` API is an `AsyncSequence`. To end the subscription, simply break out of the `for`-loop.

Note the AsyncSequence is of a Result type, wrapping either the instances discovered, or a discovery error if such occurred.
A client should decide how to best handle errors in this case, e.g. terminate the subscription or continue and handle the errors.

### Combinators

SwiftServiceDiscovery includes combinators for common requirements such as transforming and filtering instances. For example:
Expand Down
3 changes: 3 additions & 0 deletions Sources/ServiceDiscovery/Docs.docc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ for try await instances in serviceDiscovery.subscribe() {

Underlying the async `subscribe` API is an `AsyncSequence`. To end the subscription, simply break out of the `for`-loop.

Note the AsyncSequence is of a Result type, wrapping either the instances discovered, or a discovery error if such occurred.
A client should decide how to best handle errors in this case, e.g. terminate the subscription or continue and handle the errors.

## Implementing a service discovery backend

> Note: Unless you need to implement a custom service discovery backend, everything in this section is likely not relevant, so please feel free to skip.
Expand Down
49 changes: 28 additions & 21 deletions Sources/ServiceDiscovery/InMemoryServiceDiscovery.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
//
//===----------------------------------------------------------------------===//

public actor InMemoryServiceDiscovery<Instance>: ServiceDiscovery {
public actor InMemoryServiceDiscovery<Instance>: ServiceDiscovery, ServiceDiscoverySubscription {
private var instances: [Instance]
private var nextSubscriptionID = 0
private var subscriptions: [Int: AsyncThrowingStream<[Instance], Error>.Continuation]
private var subscriptions: [Int: AsyncStream<Result<[Instance], Error>>.Continuation]

public init(instances: [Instance] = []) {
self.instances = instances
Expand All @@ -26,11 +26,15 @@ public actor InMemoryServiceDiscovery<Instance>: ServiceDiscovery {
self.instances
}

public func subscribe() async throws -> _DiscoverySequence {
public func subscribe() async throws -> InMemoryServiceDiscovery {
self
}

public func next() async -> _DiscoverySequence {
defer { self.nextSubscriptionID += 1 }
let subscriptionID = self.nextSubscriptionID

let (stream, continuation) = AsyncThrowingStream.makeStream(of: [Instance].self)
let (stream, continuation) = AsyncStream.makeStream(of: Result<[Instance], Error>.self)
continuation.onTermination = { _ in
Task {
await self.unsubscribe(subscriptionID: subscriptionID)
Expand All @@ -39,18 +43,22 @@ public actor InMemoryServiceDiscovery<Instance>: ServiceDiscovery {

self.subscriptions[subscriptionID] = continuation

let instances = try await self.lookup()
continuation.yield(instances)
do {
let instances = try await self.lookup()
continuation.yield(.success(instances))
} catch {
continuation.yield(.failure(error))
}

return DiscoverySequence(stream)
return _DiscoverySequence(stream)
}

/// Registers new `instances`.
public func register(instances: [Instance]) {
self.instances = instances

for continuations in self.subscriptions.values {
continuations.yield(instances)
continuations.yield(.success(instances))
}
}

Expand All @@ -60,11 +68,11 @@ public actor InMemoryServiceDiscovery<Instance>: ServiceDiscovery {

/// Internal use only
public struct _DiscoverySequence: AsyncSequence {
public typealias Element = [Instance]
public typealias Element = Result<[Instance], Error>

private var underlying: AsyncThrowingStream<[Instance], Error>
private var underlying: AsyncStream<Result<[Instance], Error>>

init(_ underlying: AsyncThrowingStream<[Instance], Error>) {
init(_ underlying: AsyncStream<Result<[Instance], Error>>) {
self.underlying = underlying
}

Expand All @@ -73,29 +81,28 @@ public actor InMemoryServiceDiscovery<Instance>: ServiceDiscovery {
}

public struct AsyncIterator: AsyncIteratorProtocol {
private var underlying: AsyncThrowingStream<[Instance], Error>.Iterator
private var underlying: AsyncStream<Result<[Instance], Error>>.Iterator

init(_ underlying: AsyncThrowingStream<[Instance], Error>.Iterator) {
init(_ underlying: AsyncStream<Result<[Instance], Error>>.Iterator) {
self.underlying = underlying
}

public mutating func next() async throws -> [Instance]? {
try await self.underlying.next()
public mutating func next() async -> Result<[Instance], Error>? {
await self.underlying.next()
}
}
}
}

#if swift(<5.9)
// Async stream API backfill
public extension AsyncThrowingStream {
static func makeStream(
extension AsyncStream {
public static func makeStream(
of elementType: Element.Type = Element.self,
throwing failureType: Failure.Type = Failure.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
) -> (stream: AsyncThrowingStream<Element, Failure>, continuation: AsyncThrowingStream<Element, Failure>.Continuation) where Failure == Error {
var continuation: AsyncThrowingStream<Element, Failure>.Continuation!
let stream = AsyncThrowingStream<Element, Failure>(bufferingPolicy: limit) { continuation = $0 }
) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
var continuation: AsyncStream<Element>.Continuation!
let stream = AsyncStream<Element>(bufferingPolicy: limit) { continuation = $0 }
return (stream: stream, continuation: continuation!)
}
}
Expand Down
21 changes: 16 additions & 5 deletions Sources/ServiceDiscovery/ServiceDiscovery.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,33 @@

/// Provides service instances lookup.
///
/// ### Threading
///
/// `ServiceDiscovery` implementations **MUST be thread-safe**.
/// `ServiceDiscovery` implementations.
public protocol ServiceDiscovery: Sendable {
/// Service discovery instance type
associatedtype Instance: Sendable
/// AsyncSequence of Service discovery instances
associatedtype DiscoverySequence: AsyncSequence where DiscoverySequence.Element == [Instance]
associatedtype Subscription: ServiceDiscoverySubscription where Subscription.Instance == Instance

/// Performs async lookup for the given service's instances.
///
/// - Returns: A listing of service instances.
/// - throws when failing to lookup instances
func lookup() async throws -> [Instance]

/// Subscribes to receive a service's instances whenever they change.
///
/// - Returns a ``DiscoverySequence``, which is an `AsyncSequence` and each of its items is a snapshot listing of service instances.
func subscribe() async throws -> DiscoverySequence
/// - throws when failing to establish subscription
func subscribe() async throws -> Subscription
}

/// The ServiceDiscoverySubscription returns an AsyncSequence of Result type, with either the instances discovered or an error if a discovery error has occurred
/// The client should decide how to best handle errors in this case, e.g. terminate the subscription or continue and handle the errors, for example by recording or propagating them
public protocol ServiceDiscoverySubscription: Sendable {
/// Service discovery instance type
associatedtype Instance: Sendable
/// Service discovery AsyncSequence
associatedtype DiscoverySequence: Sendable, AsyncSequence where DiscoverySequence.Element == Result<[Instance], Error>

func next() async -> DiscoverySequence
}
124 changes: 113 additions & 11 deletions Tests/ServiceDiscoveryTests/InMemoryServiceDiscoveryTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ class InMemoryServiceDiscoveryTests: XCTestCase {
await serviceDiscovery.register(instances: [Self.mockInstances2[0]])

let task = Task {
let subscription = try await serviceDiscovery.subscribe()
// for await instances in await subscription.next() {
// FIXME: using iterator instead of for..in due to 5.7 compiler bug
var iterator = try await serviceDiscovery.subscribe().makeAsyncIterator()
while let instances = try await iterator.next() {
var iterator = await subscription.next().makeAsyncIterator()
while let result = await iterator.next() {
let instances = try result.get()
// for try await instances in try await serviceDiscovery.subscribe() {
switch counter.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent) {
case 1:
Expand Down Expand Up @@ -104,6 +107,62 @@ class InMemoryServiceDiscoveryTests: XCTestCase {
XCTAssertEqual(counter.load(ordering: .sequentiallyConsistent), 3, "Expected to be called 3 times")
}

func testSubscribeWithErrors() async throws {
let serviceDiscovery = ThrowingServiceDiscovery()

let counter = ManagedAtomic<Int>(0)

#if os(macOS)
let expectation = XCTestExpectation(description: #function)
#else
let semaphore = DispatchSemaphore(value: 0)
#endif

let task = Task {
let subscription = try await serviceDiscovery.subscribe()
// for await instances in await subscription.next() {
// FIXME: using iterator instead of for..in due to 5.7 compiler bug
var iterator = await subscription.next().makeAsyncIterator()
while let result = await iterator.next() {
switch counter.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent) {
case 1:
XCTAssertNoThrow(try result.get())
await serviceDiscovery.yield(error: DiscoveryError(description: "error 1"))
case 2:
XCTAssertThrowsError(try result.get()) { error in
XCTAssertEqual(error as? DiscoveryError, DiscoveryError(description: "error 1"))
}
await serviceDiscovery.yield(instances: [(), (), ()])
case 3:
XCTAssertNoThrow(try result.get())
XCTAssertEqual(try result.get().count, 3)
await serviceDiscovery.yield(error: DiscoveryError(description: "error 2"))
case 4:
XCTAssertThrowsError(try result.get()) { error in
XCTAssertEqual(error as? DiscoveryError, DiscoveryError(description: "error 2"))
}
#if os(macOS)
expectation.fulfill()
#else
semaphore.signal()
#endif
default:
XCTFail("Expected to be called 3 times")
}
}
}

#if os(macOS)
await fulfillment(of: [expectation], timeout: 5.0)
#else
XCTAssertEqual(.success, semaphore.wait(timeout: .now() + 1.0))
#endif

XCTAssertEqual(counter.load(ordering: .sequentiallyConsistent), 4, "Expected to be called 5 times")

task.cancel()
}

func testCancellation() async throws {
let serviceDiscovery = InMemoryServiceDiscovery(instances: Self.mockInstances1)

Expand All @@ -123,10 +182,12 @@ class InMemoryServiceDiscoveryTests: XCTestCase {
let counter2 = ManagedAtomic<Int>(0)

let task1 = Task {
let subscription = try await serviceDiscovery.subscribe()
// FIXME: using iterator instead of for..in due to 5.7 compiler bug
var iterator = try await serviceDiscovery.subscribe().makeAsyncIterator()
while let instances = try await iterator.next() {
// for try await instances in try await serviceDiscovery.subscribe() {
// for await instances in await subscription.next() {
var iterator = await subscription.next().makeAsyncIterator()
while let result = await iterator.next() {
let instances = try result.get()
switch counter1.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent) {
case 1:
XCTAssertEqual(instances.count, 1)
Expand All @@ -145,10 +206,12 @@ class InMemoryServiceDiscoveryTests: XCTestCase {
}

let task2 = Task {
let subscription = try await serviceDiscovery.subscribe()
// FIXME: using iterator instead of for..in due to 5.7 compiler bug
var iterator = try await serviceDiscovery.subscribe().makeAsyncIterator()
while let instances = try await iterator.next() {
// for try await instances in try await serviceDiscovery.subscribe() {
// for await instances in await subscription.next() {
var iterator = await subscription.next().makeAsyncIterator()
while let result = await iterator.next() {
let instances = try result.get()
// FIXME: casting to HostPort due to a 5.9 compiler bug
switch counter2.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent) {
case 1:
Expand Down Expand Up @@ -198,10 +261,15 @@ class InMemoryServiceDiscoveryTests: XCTestCase {

// one more time

await serviceDiscovery.register(instances: Self.mockInstances1)

let task3 = Task {
let subscription = try await serviceDiscovery.subscribe()
// FIXME: using iterator instead of for..in due to 5.7 compiler bug
var iterator = try await serviceDiscovery.subscribe().makeAsyncIterator()
while let instances = try await iterator.next() {
// for await instances in await subscription.next() {
var iterator = await subscription.next().makeAsyncIterator()
while let result = await iterator.next() {
let instances = try result.get()
XCTAssertEqual(instances.count, Self.mockInstances1.count)
XCTAssertEqual(instances, Self.mockInstances1)
if counter1.load(ordering: .sequentiallyConsistent) == 1 {
Expand All @@ -214,7 +282,6 @@ class InMemoryServiceDiscoveryTests: XCTestCase {
}
}

await serviceDiscovery.register(instances: Self.mockInstances1)
#if os(macOS)
await fulfillment(of: [expectation3], timeout: 1.0)
#else
Expand All @@ -223,3 +290,38 @@ class InMemoryServiceDiscoveryTests: XCTestCase {
task3.cancel()
}
}

private actor ThrowingServiceDiscovery: ServiceDiscovery, ServiceDiscoverySubscription {
var continuation: AsyncStream<Result<[Void], Error>>.Continuation?

func lookup() async throws -> [Void] {
[]
}

func subscribe() async throws -> ThrowingServiceDiscovery {
self
}

func next() async -> InMemoryServiceDiscovery<Void>.DiscoverySequence {
let (stream, continuation) = AsyncStream.makeStream(of: Result<[Void], Error>.self)
self.continuation = continuation
continuation.yield(.success([])) // get us going
return InMemoryServiceDiscovery.DiscoverySequence(stream)
}

func yield(error: Error) {
if let continuation = self.continuation {
continuation.yield(.failure(error))
}
}

func yield(instances: [Void]) {
if let continuation = self.continuation {
continuation.yield(.success(instances))
}
}
}

private struct DiscoveryError: Error, Equatable {
let description: String
}

0 comments on commit 4cfd669

Please sign in to comment.