diff --git a/Sources/Operators/Amb.swift b/Sources/Operators/Amb.swift index 5801e6c..71e692d 100644 --- a/Sources/Operators/Amb.swift +++ b/Sources/Operators/Amb.swift @@ -103,8 +103,10 @@ private extension Publishers.Amb { guard let decision = decision else { return } switch decision { case .first: + secondSink?.cancelUpstream() secondSink = nil case .second: + firstSink?.cancelUpstream() firstSink = nil } @@ -144,7 +146,9 @@ private extension Publishers.Amb { } func cancel() { + firstSink?.cancelUpstream() firstSink = nil + secondSink?.cancelUpstream() secondSink = nil } } diff --git a/Tests/AmbTests.swift b/Tests/AmbTests.swift index fa8177a..dd5ceec 100644 --- a/Tests/AmbTests.swift +++ b/Tests/AmbTests.swift @@ -47,6 +47,65 @@ class AmbTests: XCTestCase { XCTAssertEqual(completion, .finished) } + func testAmbCancelPreSubscription() { + let ambPublisher: AnyCancellable? + + let subject1Cancelled = expectation(description: "first publisher cancelled") + let subject1 = PassthroughSubject() + let subject1Publisher = subject1 + .handleEvents(receiveCancel: { + subject1Cancelled.fulfill() + }) + .eraseToAnyPublisher() + + let subject2Cancelled = expectation(description: "second publisher cancelled") + let subject2 = PassthroughSubject() + let subject2Publisher = subject2 + .handleEvents(receiveCancel: { + subject2Cancelled.fulfill() + }) + .eraseToAnyPublisher() + + ambPublisher = Publishers.Amb(first: subject1Publisher, second: subject2Publisher) + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in }) + + // cancelling amb should cancel the inner publishers + ambPublisher?.cancel() + + waitForExpectations(timeout: 0.01) + } + + func testAmbCancelPostSubscription() { + let subject1 = PassthroughSubject() + var subject1cancelCounter = 0 + let subject1Publisher = subject1 + .handleEvents(receiveCancel: { + subject1cancelCounter += 1 + }) + .eraseToAnyPublisher() + + let subject2 = PassthroughSubject() + var subject2cancelCounter = 0 + let subject2Publisher = subject2 + .handleEvents(receiveCancel: { + subject2cancelCounter += 1 + }) + .eraseToAnyPublisher() + + Publishers.Amb(first: subject1Publisher, second: subject2Publisher) + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + // subject1 wins the race, so 2 has to be cancelled + subject1.send(1) + + // At dealloc both publishes are cancelled, so we cannot use expectations here and count the cancel events instead + XCTAssertEqual(subject1cancelCounter, 0) + XCTAssertEqual(subject2cancelCounter, 1) + } + func testAmbLimitedPreDemand() { let subject1 = PassthroughSubject() let subject2 = PassthroughSubject()