From 9e20a3f6c1ca6e2757b65416f4f8634080407b36 Mon Sep 17 00:00:00 2001 From: Thibault Wittemberg Date: Sat, 4 Sep 2021 18:38:15 +0200 Subject: [PATCH] operators: add compactScan() and tryCompactScan() --- CombineExt.xcodeproj/project.pbxproj | 8 +++ README.md | 25 +++++++ Sources/Operators/CompactScan.swift | 92 ++++++++++++++++++++++++++ Tests/CompactScanTests.swift | 98 ++++++++++++++++++++++++++++ 4 files changed, 223 insertions(+) create mode 100644 Sources/Operators/CompactScan.swift create mode 100644 Tests/CompactScanTests.swift diff --git a/CombineExt.xcodeproj/project.pbxproj b/CombineExt.xcodeproj/project.pbxproj index fe9cb70..dbc1c9b 100644 --- a/CombineExt.xcodeproj/project.pbxproj +++ b/CombineExt.xcodeproj/project.pbxproj @@ -24,6 +24,8 @@ /* Begin PBXBuildFile section */ 1970A8AA25246FBD00799AB6 /* FilterMany.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1970A8A925246FBD00799AB6 /* FilterMany.swift */; }; 1970A8B42524730500799AB6 /* FilterManyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1970A8B32524730400799AB6 /* FilterManyTests.swift */; }; + 1A2FF9A826E3D9770098C2D1 /* CompactScan.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A2FF9A726E3D9770098C2D1 /* CompactScan.swift */; }; + 1A2FF9AB26E3D9FC0098C2D1 /* CompactScanTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A2FF9A926E3D9800098C2D1 /* CompactScanTests.swift */; }; BF330EF624F1FFFE001281FC /* CombineSchedulers in Frameworks */ = {isa = PBXBuildFile; productRef = BF330EF524F1FFFE001281FC /* CombineSchedulers */; }; BF330EF924F20032001281FC /* Timer.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF330EF824F20032001281FC /* Timer.swift */; }; BF330EFB24F20080001281FC /* Lock.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF330EFA24F20080001281FC /* Lock.swift */; }; @@ -108,6 +110,8 @@ /* Begin PBXFileReference section */ 1970A8A925246FBD00799AB6 /* FilterMany.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FilterMany.swift; sourceTree = ""; }; 1970A8B32524730400799AB6 /* FilterManyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FilterManyTests.swift; sourceTree = ""; }; + 1A2FF9A726E3D9770098C2D1 /* CompactScan.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CompactScan.swift; sourceTree = ""; }; + 1A2FF9A926E3D9800098C2D1 /* CompactScanTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CompactScanTests.swift; sourceTree = ""; }; BF330EF824F20032001281FC /* Timer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Timer.swift; sourceTree = ""; }; BF330EFA24F20080001281FC /* Lock.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Lock.swift; sourceTree = ""; }; BF3D3B5C253B83F300D830ED /* IgnoreFailure.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = IgnoreFailure.swift; sourceTree = ""; }; @@ -242,6 +246,7 @@ OBJ_18 /* AssignOwnership.swift */, OBJ_19 /* AssignToMany.swift */, OBJ_20 /* CombineLatestMany.swift */, + 1A2FF9A726E3D9770098C2D1 /* CompactScan.swift */, OBJ_21 /* Create.swift */, OBJ_22 /* Dematerialize.swift */, OBJ_23 /* FlatMapLatest.swift */, @@ -290,6 +295,7 @@ OBJ_42 /* AssignOwnershipTests.swift */, OBJ_43 /* AssignToManyTests.swift */, OBJ_44 /* CombineLatestManyTests.swift */, + 1A2FF9A926E3D9800098C2D1 /* CompactScanTests.swift */, OBJ_45 /* CreateTests.swift */, OBJ_46 /* CurrentValueRelayTests.swift */, OBJ_47 /* DematerializeTests.swift */, @@ -550,6 +556,7 @@ OBJ_123 /* AssignOwnershipTests.swift in Sources */, OBJ_124 /* AssignToManyTests.swift in Sources */, 1970A8B42524730500799AB6 /* FilterManyTests.swift in Sources */, + 1A2FF9AB26E3D9FC0098C2D1 /* CompactScanTests.swift in Sources */, D836234A24EA9888002353AC /* MergeManyTests.swift in Sources */, OBJ_125 /* CombineLatestManyTests.swift in Sources */, BF3D3B67253B88E500D830ED /* IgnoreFailureTests.swift in Sources */, @@ -581,6 +588,7 @@ buildActionMask = 0; files = ( OBJ_79 /* DemandBuffer.swift in Sources */, + 1A2FF9A826E3D9770098C2D1 /* CompactScan.swift in Sources */, OBJ_80 /* Sink.swift in Sources */, OBJ_81 /* Optional.swift in Sources */, C387777C24E6BBE900FAD2D8 /* Nwise.swift in Sources */, diff --git a/README.md b/README.md index 3f6b169..277f360 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ All operators, utilities and helpers respect Combine's publisher contract, inclu * [ignoreFailure](#ignoreFailure) * [mapToResult](#mapToResult) * [flatMapBatches(of:)](#flatMapBatchesof) +* [compactScan()](#compactScan) ### Publishers * [AnyPublisher.create](#AnypublisherCreate) @@ -755,6 +756,30 @@ subscription = ints .finished ``` +------ + +### compactScan() + +Transforms elements from the upstream publisher by providing the current element to a closure along with the last value returned by the closure. If the closure returns a nil value, then the accumulator won't change until the next non-nil upstream publisher value. + +```swift +let cancellable = (0...5) + .publisher + .compactScan(0) { + guard $1.isMultiple(of: 2) else { return nil } + return $0 + $1 + } + .sink { print ("\($0)") } +``` + +#### Output + +```none +0 2 6 +``` + +The `tryCompactScan()` version behaves the same but with a throwing closure. + ## Publishers This section outlines some of the custom Combine publishers CombineExt provides diff --git a/Sources/Operators/CompactScan.swift b/Sources/Operators/CompactScan.swift new file mode 100644 index 0000000..ddb962b --- /dev/null +++ b/Sources/Operators/CompactScan.swift @@ -0,0 +1,92 @@ +// +// CompactScan.swift +// CombineExt +// +// Created by Thibault Wittemberg on 04/09/2021. +// Copyright © 2021 Combine Community. All rights reserved. +// + +#if canImport(Combine) +import Combine + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public extension Publisher { + /// Transforms elements from the upstream publisher by providing the current + /// element to a closure along with the last value returned by the closure. + /// + /// The ``nextPartialResult`` closure might return nil values. In that case the accumulator won't change until the next non-nil upstream publisher value. + /// + /// Use ``Publisher/compactScan(_:_:)`` to accumulate all previously-published values into a single + /// value, which you then combine with each newly-published value. + /// + /// The following example logs a running total of all values received + /// from the sequence publisher. + /// + /// let range = (0...5) + /// let cancellable = range.publisher + /// .compactScan(0) { + /// guard $1.isMultiple(of: 2) else { return nil } + /// return $0 + $1 + /// } + /// .sink { print ("\($0)", terminator: " ") } + /// // Prints: "0 2 6 ". + /// + /// - Parameters: + /// - initialResult: The previous result returned by the `nextPartialResult` closure. + /// - nextPartialResult: A closure that takes as its arguments the previous value returned by the closure and the next element emitted from the upstream publisher. + /// - Returns: A publisher that transforms elements by applying a closure that receives its previous return value and the next element from the upstream publisher. + func compactScan(_ initialResult: T, _ nextPartialResult: @escaping (T, Output) -> T?) -> AnyPublisher { + self.scan((initialResult, initialResult)) { accumulator, value -> (T, T?) in + let lastNonNilAccumulator = accumulator.0 + let newAccumulator = nextPartialResult(lastNonNilAccumulator, value) + return (newAccumulator ?? lastNonNilAccumulator, newAccumulator) + } + .compactMap { $0.1 } + .eraseToAnyPublisher() + } + + /// Transforms elements from the upstream publisher by providing the current element to an error-throwing closure along with the last value returned by the closure. + /// + /// The ``nextPartialResult`` closure might return nil values. In that case the accumulator won't change until the next non-nil upstream publisher value. + /// + /// Use ``Publisher/tryCompactScan(_:_:)`` to accumulate all previously-published values into a single value, which you then combine with each newly-published value. + /// If your accumulator closure throws an error, the publisher terminates with the error. + /// + /// In the example below, ``Publisher/tryCompactScan(_:_:)`` calls a division function on elements of a collection publisher. The resulting publisher publishes each result until the function encounters a `DivisionByZeroError`, which terminates the publisher. + /// + /// struct DivisionByZeroError: Error {} + /// + /// /// A function that throws a DivisionByZeroError if `current` provided by the TryScan publisher is zero. + /// func myThrowingFunction(_ lastValue: Int, _ currentValue: Int) throws -> Int? { + /// guard currentValue.isMultiple(of: 2) else { return nil } + /// guard currentValue != 0 else { throw DivisionByZeroError() } + /// return lastValue / currentValue + /// } + /// + /// let numbers = [1, 2, 3, 4, 5, 0, 6, 7, 8, 9] + /// let cancellable = numbers.publisher + /// .tryCompactScan(10) { try myThrowingFunction($0, $1) } + /// .sink( + /// receiveCompletion: { print ("\($0)") }, + /// receiveValue: { print ("\($0)", terminator: " ") } + /// ) + /// + /// // Prints: "6 2 failure(DivisionByZeroError())". + /// + /// If the closure throws an error, the publisher fails with the error. + /// + /// - Parameters: + /// - initialResult: The previous result returned by the `nextPartialResult` closure. + /// - nextPartialResult: An error-throwing closure that takes as its arguments the previous value returned by the closure and the next element emitted from the upstream publisher. + /// - Returns: A publisher that transforms elements by applying a closure that receives its previous return value and the next element from the upstream publisher. + func tryCompactScan(_ initialResult: T, _ nextPartialResult: @escaping (T, Output) throws -> T?) -> AnyPublisher { + self.tryScan((initialResult, initialResult)) { accumulator, value -> (T, T?) in + let lastNonNilAccumulator = accumulator.0 + let newAccumulator = try nextPartialResult(lastNonNilAccumulator, value) + return (newAccumulator ?? lastNonNilAccumulator, newAccumulator) + } + .compactMap { $0.1 } + .eraseToAnyPublisher() + } +} +#endif diff --git a/Tests/CompactScanTests.swift b/Tests/CompactScanTests.swift new file mode 100644 index 0000000..b17e47c --- /dev/null +++ b/Tests/CompactScanTests.swift @@ -0,0 +1,98 @@ +// +// CompactScanTests.swift +// CombineExtTests +// +// Created by Thibault Wittemberg on 04/09/2021. +// Copyright © 2021 Combine Community. All rights reserved. +// + +#if !os(watchOS) +import XCTest +import Combine +import CombineExt + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +final class CompactScanTests: XCTestCase { + func testCompactScan_drops_nil_values() { + let expectedValues = [0, 2, 6] + var receivedValues = [Int]() + + // Given: a stream of integers from 0 to 5 + let sut = (0...5).publisher + + // When: using a compactScan operator using a closure that returns nil when the value from the upstream publisher is odd + let cancellable = sut + .compactScan(0) { + guard $1.isMultiple(of: 2) else { return nil } + return $0 + $1 + } + .assertNoFailure() + .sink { receivedValues.append($0) } + + // Then: the nil results have been discarded + XCTAssertEqual(receivedValues, expectedValues) + + cancellable.cancel() + } + + func testTryCompactScan_drops_nil_values() { + let expectedValues = [0, 2, 6] + var receivedValues = [Int]() + + // Given: a stream of integers from 0 to 5 + let sut = (0...5).publisher + + // When: using a tryCompactScan operator using a closure that returns nil when the value from the upstream publisher is odd + let cancellable = sut + .tryCompactScan(0) { + guard $1.isMultiple(of: 2) else { return nil } + return $0 + $1 + } + .assertNoFailure() + .sink { receivedValues.append($0) } + + // Then: the nil results have been discarded + XCTAssertEqual(receivedValues, expectedValues) + + cancellable.cancel() + } + + func testTryCompactScan_drops_nil_values_and_throws_error() { + struct DivisionByZeroError: Error, Equatable {} + + let expectedValues = [6, 2] + var receivedValues = [Int]() + + let expectedError = DivisionByZeroError() + var receivedCompletion: Subscribers.Completion? + + // Given: a sequence a integers containing a 0 + let sut = [1, 2, 3, 4, 5, 0, 6, 7, 8, 9].publisher + + // When: using a tryCompactScan operator using a closure that returns nil when the value from the upstream publisher is odd + // and throws when the value is 0 + let cancellable = sut + .tryCompactScan(10) { + guard $1.isMultiple(of: 2) else { return nil } + guard $1 != 0 else { throw expectedError } + return ($0 + $1) / $1 + } + .sink { + receivedCompletion = $0 + } receiveValue: { + receivedValues.append($0) + } + + cancellable.cancel() + + // Then: the nil results have been discarded + XCTAssertEqual(receivedValues, expectedValues) + + // Then: the thrown error provoqued a failure + switch receivedCompletion { + case let .failure(receivedError): XCTAssertEqual(receivedError as? DivisionByZeroError, expectedError) + default: XCTFail() + } + } +} +#endif