From b94db3c4f2c96b1c53c956588d1a3b008ebe6611 Mon Sep 17 00:00:00 2001 From: tomer doron Date: Wed, 31 May 2023 23:06:11 -0700 Subject: [PATCH] wip --- Package.swift | 14 +- Package@swift-5.3.swift | 36 -- Package@swift-5.4.swift | 36 -- Package@swift-5.5.swift | 36 -- .../FilterInstanceServiceDiscovery.swift | 59 --- .../InMemoryServiceDiscovery.swift | 239 +++++----- .../MapInstanceServiceDiscovery.swift | 59 --- .../MapServiceServiceDiscovery.swift | 67 --- .../ServiceDiscovery+AsyncAwait.swift | 125 ----- .../ServiceDiscovery+Combinators.swift | 41 -- .../ServiceDiscovery+TypeErased.swift | 223 --------- .../ServiceDiscovery/ServiceDiscovery.swift | 102 +--- .../FilterInstanceServiceDiscoveryTests.swift | 380 --------------- Tests/ServiceDiscoveryTests/Helpers.swift | 71 +-- .../InMemoryServiceDiscoveryTests.swift | 365 ++++----------- .../MapInstanceServiceDiscoveryTests.swift | 379 --------------- .../MapServiceServiceDiscoveryTests.swift | 435 ------------------ .../TypeErasedServiceDiscoveryTests.swift | 332 ------------- 18 files changed, 245 insertions(+), 2754 deletions(-) delete mode 100644 Package@swift-5.3.swift delete mode 100644 Package@swift-5.4.swift delete mode 100644 Package@swift-5.5.swift delete mode 100644 Sources/ServiceDiscovery/FilterInstanceServiceDiscovery.swift delete mode 100644 Sources/ServiceDiscovery/MapInstanceServiceDiscovery.swift delete mode 100644 Sources/ServiceDiscovery/MapServiceServiceDiscovery.swift delete mode 100644 Sources/ServiceDiscovery/ServiceDiscovery+AsyncAwait.swift delete mode 100644 Sources/ServiceDiscovery/ServiceDiscovery+Combinators.swift delete mode 100644 Sources/ServiceDiscovery/ServiceDiscovery+TypeErased.swift delete mode 100644 Tests/ServiceDiscoveryTests/FilterInstanceServiceDiscoveryTests.swift delete mode 100644 Tests/ServiceDiscoveryTests/MapInstanceServiceDiscoveryTests.swift delete mode 100644 Tests/ServiceDiscoveryTests/MapServiceServiceDiscoveryTests.swift delete mode 100644 Tests/ServiceDiscoveryTests/TypeErasedServiceDiscoveryTests.swift diff --git a/Package.swift b/Package.swift index b7dfae5..872479a 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version:5.6 +// swift-tools-version:5.8 //===----------------------------------------------------------------------===// // @@ -18,20 +18,22 @@ import PackageDescription let package = Package( name: "swift-service-discovery", + platforms: [ + .macOS(.v13) + ], products: [ .library(name: "ServiceDiscovery", targets: ["ServiceDiscovery"]), ], dependencies: [ .package(url: "https://github.com/apple/swift-atomics", from: "1.0.2"), - .package(url: "https://github.com/apple/swift-log", from: "1.2.0"), .package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"), ], targets: [ - .target(name: "ServiceDiscovery", dependencies: [ + .target(name: "ServiceDiscovery", dependencies: []), + + .testTarget(name: "ServiceDiscoveryTests", dependencies: [ + "ServiceDiscovery", .product(name: "Atomics", package: "swift-atomics"), - .product(name: "Logging", package: "swift-log"), ]), - - .testTarget(name: "ServiceDiscoveryTests", dependencies: ["ServiceDiscovery"]), ] ) diff --git a/Package@swift-5.3.swift b/Package@swift-5.3.swift deleted file mode 100644 index 29914f1..0000000 --- a/Package@swift-5.3.swift +++ /dev/null @@ -1,36 +0,0 @@ -// swift-tools-version:5.3 - -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftServiceDiscovery open source project -// -// Copyright (c) 2020-2022 Apple Inc. and the SwiftServiceDiscovery project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftServiceDiscovery project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import PackageDescription - -let package = Package( - name: "swift-service-discovery", - products: [ - .library(name: "ServiceDiscovery", targets: ["ServiceDiscovery"]), - ], - dependencies: [ - .package(url: "https://github.com/apple/swift-atomics", from: "1.0.2"), - .package(url: "https://github.com/apple/swift-log", from: "1.2.0"), - ], - targets: [ - .target(name: "ServiceDiscovery", dependencies: [ - .product(name: "Atomics", package: "swift-atomics"), - .product(name: "Logging", package: "swift-log"), - ]), - - .testTarget(name: "ServiceDiscoveryTests", dependencies: ["ServiceDiscovery"]), - ] -) diff --git a/Package@swift-5.4.swift b/Package@swift-5.4.swift deleted file mode 100644 index 29914f1..0000000 --- a/Package@swift-5.4.swift +++ /dev/null @@ -1,36 +0,0 @@ -// swift-tools-version:5.3 - -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftServiceDiscovery open source project -// -// Copyright (c) 2020-2022 Apple Inc. and the SwiftServiceDiscovery project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftServiceDiscovery project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import PackageDescription - -let package = Package( - name: "swift-service-discovery", - products: [ - .library(name: "ServiceDiscovery", targets: ["ServiceDiscovery"]), - ], - dependencies: [ - .package(url: "https://github.com/apple/swift-atomics", from: "1.0.2"), - .package(url: "https://github.com/apple/swift-log", from: "1.2.0"), - ], - targets: [ - .target(name: "ServiceDiscovery", dependencies: [ - .product(name: "Atomics", package: "swift-atomics"), - .product(name: "Logging", package: "swift-log"), - ]), - - .testTarget(name: "ServiceDiscoveryTests", dependencies: ["ServiceDiscovery"]), - ] -) diff --git a/Package@swift-5.5.swift b/Package@swift-5.5.swift deleted file mode 100644 index 29914f1..0000000 --- a/Package@swift-5.5.swift +++ /dev/null @@ -1,36 +0,0 @@ -// swift-tools-version:5.3 - -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftServiceDiscovery open source project -// -// Copyright (c) 2020-2022 Apple Inc. and the SwiftServiceDiscovery project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftServiceDiscovery project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import PackageDescription - -let package = Package( - name: "swift-service-discovery", - products: [ - .library(name: "ServiceDiscovery", targets: ["ServiceDiscovery"]), - ], - dependencies: [ - .package(url: "https://github.com/apple/swift-atomics", from: "1.0.2"), - .package(url: "https://github.com/apple/swift-log", from: "1.2.0"), - ], - targets: [ - .target(name: "ServiceDiscovery", dependencies: [ - .product(name: "Atomics", package: "swift-atomics"), - .product(name: "Logging", package: "swift-log"), - ]), - - .testTarget(name: "ServiceDiscoveryTests", dependencies: ["ServiceDiscovery"]), - ] -) diff --git a/Sources/ServiceDiscovery/FilterInstanceServiceDiscovery.swift b/Sources/ServiceDiscovery/FilterInstanceServiceDiscovery.swift deleted file mode 100644 index ad459a1..0000000 --- a/Sources/ServiceDiscovery/FilterInstanceServiceDiscovery.swift +++ /dev/null @@ -1,59 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftServiceDiscovery open source project -// -// Copyright (c) 2020-2021 Apple Inc. and the SwiftServiceDiscovery project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftServiceDiscovery project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Dispatch - -public final class FilterInstanceServiceDiscovery { - typealias Predicate = (BaseDiscovery.Instance) throws -> Bool - - private let originalSD: BaseDiscovery - private let predicate: Predicate - - internal init(originalSD: BaseDiscovery, predicate: @escaping Predicate) { - self.originalSD = originalSD - self.predicate = predicate - } -} - -extension FilterInstanceServiceDiscovery: ServiceDiscovery { - /// Default timeout for lookup. - public var defaultLookupTimeout: DispatchTimeInterval { - self.originalSD.defaultLookupTimeout - } - - public func lookup(_ service: BaseDiscovery.Service, deadline: DispatchTime?, callback: @escaping (Result<[BaseDiscovery.Instance], Error>) -> Void) { - self.originalSD.lookup(service, deadline: deadline) { result in callback(self.transform(result)) } - } - - public func subscribe(to service: BaseDiscovery.Service, onNext nextResultHandler: @escaping (Result<[BaseDiscovery.Instance], Error>) -> Void, onComplete completionHandler: @escaping (CompletionReason) -> Void) -> CancellationToken { - self.originalSD.subscribe( - to: service, - onNext: { result in nextResultHandler(self.transform(result)) }, - onComplete: completionHandler - ) - } - - private func transform(_ result: Result<[BaseDiscovery.Instance], Error>) -> Result<[BaseDiscovery.Instance], Error> { - switch result { - case .success(let instances): - do { - return try .success(instances.filter(self.predicate)) - } catch { - return .failure(error) - } - case .failure(let error): - return .failure(error) - } - } -} diff --git a/Sources/ServiceDiscovery/InMemoryServiceDiscovery.swift b/Sources/ServiceDiscovery/InMemoryServiceDiscovery.swift index 187cbdf..e8d31ef 100644 --- a/Sources/ServiceDiscovery/InMemoryServiceDiscovery.swift +++ b/Sources/ServiceDiscovery/InMemoryServiceDiscovery.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftServiceDiscovery open source project // -// Copyright (c) 2019-2021 Apple Inc. and the SwiftServiceDiscovery project authors +// Copyright (c) 2019-2023 Apple Inc. and the SwiftServiceDiscovery project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -12,172 +12,165 @@ // //===----------------------------------------------------------------------===// -import Atomics -import Dispatch -import Foundation // for NSLock -/// Provides lookup for service instances that are stored in-memory. -public class InMemoryServiceDiscovery: ServiceDiscovery { +public actor InMemoryServiceDiscovery: ServiceDiscovery { private let configuration: Configuration - private let serviceInstancesLock = NSLock() - private var serviceInstances: [Service: [Instance]] + private var instances: [Service: [Instance]] + private var subscriptions: [Service: Set] - private let serviceSubscriptionsLock = NSLock() - private var serviceSubscriptions: [Service: [Subscription]] = [:] - - private let queue: DispatchQueue - - public var defaultLookupTimeout: DispatchTimeInterval { - self.configuration.defaultLookupTimeout + public init(configuration: Configuration = .default) { + self.configuration = configuration + self.instances = configuration.instances + self.subscriptions = [:] } - private let _isShutdown = ManagedAtomic(false) - - public var isShutdown: Bool { - self._isShutdown.load(ordering: .acquiring) + public func lookup(_ service: Service, deadline: ContinuousClock.Instant?) async throws -> [Instance] { + if let instances = self.instances[service] { + return instances + } else { + throw LookupError.unknownService + } } + + public func subscribe(_ service: Service) async throws -> any ServiceDiscoveryInstanceSequence { + // TODO: confirm this kind of termination handler is thread safe in an actor + let (subscription, sequence) = Subscription.makeSubscription(terminationHandler: { subscription in + Task { + await self.unsubscribe(service: service, subscription: subscription) + } + // reduce CoW + /*if var subscriptions = self.subscriptions.removeValue(forKey: service) { + subscriptions.remove(subscription) + if !subscriptions.isEmpty { + self.subscriptions[service] = subscriptions + } + }*/ + }) + + // reduce CoW + var subscriptions = self.subscriptions.removeValue(forKey: service) ?? [] + subscriptions.insert(subscription) + self.subscriptions[service] = subscriptions + + do { + let instances = try await self.lookup(service, deadline: nil) + subscription.yield(instances) + } catch { + // FIXME: nicer try/catch syntax? + if let lookupError = error as? LookupError, lookupError == LookupError.unknownService { + subscription.yield([]) + } else { + subscription.yield(error) + } + } - public init(configuration: Configuration, queue: DispatchQueue = .init(label: "InMemoryServiceDiscovery", attributes: .concurrent)) { - self.configuration = configuration - self.serviceInstances = configuration.serviceInstances - self.queue = queue + return sequence } - public func lookup(_ service: Service, deadline: DispatchTime? = nil, callback: @escaping (Result<[Instance], Error>) -> Void) { - guard !self.isShutdown else { - callback(.failure(ServiceDiscoveryError.unavailable)) + private func unsubscribe(service: Service, subscription: Subscription) { + guard var subscriptions = self.subscriptions.removeValue(forKey: service) else { return } - - let isDone = ManagedAtomic(false) - - let lookupWorkItem = DispatchWorkItem { - var result: Result<[Instance], Error>! // !-safe because if-else block always set `result` - - self.serviceInstancesLock.withLock { - if let instances = self.serviceInstances[service] { - result = .success(instances) - } else { - result = .failure(LookupError.unknownService) - } - } - - if isDone.compareExchange(expected: false, desired: true, ordering: .acquiring).exchanged { - callback(result) - } + subscriptions.remove(subscription) + if !subscriptions.isEmpty { + self.subscriptions[service] = subscriptions } + } - self.queue.async(execute: lookupWorkItem) - - // Timeout handler - self.queue.asyncAfter(deadline: deadline ?? DispatchTime.now() + self.defaultLookupTimeout) { - lookupWorkItem.cancel() + /// Registers `service` and its `instances`. + public func register(service: Service, instances: [Instance]) async { + self.instances[service] = instances - if isDone.compareExchange(expected: false, desired: true, ordering: .acquiring).exchanged { - callback(.failure(LookupError.timedOut)) + if let subscriptions = self.subscriptions[service] { + for subscription in subscriptions { + subscription.yield(instances) } } } - @discardableResult - public func subscribe(to service: Service, onNext nextResultHandler: @escaping (Result<[Instance], Error>) -> Void, onComplete completionHandler: @escaping (CompletionReason) -> Void = { _ in }) -> CancellationToken { - guard !self.isShutdown else { - completionHandler(.serviceDiscoveryUnavailable) - return CancellationToken(isCancelled: true) - } - - // Call `lookup` once and send result to subscriber - self.lookup(service, callback: nextResultHandler) - - let cancellationToken = CancellationToken(completionHandler: completionHandler) - let subscription = Subscription(nextResultHandler: nextResultHandler, completionHandler: completionHandler, cancellationToken: cancellationToken) + private class Subscription: Identifiable, Hashable { + private let continuation: AsyncThrowingStream.Continuation - // Save the subscription - self.serviceSubscriptionsLock.withLock { - var subscriptions = self.serviceSubscriptions.removeValue(forKey: service) ?? [Subscription]() - subscriptions.append(subscription) - self.serviceSubscriptions[service] = subscriptions + static func makeSubscription(terminationHandler: @Sendable @escaping (Subscription) -> Void) -> (Subscription, InstanceSequence) { + let (stream, continuation) = AsyncThrowingStream.makeStream(of: Instance.self) + let subscription = Subscription(continuation) + continuation.onTermination = { _ in terminationHandler(subscription) } + return (subscription, InstanceSequence(stream)) } - return cancellationToken - } - - /// Registers a service and its `instances`. - public func register(_ service: Service, instances: [Instance]) { - guard !self.isShutdown else { return } - - var previousInstances: [Instance]? - self.serviceInstancesLock.withLock { - previousInstances = self.serviceInstances[service] - self.serviceInstances[service] = instances + private init(_ continuation: AsyncThrowingStream.Continuation) { + self.continuation = continuation } - self.serviceSubscriptionsLock.withLock { - if !self.isShutdown, instances != previousInstances, let subscriptions = self.serviceSubscriptions[service] { - // Notify subscribers whenever instances change - subscriptions - .filter { !$0.cancellationToken.isCancelled } - .forEach { $0.nextResultHandler(.success(instances)) } + func yield(_ instances: [Instance]) { + guard !Task.isCancelled else { + return + } + for instance in instances { + continuation.yield(instance) } } - } - - public func shutdown() { - guard self._isShutdown.compareExchange(expected: false, desired: true, ordering: .acquiring).exchanged else { return } - self.serviceSubscriptionsLock.withLock { - self.serviceSubscriptions.values.forEach { subscriptions in - subscriptions - .filter { !$0.cancellationToken.isCancelled } - .forEach { $0.completionHandler(.serviceDiscoveryUnavailable) } + func yield(_ error: Error) { + guard !Task.isCancelled else { + return } + continuation.yield(with: .failure(error)) } - } - private struct Subscription { - let nextResultHandler: (Result<[Instance], Error>) -> Void - let completionHandler: (CompletionReason) -> Void - let cancellationToken: CancellationToken - } -} + static func == (lhs: Subscription, rhs: Subscription) -> Bool { + lhs.id == rhs.id + } -public extension InMemoryServiceDiscovery { - struct Configuration { - /// Default configuration - public static var `default`: Configuration { - .init() + func hash(into hasher: inout Hasher) { + self.id.hash(into: &hasher) } + } - /// Lookup timeout in case `deadline` is not specified - public var defaultLookupTimeout: DispatchTimeInterval = .milliseconds(100) + private struct InstanceSequence: ServiceDiscoveryInstanceSequence { + typealias Element = Instance + typealias Underlying = AsyncThrowingStream - internal var serviceInstances: [Service: [Instance]] + private let underlying: Underlying - public init() { - self.init(serviceInstances: [:]) + init(_ underlying: AsyncThrowingStream) { + self.underlying = underlying } - /// Initializes `InMemoryServiceDiscovery` with the given service to instances mappings. - public init(serviceInstances: [Service: [Instance]]) { - self.serviceInstances = serviceInstances + func makeAsyncIterator() -> AsyncIterator { + AsyncIterator(self.underlying.makeAsyncIterator()) } - /// Registers `service` and its `instances`. - public mutating func register(service: Service, instances: [Instance]) { - self.serviceInstances[service] = instances + struct AsyncIterator: AsyncIteratorProtocol { + private var underlying: Underlying.AsyncIterator + + init(_ iterator: Underlying.AsyncIterator) { + self.underlying = iterator + } + + mutating func next() async throws -> Instance? { + try await self.underlying.next() + } } } } -// MARK: - NSLock extensions +extension InMemoryServiceDiscovery { + public struct Configuration { + public let instances: [Service: [Instance]] + + /// Default configuration + public static var `default`: Configuration { + .init( + instances: [:] + ) + } -private extension NSLock { - func withLock(_ body: () -> Void) { - self.lock() - defer { - self.unlock() + public init( + instances: [Service: [Instance]] + ) { + self.instances = instances } - body() } } diff --git a/Sources/ServiceDiscovery/MapInstanceServiceDiscovery.swift b/Sources/ServiceDiscovery/MapInstanceServiceDiscovery.swift deleted file mode 100644 index 280f476..0000000 --- a/Sources/ServiceDiscovery/MapInstanceServiceDiscovery.swift +++ /dev/null @@ -1,59 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftServiceDiscovery open source project -// -// Copyright (c) 2020-2021 Apple Inc. and the SwiftServiceDiscovery project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftServiceDiscovery project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Dispatch - -public final class MapInstanceServiceDiscovery { - typealias Transformer = (BaseDiscovery.Instance) throws -> DerivedInstance - - private let originalSD: BaseDiscovery - private let transformer: Transformer - - internal init(originalSD: BaseDiscovery, transformer: @escaping Transformer) { - self.originalSD = originalSD - self.transformer = transformer - } -} - -extension MapInstanceServiceDiscovery: ServiceDiscovery { - /// Default timeout for lookup. - public var defaultLookupTimeout: DispatchTimeInterval { - self.originalSD.defaultLookupTimeout - } - - public func lookup(_ service: BaseDiscovery.Service, deadline: DispatchTime?, callback: @escaping (Result<[DerivedInstance], Error>) -> Void) { - self.originalSD.lookup(service, deadline: deadline) { result in callback(self.transform(result)) } - } - - public func subscribe(to service: BaseDiscovery.Service, onNext nextResultHandler: @escaping (Result<[DerivedInstance], Error>) -> Void, onComplete completionHandler: @escaping (CompletionReason) -> Void) -> CancellationToken { - self.originalSD.subscribe( - to: service, - onNext: { result in nextResultHandler(self.transform(result)) }, - onComplete: completionHandler - ) - } - - private func transform(_ result: Result<[BaseDiscovery.Instance], Error>) -> Result<[DerivedInstance], Error> { - switch result { - case .success(let instances): - do { - return try .success(instances.map(self.transformer)) - } catch { - return .failure(error) - } - case .failure(let error): - return .failure(error) - } - } -} diff --git a/Sources/ServiceDiscovery/MapServiceServiceDiscovery.swift b/Sources/ServiceDiscovery/MapServiceServiceDiscovery.swift deleted file mode 100644 index dd049a7..0000000 --- a/Sources/ServiceDiscovery/MapServiceServiceDiscovery.swift +++ /dev/null @@ -1,67 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftServiceDiscovery open source project -// -// Copyright (c) 2020-2021 Apple Inc. and the SwiftServiceDiscovery project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftServiceDiscovery project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Dispatch - -public final class MapServiceServiceDiscovery { - typealias Transformer = (ComputedService) throws -> BaseDiscovery.Service - - private let originalSD: BaseDiscovery - private let transformer: Transformer - - internal init(originalSD: BaseDiscovery, transformer: @escaping Transformer) { - self.originalSD = originalSD - self.transformer = transformer - } -} - -extension MapServiceServiceDiscovery: ServiceDiscovery { - /// Default timeout for lookup. - public var defaultLookupTimeout: DispatchTimeInterval { - self.originalSD.defaultLookupTimeout - } - - public func lookup(_ service: ComputedService, deadline: DispatchTime?, callback: @escaping (Result<[BaseDiscovery.Instance], Error>) -> Void) { - let derivedService: BaseDiscovery.Service - - do { - derivedService = try self.transformer(service) - } catch { - callback(.failure(error)) - return - } - - self.originalSD.lookup(derivedService, deadline: deadline, callback: callback) - } - - public func subscribe(to service: ComputedService, onNext nextResultHandler: @escaping (Result<[BaseDiscovery.Instance], Error>) -> Void, onComplete completionHandler: @escaping (CompletionReason) -> Void) -> CancellationToken { - let derivedService: BaseDiscovery.Service - - do { - derivedService = try self.transformer(service) - } catch { - // Ok, we couldn't transform the service. We want to throw an error into `nextResultHandler` and then immediately cancel. - let cancellationToken = CancellationToken(isCancelled: true, completionHandler: completionHandler) - nextResultHandler(.failure(error)) - completionHandler(.failedToMapService) - return cancellationToken - } - - return self.originalSD.subscribe( - to: derivedService, - onNext: nextResultHandler, - onComplete: completionHandler - ) - } -} diff --git a/Sources/ServiceDiscovery/ServiceDiscovery+AsyncAwait.swift b/Sources/ServiceDiscovery/ServiceDiscovery+AsyncAwait.swift deleted file mode 100644 index fe59825..0000000 --- a/Sources/ServiceDiscovery/ServiceDiscovery+AsyncAwait.swift +++ /dev/null @@ -1,125 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftServiceDiscovery open source project -// -// Copyright (c) 2021-2022 Apple Inc. and the SwiftServiceDiscovery project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftServiceDiscovery project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Dispatch - -#if compiler(>=5.5) && canImport(_Concurrency) - -public extension ServiceDiscovery { - /// Performs async lookup for the given service's instances. - /// - /// ``defaultLookupTimeout`` will be used to compute `deadline` in case one is not specified. - /// - /// - Parameters: - /// - service: The service to lookup - /// - /// - Returns: A listing of service instances. - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func lookup(_ service: Service, deadline: DispatchTime? = nil) async throws -> [Instance] { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<[Instance], Error>) in - self.lookup(service, deadline: deadline) { result in - switch result { - case .success(let instances): - continuation.resume(returning: instances) - case .failure(let error): - continuation.resume(throwing: error) - } - } - } - } - - /// Returns a ``ServiceSnapshots``, which is an `AsyncSequence` and each of its items is a snapshot listing of service instances. - /// - /// - Parameters: - /// - service: The service to subscribe to - /// - /// - Returns: A ``ServiceSnapshots`` async sequence. - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func subscribe(to service: Service) -> ServiceSnapshots { - ServiceSnapshots(AsyncThrowingStream<[Instance], Error> { continuation in - Task { - let cancellationToken = self.subscribe( - to: service, - onNext: { result in - switch result { - case .success(let instances): - continuation.yield(instances) - case .failure(let error): - // LookupError is recoverable (e.g., service is added *after* subscription begins), so don't give up yet - guard error is LookupError else { - return continuation.finish(throwing: error) - } - } - }, - onComplete: { reason in - switch reason { - case .cancellationRequested: - continuation.finish() - case .serviceDiscoveryUnavailable: - continuation.finish(throwing: ServiceDiscoveryError.unavailable) - default: - continuation.finish(throwing: ServiceDiscoveryError.other(reason.description)) - } - } - ) - - continuation.onTermination = { @Sendable (_) -> Void in - cancellationToken.cancel() - } - } - }) - } -} - -/// An async sequence of snapshot listings of service instances. -@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) -public struct ServiceSnapshots: AsyncSequence { - public typealias Element = [Instance] - typealias AsyncSnapshotsStream = AsyncThrowingStream - - private let stream: AsyncSnapshotsStream - - public init(_ snapshots: SnapshotSequence) where SnapshotSequence.Element == Element { - self.stream = AsyncThrowingStream { continuation in - Task { - do { - for try await snapshot in snapshots { - continuation.yield(snapshot) - } - continuation.finish() - } catch { - continuation.finish(throwing: error) - } - } - } - } - - public func makeAsyncIterator() -> AsyncIterator { - AsyncIterator(self.stream.makeAsyncIterator()) - } - - public struct AsyncIterator: AsyncIteratorProtocol { - private var underlying: AsyncSnapshotsStream.Iterator - - init(_ iterator: AsyncSnapshotsStream.Iterator) { - self.underlying = iterator - } - - public mutating func next() async throws -> [Instance]? { - try await self.underlying.next() - } - } -} - -#endif diff --git a/Sources/ServiceDiscovery/ServiceDiscovery+Combinators.swift b/Sources/ServiceDiscovery/ServiceDiscovery+Combinators.swift deleted file mode 100644 index 05bba9e..0000000 --- a/Sources/ServiceDiscovery/ServiceDiscovery+Combinators.swift +++ /dev/null @@ -1,41 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftServiceDiscovery open source project -// -// Copyright (c) 2020-2022 Apple Inc. and the SwiftServiceDiscovery project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftServiceDiscovery project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -// MARK: - Map and filter - -public extension ServiceDiscovery { - /// Creates a new ``ServiceDiscovery/ServiceDiscovery`` implementation based on this one, transforming the instances according to - /// the derived function. - /// - /// It is not necessarily safe to block in this closure. This closure should not block for safety. - func mapInstance(_ transformer: @escaping (Instance) throws -> DerivedInstance) -> MapInstanceServiceDiscovery { - MapInstanceServiceDiscovery(originalSD: self, transformer: transformer) - } - - /// Creates a new ``ServiceDiscovery/ServiceDiscovery`` implementation based on this one, transforming the services according to - /// the derived function. - /// - /// It is not necessarily safe to block in this closure. This closure should not block for safety. - func mapService(serviceType: ComputedService.Type = ComputedService.self, - _ transformer: @escaping (ComputedService) throws -> Service) -> MapServiceServiceDiscovery { - MapServiceServiceDiscovery(originalSD: self, transformer: transformer) - } - - /// Creates a new ``ServiceDiscovery/ServiceDiscovery`` implementation based on this one, filtering instances with the given predicate. - /// - /// It is not necessarily safe to block in this closure. This closure should not block for safety. - func filterInstance(_ predicate: @escaping (Instance) throws -> Bool) -> FilterInstanceServiceDiscovery { - FilterInstanceServiceDiscovery(originalSD: self, predicate: predicate) - } -} diff --git a/Sources/ServiceDiscovery/ServiceDiscovery+TypeErased.swift b/Sources/ServiceDiscovery/ServiceDiscovery+TypeErased.swift deleted file mode 100644 index 14afe5c..0000000 --- a/Sources/ServiceDiscovery/ServiceDiscovery+TypeErased.swift +++ /dev/null @@ -1,223 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftServiceDiscovery open source project -// -// Copyright (c) 2020-2022 Apple Inc. and the SwiftServiceDiscovery project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftServiceDiscovery project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Dispatch - -// MARK: - Generic wrapper for `ServiceDiscovery` instance - -/// Generic wrapper for ``ServiceDiscovery/ServiceDiscovery`` instance. -public class ServiceDiscoveryBox: ServiceDiscovery { - private let _underlying: Any - - private let _defaultLookupTimeout: () -> DispatchTimeInterval - - private let _lookup: (Service, DispatchTime?, @escaping (Result<[Instance], Error>) -> Void) -> Void - - private let _subscribe: (Service, @escaping (Result<[Instance], Error>) -> Void, @escaping (CompletionReason) -> Void) -> CancellationToken - - public var defaultLookupTimeout: DispatchTimeInterval { - self._defaultLookupTimeout() - } - - public init(_ serviceDiscovery: ServiceDiscoveryImpl) - where ServiceDiscoveryImpl.Service == Service, ServiceDiscoveryImpl.Instance == Instance { - self._underlying = serviceDiscovery - self._defaultLookupTimeout = { serviceDiscovery.defaultLookupTimeout } - - self._lookup = { service, deadline, callback in - serviceDiscovery.lookup(service, deadline: deadline, callback: callback) - } - self._subscribe = { service, nextResultHandler, completionHandler in - serviceDiscovery.subscribe(to: service, onNext: nextResultHandler, onComplete: completionHandler) - } - } - - public func lookup(_ service: Service, deadline: DispatchTime? = nil, callback: @escaping (Result<[Instance], Error>) -> Void) { - self._lookup(service, deadline, callback) - } - - @discardableResult - public func subscribe( - to service: Service, - onNext nextResultHandler: @escaping (Result<[Instance], Error>) -> Void, - onComplete completionHandler: @escaping (CompletionReason) -> Void = { _ in } - ) -> CancellationToken { - self._subscribe(service, nextResultHandler, completionHandler) - } - - /// Unwraps the underlying ``ServiceDiscovery/ServiceDiscovery`` instance as `ServiceDiscoveryImpl` type. - /// - /// - Throws: `TypeErasedServiceDiscoveryError.typeMismatch` when the underlying - /// `ServiceDiscovery` instance is not of type `ServiceDiscoveryImpl`. - @discardableResult - public func unwrapAs(_ serviceDiscoveryType: ServiceDiscoveryImpl.Type) throws -> ServiceDiscoveryImpl { - guard let unwrapped = self._underlying as? ServiceDiscoveryImpl else { - throw TypeErasedServiceDiscoveryError.typeMismatch(description: "Cannot unwrap [\(type(of: self._underlying)))] as [\(ServiceDiscoveryImpl.self)]") - } - return unwrapped - } -} - -// MARK: - Type-erased wrapper for `ServiceDiscovery` instance - -/// Type-erased wrapper for ``ServiceDiscovery/ServiceDiscovery`` instance. -public class AnyServiceDiscovery: ServiceDiscovery { - private let _underlying: Any - - private let _defaultLookupTimeout: () -> DispatchTimeInterval - - private let _lookup: (AnyHashable, DispatchTime?, @escaping (Result<[AnyHashable], Error>) -> Void) -> Void - - private let _subscribe: (AnyHashable, @escaping (Result<[AnyHashable], Error>) -> Void, @escaping (CompletionReason) -> Void) -> CancellationToken - - public var defaultLookupTimeout: DispatchTimeInterval { - self._defaultLookupTimeout() - } - - public init(_ serviceDiscovery: ServiceDiscoveryImpl) { - self._underlying = serviceDiscovery - self._defaultLookupTimeout = { serviceDiscovery.defaultLookupTimeout } - - self._lookup = { anyService, deadline, callback in - guard let service = anyService.base as? ServiceDiscoveryImpl.Service else { - preconditionFailure("Expected service type to be \(ServiceDiscoveryImpl.Service.self), got \(type(of: anyService.base))") - } - serviceDiscovery.lookup(service, deadline: deadline) { result in - callback(result.map { $0.map(AnyHashable.init) }) - } - } - self._subscribe = { anyService, nextResultHandler, completionHandler in - guard let service = anyService.base as? ServiceDiscoveryImpl.Service else { - preconditionFailure("Expected service type to be \(ServiceDiscoveryImpl.Service.self), got \(type(of: anyService.base))") - } - return serviceDiscovery.subscribe( - to: service, - onNext: { result in nextResultHandler(result.map { $0.map(AnyHashable.init) }) }, - onComplete: completionHandler - ) - } - } - - /// See ``ServiceDiscovery/lookup(_:deadline:callback:)``. - /// - /// - Warning: If `service` type does not match the underlying `ServiceDiscovery`'s, it would result in a failure. - public func lookup(_ service: AnyHashable, deadline: DispatchTime? = nil, callback: @escaping (Result<[AnyHashable], Error>) -> Void) { - self._lookup(service, deadline, callback) - } - - /// See ``ServiceDiscovery/lookup(_:deadline:callback:)``. - /// - /// - Warning: If `Service` or `Instance` type does not match the underlying ``ServiceDiscovery/ServiceDiscovery``'s associated types, it would result in a failure. - public func lookupAndUnwrap( - _ service: Service, - deadline: DispatchTime? = nil, - callback: @escaping (Result<[Instance], Error>) -> Void - ) where Service: Hashable, Instance: Hashable { - self._lookup(AnyHashable(service), deadline) { result in - callback(self.transform(result)) - } - } - - /// See ``ServiceDiscovery/subscribe(to:onNext:onComplete:)``. - /// - /// - Warning: If `service` type does not match the underlying ``ServiceDiscovery/ServiceDiscovery``'s, it would result in a failure. - @discardableResult - public func subscribe( - to service: AnyHashable, - onNext nextResultHandler: @escaping (Result<[AnyHashable], Error>) -> Void, - onComplete completionHandler: @escaping (CompletionReason) -> Void = { _ in } - ) -> CancellationToken { - self._subscribe(service, nextResultHandler, completionHandler) - } - - /// See ``ServiceDiscovery/subscribe(to:onNext:onComplete:)``. - /// - /// - Warning: If `Service` or `Instance` type does not match the underlying ``ServiceDiscovery/ServiceDiscovery``'s associated types, it would result in a failure. - @discardableResult - public func subscribeAndUnwrap( - to service: Service, - onNext nextResultHandler: @escaping (Result<[Instance], Error>) -> Void, - onComplete completionHandler: @escaping (CompletionReason) -> Void = { _ in } - ) -> CancellationToken where Service: Hashable, Instance: Hashable { - self._subscribe(AnyHashable(service), { result in nextResultHandler(self.transform(result)) }, completionHandler) - } - - private func transform(_ result: Result<[AnyHashable], Error>) -> Result<[Instance], Error> where Instance: Hashable { - result.flatMap { anyInstances in - var instances = [Instance]() - for anyInstance in anyInstances { - do { - let instance: Instance = try self.transform(anyInstance) - instances.append(instance) - } catch { - return .failure(error) - } - } - return .success(instances) - } - } - - private func transform(_ anyInstance: AnyHashable) throws -> Instance where Instance: Hashable { - guard let instance = anyInstance.base as? Instance else { - throw TypeErasedServiceDiscoveryError.typeMismatch(description: "Expected instance type to be \(Instance.self), got \(type(of: anyInstance.base))") - } - return instance - } - - /// Unwraps the underlying ``ServiceDiscovery/ServiceDiscovery`` instance as `ServiceDiscoveryImpl` type. - /// - /// - Throws: `TypeErasedServiceDiscoveryError.typeMismatch` when the underlying - /// `ServiceDiscovery` instance is not of type `ServiceDiscoveryImpl`. - public func unwrapAs(_ serviceDiscoveryType: ServiceDiscoveryImpl.Type) throws -> ServiceDiscoveryImpl { - guard let unwrapped = self._underlying as? ServiceDiscoveryImpl else { - throw TypeErasedServiceDiscoveryError.typeMismatch(description: "Cannot unwrap [\(type(of: self._underlying))] as [\(ServiceDiscoveryImpl.self)]") - } - return unwrapped - } -} - -#if compiler(>=5.5) && canImport(_Concurrency) -public extension AnyServiceDiscovery { - /// See ``ServiceDiscovery/lookup(_:deadline:)``. - /// - /// - Warning: If `Service` or `Instance` type does not match the underlying ``ServiceDiscovery/ServiceDiscovery``'s associated types, it would result in a failure. - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func lookupAndUnwrap(_ service: Service, deadline: DispatchTime? = nil) async throws -> [Instance] where Service: Hashable, Instance: Hashable { - try await self.lookup(service, deadline: deadline).map(self.transform) - } - - /// See ``ServiceDiscovery/subscribe(to:)``. - /// - /// - Warning: If `Service` or `Instance` type does not match the underlying ``ServiceDiscovery/ServiceDiscovery``'s associated types, it would result in a failure. - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func subscribeAndUnwrap(to service: Service) -> ServiceSnapshots where Service: Hashable, Instance: Hashable { - ServiceSnapshots(AsyncThrowingStream { continuation in - Task { - do { - for try await snapshot in self.subscribe(to: service) { - continuation.yield(try snapshot.map(self.transform)) - } - continuation.finish() - } catch { - continuation.finish(throwing: error) - } - } - }) - } -} -#endif - -public enum TypeErasedServiceDiscoveryError: Error { - case typeMismatch(description: String) -} diff --git a/Sources/ServiceDiscovery/ServiceDiscovery.swift b/Sources/ServiceDiscovery/ServiceDiscovery.swift index 01280c4..b17efa2 100644 --- a/Sources/ServiceDiscovery/ServiceDiscovery.swift +++ b/Sources/ServiceDiscovery/ServiceDiscovery.swift @@ -12,9 +12,6 @@ // //===----------------------------------------------------------------------===// -import Atomics -import Dispatch - // MARK: - Service discovery protocol /// Provides service instances lookup. @@ -22,110 +19,35 @@ import Dispatch /// ### Threading /// /// `ServiceDiscovery` implementations **MUST be thread-safe**. -public protocol ServiceDiscovery: AnyObject { +public protocol ServiceDiscovery { /// Service identity type - associatedtype Service: Hashable + associatedtype Service /// Service instance type - associatedtype Instance: Hashable - - /// Default timeout for lookup. - var defaultLookupTimeout: DispatchTimeInterval { get } + associatedtype Instance - /// Performs a lookup for the given service's instances. The result will be sent to `callback`. + /// Performs async lookup for the given service's instances. /// /// ``defaultLookupTimeout`` will be used to compute `deadline` in case one is not specified. /// - /// ### Threading - /// - /// `callback` may be invoked on arbitrary threads, as determined by implementation. - /// /// - Parameters: /// - service: The service to lookup - /// - deadline: Lookup is considered to have timed out if it does not complete by this time - /// - callback: The closure to receive lookup result - func lookup(_ service: Service, deadline: DispatchTime?, callback: @escaping (Result<[Instance], Error>) -> Void) + /// + /// - Returns: A listing of service instances. + func lookup(_ service: Service, deadline: ContinuousClock.Instant?) async throws -> [Instance] /// Subscribes to receive a service's instances whenever they change. /// - /// The service's current list of instances will be sent to `nextResultHandler` when this method is first called. Subsequently, - /// `nextResultHandler` will only be invoked when the `service`'s instances change. - /// - /// ### Threading - /// - /// `nextResultHandler` and `completionHandler` may be invoked on arbitrary threads, as determined by implementation. + /// Returns a ``ServiceDiscoveryInstanceSequence``, which is an `AsyncSequence` and each of its items is a snapshot listing of service instances. /// /// - Parameters: /// - service: The service to subscribe to - /// - nextResultHandler: The closure to receive update result - /// - completionHandler: The closure to invoke when the subscription completes (e.g., when the `ServiceDiscovery` instance exits, etc.), - /// including cancellation requested through `CancellationToken`. /// - /// - Returns: A ``CancellationToken`` instance that can be used to cancel the subscription in the future. - func subscribe(to service: Service, onNext nextResultHandler: @escaping (Result<[Instance], Error>) -> Void, onComplete completionHandler: @escaping (CompletionReason) -> Void) -> CancellationToken -} - -// MARK: - Subscription - -/// Enables cancellation of service discovery subscription. -public class CancellationToken { - private let _isCancelled: ManagedAtomic - private let _completionHandler: (CompletionReason) -> Void - - /// Returns `true` if the subscription has been cancelled. - public var isCancelled: Bool { - self._isCancelled.load(ordering: .acquiring) - } - - /// Creates a new token. - public init(isCancelled: Bool = false, completionHandler: @escaping (CompletionReason) -> Void = { _ in }) { - self._isCancelled = ManagedAtomic(isCancelled) - self._completionHandler = completionHandler - } - - /// Cancels the subscription. - public func cancel() { - guard self._isCancelled.compareExchange(expected: false, desired: true, ordering: .acquiring).exchanged else { return } - self._completionHandler(.cancellationRequested) - } + /// - Returns: A ``ServiceDiscoveryInstanceSequence`` async sequence. + func subscribe(_ service: Service) async throws -> any ServiceDiscoveryInstanceSequence } -#if compiler(>=5.5) && canImport(_Concurrency) -extension CancellationToken: @unchecked Sendable {} -#endif - -/// Reason that leads to service discovery subscription completion. -public struct CompletionReason: Equatable, CustomStringConvertible { - internal enum ReasonType: Int, Equatable, CustomStringConvertible { - case cancellationRequested - case serviceDiscoveryUnavailable - case failedToMapService - - var description: String { - switch self { - case .cancellationRequested: - return "cancellationRequested" - case .serviceDiscoveryUnavailable: - return "serviceDiscoveryUnavailable" - case .failedToMapService: - return "failedToMapService" - } - } - } - - internal let type: ReasonType - - public var description: String { - "CompletionReason.\(String(describing: self.type))" - } - - /// Cancellation requested through `CancellationToken`. - public static let cancellationRequested = CompletionReason(type: .cancellationRequested) - - /// Service discovery is unavailable. - public static let serviceDiscoveryUnavailable = CompletionReason(type: .serviceDiscoveryUnavailable) - - /// A service mapping function threw an error - public static let failedToMapService = CompletionReason(type: .failedToMapService) +public protocol ServiceDiscoveryInstanceSequence: AsyncSequence where Self.Element == Instance { + associatedtype Instance } // MARK: - Service discovery errors diff --git a/Tests/ServiceDiscoveryTests/FilterInstanceServiceDiscoveryTests.swift b/Tests/ServiceDiscoveryTests/FilterInstanceServiceDiscoveryTests.swift deleted file mode 100644 index a690e31..0000000 --- a/Tests/ServiceDiscoveryTests/FilterInstanceServiceDiscoveryTests.swift +++ /dev/null @@ -1,380 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftServiceDiscovery open source project -// -// Copyright (c) 2020-2023 Apple Inc. and the SwiftServiceDiscovery project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftServiceDiscovery project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Atomics -import Dispatch -@testable import ServiceDiscovery -import XCTest - -class FilterInstanceServiceDiscoveryTests: XCTestCase { - typealias Service = String - typealias Instance = HostPort - - static let fooService = "fooService" - static let fooBaseInstances = [ - HostPort(host: "localhost", port: 7001), - HostPort(host: "localhost", port: 7003), - ] - static let fooDerivedInstances = [ - HostPort(host: "localhost", port: 7001), - ] - - static let barService = "bar-service" - static let barBaseInstances = [ - HostPort(host: "localhost", port: 9001), - HostPort(host: "localhost", port: 9002), - HostPort(host: "localhost", port: 80), - ] - static let barDerivedInstances = [ - HostPort(host: "localhost", port: 9001), - HostPort(host: "localhost", port: 9002), - ] - - func test_lookup() throws { - var configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooBaseInstances]) - configuration.register(service: Self.barService, instances: Self.barBaseInstances) - - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).filterInstance { [7001, 9001, 9002].contains($0.port) } - - let fooResult = try ensureResult(serviceDiscovery: serviceDiscovery, service: Self.fooService) - guard case .success(let _fooInstances) = fooResult else { - return XCTFail("Failed to lookup instances for service[\(Self.fooService)]") - } - XCTAssertEqual(_fooInstances.count, 1, "Expected service[\(Self.fooService)] to have 1 instance, got \(_fooInstances.count)") - XCTAssertEqual(_fooInstances, Self.fooDerivedInstances, "Expected service[\(Self.fooService)] to have instances \(Self.fooDerivedInstances), got \(_fooInstances)") - - let barResult = try ensureResult(serviceDiscovery: serviceDiscovery, service: Self.barService) - guard case .success(let _barInstances) = barResult else { - return XCTFail("Failed to lookup instances for service[\(Self.barService)]") - } - XCTAssertEqual(_barInstances.count, 2, "Expected service[\(Self.barService)] to have 2 instances, got \(_barInstances.count)") - XCTAssertEqual(_barInstances, Self.barDerivedInstances, "Expected service[\(Self.barService)] to have instances \(Self.barDerivedInstances), got \(_barInstances)") - } - - func test_lookup_errorIfServiceUnknown() throws { - let unknownService = "unknown-service" - - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: ["foo-service": []]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).filterInstance { $0.port == 7001 } - - let result = try ensureResult(serviceDiscovery: serviceDiscovery, service: unknownService) - guard case .failure(let error) = result else { - return XCTFail("Lookup instances for service[\(unknownService)] should return an error") - } - guard let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected LookupError.unknownService, got \(error)") - } - } - - func test_subscribe() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooBaseInstances]) - let baseServiceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let serviceDiscovery = baseServiceDiscovery.filterInstance { [7001, 9001, 9002].contains($0.port) } - - let semaphore = DispatchSemaphore(value: 0) - let resultCounter = ManagedAtomic(0) - - let onCompleteInvoked = ManagedAtomic(false) - let onComplete: (CompletionReason) -> Void = { reason in - XCTAssertEqual(reason, .serviceDiscoveryUnavailable, "Expected CompletionReason to be .serviceDiscoveryUnavailable, got \(reason)") - onCompleteInvoked.store(true, ordering: .relaxed) - } - - // Two results are expected: - // Result #1: LookupError.unknownService because bar-service is not registered - // Result #2: Later we register bar-service and that should notify the subscriber - _ = serviceDiscovery.subscribe( - to: Self.barService, - onNext: { result in - resultCounter.wrappingIncrement(ordering: .acquiring) - - guard resultCounter.load(ordering: .relaxed) <= 2 else { - return XCTFail("Expected to receive result 2 times only") - } - - switch result { - case .failure(let error): - guard resultCounter.load(ordering: .relaxed) == 1, let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected the first result to be LookupError.unknownService since \(Self.barService) is not registered, got \(error)") - } - case .success(let instances): - guard resultCounter.load(ordering: .relaxed) == 2 else { - return XCTFail("Expected to receive instances list on the second result only, but at result #\(resultCounter.load(ordering: .relaxed)) got \(instances)") - } - XCTAssertEqual(instances, Self.barDerivedInstances, "Expected instances of \(Self.barService) to be \(Self.barDerivedInstances), got \(instances)") - semaphore.signal() - } - }, - onComplete: onComplete - ) - - // Allow time for first result of `subscribe` - usleep(100_000) - baseServiceDiscovery.register(Self.barService, instances: Self.barBaseInstances) - - _ = semaphore.wait(timeout: DispatchTime.now() + .milliseconds(200)) - - XCTAssertEqual(resultCounter.load(ordering: .relaxed), 2, "Expected to receive result 2 times, got \(resultCounter.load(ordering: .relaxed))") - - // Verify `onComplete` gets invoked on `shutdown` - baseServiceDiscovery.shutdown() - XCTAssertTrue(onCompleteInvoked.load(ordering: .relaxed), "Expected onComplete to be invoked") - } - - func test_subscribe_cancel() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooBaseInstances]) - let baseServiceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let serviceDiscovery = baseServiceDiscovery.filterInstance { [7001, 9001, 9002].contains($0.port) } - - let semaphore = DispatchSemaphore(value: 0) - let resultCounter1 = ManagedAtomic(0) - let resultCounter2 = ManagedAtomic(0) - - // Two results are expected: - // Result #1: LookupError.unknownService because bar-service is not registered - // Result #2: Later we register bar-service and that should notify the subscribers - _ = serviceDiscovery.subscribe( - to: Self.barService, - onNext: { result in - resultCounter1.wrappingIncrement(ordering: .relaxed) - - guard resultCounter1.load(ordering: .relaxed) <= 2 else { - return XCTFail("Expected to receive result 2 times only") - } - - switch result { - case .failure(let error): - guard resultCounter1.load(ordering: .relaxed) == 1, let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected the first result to be LookupError.unknownService since \(Self.barService) is not registered, got \(error)") - } - case .success(let instances): - guard resultCounter1.load(ordering: .relaxed) == 2 else { - return XCTFail("Expected to receive instances list on the second result only, but at result #\(resultCounter1.load(ordering: .relaxed)) got \(instances)") - } - XCTAssertEqual(instances, Self.barDerivedInstances, "Expected instances of \(Self.barService) to be \(Self.barDerivedInstances), got \(instances)") - semaphore.signal() - } - }, - onComplete: { _ in } - ) - - let onCompleteInvoked = ManagedAtomic(false) - let onComplete: (CompletionReason) -> Void = { reason in - XCTAssertEqual(reason, .cancellationRequested, "Expected CompletionReason to be .cancellationRequested, got \(reason)") - onCompleteInvoked.store(true, ordering: .relaxed) - } - - // This subscriber receives Result #1 only because we cancel subscription before Result #2 is triggered - let cancellationToken = serviceDiscovery.subscribe( - to: Self.barService, - onNext: { result in - resultCounter2.wrappingIncrement(ordering: .relaxed) - - guard resultCounter2.load(ordering: .relaxed) <= 1 else { - return XCTFail("Expected to receive result 1 time only") - } - - switch result { - case .failure(let error): - guard resultCounter2.load(ordering: .relaxed) == 1, let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected the first result to be LookupError.unknownService since \(Self.barService) is not registered, got \(error)") - } - case .success: - return XCTFail("Does not expect to receive instances list") - } - }, - onComplete: onComplete - ) - - // Allow time for first result of `subscribe` - usleep(100_000) - - cancellationToken.cancel() - // Only subscriber 1 will receive this change - baseServiceDiscovery.register(Self.barService, instances: Self.barBaseInstances) - - _ = semaphore.wait(timeout: DispatchTime.now() + .milliseconds(200)) - - XCTAssertEqual(resultCounter1.load(ordering: .relaxed), 2, "Expected subscriber #1 to receive result 2 times, got \(resultCounter1.load(ordering: .relaxed))") - XCTAssertEqual(resultCounter2.load(ordering: .relaxed), 1, "Expected subscriber #2 to receive result 1 time, got \(resultCounter2.load(ordering: .relaxed))") - // Verify `onComplete` gets invoked on `cancel` - XCTAssertTrue(onCompleteInvoked.load(ordering: .relaxed), "Expected onComplete to be invoked") - } - - func test_concurrency() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooBaseInstances]) - let baseServiceDisovery = InMemoryServiceDiscovery(configuration: configuration) - let serviceDiscovery = baseServiceDisovery.filterInstance { $0.port == 7001 } - - let registerSemaphore = DispatchSemaphore(value: 0) - let registerCounter = ManagedAtomic(0) - - let lookupSemaphore = DispatchSemaphore(value: 0) - let lookupCounter = ManagedAtomic(0) - - let times = 100 - for _ in 1 ... times { - DispatchQueue.global().async { - baseServiceDisovery.register(Self.fooService, instances: Self.fooBaseInstances) - registerCounter.wrappingIncrement(ordering: .relaxed) - - if registerCounter.load(ordering: .relaxed) == times { - registerSemaphore.signal() - } - } - - DispatchQueue.global().async { - serviceDiscovery.lookup(Self.fooService, deadline: nil) { result in - lookupCounter.wrappingIncrement(ordering: .relaxed) - - guard case .success(let instances) = result, instances == Self.fooDerivedInstances else { - return XCTFail("Failed to lookup instances for service[\(Self.fooService)]: \(result)") - } - - if lookupCounter.load(ordering: .relaxed) == times { - lookupSemaphore.signal() - } - } - } - } - - _ = registerSemaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - _ = lookupSemaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - - XCTAssertEqual(registerCounter.load(ordering: .relaxed), times, "Expected register to succeed \(times) times") - XCTAssertEqual(lookupCounter.load(ordering: .relaxed), times, "Expected lookup callback to be called \(times) times") - } - - func testThrownErrorsPropagateIntoFailures() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooBaseInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).filterInstance { _ in throw TestError.error } - - let result = try ensureResult(serviceDiscovery: serviceDiscovery, service: Self.fooService) - guard case .failure(let err) = result else { - XCTFail("Expected failure, got \(result)") - return - } - XCTAssertEqual(err as? TestError, .error, "Expected \(TestError.error), but got \(err)") - } - - func testPropagateDefaultTimeout() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: ["foo-service": []]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).filterInstance { $0.port == 7001 } - XCTAssertTrue(compareTimeInterval(configuration.defaultLookupTimeout, serviceDiscovery.defaultLookupTimeout), "\(configuration.defaultLookupTimeout) does not match \(serviceDiscovery.defaultLookupTimeout)") - } - - // MARK: - async/await API tests - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_async_lookup() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - var configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooBaseInstances]) - configuration.register(service: Self.barService, instances: Self.barBaseInstances) - - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).filterInstance { [7001, 9001, 9002].contains($0.port) } - - runAsyncAndWaitFor { - let _fooInstances = try await serviceDiscovery.lookup(Self.fooService) - XCTAssertEqual(_fooInstances.count, 1, "Expected service[\(Self.fooService)] to have 1 instance, got \(_fooInstances.count)") - XCTAssertEqual(_fooInstances, Self.fooDerivedInstances, "Expected service[\(Self.fooService)] to have instances \(Self.fooDerivedInstances), got \(_fooInstances)") - - let _barInstances = try await serviceDiscovery.lookup(Self.barService) - XCTAssertEqual(_barInstances.count, 2, "Expected service[\(Self.barService)] to have 2 instances, got \(_barInstances.count)") - XCTAssertEqual(_barInstances, Self.barDerivedInstances, "Expected service[\(Self.barService)] to have instances \(Self.barDerivedInstances), got \(_barInstances)") - } - #endif - } - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_async_lookup_errorIfServiceUnknown() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - let unknownService = "unknown-service" - - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: ["foo-service": []]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).filterInstance { $0.port == 7001 } - - runAsyncAndWaitFor { - do { - _ = try await serviceDiscovery.lookup(unknownService) - return XCTFail("Lookup instances for service[\(unknownService)] should return an error") - } catch { - guard let lookupError = error as? LookupError, lookupError == .unknownService else { - return XCTFail("Expected LookupError.unknownService, got \(error)") - } - } - } - #endif - } - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_async_subscribe() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooBaseInstances]) - let baseServiceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let serviceDiscovery = baseServiceDiscovery.filterInstance { [7001, 9001, 9002].contains($0.port) } - - let semaphore = DispatchSemaphore(value: 0) - let counter = ManagedAtomic(0) - - Task.detached { - // Allow time for subscription to start - usleep(100_000) - // Update #1 - baseServiceDiscovery.register(Self.barService, instances: []) - usleep(50000) - // Update #2 - baseServiceDiscovery.register(Self.barService, instances: Self.barBaseInstances) - } - - let task = Task.detached { () -> Void in - do { - for try await instances in serviceDiscovery.subscribe(to: Self.barService) { - switch counter.wrappingIncrementThenLoad(ordering: .relaxed) { - case 1: - XCTAssertEqual(instances, [], "Expected instances of \(Self.barService) to be empty, got \(instances)") - case 2: - XCTAssertEqual(instances, Self.barDerivedInstances, "Expected instances of \(Self.barService) to be \(Self.barDerivedInstances), got \(instances)") - // This causes the stream to terminate - baseServiceDiscovery.shutdown() - default: - XCTFail("Expected to receive instances 2 times") - } - } - } catch { - switch counter.load(ordering: .relaxed) { - case 2: // shutdown is called after receiving two results - guard let serviceDiscoveryError = error as? ServiceDiscoveryError, serviceDiscoveryError == .unavailable else { - return XCTFail("Expected ServiceDiscoveryError.unavailable, got \(error)") - } - // Test is complete at this point - semaphore.signal() - default: - XCTFail("Unexpected error \(error)") - } - } - } - - _ = semaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - task.cancel() - - XCTAssertEqual(counter.load(ordering: .relaxed), 2, "Expected to receive instances 2 times, got \(counter.load(ordering: .relaxed)) times") - #endif - } -} diff --git a/Tests/ServiceDiscoveryTests/Helpers.swift b/Tests/ServiceDiscoveryTests/Helpers.swift index 05feabd..c29a787 100644 --- a/Tests/ServiceDiscoveryTests/Helpers.swift +++ b/Tests/ServiceDiscoveryTests/Helpers.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftServiceDiscovery open source project // -// Copyright (c) 2021 Apple Inc. and the SwiftServiceDiscovery project authors +// Copyright (c) 2021-2023 Apple Inc. and the SwiftServiceDiscovery project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -12,64 +12,21 @@ // //===----------------------------------------------------------------------===// -import Dispatch import ServiceDiscovery import XCTest -enum TestError: Error { - case error -} - -func compareTimeInterval(_ lhs: DispatchTimeInterval, _ rhs: DispatchTimeInterval) -> Bool { - switch (lhs, rhs) { - case (.seconds(let lhs), .seconds(let rhs)): - return lhs == rhs - case (.milliseconds(let lhs), .milliseconds(let rhs)): - return lhs == rhs - case (.microseconds(let lhs), .microseconds(let rhs)): - return lhs == rhs - case (.nanoseconds(let lhs), .nanoseconds(let rhs)): - return lhs == rhs - case (.never, .never): - return true - case (.seconds, _), (.milliseconds, _), (.microseconds, _), (.nanoseconds, _), (.never, _): - return false - #if os(macOS) || os(iOS) || os(watchOS) || os(tvOS) - @unknown default: - return false - #endif - } -} - -func ensureResult(serviceDiscovery: SD, service: SD.Service) throws -> Result<[SD.Instance], Error> { - let semaphore = DispatchSemaphore(value: 0) - var result: Result<[SD.Instance], Error>? - - serviceDiscovery.lookup(service, deadline: nil) { - result = $0 - semaphore.signal() - } - - _ = semaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - - guard let _result = result else { - throw LookupError.timedOut - } - - return _result -} - -#if compiler(>=5.5) && canImport(_Concurrency) -extension XCTestCase { - // TODO: remove once XCTest supports async functions - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func runAsyncAndWaitFor(_ closure: @escaping @Sendable () async throws -> Void, _ timeout: TimeInterval = 1.0) { - let finished = expectation(description: "finished") - Task.detached { - try await closure() - finished.fulfill() - } - wait(for: [finished], timeout: timeout) +// FIXME: is there a builtin alternative? +func XCTAssertThrowsErrorAsync( + _ expression: @autoclosure () async throws -> T, + _ errorHandler: (Error) -> Void, + _ message: String = "This method should fail", + file: StaticString = #filePath, + line: UInt = #line +) async { + do { + let _ = try await expression() + XCTFail(message, file: file, line: line) + } catch { + errorHandler(error) } } -#endif diff --git a/Tests/ServiceDiscoveryTests/InMemoryServiceDiscoveryTests.swift b/Tests/ServiceDiscoveryTests/InMemoryServiceDiscoveryTests.swift index ddf831c..da5ee76 100644 --- a/Tests/ServiceDiscoveryTests/InMemoryServiceDiscoveryTests.swift +++ b/Tests/ServiceDiscoveryTests/InMemoryServiceDiscoveryTests.swift @@ -13,7 +13,6 @@ //===----------------------------------------------------------------------===// import Atomics -import Dispatch @testable import ServiceDiscovery import XCTest @@ -32,317 +31,143 @@ class InMemoryServiceDiscoveryTests: XCTestCase { HostPort(host: "localhost", port: 9002), ] - func test_lookup() throws { - var configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - configuration.register(service: Self.barService, instances: Self.barInstances) + func testLookup() async throws { + let configuration = InMemoryServiceDiscovery.Configuration( + instances: [ + Self.fooService: Self.fooInstances, + Self.barService: Self.barInstances + ] + ) let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let fooResult = try ensureResult(serviceDiscovery: serviceDiscovery, service: Self.fooService) - guard case .success(let _fooInstances) = fooResult else { - return XCTFail("Failed to lookup instances for service[\(Self.fooService)]") - } - XCTAssertEqual(_fooInstances.count, 1, "Expected service[\(Self.fooService)] to have 1 instance, got \(_fooInstances.count)") - XCTAssertEqual(_fooInstances, Self.fooInstances, "Expected service[\(Self.fooService)] to have instances \(Self.fooInstances), got \(_fooInstances)") + let fooInstances = try await serviceDiscovery.lookup(Self.fooService, deadline: .none) + XCTAssertEqual(fooInstances, Self.fooInstances, "Expected service[\(Self.fooService)] to have instances \(Self.fooInstances), got \(fooInstances)") - let barResult = try ensureResult(serviceDiscovery: serviceDiscovery, service: Self.barService) - guard case .success(let _barInstances) = barResult else { - return XCTFail("Failed to lookup instances for service[\(Self.barService)]") - } - XCTAssertEqual(_barInstances.count, 2, "Expected service[\(Self.barService)] to have 2 instances, got \(_barInstances.count)") - XCTAssertEqual(_barInstances, Self.barInstances, "Expected service[\(Self.barService)] to have instances \(Self.barInstances), got \(_barInstances)") + let barInstances = try await serviceDiscovery.lookup(Self.barService, deadline: .none) + XCTAssertEqual(barInstances, Self.barInstances, "Expected service[\(Self.barService)] to have instances \(Self.barInstances), got \(barInstances)") } - func test_lookup_errorIfServiceUnknown() throws { + func testLookupErrorIfServiceUnknown() async throws { let unknownService = "unknown-service" - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: ["foo-service": []]) + let configuration = InMemoryServiceDiscovery.Configuration( + instances: ["foo-service": []] + ) let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let result = try ensureResult(serviceDiscovery: serviceDiscovery, service: unknownService) - guard case .failure(let error) = result else { - return XCTFail("Lookup instances for service[\(unknownService)] should return an error") - } - guard let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected LookupError.unknownService, got \(error)") + await XCTAssertThrowsErrorAsync(try await serviceDiscovery.lookup(unknownService, deadline: .none)) { error in + guard let lookupError = error as? LookupError, case .unknownService = lookupError else { + return XCTFail("Expected LookupError.unknownService, got \(error)") + } } } - func test_subscribe() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - - let semaphore = DispatchSemaphore(value: 0) - let resultCounter = ManagedAtomic(0) - - let onCompleteInvoked = ManagedAtomic(false) - let onComplete: (CompletionReason) -> Void = { reason in - XCTAssertEqual(reason, .serviceDiscoveryUnavailable, "Expected CompletionReason to be .serviceDiscoveryUnavailable, got \(reason)") - onCompleteInvoked.store(true, ordering: .relaxed) - } - - // Two results are expected: - // Result #1: LookupError.unknownService because bar-service is not registered - // Result #2: Later we register bar-service and that should notify the subscriber - serviceDiscovery.subscribe( - to: Self.barService, - onNext: { result in - resultCounter.wrappingIncrement(ordering: .relaxed) - - guard resultCounter.load(ordering: .relaxed) <= 2 else { - return XCTFail("Expected to receive result 2 times only") - } - - switch result { - case .failure(let error): - guard resultCounter.load(ordering: .relaxed) == 1, let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected the first result to be LookupError.unknownService since \(Self.barService) is not registered, got \(error)") - } - case .success(let instances): - guard resultCounter.load(ordering: .relaxed) == 2 else { - return XCTFail("Expected to receive instances list on the second result only, but at result #\(resultCounter.load(ordering: .relaxed)) got \(instances)") - } - XCTAssertEqual(instances, Self.barInstances, "Expected instances of \(Self.barService) to be \(Self.barInstances), got \(instances)") - semaphore.signal() - } - }, - onComplete: onComplete + func testSubscribe() async throws { + let configuration = InMemoryServiceDiscovery.Configuration( + instances: [Self.fooService: Self.fooInstances] ) - - // Allow time for first result of `subscribe` - usleep(100_000) - serviceDiscovery.register(Self.barService, instances: Self.barInstances) - - _ = semaphore.wait(timeout: DispatchTime.now() + .milliseconds(200)) - - XCTAssertEqual(resultCounter.load(ordering: .relaxed), 2, "Expected to receive result 2 times, got \(resultCounter.load(ordering: .relaxed))") - - // Verify `onComplete` gets invoked on `shutdown` - serviceDiscovery.shutdown() - XCTAssertTrue(onCompleteInvoked.load(ordering: .relaxed), "Expected onComplete to be invoked") - } - - func test_subscribe_cancel() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let semaphore = DispatchSemaphore(value: 0) - let resultCounter1 = ManagedAtomic(0) - let resultCounter2 = ManagedAtomic(0) + let barInstances: [HostPort] = [ + .init(host: "localhost", port: 8081), + .init(host: "localhost", port: 8082), + .init(host: "localhost", port: 8083) + ] - // Two results are expected: - // Result #1: LookupError.unknownService because bar-service is not registered - // Result #2: Later we register bar-service and that should notify the subscribers - serviceDiscovery.subscribe( - to: Self.barService, - onNext: { result in - resultCounter1.wrappingIncrement(ordering: .relaxed) - - guard resultCounter1.load(ordering: .relaxed) <= 2 else { - return XCTFail("Expected to receive result 2 times only") - } + let counter = ManagedAtomic(0) - switch result { - case .failure(let error): - guard resultCounter1.load(ordering: .relaxed) == 1, let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected the first result to be LookupError.unknownService since \(Self.barService) is not registered, got \(error)") - } - case .success(let instances): - guard resultCounter1.load(ordering: .relaxed) == 2 else { - return XCTFail("Expected to receive instances list on the second result only, but at result #\(resultCounter1.load(ordering: .relaxed)) got \(instances)") - } - XCTAssertEqual(instances, Self.barInstances, "Expected instances of \(Self.barService) to be \(Self.barInstances), got \(instances)") - semaphore.signal() + let expectation = XCTestExpectation(description: #function) + + await serviceDiscovery.register(service: Self.barService, instances: [barInstances[0]]) + + let task = Task { + for try await instance in try await serviceDiscovery.subscribe(Self.barService) { + switch counter.wrappingIncrementThenLoad(ordering: .relaxed) { + case 1: + // FIXME: casting to HostPort due to a 5.9 compiler bug + XCTAssertEqual(instance as? HostPort, barInstances[0]) + await serviceDiscovery.register(service: Self.barService, instances: Array(barInstances[1..<3])) + case 2: + // FIXME: casting to HostPort due to a 5.9 compiler bug + XCTAssertEqual(instance as? HostPort, barInstances[1]) + case 3: + // FIXME: casting to HostPort due to a 5.9 compiler bug + XCTAssertEqual(instance as? HostPort, barInstances[2]) + expectation.fulfill() + default: + XCTFail("Expected to receive \(barInstances.count) instances") } } - ) - - let onCompleteInvoked = ManagedAtomic(false) - let onComplete: (CompletionReason) -> Void = { reason in - XCTAssertEqual(reason, .cancellationRequested, "Expected CompletionReason to be .cancellationRequested, got \(reason)") - onCompleteInvoked.store(true, ordering: .relaxed) } - // This subscriber receives Result #1 only because we cancel subscription before Result #2 is triggered - let cancellationToken = serviceDiscovery.subscribe( - to: Self.barService, - onNext: { result in - resultCounter2.wrappingIncrement(ordering: .relaxed) - - guard resultCounter2.load(ordering: .relaxed) <= 1 else { - return XCTFail("Expected to receive result 1 time only") - } - - switch result { - case .failure(let error): - guard resultCounter2.load(ordering: .relaxed) == 1, let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected the first result to be LookupError.unknownService since \(Self.barService) is not registered, got \(error)") - } - case .success: - return XCTFail("Does not expect to receive instances list") - } - }, - onComplete: onComplete - ) - - // Allow time for first result of `subscribe` - usleep(100_000) - - cancellationToken.cancel() - // Only subscriber 1 will receive this change - serviceDiscovery.register(Self.barService, instances: Self.barInstances) - - _ = semaphore.wait(timeout: DispatchTime.now() + .milliseconds(200)) - - XCTAssertEqual(resultCounter1.load(ordering: .relaxed), 2, "Expected subscriber #1 to receive result 2 times, got \(resultCounter1.load(ordering: .relaxed))") - XCTAssertEqual(resultCounter2.load(ordering: .relaxed), 1, "Expected subscriber #2 to receive result 1 time, got \(resultCounter2.load(ordering: .relaxed))") - // Verify `onComplete` gets invoked on `cancel` - XCTAssertTrue(onCompleteInvoked.load(ordering: .relaxed), "Expected onComplete to be invoked") - } - - func test_concurrency() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - - let registerSemaphore = DispatchSemaphore(value: 0) - let registerCounter = ManagedAtomic(0) - - let lookupSemaphore = DispatchSemaphore(value: 0) - let lookupCounter = ManagedAtomic(0) - - let times = 100 - for _ in 1 ... times { - DispatchQueue.global().async { - serviceDiscovery.register(Self.fooService, instances: Self.fooInstances) - registerCounter.wrappingIncrement(ordering: .relaxed) - - if registerCounter.load(ordering: .relaxed) == times { - registerSemaphore.signal() - } - } + await fulfillment(of: [expectation], timeout: 1.0) + task.cancel() - DispatchQueue.global().async { - serviceDiscovery.lookup(Self.fooService) { result in - lookupCounter.wrappingIncrement(ordering: .relaxed) + XCTAssertEqual(counter.load(ordering: .relaxed), barInstances.count, "Expected to receive \(barInstances.count) instances") - guard case .success(let instances) = result, instances == Self.fooInstances else { - return XCTFail("Failed to lookup instances for service[\(Self.fooService)]: \(result)") - } + await serviceDiscovery.register(service: Self.barService, instances: Self.barInstances) - if lookupCounter.load(ordering: .relaxed) == times { - lookupSemaphore.signal() - } - } - } - } - - _ = registerSemaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - _ = lookupSemaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - - XCTAssertEqual(registerCounter.load(ordering: .relaxed), times, "Expected register to succeed \(times) times") - XCTAssertEqual(lookupCounter.load(ordering: .relaxed), times, "Expected lookup callback to be called \(times) times") + XCTAssertEqual(counter.load(ordering: .relaxed), barInstances.count, "Expected to receive \(barInstances.count) instances") } - // MARK: - async/await API tests - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_async_lookup() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - var configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - configuration.register(service: Self.barService, instances: Self.barInstances) - + func testCancellation() async throws { + let configuration = InMemoryServiceDiscovery.Configuration( + instances: [Self.fooService: Self.fooInstances] + ) let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - runAsyncAndWaitFor { - let _fooInstances = try await serviceDiscovery.lookup(Self.fooService) - XCTAssertEqual(_fooInstances.count, 1, "Expected service[\(Self.fooService)] to have 1 instance, got \(_fooInstances.count)") - XCTAssertEqual(_fooInstances, Self.fooInstances, "Expected service[\(Self.fooService)] to have instances \(Self.fooInstances), got \(_fooInstances)") + let barInstances: [HostPort] = [ + .init(host: "localhost", port: 8081), + .init(host: "localhost", port: 8082), + .init(host: "localhost", port: 8083) + ] - let _barInstances = try await serviceDiscovery.lookup(Self.barService) - XCTAssertEqual(_barInstances.count, 2, "Expected service[\(Self.barService)] to have 2 instances, got \(_barInstances.count)") - XCTAssertEqual(_barInstances, Self.barInstances, "Expected service[\(Self.barService)] to have instances \(Self.barInstances), got \(_barInstances)") - } - #endif - } - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_async_lookup_errorIfServiceUnknown() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - let unknownService = "unknown-service" + let expectation1 = XCTestExpectation(description: #function) + let expectation2 = XCTestExpectation(description: #function) - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: ["foo-service": []]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) + await serviceDiscovery.register(service: Self.barService, instances: [barInstances[0]]) - runAsyncAndWaitFor { - do { - _ = try await serviceDiscovery.lookup(unknownService) - return XCTFail("Lookup instances for service[\(unknownService)] should return an error") - } catch { - guard let lookupError = error as? LookupError, lookupError == .unknownService else { - return XCTFail("Expected LookupError.unknownService, got \(error)") + let counter1 = ManagedAtomic(0) + let task1 = Task { + for try await instance in try await serviceDiscovery.subscribe(Self.barService) { + switch counter1.wrappingIncrementThenLoad(ordering: .relaxed) { + case 1: + XCTAssertEqual(instance as? HostPort, barInstances[0]) + expectation1.fulfill() + default: + XCTFail("Expected to receive 1 instances") } } } - #endif - } - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_async_subscribe() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let semaphore = DispatchSemaphore(value: 0) - let counter = ManagedAtomic(0) - - Task.detached { - // Allow time for subscription to start - usleep(100_000) - // Update #1 - serviceDiscovery.register(Self.barService, instances: []) - usleep(50000) - // Update #2 - serviceDiscovery.register(Self.barService, instances: Self.barInstances) - } - - let task = Task.detached { () -> Void in - do { - for try await instances in serviceDiscovery.subscribe(to: Self.barService) { - switch counter.wrappingIncrementThenLoad(ordering: .relaxed) { - case 1: - XCTAssertEqual(instances, [], "Expected instances of \(Self.barService) to be empty, got \(instances)") - case 2: - XCTAssertEqual(instances, Self.barInstances, "Expected instances of \(Self.barService) to be \(Self.barInstances), got \(instances)") - // This causes the stream to terminate - serviceDiscovery.shutdown() - default: - XCTFail("Expected to receive instances 2 times") - } - } - } catch { - switch counter.load(ordering: .relaxed) { - case 2: // shutdown is called after receiving two results - guard let serviceDiscoveryError = error as? ServiceDiscoveryError, serviceDiscoveryError == .unavailable else { - return XCTFail("Expected ServiceDiscoveryError.unavailable, got \(error)") - } - // Test is complete at this point - semaphore.signal() + let counter2 = ManagedAtomic(0) + let task2 = Task { + for try await instance in try await serviceDiscovery.subscribe(Self.barService) { + switch counter2.wrappingIncrementThenLoad(ordering: .relaxed) { + case 1: + XCTAssertEqual(instance as? HostPort, barInstances[0]) + case 2: + XCTAssertEqual(instance as? HostPort, barInstances[1]) + expectation2.fulfill() default: - XCTFail("Unexpected error \(error)") + XCTFail("Expected to receive 2 instances") } } } - _ = semaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - task.cancel() + await fulfillment(of: [expectation1], timeout: 1.0) + task1.cancel() + XCTAssertEqual(counter1.load(ordering: .relaxed), 1, "Expected to receive \(barInstances.count) instances") + XCTAssertEqual(counter2.load(ordering: .relaxed), 1, "Expected to receive \(barInstances.count) instances") - XCTAssertEqual(counter.load(ordering: .relaxed), 2, "Expected to receive instances 2 times, got \(counter.load(ordering: .relaxed)) times") - #endif + await serviceDiscovery.register(service: Self.barService, instances: [barInstances[1]]) + await fulfillment(of: [expectation2], timeout: 1.0) + task2.cancel() + + XCTAssertEqual(counter1.load(ordering: .relaxed), 1, "Expected to receive \(barInstances.count) instances") + XCTAssertEqual(counter2.load(ordering: .relaxed), 2, "Expected to receive \(barInstances.count) instances") } } + diff --git a/Tests/ServiceDiscoveryTests/MapInstanceServiceDiscoveryTests.swift b/Tests/ServiceDiscoveryTests/MapInstanceServiceDiscoveryTests.swift deleted file mode 100644 index 8c287bd..0000000 --- a/Tests/ServiceDiscoveryTests/MapInstanceServiceDiscoveryTests.swift +++ /dev/null @@ -1,379 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftServiceDiscovery open source project -// -// Copyright (c) 2019-2023 Apple Inc. and the SwiftServiceDiscovery project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftServiceDiscovery project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Atomics -import Dispatch -@testable import ServiceDiscovery -import XCTest - -class MapInstanceServiceDiscoveryTests: XCTestCase { - typealias Service = String - typealias BaseInstance = Int - typealias DerivedInstance = HostPort - - static let fooService = "fooService" - static let fooBaseInstances = [ - 7001, - ] - static let fooDerivedInstances = [ - HostPort(host: "localhost", port: 7001), - ] - - static let barService = "bar-service" - static let barBaseInstances = [ - 9001, - 9002, - ] - static let barDerivedInstances = [ - HostPort(host: "localhost", port: 9001), - HostPort(host: "localhost", port: 9002), - ] - - func test_lookup() throws { - var configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooBaseInstances]) - configuration.register(service: Self.barService, instances: Self.barBaseInstances) - - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).mapInstance { port in HostPort(host: "localhost", port: port) } - - let fooResult = try ensureResult(serviceDiscovery: serviceDiscovery, service: Self.fooService) - guard case .success(let _fooInstances) = fooResult else { - return XCTFail("Failed to lookup instances for service[\(Self.fooService)]") - } - XCTAssertEqual(_fooInstances.count, 1, "Expected service[\(Self.fooService)] to have 1 instance, got \(_fooInstances.count)") - XCTAssertEqual(_fooInstances, Self.fooDerivedInstances, "Expected service[\(Self.fooService)] to have instances \(Self.fooDerivedInstances), got \(_fooInstances)") - - let barResult = try ensureResult(serviceDiscovery: serviceDiscovery, service: Self.barService) - guard case .success(let _barInstances) = barResult else { - return XCTFail("Failed to lookup instances for service[\(Self.barService)]") - } - XCTAssertEqual(_barInstances.count, 2, "Expected service[\(Self.barService)] to have 2 instances, got \(_barInstances.count)") - XCTAssertEqual(_barInstances, Self.barDerivedInstances, "Expected service[\(Self.barService)] to have instances \(Self.barDerivedInstances), got \(_barInstances)") - } - - func test_lookup_errorIfServiceUnknown() throws { - let unknownService = "unknown-service" - - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: ["foo-service": []]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).mapInstance { port in HostPort(host: "localhost", port: port) } - - let result = try ensureResult(serviceDiscovery: serviceDiscovery, service: unknownService) - guard case .failure(let error) = result else { - return XCTFail("Lookup instances for service[\(unknownService)] should return an error") - } - guard let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected LookupError.unknownService, got \(error)") - } - } - - func test_subscribe() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooBaseInstances]) - let baseServiceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let serviceDiscovery = baseServiceDiscovery.mapInstance { port in HostPort(host: "localhost", port: port) } - - let semaphore = DispatchSemaphore(value: 0) - let resultCounter = ManagedAtomic(0) - - let onCompleteInvoked = ManagedAtomic(false) - let onComplete: (CompletionReason) -> Void = { reason in - XCTAssertEqual(reason, .serviceDiscoveryUnavailable, "Expected CompletionReason to be .serviceDiscoveryUnavailable, got \(reason)") - onCompleteInvoked.store(true, ordering: .relaxed) - } - - // Two results are expected: - // Result #1: LookupError.unknownService because bar-service is not registered - // Result #2: Later we register bar-service and that should notify the subscriber - _ = serviceDiscovery.subscribe( - to: Self.barService, - onNext: { result in - resultCounter.wrappingIncrement(ordering: .relaxed) - - guard resultCounter.load(ordering: .relaxed) <= 2 else { - return XCTFail("Expected to receive result 2 times only") - } - - switch result { - case .failure(let error): - guard resultCounter.load(ordering: .relaxed) == 1, let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected the first result to be LookupError.unknownService since \(Self.barService) is not registered, got \(error)") - } - case .success(let instances): - guard resultCounter.load(ordering: .relaxed) == 2 else { - return XCTFail("Expected to receive instances list on the second result only, but at result #\(resultCounter.load(ordering: .relaxed)) got \(instances)") - } - XCTAssertEqual(instances, Self.barDerivedInstances, "Expected instances of \(Self.barService) to be \(Self.barDerivedInstances), got \(instances)") - semaphore.signal() - } - }, - onComplete: onComplete - ) - - // Allow time for first result of `subscribe` - usleep(100_000) - baseServiceDiscovery.register(Self.barService, instances: Self.barBaseInstances) - - _ = semaphore.wait(timeout: DispatchTime.now() + .milliseconds(200)) - - XCTAssertEqual(resultCounter.load(ordering: .relaxed), 2, "Expected to receive result 2 times, got \(resultCounter.load(ordering: .relaxed))") - - // Verify `onComplete` gets invoked on `shutdown` - baseServiceDiscovery.shutdown() - XCTAssertTrue(onCompleteInvoked.load(ordering: .relaxed), "Expected onComplete to be invoked") - } - - func test_subscribe_cancel() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooBaseInstances]) - let baseServiceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let serviceDiscovery = baseServiceDiscovery.mapInstance { port in HostPort(host: "localhost", port: port) } - - let semaphore = DispatchSemaphore(value: 0) - let resultCounter1 = ManagedAtomic(0) - let resultCounter2 = ManagedAtomic(0) - - // Two results are expected: - // Result #1: LookupError.unknownService because bar-service is not registered - // Result #2: Later we register bar-service and that should notify the subscribers - _ = serviceDiscovery.subscribe( - to: Self.barService, - onNext: { result in - resultCounter1.wrappingIncrement(ordering: .relaxed) - - guard resultCounter1.load(ordering: .relaxed) <= 2 else { - return XCTFail("Expected to receive result 2 times only") - } - - switch result { - case .failure(let error): - guard resultCounter1.load(ordering: .relaxed) == 1, let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected the first result to be LookupError.unknownService since \(Self.barService) is not registered, got \(error)") - } - case .success(let instances): - guard resultCounter1.load(ordering: .relaxed) == 2 else { - return XCTFail("Expected to receive instances list on the second result only, but at result #\(resultCounter1.load(ordering: .relaxed)) got \(instances)") - } - XCTAssertEqual(instances, Self.barDerivedInstances, "Expected instances of \(Self.barService) to be \(Self.barDerivedInstances), got \(instances)") - semaphore.signal() - } - }, - onComplete: { _ in } - ) - - let onCompleteInvoked = ManagedAtomic(false) - let onComplete: (CompletionReason) -> Void = { reason in - XCTAssertEqual(reason, .cancellationRequested, "Expected CompletionReason to be .cancellationRequested, got \(reason)") - onCompleteInvoked.store(true, ordering: .relaxed) - } - - // This subscriber receives Result #1 only because we cancel subscription before Result #2 is triggered - let cancellationToken = serviceDiscovery.subscribe( - to: Self.barService, - onNext: { result in - resultCounter2.wrappingIncrement(ordering: .relaxed) - - guard resultCounter2.load(ordering: .relaxed) <= 1 else { - return XCTFail("Expected to receive result 1 time only") - } - - switch result { - case .failure(let error): - guard resultCounter2.load(ordering: .relaxed) == 1, let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected the first result to be LookupError.unknownService since \(Self.barService) is not registered, got \(error)") - } - case .success: - return XCTFail("Does not expect to receive instances list") - } - }, - onComplete: onComplete - ) - - // Allow time for first result of `subscribe` - usleep(100_000) - - cancellationToken.cancel() - // Only subscriber 1 will receive this change - baseServiceDiscovery.register(Self.barService, instances: Self.barBaseInstances) - - _ = semaphore.wait(timeout: DispatchTime.now() + .milliseconds(200)) - - XCTAssertEqual(resultCounter1.load(ordering: .relaxed), 2, "Expected subscriber #1 to receive result 2 times, got \(resultCounter1.load(ordering: .relaxed))") - XCTAssertEqual(resultCounter2.load(ordering: .relaxed), 1, "Expected subscriber #2 to receive result 1 time, got \(resultCounter2.load(ordering: .relaxed))") - // Verify `onComplete` gets invoked on `cancel` - XCTAssertTrue(onCompleteInvoked.load(ordering: .relaxed), "Expected onComplete to be invoked") - } - - func test_concurrency() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooBaseInstances]) - let baseServiceDisovery = InMemoryServiceDiscovery(configuration: configuration) - let serviceDiscovery = baseServiceDisovery.mapInstance { port in HostPort(host: "localhost", port: port) } - - let registerSemaphore = DispatchSemaphore(value: 0) - let registerCounter = ManagedAtomic(0) - - let lookupSemaphore = DispatchSemaphore(value: 0) - let lookupCounter = ManagedAtomic(0) - - let times = 100 - for _ in 1 ... times { - DispatchQueue.global().async { - baseServiceDisovery.register(Self.fooService, instances: Self.fooBaseInstances) - registerCounter.wrappingIncrement(ordering: .relaxed) - - if registerCounter.load(ordering: .relaxed) == times { - registerSemaphore.signal() - } - } - - DispatchQueue.global().async { - serviceDiscovery.lookup(Self.fooService, deadline: nil) { result in - lookupCounter.wrappingIncrement(ordering: .relaxed) - - guard case .success(let instances) = result, instances == Self.fooDerivedInstances else { - return XCTFail("Failed to lookup instances for service[\(Self.fooService)]: \(result)") - } - - if lookupCounter.load(ordering: .relaxed) == times { - lookupSemaphore.signal() - } - } - } - } - - _ = registerSemaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - _ = lookupSemaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - - XCTAssertEqual(registerCounter.load(ordering: .relaxed), times, "Expected register to succeed \(times) times") - XCTAssertEqual(lookupCounter.load(ordering: .relaxed), times, "Expected lookup callback to be called \(times) times") - } - - func testThrownErrorsPropagateIntoFailures() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooBaseInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).mapInstance { _ -> Int in throw TestError.error } - - let result = try ensureResult(serviceDiscovery: serviceDiscovery, service: Self.fooService) - guard case .failure(let err) = result else { - XCTFail("Expected failure, got \(result)") - return - } - XCTAssertEqual(err as? TestError, .error, "Expected \(TestError.error), but got \(err)") - } - - func testPropagateDefaultTimeout() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: ["foo-service": []]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).mapInstance { port in HostPort(host: "localhost", port: port) } - XCTAssertTrue(compareTimeInterval(configuration.defaultLookupTimeout, serviceDiscovery.defaultLookupTimeout), "\(configuration.defaultLookupTimeout) does not match \(serviceDiscovery.defaultLookupTimeout)") - } - - // MARK: - async/await API tests - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_async_lookup() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - var configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooBaseInstances]) - configuration.register(service: Self.barService, instances: Self.barBaseInstances) - - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).mapInstance { port in HostPort(host: "localhost", port: port) } - - runAsyncAndWaitFor { - let _fooInstances = try await serviceDiscovery.lookup(Self.fooService) - XCTAssertEqual(_fooInstances.count, 1, "Expected service[\(Self.fooService)] to have 1 instance, got \(_fooInstances.count)") - XCTAssertEqual(_fooInstances, Self.fooDerivedInstances, "Expected service[\(Self.fooService)] to have instances \(Self.fooDerivedInstances), got \(_fooInstances)") - - let _barInstances = try await serviceDiscovery.lookup(Self.barService) - XCTAssertEqual(_barInstances.count, 2, "Expected service[\(Self.barService)] to have 2 instances, got \(_barInstances.count)") - XCTAssertEqual(_barInstances, Self.barDerivedInstances, "Expected service[\(Self.barService)] to have instances \(Self.barDerivedInstances), got \(_barInstances)") - } - #endif - } - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_async_lookup_errorIfServiceUnknown() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - let unknownService = "unknown-service" - - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: ["foo-service": []]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).mapInstance { port in HostPort(host: "localhost", port: port) } - - runAsyncAndWaitFor { - do { - _ = try await serviceDiscovery.lookup(unknownService) - return XCTFail("Lookup instances for service[\(unknownService)] should return an error") - } catch { - guard let lookupError = error as? LookupError, lookupError == .unknownService else { - return XCTFail("Expected LookupError.unknownService, got \(error)") - } - } - } - #endif - } - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_async_subscribe() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooBaseInstances]) - let baseServiceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let serviceDiscovery = baseServiceDiscovery.mapInstance { port in HostPort(host: "localhost", port: port) } - - let semaphore = DispatchSemaphore(value: 0) - let counter = ManagedAtomic(0) - - Task.detached { - // Allow time for subscription to start - usleep(100_000) - // Update #1 - baseServiceDiscovery.register(Self.barService, instances: []) - usleep(50000) - // Update #2 - baseServiceDiscovery.register(Self.barService, instances: Self.barBaseInstances) - } - - let task = Task.detached { () -> Void in - do { - for try await instances in serviceDiscovery.subscribe(to: Self.barService) { - switch counter.wrappingIncrementThenLoad(ordering: .relaxed) { - case 1: - XCTAssertEqual(instances, [], "Expected instances of \(Self.barService) to be empty, got \(instances)") - case 2: - XCTAssertEqual(instances, Self.barDerivedInstances, "Expected instances of \(Self.barService) to be \(Self.barDerivedInstances), got \(instances)") - // This causes the stream to terminate - baseServiceDiscovery.shutdown() - default: - XCTFail("Expected to receive instances 2 times") - } - } - } catch { - switch counter.load(ordering: .relaxed) { - case 2: // shutdown is called after receiving two results - guard let serviceDiscoveryError = error as? ServiceDiscoveryError, serviceDiscoveryError == .unavailable else { - return XCTFail("Expected ServiceDiscoveryError.unavailable, got \(error)") - } - // Test is complete at this point - semaphore.signal() - default: - XCTFail("Unexpected error \(error)") - } - } - } - - _ = semaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - task.cancel() - - XCTAssertEqual(counter.load(ordering: .relaxed), 2, "Expected to receive instances 2 times, got \(counter.load(ordering: .relaxed)) times") - #endif - } -} diff --git a/Tests/ServiceDiscoveryTests/MapServiceServiceDiscoveryTests.swift b/Tests/ServiceDiscoveryTests/MapServiceServiceDiscoveryTests.swift deleted file mode 100644 index dbba7b5..0000000 --- a/Tests/ServiceDiscoveryTests/MapServiceServiceDiscoveryTests.swift +++ /dev/null @@ -1,435 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftServiceDiscovery open source project -// -// Copyright (c) 2019-2023 Apple Inc. and the SwiftServiceDiscovery project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftServiceDiscovery project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Atomics -import Dispatch -@testable import ServiceDiscovery -import XCTest - -class MapServiceServiceDiscoveryTests: XCTestCase { - typealias ComputedService = Int - typealias Service = String - typealias Instance = HostPort - - static let services = ["fooService", "bar-service"] - - static let computedFooService = 0 - static let fooService = "fooService" - static let fooInstances = [ - HostPort(host: "localhost", port: 7001), - ] - - static let computedBarService = 1 - static let barService = "bar-service" - static let barInstances = [ - HostPort(host: "localhost", port: 9001), - HostPort(host: "localhost", port: 9002), - ] - - func test_lookup() throws { - var configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - configuration.register(service: Self.barService, instances: Self.barInstances) - - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).mapService { (service: Int) in Self.services[service] } - - let fooResult = try ensureResult(serviceDiscovery: serviceDiscovery, service: Self.computedFooService) - guard case .success(let _fooInstances) = fooResult else { - return XCTFail("Failed to lookup instances for service[\(Self.computedFooService)]") - } - XCTAssertEqual(_fooInstances.count, 1, "Expected service[\(Self.computedFooService)] to have 1 instance, got \(_fooInstances.count)") - XCTAssertEqual(_fooInstances, Self.fooInstances, "Expected service[\(Self.computedFooService)] to have instances \(Self.fooInstances), got \(_fooInstances)") - - let barResult = try ensureResult(serviceDiscovery: serviceDiscovery, service: Self.computedBarService) - guard case .success(let _barInstances) = barResult else { - return XCTFail("Failed to lookup instances for service[\(Self.computedBarService)]") - } - XCTAssertEqual(_barInstances.count, 2, "Expected service[\(Self.computedBarService)] to have 2 instances, got \(_barInstances.count)") - XCTAssertEqual(_barInstances, Self.barInstances, "Expected service[\(Self.computedBarService)] to have instances \(Self.barInstances), got \(_barInstances)") - } - - func test_lookup_errorIfServiceUnknown() throws { - let unknownService = "unknown-service" - let unknownComputedService = 3 - - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: ["foo-service": []]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).mapService(serviceType: Int.self) { _ in unknownService } - - let result = try ensureResult(serviceDiscovery: serviceDiscovery, service: unknownComputedService) - guard case .failure(let error) = result else { - return XCTFail("Lookup instances for service[\(unknownComputedService)] should return an error") - } - guard let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected LookupError.unknownService, got \(error)") - } - } - - func test_subscribe() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let baseServiceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let serviceDiscovery = baseServiceDiscovery.mapService(serviceType: Int.self) { service in Self.services[service] } - - let semaphore = DispatchSemaphore(value: 0) - let resultCounter = ManagedAtomic(0) - - let onCompleteInvoked = ManagedAtomic(false) - let onComplete: (CompletionReason) -> Void = { reason in - XCTAssertEqual(reason, .serviceDiscoveryUnavailable, "Expected CompletionReason to be .serviceDiscoveryUnavailable, got \(reason)") - onCompleteInvoked.store(true, ordering: .relaxed) - } - - // Two results are expected: - // Result #1: LookupError.unknownService because bar-service is not registered - // Result #2: Later we register bar-service and that should notify the subscriber - _ = serviceDiscovery.subscribe( - to: Self.computedBarService, - onNext: { result in - resultCounter.wrappingIncrement(ordering: .relaxed) - - guard resultCounter.load(ordering: .relaxed) <= 2 else { - return XCTFail("Expected to receive result 2 times only") - } - - switch result { - case .failure(let error): - guard resultCounter.load(ordering: .relaxed) == 1, let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected the first result to be LookupError.unknownService since \(Self.computedBarService) is not registered, got \(error)") - } - case .success(let instances): - guard resultCounter.load(ordering: .relaxed) == 2 else { - return XCTFail("Expected to receive instances list on the second result only, but at result #\(resultCounter.load(ordering: .relaxed)) got \(instances)") - } - XCTAssertEqual(instances, Self.barInstances, "Expected instances of \(Self.computedBarService) to be \(Self.barInstances), got \(instances)") - semaphore.signal() - } - }, - onComplete: onComplete - ) - - // Allow time for first result of `subscribe` - usleep(100_000) - baseServiceDiscovery.register(Self.barService, instances: Self.barInstances) - - _ = semaphore.wait(timeout: DispatchTime.now() + .milliseconds(200)) - - XCTAssertEqual(resultCounter.load(ordering: .relaxed), 2, "Expected to receive result 2 times, got \(resultCounter.load(ordering: .relaxed))") - - // Verify `onComplete` gets invoked on `shutdown` - baseServiceDiscovery.shutdown() - XCTAssertTrue(onCompleteInvoked.load(ordering: .relaxed), "Expected onComplete to be invoked") - } - - func test_subscribe_cancel() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let baseServiceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let serviceDiscovery = baseServiceDiscovery.mapService(serviceType: Int.self) { service in Self.services[service] } - - let semaphore = DispatchSemaphore(value: 0) - let resultCounter1 = ManagedAtomic(0) - let resultCounter2 = ManagedAtomic(0) - - // Two results are expected: - // Result #1: LookupError.unknownService because bar-service is not registered - // Result #2: Later we register bar-service and that should notify the subscribers - _ = serviceDiscovery.subscribe( - to: Self.computedBarService, - onNext: { result in - resultCounter1.wrappingIncrement(ordering: .relaxed) - - guard resultCounter1.load(ordering: .relaxed) <= 2 else { - return XCTFail("Expected to receive result 2 times only") - } - - switch result { - case .failure(let error): - guard resultCounter1.load(ordering: .relaxed) == 1, let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected the first result to be LookupError.unknownService since \(Self.computedBarService) is not registered, got \(error)") - } - case .success(let instances): - guard resultCounter1.load(ordering: .relaxed) == 2 else { - return XCTFail("Expected to receive instances list on the second result only, but at result #\(resultCounter1.load(ordering: .relaxed)) got \(instances)") - } - XCTAssertEqual(instances, Self.barInstances, "Expected instances of \(Self.computedBarService) to be \(Self.barInstances), got \(instances)") - semaphore.signal() - } - }, - onComplete: { _ in } - ) - - let onCompleteInvoked = ManagedAtomic(false) - let onComplete: (CompletionReason) -> Void = { reason in - XCTAssertEqual(reason, .cancellationRequested, "Expected CompletionReason to be .cancellationRequested, got \(reason)") - onCompleteInvoked.store(true, ordering: .relaxed) - } - - // This subscriber receives Result #1 only because we cancel subscription before Result #2 is triggered - let cancellationToken = serviceDiscovery.subscribe( - to: Self.computedBarService, - onNext: { result in - resultCounter2.wrappingIncrement(ordering: .relaxed) - - guard resultCounter2.load(ordering: .relaxed) <= 1 else { - return XCTFail("Expected to receive result 1 time only") - } - - switch result { - case .failure(let error): - guard resultCounter2.load(ordering: .relaxed) == 1, let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected the first result to be LookupError.unknownService since \(Self.computedBarService) is not registered, got \(error)") - } - case .success: - return XCTFail("Does not expect to receive instances list") - } - }, - onComplete: onComplete - ) - - // Allow time for first result of `subscribe` - usleep(100_000) - - cancellationToken.cancel() - // Only subscriber 1 will receive this change - baseServiceDiscovery.register(Self.barService, instances: Self.barInstances) - - _ = semaphore.wait(timeout: DispatchTime.now() + .milliseconds(200)) - - XCTAssertEqual(resultCounter1.load(ordering: .relaxed), 2, "Expected subscriber #1 to receive result 2 times, got \(resultCounter1.load(ordering: .relaxed))") - XCTAssertEqual(resultCounter2.load(ordering: .relaxed), 1, "Expected subscriber #2 to receive result 1 time, got \(resultCounter2.load(ordering: .relaxed))") - // Verify `onComplete` gets invoked on `cancel` - XCTAssertTrue(onCompleteInvoked.load(ordering: .relaxed), "Expected onComplete to be invoked") - } - - func test_concurrency() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let baseServiceDisovery = InMemoryServiceDiscovery(configuration: configuration) - let serviceDiscovery = baseServiceDisovery.mapService(serviceType: Int.self) { service in Self.services[service] } - - let registerSemaphore = DispatchSemaphore(value: 0) - let registerCounter = ManagedAtomic(0) - - let lookupSemaphore = DispatchSemaphore(value: 0) - let lookupCounter = ManagedAtomic(0) - - let times = 100 - for _ in 1 ... times { - DispatchQueue.global().async { - baseServiceDisovery.register(Self.fooService, instances: Self.fooInstances) - registerCounter.wrappingIncrement(ordering: .relaxed) - - if registerCounter.load(ordering: .relaxed) == times { - registerSemaphore.signal() - } - } - - DispatchQueue.global().async { - serviceDiscovery.lookup(Self.computedFooService, deadline: nil) { result in - lookupCounter.wrappingIncrement(ordering: .relaxed) - - guard case .success(let instances) = result, instances == Self.fooInstances else { - return XCTFail("Failed to lookup instances for service[\(Self.fooService)]: \(result)") - } - - if lookupCounter.load(ordering: .relaxed) == times { - lookupSemaphore.signal() - } - } - } - } - - _ = registerSemaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - _ = lookupSemaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - - XCTAssertEqual(registerCounter.load(ordering: .relaxed), times, "Expected register to succeed \(times) times") - XCTAssertEqual(lookupCounter.load(ordering: .relaxed), times, "Expected lookup callback to be called \(times) times") - } - - func testThrownErrorsPropagateIntoFailures() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).mapService { (_: Int) -> String in throw TestError.error } - - let result = try ensureResult(serviceDiscovery: serviceDiscovery, service: Self.computedFooService) - guard case .failure(let err) = result else { - XCTFail("Expected failure, got \(result)") - return - } - XCTAssertEqual(err as? TestError, .error, "Expected \(TestError.error), but got \(err)") - } - - func testThrownErrorsPropagateIntoCancelledSubscriptions() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).mapService { (_: Int) -> String in throw TestError.error } - - let resultGroup = DispatchGroup() - resultGroup.enter() - resultGroup.enter() - - let token = serviceDiscovery.subscribe( - to: Self.computedFooService, - onNext: { result in - defer { - resultGroup.leave() - } - guard case .failure(let err) = result else { - XCTFail("Expected error, got \(result)") - return - } - XCTAssertEqual(err as? TestError, .error) - }, - onComplete: { reason in - defer { - resultGroup.leave() - } - XCTAssertEqual(reason, .failedToMapService) - } - ) - XCTAssertTrue(token.isCancelled) - } - - func testPropagateDefaultTimeout() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: ["foo-service": []]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).mapService(serviceType: Int.self) { service in Self.services[service] } - XCTAssertTrue(compareTimeInterval(configuration.defaultLookupTimeout, serviceDiscovery.defaultLookupTimeout), "\(configuration.defaultLookupTimeout) does not match \(serviceDiscovery.defaultLookupTimeout)") - } - - // MARK: - async/await API tests - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_async_lookup() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - var configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - configuration.register(service: Self.barService, instances: Self.barInstances) - - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).mapService { (service: Int) in Self.services[service] } - - runAsyncAndWaitFor { - let _fooInstances = try await serviceDiscovery.lookup(Self.computedFooService) - XCTAssertEqual(_fooInstances.count, 1, "Expected service[\(Self.computedFooService)] to have 1 instance, got \(_fooInstances.count)") - XCTAssertEqual(_fooInstances, Self.fooInstances, "Expected service[\(Self.computedFooService)] to have instances \(Self.fooInstances), got \(_fooInstances)") - - let _barInstances = try await serviceDiscovery.lookup(Self.computedBarService) - XCTAssertEqual(_barInstances.count, 2, "Expected service[\(Self.computedBarService)] to have 2 instances, got \(_barInstances.count)") - XCTAssertEqual(_barInstances, Self.barInstances, "Expected service[\(Self.computedBarService)] to have instances \(Self.barInstances), got \(_barInstances)") - } - #endif - } - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_async_lookup_errorIfServiceUnknown() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - let unknownService = "unknown-service" - let unknownComputedService = 3 - - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: ["foo-service": []]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).mapService(serviceType: Int.self) { _ in unknownService } - - runAsyncAndWaitFor { - do { - _ = try await serviceDiscovery.lookup(unknownComputedService) - return XCTFail("Lookup instances for service[\(unknownComputedService)] should return an error") - } catch { - guard let lookupError = error as? LookupError, lookupError == .unknownService else { - return XCTFail("Expected LookupError.unknownService, got \(error)") - } - } - } - #endif - } - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_async_subscribe() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let baseServiceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let serviceDiscovery = baseServiceDiscovery.mapService(serviceType: Int.self) { service in Self.services[service] } - - let semaphore = DispatchSemaphore(value: 0) - let counter = ManagedAtomic(0) - - Task.detached { - // Allow time for subscription to start - usleep(100_000) - // Update #1 - baseServiceDiscovery.register(Self.barService, instances: []) - usleep(50000) - // Update #2 - baseServiceDiscovery.register(Self.barService, instances: Self.barInstances) - } - - let task = Task.detached { () -> Void in - do { - for try await instances in serviceDiscovery.subscribe(to: Self.computedBarService) { - switch counter.wrappingIncrementThenLoad(ordering: .relaxed) { - case 1: - XCTAssertEqual(instances, [], "Expected instances of \(Self.computedBarService) to be empty, got \(instances)") - case 2: - XCTAssertEqual(instances, Self.barInstances, "Expected instances of \(Self.computedBarService) to be \(Self.barInstances), got \(instances)") - // This causes the stream to terminate - baseServiceDiscovery.shutdown() - default: - XCTFail("Expected to receive instances 2 times") - } - } - } catch { - switch counter.load(ordering: .relaxed) { - case 2: // shutdown is called after receiving two results - guard let serviceDiscoveryError = error as? ServiceDiscoveryError, serviceDiscoveryError == .unavailable else { - return XCTFail("Expected ServiceDiscoveryError.unavailable, got \(error)") - } - // Test is complete at this point - semaphore.signal() - default: - XCTFail("Unexpected error \(error)") - } - } - } - - _ = semaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - task.cancel() - - XCTAssertEqual(counter.load(ordering: .relaxed), 2, "Expected to receive instances 2 times, got \(counter.load(ordering: .relaxed)) times") - #endif - } - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func testThrownErrorsPropagateIntoAsyncSubscriptions() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration).mapService { (_: Int) -> String in throw TestError.error } - - let semaphore = DispatchSemaphore(value: 0) - - let task = Task.detached { () -> Void in - do { - for try await instances in serviceDiscovery.subscribe(to: Self.computedFooService) { - XCTFail("Expected error, got \(instances)") - } - } catch { - guard let err = error as? TestError, err == .error else { - return XCTFail("Expected TestError.error, got \(error)") - } - } - } - - _ = semaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - task.cancel() - #endif - } -} diff --git a/Tests/ServiceDiscoveryTests/TypeErasedServiceDiscoveryTests.swift b/Tests/ServiceDiscoveryTests/TypeErasedServiceDiscoveryTests.swift deleted file mode 100644 index 885e5f4..0000000 --- a/Tests/ServiceDiscoveryTests/TypeErasedServiceDiscoveryTests.swift +++ /dev/null @@ -1,332 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftServiceDiscovery open source project -// -// Copyright (c) 2020-2023 Apple Inc. and the SwiftServiceDiscovery project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftServiceDiscovery project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Atomics -import Dispatch -@testable import ServiceDiscovery -import XCTest - -class TypeErasedServiceDiscoveryTests: XCTestCase { - typealias Service = String - typealias Instance = HostPort - - static let fooService = "fooService" - static let fooInstances = [ - HostPort(host: "localhost", port: 7001), - ] - - static let barService = "bar-service" - static let barInstances = [ - HostPort(host: "localhost", port: 9001), - HostPort(host: "localhost", port: 9002), - ] - - func test_ServiceDiscoveryBox_lookup() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let boxedServiceDiscovery = ServiceDiscoveryBox(serviceDiscovery) - - let semaphore = DispatchSemaphore(value: 0) - - boxedServiceDiscovery.lookup(Self.fooService) { fooResult in - guard case .success(let _fooInstances) = fooResult else { - return XCTFail("Failed to lookup instances for service[\(Self.fooService)]") - } - XCTAssertEqual(_fooInstances.count, 1, "Expected service[\(Self.fooService)] to have 1 instance, got \(_fooInstances.count)") - XCTAssertEqual(_fooInstances, Self.fooInstances, "Expected service[\(Self.fooService)] to have instances \(Self.fooInstances), got \(_fooInstances)") - - semaphore.signal() - } - - if semaphore.wait(timeout: DispatchTime.now() + .seconds(1)) == .timedOut { - return XCTFail("Failed to lookup instances for service[\(Self.fooService)]: timed out") - } - } - - func test_ServiceDiscoveryBox_subscribe() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let boxedServiceDiscovery = ServiceDiscoveryBox(serviceDiscovery) - - let semaphore = DispatchSemaphore(value: 0) - let resultCounter = ManagedAtomic(0) - - // Two results are expected: - // Result #1: LookupError.unknownService because bar-service is not registered - // Result #2: Later we register bar-service and that should notify the subscriber - boxedServiceDiscovery.subscribe( - to: Self.barService, - onNext: { result in - resultCounter.wrappingIncrement(ordering: .relaxed) - - guard resultCounter.load(ordering: .relaxed) <= 2 else { - return XCTFail("Expected to receive result 2 times only") - } - - switch result { - case .failure(let error): - guard resultCounter.load(ordering: .relaxed) == 1, let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected the first result to be LookupError.unknownService since \(Self.barService) is not registered, got \(error)") - } - case .success(let instances): - guard resultCounter.load(ordering: .relaxed) == 2 else { - return XCTFail("Expected to receive instances list on the second result only, but at result #\(resultCounter.load(ordering: .relaxed)) got \(instances)") - } - XCTAssertEqual(instances, Self.barInstances, "Expected instances of \(Self.barService) to be \(Self.barInstances), got \(instances)") - semaphore.signal() - } - } - ) - - // Allow time for first result of `subscribe` - usleep(100_000) - serviceDiscovery.register(Self.barService, instances: Self.barInstances) - - _ = semaphore.wait(timeout: DispatchTime.now() + .milliseconds(200)) - - XCTAssertEqual(resultCounter.load(ordering: .relaxed), 2, "Expected to receive result 2 times, got \(resultCounter.load(ordering: .relaxed))") - } - - func test_ServiceDiscoveryBox_unwrap() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let boxedServiceDiscovery = ServiceDiscoveryBox(serviceDiscovery) - - XCTAssertNoThrow(try boxedServiceDiscovery.unwrapAs(InMemoryServiceDiscovery.self)) - } - - func test_AnyServiceDiscovery_lookup() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let anyServiceDiscovery = AnyServiceDiscovery(serviceDiscovery) - - let semaphore = DispatchSemaphore(value: 0) - - anyServiceDiscovery.lookupAndUnwrap(Self.fooService) { (fooResult: Result<[Instance], Error>) in - guard case .success(let _fooInstances) = fooResult else { - return XCTFail("Failed to lookup instances for service[\(Self.fooService)]") - } - XCTAssertEqual(_fooInstances.count, 1, "Expected service[\(Self.fooService)] to have 1 instance, got \(_fooInstances.count)") - XCTAssertEqual(_fooInstances, Self.fooInstances, "Expected service[\(Self.fooService)] to have instances \(Self.fooInstances), got \(_fooInstances)") - - semaphore.signal() - } - - if semaphore.wait(timeout: DispatchTime.now() + .seconds(1)) == .timedOut { - return XCTFail("Failed to lookup instances for service[\(Self.fooService)]: timed out") - } - } - - func test_AnyServiceDiscovery_subscribe() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let anyServiceDiscovery = AnyServiceDiscovery(serviceDiscovery) - - let semaphore = DispatchSemaphore(value: 0) - let resultCounter = ManagedAtomic(0) - - // Two results are expected: - // Result #1: LookupError.unknownService because bar-service is not registered - // Result #2: Later we register bar-service and that should notify the subscriber - anyServiceDiscovery.subscribeAndUnwrap( - to: Self.barService, - onNext: { (result: Result<[Instance], Error>) -> Void in - resultCounter.wrappingIncrement(ordering: .relaxed) - - guard resultCounter.load(ordering: .relaxed) <= 2 else { - return XCTFail("Expected to receive result 2 times only") - } - - switch result { - case .failure(let error): - guard resultCounter.load(ordering: .relaxed) == 1, let lookupError = error as? LookupError, case .unknownService = lookupError else { - return XCTFail("Expected the first result to be LookupError.unknownService since \(Self.barService) is not registered, got \(error)") - } - case .success(let instances): - guard resultCounter.load(ordering: .relaxed) == 2 else { - return XCTFail("Expected to receive instances list on the second result only, but at result #\(resultCounter.load(ordering: .relaxed)) got \(instances)") - } - XCTAssertEqual(instances, Self.barInstances, "Expected instances of \(Self.barService) to be \(Self.barInstances), got \(instances)") - semaphore.signal() - } - } - ) - - // Allow time for first result of `subscribe` - usleep(100_000) - serviceDiscovery.register(Self.barService, instances: Self.barInstances) - - _ = semaphore.wait(timeout: DispatchTime.now() + .milliseconds(200)) - - XCTAssertEqual(resultCounter.load(ordering: .relaxed), 2, "Expected to receive result 2 times, got \(resultCounter.load(ordering: .relaxed))") - } - - func test_AnyServiceDiscovery_unwrap() throws { - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let anyServiceDiscovery = AnyServiceDiscovery(serviceDiscovery) - - XCTAssertNoThrow(try anyServiceDiscovery.unwrapAs(InMemoryServiceDiscovery.self)) - } - - // MARK: - async/await API tests - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_ServiceDiscoveryBox_async_lookup() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let boxedServiceDiscovery = ServiceDiscoveryBox(serviceDiscovery) - - runAsyncAndWaitFor { - let _fooInstances = try await boxedServiceDiscovery.lookup(Self.fooService) - XCTAssertEqual(_fooInstances.count, 1, "Expected service[\(Self.fooService)] to have 1 instance, got \(_fooInstances.count)") - XCTAssertEqual(_fooInstances, Self.fooInstances, "Expected service[\(Self.fooService)] to have instances \(Self.fooInstances), got \(_fooInstances)") - } - #endif - } - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_ServiceDiscoveryBox_async_subscribe() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let boxedServiceDiscovery = ServiceDiscoveryBox(serviceDiscovery) - - let semaphore = DispatchSemaphore(value: 0) - let counter = ManagedAtomic(0) - - Task.detached { - // Allow time for subscription to start - usleep(100_000) - // Update #1 - serviceDiscovery.register(Self.barService, instances: []) - usleep(50000) - // Update #2 - serviceDiscovery.register(Self.barService, instances: Self.barInstances) - } - - let task = Task.detached { () -> Void in - do { - for try await instances in boxedServiceDiscovery.subscribe(to: Self.barService) { - switch counter.wrappingIncrementThenLoad(ordering: .relaxed) { - case 1: - XCTAssertEqual(instances, [], "Expected instances of \(Self.barService) to be empty, got \(instances)") - case 2: - XCTAssertEqual(instances, Self.barInstances, "Expected instances of \(Self.barService) to be \(Self.barInstances), got \(instances)") - // This causes the stream to terminate - serviceDiscovery.shutdown() - default: - XCTFail("Expected to receive instances 2 times") - } - } - } catch { - switch counter.load(ordering: .relaxed) { - case 2: // shutdown is called after receiving two results - guard let serviceDiscoveryError = error as? ServiceDiscoveryError, serviceDiscoveryError == .unavailable else { - return XCTFail("Expected ServiceDiscoveryError.unavailable, got \(error)") - } - // Test is complete at this point - semaphore.signal() - default: - XCTFail("Unexpected error \(error)") - } - } - } - - _ = semaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - task.cancel() - - XCTAssertEqual(counter.load(ordering: .relaxed), 2, "Expected to receive instances 2 times, got \(counter.load(ordering: .relaxed)) times") - #endif - } - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_AnyServiceDiscovery_async_lookup() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let anyServiceDiscovery = AnyServiceDiscovery(serviceDiscovery) - - runAsyncAndWaitFor { - let _fooInstances: [Instance] = try await anyServiceDiscovery.lookupAndUnwrap(Self.fooService) - XCTAssertEqual(_fooInstances.count, 1, "Expected service[\(Self.fooService)] to have 1 instance, got \(_fooInstances.count)") - XCTAssertEqual(_fooInstances, Self.fooInstances, "Expected service[\(Self.fooService)] to have instances \(Self.fooInstances), got \(_fooInstances)") - } - #endif - } - - @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func test_AnyServiceDiscovery_async_subscribe() throws { - #if !(compiler(>=5.5) && canImport(_Concurrency)) - try XCTSkipIf(true) - #else - let configuration = InMemoryServiceDiscovery.Configuration(serviceInstances: [Self.fooService: Self.fooInstances]) - let serviceDiscovery = InMemoryServiceDiscovery(configuration: configuration) - let anyServiceDiscovery = AnyServiceDiscovery(serviceDiscovery) - - let semaphore = DispatchSemaphore(value: 0) - let counter = ManagedAtomic(0) - - Task.detached { - // Allow time for subscription to start - usleep(100_000) - // Update #1 - serviceDiscovery.register(Self.barService, instances: []) - usleep(50000) - // Update #2 - serviceDiscovery.register(Self.barService, instances: Self.barInstances) - } - - let task = Task.detached { () -> Void in - do { - for try await instances: [Instance] in anyServiceDiscovery.subscribeAndUnwrap(to: Self.barService) { - switch counter.wrappingIncrementThenLoad(ordering: .relaxed) { - case 1: - XCTAssertEqual(instances, [], "Expected instances of \(Self.barService) to be empty, got \(instances)") - case 2: - XCTAssertEqual(instances, Self.barInstances, "Expected instances of \(Self.barService) to be \(Self.barInstances), got \(instances)") - // This causes the stream to terminate - serviceDiscovery.shutdown() - default: - XCTFail("Expected to receive instances 2 times") - } - } - } catch { - switch counter.load(ordering: .relaxed) { - case 2: // shutdown is called after receiving two results - guard let serviceDiscoveryError = error as? ServiceDiscoveryError, serviceDiscoveryError == .unavailable else { - return XCTFail("Expected ServiceDiscoveryError.unavailable, got \(error)") - } - // Test is complete at this point - semaphore.signal() - default: - XCTFail("Unexpected error \(error)") - } - } - } - - _ = semaphore.wait(timeout: DispatchTime.now() + .seconds(1)) - task.cancel() - - XCTAssertEqual(counter.load(ordering: .relaxed), 2, "Expected to receive instances 2 times, got \(counter.load(ordering: .relaxed)) times") - #endif - } -}