Skip to content

Commit

Permalink
operators: add compactScan() and tryCompactScan()
Browse files Browse the repository at this point in the history
  • Loading branch information
Thibault Wittemberg committed Sep 4, 2021
1 parent 665fc63 commit 078ae29
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 0 deletions.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ All operators, utilities and helpers respect Combine's publisher contract, inclu
* [ignoreFailure](#ignoreFailure)
* [mapToResult](#mapToResult)
* [flatMapBatches(of:)](#flatMapBatchesof)
* [compactScan()](#compactScan)

### Publishers
* [AnyPublisher.create](#AnypublisherCreate)
Expand Down Expand Up @@ -755,6 +756,30 @@ subscription = ints
.finished
```

------

### compactScan()

Transforms elements from the upstream publisher by providing the current element to a closure along with the last value returned by the closure. If the closure returns a nil value, then the accumulator won't change until the next non-nil upstream publisher value.

```swift
let cancellable = (0...5)
.publisher
.compactScan(0) {
guard $1.isMultiple(of: 2) else { return nil }
return $0 + $1
}
.sink { print ("\($0)") }
```

#### Output

```none
0 2 6
```

The `tryCompactScan()` version behaves the same but with a throwing closure.

## Publishers

This section outlines some of the custom Combine publishers CombineExt provides
Expand Down
93 changes: 93 additions & 0 deletions Sources/Operators/CompactScan.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//
// CompactScan.swift
// CombineExt
//
// Created by Thibault Wittemberg on 04/09/2021.
// Copyright © 2021 Combine Community. All rights reserved.
//

#if canImport(Combine)
import Combine

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publisher {

/// Transforms elements from the upstream publisher by providing the current
/// element to a closure along with the last value returned by the closure.
///
/// The ``nextPartialResult`` closure might return nil values. In that case the accumulator won't change until the next non-nil upstream publisher value.
///
/// Use ``Publisher/compactScan(_:_:)`` to accumulate all previously-published values into a single
/// value, which you then combine with each newly-published value.
///
/// The following example logs a running total of all values received
/// from the sequence publisher.
///
/// let range = (0...5)
/// let cancellable = range.publisher
/// .compactScan(0) {
/// guard $1.isMultiple(of: 2) else { return nil }
/// return $0 + $1
/// }
/// .sink { print ("\($0)", terminator: " ") }
/// // Prints: "0 2 6 ".
///
/// - Parameters:
/// - initialResult: The previous result returned by the `nextPartialResult` closure.
/// - nextPartialResult: A closure that takes as its arguments the previous value returned by the closure and the next element emitted from the upstream publisher.
/// - Returns: A publisher that transforms elements by applying a closure that receives its previous return value and the next element from the upstream publisher.
func compactScan<T>(_ initialResult: T, _ nextPartialResult: @escaping (T, Output) -> T?) -> AnyPublisher<T, Failure> {
self.scan((initialResult, initialResult)) { accumulator, value -> (T, T?) in
let lastNonNilAccumulator = accumulator.0
let newAccumulator = nextPartialResult(lastNonNilAccumulator, value)
return (newAccumulator ?? lastNonNilAccumulator, newAccumulator)
}
.compactMap { $0.1 }
.eraseToAnyPublisher()
}

/// Transforms elements from the upstream publisher by providing the current element to an error-throwing closure along with the last value returned by the closure.
///
/// The ``nextPartialResult`` closure might return nil values. In that case the accumulator won't change until the next non-nil upstream publisher value.
///
/// Use ``Publisher/tryCompactScan(_:_:)`` to accumulate all previously-published values into a single value, which you then combine with each newly-published value.
/// If your accumulator closure throws an error, the publisher terminates with the error.
///
/// In the example below, ``Publisher/tryCompactScan(_:_:)`` calls a division function on elements of a collection publisher. The resulting publisher publishes each result until the function encounters a `DivisionByZeroError`, which terminates the publisher.
///m
/// struct DivisionByZeroError: Error {}
///
/// /// A function that throws a DivisionByZeroError if `current` provided by the TryScan publisher is zero.
/// func myThrowingFunction(_ lastValue: Int, _ currentValue: Int) throws -> Int? {
/// guard currentValue.isMultiple(of: 2) else { return nil }
/// guard currentValue != 0 else { throw DivisionByZeroError() }
/// return lastValue / currentValue
/// }
///
/// let numbers = [1, 2, 3, 4, 5, 0, 6, 7, 8, 9]
/// let cancellable = numbers.publisher
/// .tryCompactScan(10) { try myThrowingFunction($0, $1) }
/// .sink(
/// receiveCompletion: { print ("\($0)") },
/// receiveValue: { print ("\($0)", terminator: " ") }
/// )
///
/// // Prints: "6 2 failure(DivisionByZeroError())".
///
/// If the closure throws an error, the publisher fails with the error.
///
/// - Parameters:
/// - initialResult: The previous result returned by the `nextPartialResult` closure.
/// - nextPartialResult: An error-throwing closure that takes as its arguments the previous value returned by the closure and the next element emitted from the upstream publisher.
/// - Returns: A publisher that transforms elements by applying a closure that receives its previous return value and the next element from the upstream publisher.
func tryCompactScan<T>(_ initialResult: T, _ nextPartialResult: @escaping (T, Output) throws -> T?) -> AnyPublisher<T, Error> {
self.tryScan((initialResult, initialResult)) { accumulator, value -> (T, T?) in
let lastNonNilAccumulator = accumulator.0
let newAccumulator = try nextPartialResult(lastNonNilAccumulator, value)
return (newAccumulator ?? lastNonNilAccumulator, newAccumulator)
}
.compactMap { $0.1 }
.eraseToAnyPublisher()
}
}
#endif
95 changes: 95 additions & 0 deletions Tests/CompactScanTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
//
// CompactScanTests.swift
// CombineExtTests
//
// Created by Thibault Wittemberg on 04/09/2021.
// Copyright © 2021 Combine Community. All rights reserved.
//

import Combine
import CombineExt
import XCTest

final class CompactScanTests: XCTestCase {
func testCompactScan_drops_nil_values() {
let expectedValues = [0, 2, 6]
var receivedValues = [Int]()

// Given: a stream of integers from 0 to 5
let sut = (0...5).publisher

// When: using a compactScan operator using a closure that returns nil when the value from the upstream publisher is odd
let cancellable = sut
.compactScan(0) {
guard $1.isMultiple(of: 2) else { return nil }
return $0 + $1
}
.assertNoFailure()
.sink { receivedValues.append($0) }

// Then: the nil results have been discarded
XCTAssertEqual(receivedValues, expectedValues)

cancellable.cancel()
}

func testTryCompactScan_drops_nil_values() {
let expectedValues = [0, 2, 6]
var receivedValues = [Int]()

// Given: a stream of integers from 0 to 5
let sut = (0...5).publisher

// When: using a tryCompactScan operator using a closure that returns nil when the value from the upstream publisher is odd
let cancellable = sut
.tryCompactScan(0) {
guard $1.isMultiple(of: 2) else { return nil }
return $0 + $1
}
.assertNoFailure()
.sink { receivedValues.append($0) }

// Then: the nil results have been discarded
XCTAssertEqual(receivedValues, expectedValues)

cancellable.cancel()
}

func testTryCompactScan_drops_nil_values_and_throws_error() {
struct DivisionByZeroError: Error, Equatable {}

let expectedValues = [6, 2]
var receivedValues = [Int]()

let expectedError = DivisionByZeroError()
var receivedCompletion: Subscribers.Completion<Error>?

// Given: a sequence a integers containing a 0
let sut = [1, 2, 3, 4, 5, 0, 6, 7, 8, 9].publisher

// When: using a tryCompactScan operator using a closure that returns nil when the value from the upstream publisher is odd
// and throws when the value is 0
let cancellable = sut
.tryCompactScan(10) {
guard $1.isMultiple(of: 2) else { return nil }
guard $1 != 0 else { throw expectedError }
return ($0 + $1) / $1
}
.sink {
receivedCompletion = $0
} receiveValue: {
receivedValues.append($0)
}

cancellable.cancel()

// Then: the nil results have been discarded
XCTAssertEqual(receivedValues, expectedValues)

// Then: the thrown error provoqued a failure
switch receivedCompletion {
case let .failure(receivedError): XCTAssertEqual(receivedError as? DivisionByZeroError, expectedError)
default: XCTFail()
}
}
}

0 comments on commit 078ae29

Please sign in to comment.