Skip to content

Commit

Permalink
Result overload for First/Last/Single
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 12, 2023
1 parent 0f73153 commit 31f78c6
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 7 deletions.
4 changes: 1 addition & 3 deletions src/R3/FrameProvider.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using R3.Internal;

namespace R3;
namespace R3;

public abstract class FrameProvider
{
Expand Down
4 changes: 2 additions & 2 deletions src/R3/Operators/AggregateAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected override void OnCompletedCore(TComplete complete)
{
try
{
var result = resultSelector(value, complete);
var result = resultSelector(value, complete); // trap this resultSelector exception
tcs.TrySetResult(result);
}
catch (Exception ex)
Expand Down Expand Up @@ -85,7 +85,7 @@ protected override void OnCompletedCore(Result<TComplete> complete)
{
try
{
var result = resultSelector(value, complete);
var result = resultSelector(value, complete); // trap this resultSelector exception
if (complete.IsSuccess)
{
tcs.TrySetResult(result);
Expand Down
75 changes: 75 additions & 0 deletions src/R3/Operators/FirstLastSingle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

public static partial class EventExtensions
{
// Completable

public static Task<TMessage> FirstAsync<TMessage, TComplete>(this CompletableEvent<TMessage, TComplete> source, CancellationToken cancellationToken = default) => FirstAsync(source, static _ => true, cancellationToken);
public static Task<TMessage> FirstOrDefaultAsync<TMessage, TComplete>(this CompletableEvent<TMessage, TComplete> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken);
public static Task<TMessage> LastAsync<TMessage, TComplete>(this CompletableEvent<TMessage, TComplete> source, CancellationToken cancellationToken = default) => LastAsync(source, static _ => true, cancellationToken);
Expand All @@ -20,6 +22,22 @@ public static partial class EventExtensions
public static Task<TMessage> SingleAsync<TMessage, TComplete>(this CompletableEvent<TMessage, TComplete> source, Func<TMessage, bool> predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, false, default, predicate, cancellationToken);
public static Task<TMessage> SingleOrDefaultAsync<TMessage, TComplete>(this CompletableEvent<TMessage, TComplete> source, Func<TMessage, bool> predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, true, defaultValue, predicate, cancellationToken);

// Result variation

public static Task<TMessage> FirstAsync<TMessage, TComplete>(this CompletableEvent<TMessage, Result<TComplete>> source, CancellationToken cancellationToken = default) => FirstAsync(source, static _ => true, cancellationToken);
public static Task<TMessage> FirstOrDefaultAsync<TMessage, TComplete>(this CompletableEvent<TMessage, Result<TComplete>> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken);
public static Task<TMessage> LastAsync<TMessage, TComplete>(this CompletableEvent<TMessage, Result<TComplete>> source, CancellationToken cancellationToken = default) => LastAsync(source, static _ => true, cancellationToken);
public static Task<TMessage> LastOrDefaultAsync<TMessage, TComplete>(this CompletableEvent<TMessage, Result<TComplete>> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => LastOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken);
public static Task<TMessage> SingleAsync<TMessage, TComplete>(this CompletableEvent<TMessage, Result<TComplete>> source, CancellationToken cancellationToken = default) => SingleAsync(source, static _ => true, cancellationToken);
public static Task<TMessage> SingleOrDefaultAsync<TMessage, TComplete>(this CompletableEvent<TMessage, Result<TComplete>> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => SingleOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken);
public static Task<TMessage> FirstAsync<TMessage, TComplete>(this CompletableEvent<TMessage, Result<TComplete>> source, Func<TMessage, bool> predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, false, default, predicate, cancellationToken);
public static Task<TMessage> FirstOrDefaultAsync<TMessage, TComplete>(this CompletableEvent<TMessage, Result<TComplete>> source, Func<TMessage, bool> predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, true, defaultValue, predicate, cancellationToken);
public static Task<TMessage> LastAsync<TMessage, TComplete>(this CompletableEvent<TMessage, Result<TComplete>> source, Func<TMessage, bool> predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, false, default, predicate, cancellationToken);
public static Task<TMessage> LastOrDefaultAsync<TMessage, TComplete>(this CompletableEvent<TMessage, Result<TComplete>> source, Func<TMessage, bool> predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, true, defaultValue, predicate, cancellationToken);
public static Task<TMessage> SingleAsync<TMessage, TComplete>(this CompletableEvent<TMessage, Result<TComplete>> source, Func<TMessage, bool> predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, false, default, predicate, cancellationToken);
public static Task<TMessage> SingleOrDefaultAsync<TMessage, TComplete>(this CompletableEvent<TMessage, Result<TComplete>> source, Func<TMessage, bool> predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, true, defaultValue, predicate, cancellationToken);


// no complete, only use First
public static Task<TMessage> FirstAsync<TMessage>(this Event<TMessage> source, CancellationToken cancellationToken = default) => FirstAsync(source, static _ => true, cancellationToken);

Expand All @@ -36,6 +54,13 @@ static Task<TMessage> FirstLastSingleAsync<TMessage, TComplete>(this Completable
source.Subscribe(subscriber);
return subscriber.Task;
}

static Task<TMessage> FirstLastSingleAsync<TMessage, TComplete>(this CompletableEvent<TMessage, Result<TComplete>> source, FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func<TMessage, bool> predicate, CancellationToken cancellationToken)
{
var subscriber = new FirstLastSingleR<TMessage, TComplete>(operation, useDefaultIfEmpty, defaultValue, predicate, cancellationToken);
source.Subscribe(subscriber);
return subscriber.Task;
}
}
}

Expand Down Expand Up @@ -102,6 +127,56 @@ protected override void OnCompletedCore(TComplete complete)
}
}

internal sealed class FirstLastSingleR<TMessage, TComplete>(FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func<TMessage, bool> predicate, CancellationToken cancellationToken)
: TaskSubscriberBase<TMessage, Result<TComplete>, TMessage>(cancellationToken)
{
bool hasValue;
TMessage? latestValue = defaultValue;

protected override void OnNextCore(TMessage message)
{
if (!predicate(message)) return;

if (operation == FirstLastSingleOperation.Single && hasValue)
{
tcs.TrySetException(new InvalidOperationException("Sequence contains more than one element."));
}

hasValue = true;
if (operation == FirstLastSingleOperation.First)
{
tcs.TrySetResult(message); // First / FirstOrDefault
Dispose();
}
else
{
latestValue = message;
}
}

protected override void OnErrorResumeCore(Exception error)
{
tcs.TrySetException(error);
}

protected override void OnCompletedCore(Result<TComplete> complete)
{
if (complete.IsFailure)
{
tcs.TrySetException(complete.Exception);
return;
}

if (hasValue || useDefaultIfEmpty)
{
tcs.TrySetResult(latestValue!); // FirstOrDefault / Last / LastOrDefault / Single / SingleOrDefault
return;
}

tcs.TrySetException(new InvalidOperationException("Sequence contains no elements."));
}
}

internal enum FirstLastSingleOperation
{
First,
Expand Down
5 changes: 4 additions & 1 deletion src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ public static partial class EventExtensions

namespace R3.Operators
{

//internal sealed class ElementAtAsync<TMessage>(Event<TMessage> CancellationToken cancellationToken)

//{
//}

}
1 change: 1 addition & 0 deletions src/R3/Result.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace R3;
public static class Result
{
public static Result<T> Success<T>(T value) => new(value, null);
public static Result<Unit> Failure(Exception exception) => new(default, exception);
public static Result<T> Failure<T>(Exception exception) => new(default, exception);
}

Expand Down
18 changes: 17 additions & 1 deletion tests/R3.Tests/OperatorTests/FirstLastSingleTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace R3.Tests.OperatorTests;
using System.Threading.Tasks;

namespace R3.Tests.OperatorTests;

public class FirstLastSingleTest
{
Expand Down Expand Up @@ -221,4 +223,18 @@ public async Task SingleOrDefault()
publisher.PublishOnCompleted();
(await task4).Should().Be(99);
}

[Fact]
public async Task ErrorStream()
{
var publisher = new CompletablePublisher<int, Result<Unit>>();
var task = publisher.LastAsync();

publisher.PublishOnNext(10);
publisher.PublishOnNext(20);
publisher.PublishOnNext(30);
publisher.PublishOnCompleted(Result.Failure(new Exception("foo")));

await Assert.ThrowsAsync<Exception>(async () => await task);
}
}

0 comments on commit 31f78c6

Please sign in to comment.