Skip to content

Commit

Permalink
operators: add compactScan() and tryCompactScan()
Browse files Browse the repository at this point in the history
  • Loading branch information
Thibault Wittemberg committed Sep 4, 2021
1 parent 665fc63 commit 9e20a3f
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 0 deletions.
8 changes: 8 additions & 0 deletions CombineExt.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
/* Begin PBXBuildFile section */
1970A8AA25246FBD00799AB6 /* FilterMany.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1970A8A925246FBD00799AB6 /* FilterMany.swift */; };
1970A8B42524730500799AB6 /* FilterManyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1970A8B32524730400799AB6 /* FilterManyTests.swift */; };
1A2FF9A826E3D9770098C2D1 /* CompactScan.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A2FF9A726E3D9770098C2D1 /* CompactScan.swift */; };
1A2FF9AB26E3D9FC0098C2D1 /* CompactScanTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1A2FF9A926E3D9800098C2D1 /* CompactScanTests.swift */; };
BF330EF624F1FFFE001281FC /* CombineSchedulers in Frameworks */ = {isa = PBXBuildFile; productRef = BF330EF524F1FFFE001281FC /* CombineSchedulers */; };
BF330EF924F20032001281FC /* Timer.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF330EF824F20032001281FC /* Timer.swift */; };
BF330EFB24F20080001281FC /* Lock.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF330EFA24F20080001281FC /* Lock.swift */; };
Expand Down Expand Up @@ -108,6 +110,8 @@
/* Begin PBXFileReference section */
1970A8A925246FBD00799AB6 /* FilterMany.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FilterMany.swift; sourceTree = "<group>"; };
1970A8B32524730400799AB6 /* FilterManyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FilterManyTests.swift; sourceTree = "<group>"; };
1A2FF9A726E3D9770098C2D1 /* CompactScan.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CompactScan.swift; sourceTree = "<group>"; };
1A2FF9A926E3D9800098C2D1 /* CompactScanTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CompactScanTests.swift; sourceTree = "<group>"; };
BF330EF824F20032001281FC /* Timer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Timer.swift; sourceTree = "<group>"; };
BF330EFA24F20080001281FC /* Lock.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Lock.swift; sourceTree = "<group>"; };
BF3D3B5C253B83F300D830ED /* IgnoreFailure.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = IgnoreFailure.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -242,6 +246,7 @@
OBJ_18 /* AssignOwnership.swift */,
OBJ_19 /* AssignToMany.swift */,
OBJ_20 /* CombineLatestMany.swift */,
1A2FF9A726E3D9770098C2D1 /* CompactScan.swift */,
OBJ_21 /* Create.swift */,
OBJ_22 /* Dematerialize.swift */,
OBJ_23 /* FlatMapLatest.swift */,
Expand Down Expand Up @@ -290,6 +295,7 @@
OBJ_42 /* AssignOwnershipTests.swift */,
OBJ_43 /* AssignToManyTests.swift */,
OBJ_44 /* CombineLatestManyTests.swift */,
1A2FF9A926E3D9800098C2D1 /* CompactScanTests.swift */,
OBJ_45 /* CreateTests.swift */,
OBJ_46 /* CurrentValueRelayTests.swift */,
OBJ_47 /* DematerializeTests.swift */,
Expand Down Expand Up @@ -550,6 +556,7 @@
OBJ_123 /* AssignOwnershipTests.swift in Sources */,
OBJ_124 /* AssignToManyTests.swift in Sources */,
1970A8B42524730500799AB6 /* FilterManyTests.swift in Sources */,
1A2FF9AB26E3D9FC0098C2D1 /* CompactScanTests.swift in Sources */,
D836234A24EA9888002353AC /* MergeManyTests.swift in Sources */,
OBJ_125 /* CombineLatestManyTests.swift in Sources */,
BF3D3B67253B88E500D830ED /* IgnoreFailureTests.swift in Sources */,
Expand Down Expand Up @@ -581,6 +588,7 @@
buildActionMask = 0;
files = (
OBJ_79 /* DemandBuffer.swift in Sources */,
1A2FF9A826E3D9770098C2D1 /* CompactScan.swift in Sources */,
OBJ_80 /* Sink.swift in Sources */,
OBJ_81 /* Optional.swift in Sources */,
C387777C24E6BBE900FAD2D8 /* Nwise.swift in Sources */,
Expand Down
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ All operators, utilities and helpers respect Combine's publisher contract, inclu
* [ignoreFailure](#ignoreFailure)
* [mapToResult](#mapToResult)
* [flatMapBatches(of:)](#flatMapBatchesof)
* [compactScan()](#compactScan)

### Publishers
* [AnyPublisher.create](#AnypublisherCreate)
Expand Down Expand Up @@ -755,6 +756,30 @@ subscription = ints
.finished
```

------

### compactScan()

Transforms elements from the upstream publisher by providing the current element to a closure along with the last value returned by the closure. If the closure returns a nil value, then the accumulator won't change until the next non-nil upstream publisher value.

```swift
let cancellable = (0...5)
.publisher
.compactScan(0) {
guard $1.isMultiple(of: 2) else { return nil }
return $0 + $1
}
.sink { print ("\($0)") }
```

#### Output

```none
0 2 6
```

The `tryCompactScan()` version behaves the same but with a throwing closure.

## Publishers

This section outlines some of the custom Combine publishers CombineExt provides
Expand Down
92 changes: 92 additions & 0 deletions Sources/Operators/CompactScan.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//
// CompactScan.swift
// CombineExt
//
// Created by Thibault Wittemberg on 04/09/2021.
// Copyright © 2021 Combine Community. All rights reserved.
//

#if canImport(Combine)
import Combine

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publisher {
/// Transforms elements from the upstream publisher by providing the current
/// element to a closure along with the last value returned by the closure.
///
/// The ``nextPartialResult`` closure might return nil values. In that case the accumulator won't change until the next non-nil upstream publisher value.
///
/// Use ``Publisher/compactScan(_:_:)`` to accumulate all previously-published values into a single
/// value, which you then combine with each newly-published value.
///
/// The following example logs a running total of all values received
/// from the sequence publisher.
///
/// let range = (0...5)
/// let cancellable = range.publisher
/// .compactScan(0) {
/// guard $1.isMultiple(of: 2) else { return nil }
/// return $0 + $1
/// }
/// .sink { print ("\($0)", terminator: " ") }
/// // Prints: "0 2 6 ".
///
/// - Parameters:
/// - initialResult: The previous result returned by the `nextPartialResult` closure.
/// - nextPartialResult: A closure that takes as its arguments the previous value returned by the closure and the next element emitted from the upstream publisher.
/// - Returns: A publisher that transforms elements by applying a closure that receives its previous return value and the next element from the upstream publisher.
func compactScan<T>(_ initialResult: T, _ nextPartialResult: @escaping (T, Output) -> T?) -> AnyPublisher<T, Failure> {
self.scan((initialResult, initialResult)) { accumulator, value -> (T, T?) in
let lastNonNilAccumulator = accumulator.0
let newAccumulator = nextPartialResult(lastNonNilAccumulator, value)
return (newAccumulator ?? lastNonNilAccumulator, newAccumulator)
}
.compactMap { $0.1 }
.eraseToAnyPublisher()
}

/// Transforms elements from the upstream publisher by providing the current element to an error-throwing closure along with the last value returned by the closure.
///
/// The ``nextPartialResult`` closure might return nil values. In that case the accumulator won't change until the next non-nil upstream publisher value.
///
/// Use ``Publisher/tryCompactScan(_:_:)`` to accumulate all previously-published values into a single value, which you then combine with each newly-published value.
/// If your accumulator closure throws an error, the publisher terminates with the error.
///
/// In the example below, ``Publisher/tryCompactScan(_:_:)`` calls a division function on elements of a collection publisher. The resulting publisher publishes each result until the function encounters a `DivisionByZeroError`, which terminates the publisher.
///
/// struct DivisionByZeroError: Error {}
///
/// /// A function that throws a DivisionByZeroError if `current` provided by the TryScan publisher is zero.
/// func myThrowingFunction(_ lastValue: Int, _ currentValue: Int) throws -> Int? {
/// guard currentValue.isMultiple(of: 2) else { return nil }
/// guard currentValue != 0 else { throw DivisionByZeroError() }
/// return lastValue / currentValue
/// }
///
/// let numbers = [1, 2, 3, 4, 5, 0, 6, 7, 8, 9]
/// let cancellable = numbers.publisher
/// .tryCompactScan(10) { try myThrowingFunction($0, $1) }
/// .sink(
/// receiveCompletion: { print ("\($0)") },
/// receiveValue: { print ("\($0)", terminator: " ") }
/// )
///
/// // Prints: "6 2 failure(DivisionByZeroError())".
///
/// If the closure throws an error, the publisher fails with the error.
///
/// - Parameters:
/// - initialResult: The previous result returned by the `nextPartialResult` closure.
/// - nextPartialResult: An error-throwing closure that takes as its arguments the previous value returned by the closure and the next element emitted from the upstream publisher.
/// - Returns: A publisher that transforms elements by applying a closure that receives its previous return value and the next element from the upstream publisher.
func tryCompactScan<T>(_ initialResult: T, _ nextPartialResult: @escaping (T, Output) throws -> T?) -> AnyPublisher<T, Error> {
self.tryScan((initialResult, initialResult)) { accumulator, value -> (T, T?) in
let lastNonNilAccumulator = accumulator.0
let newAccumulator = try nextPartialResult(lastNonNilAccumulator, value)
return (newAccumulator ?? lastNonNilAccumulator, newAccumulator)
}
.compactMap { $0.1 }
.eraseToAnyPublisher()
}
}
#endif
98 changes: 98 additions & 0 deletions Tests/CompactScanTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//
// CompactScanTests.swift
// CombineExtTests
//
// Created by Thibault Wittemberg on 04/09/2021.
// Copyright © 2021 Combine Community. All rights reserved.
//

#if !os(watchOS)
import XCTest
import Combine
import CombineExt

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
final class CompactScanTests: XCTestCase {
func testCompactScan_drops_nil_values() {
let expectedValues = [0, 2, 6]
var receivedValues = [Int]()

// Given: a stream of integers from 0 to 5
let sut = (0...5).publisher

// When: using a compactScan operator using a closure that returns nil when the value from the upstream publisher is odd
let cancellable = sut
.compactScan(0) {
guard $1.isMultiple(of: 2) else { return nil }
return $0 + $1
}
.assertNoFailure()
.sink { receivedValues.append($0) }

// Then: the nil results have been discarded
XCTAssertEqual(receivedValues, expectedValues)

cancellable.cancel()
}

func testTryCompactScan_drops_nil_values() {
let expectedValues = [0, 2, 6]
var receivedValues = [Int]()

// Given: a stream of integers from 0 to 5
let sut = (0...5).publisher

// When: using a tryCompactScan operator using a closure that returns nil when the value from the upstream publisher is odd
let cancellable = sut
.tryCompactScan(0) {
guard $1.isMultiple(of: 2) else { return nil }
return $0 + $1
}
.assertNoFailure()
.sink { receivedValues.append($0) }

// Then: the nil results have been discarded
XCTAssertEqual(receivedValues, expectedValues)

cancellable.cancel()
}

func testTryCompactScan_drops_nil_values_and_throws_error() {
struct DivisionByZeroError: Error, Equatable {}

let expectedValues = [6, 2]
var receivedValues = [Int]()

let expectedError = DivisionByZeroError()
var receivedCompletion: Subscribers.Completion<Error>?

// Given: a sequence a integers containing a 0
let sut = [1, 2, 3, 4, 5, 0, 6, 7, 8, 9].publisher

// When: using a tryCompactScan operator using a closure that returns nil when the value from the upstream publisher is odd
// and throws when the value is 0
let cancellable = sut
.tryCompactScan(10) {
guard $1.isMultiple(of: 2) else { return nil }
guard $1 != 0 else { throw expectedError }
return ($0 + $1) / $1
}
.sink {
receivedCompletion = $0
} receiveValue: {
receivedValues.append($0)
}

cancellable.cancel()

// Then: the nil results have been discarded
XCTAssertEqual(receivedValues, expectedValues)

// Then: the thrown error provoqued a failure
switch receivedCompletion {
case let .failure(receivedError): XCTAssertEqual(receivedError as? DivisionByZeroError, expectedError)
default: XCTFail()
}
}
}
#endif

0 comments on commit 9e20a3f

Please sign in to comment.