From 31f78c64387d1e00861caa84a72ef68ecb0e6c83 Mon Sep 17 00:00:00 2001 From: neuecc Date: Tue, 12 Dec 2023 21:59:16 +0900 Subject: [PATCH] Result overload for First/Last/Single --- src/R3/FrameProvider.cs | 4 +- src/R3/Operators/AggregateAsync.cs | 4 +- src/R3/Operators/FirstLastSingle.cs | 75 +++++++++++++++++++ src/R3/Operators/_Operators.cs | 5 +- src/R3/Result.cs | 1 + .../OperatorTests/FirstLastSingleTest.cs | 18 ++++- 6 files changed, 100 insertions(+), 7 deletions(-) diff --git a/src/R3/FrameProvider.cs b/src/R3/FrameProvider.cs index 81710f79..d99e833e 100644 --- a/src/R3/FrameProvider.cs +++ b/src/R3/FrameProvider.cs @@ -1,6 +1,4 @@ -using R3.Internal; - -namespace R3; +namespace R3; public abstract class FrameProvider { diff --git a/src/R3/Operators/AggregateAsync.cs b/src/R3/Operators/AggregateAsync.cs index 3ec7711b..1907dc12 100644 --- a/src/R3/Operators/AggregateAsync.cs +++ b/src/R3/Operators/AggregateAsync.cs @@ -53,7 +53,7 @@ protected override void OnCompletedCore(TComplete complete) { try { - var result = resultSelector(value, complete); + var result = resultSelector(value, complete); // trap this resultSelector exception tcs.TrySetResult(result); } catch (Exception ex) @@ -85,7 +85,7 @@ protected override void OnCompletedCore(Result complete) { try { - var result = resultSelector(value, complete); + var result = resultSelector(value, complete); // trap this resultSelector exception if (complete.IsSuccess) { tcs.TrySetResult(result); diff --git a/src/R3/Operators/FirstLastSingle.cs b/src/R3/Operators/FirstLastSingle.cs index 15d12752..4392692c 100644 --- a/src/R3/Operators/FirstLastSingle.cs +++ b/src/R3/Operators/FirstLastSingle.cs @@ -4,6 +4,8 @@ public static partial class EventExtensions { + // Completable + public static Task FirstAsync(this CompletableEvent source, CancellationToken cancellationToken = default) => FirstAsync(source, static _ => true, cancellationToken); public static Task FirstOrDefaultAsync(this CompletableEvent source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); public static Task LastAsync(this CompletableEvent source, CancellationToken cancellationToken = default) => LastAsync(source, static _ => true, cancellationToken); @@ -20,6 +22,22 @@ public static partial class EventExtensions public static Task SingleAsync(this CompletableEvent source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, false, default, predicate, cancellationToken); public static Task SingleOrDefaultAsync(this CompletableEvent source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, true, defaultValue, predicate, cancellationToken); + // Result variation + + public static Task FirstAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) => FirstAsync(source, static _ => true, cancellationToken); + public static Task FirstOrDefaultAsync(this CompletableEvent> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); + public static Task LastAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) => LastAsync(source, static _ => true, cancellationToken); + public static Task LastOrDefaultAsync(this CompletableEvent> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => LastOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); + public static Task SingleAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) => SingleAsync(source, static _ => true, cancellationToken); + public static Task SingleOrDefaultAsync(this CompletableEvent> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => SingleOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); + public static Task FirstAsync(this CompletableEvent> source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, false, default, predicate, cancellationToken); + public static Task FirstOrDefaultAsync(this CompletableEvent> source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, true, defaultValue, predicate, cancellationToken); + public static Task LastAsync(this CompletableEvent> source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, false, default, predicate, cancellationToken); + public static Task LastOrDefaultAsync(this CompletableEvent> source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, true, defaultValue, predicate, cancellationToken); + public static Task SingleAsync(this CompletableEvent> source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, false, default, predicate, cancellationToken); + public static Task SingleOrDefaultAsync(this CompletableEvent> source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, true, defaultValue, predicate, cancellationToken); + + // no complete, only use First public static Task FirstAsync(this Event source, CancellationToken cancellationToken = default) => FirstAsync(source, static _ => true, cancellationToken); @@ -36,6 +54,13 @@ static Task FirstLastSingleAsync(this Completable source.Subscribe(subscriber); return subscriber.Task; } + + static Task FirstLastSingleAsync(this CompletableEvent> source, FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func predicate, CancellationToken cancellationToken) + { + var subscriber = new FirstLastSingleR(operation, useDefaultIfEmpty, defaultValue, predicate, cancellationToken); + source.Subscribe(subscriber); + return subscriber.Task; + } } } @@ -102,6 +127,56 @@ protected override void OnCompletedCore(TComplete complete) } } + internal sealed class FirstLastSingleR(FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func predicate, CancellationToken cancellationToken) + : TaskSubscriberBase, TMessage>(cancellationToken) + { + bool hasValue; + TMessage? latestValue = defaultValue; + + protected override void OnNextCore(TMessage message) + { + if (!predicate(message)) return; + + if (operation == FirstLastSingleOperation.Single && hasValue) + { + tcs.TrySetException(new InvalidOperationException("Sequence contains more than one element.")); + } + + hasValue = true; + if (operation == FirstLastSingleOperation.First) + { + tcs.TrySetResult(message); // First / FirstOrDefault + Dispose(); + } + else + { + latestValue = message; + } + } + + protected override void OnErrorResumeCore(Exception error) + { + tcs.TrySetException(error); + } + + protected override void OnCompletedCore(Result complete) + { + if (complete.IsFailure) + { + tcs.TrySetException(complete.Exception); + return; + } + + if (hasValue || useDefaultIfEmpty) + { + tcs.TrySetResult(latestValue!); // FirstOrDefault / Last / LastOrDefault / Single / SingleOrDefault + return; + } + + tcs.TrySetException(new InvalidOperationException("Sequence contains no elements.")); + } + } + internal enum FirstLastSingleOperation { First, diff --git a/src/R3/Operators/_Operators.cs b/src/R3/Operators/_Operators.cs index baec6861..1858f51a 100644 --- a/src/R3/Operators/_Operators.cs +++ b/src/R3/Operators/_Operators.cs @@ -26,6 +26,9 @@ public static partial class EventExtensions namespace R3.Operators { - + //internal sealed class ElementAtAsync(Event CancellationToken cancellationToken) + + //{ + //} } diff --git a/src/R3/Result.cs b/src/R3/Result.cs index 7a9816e2..ed1c6905 100644 --- a/src/R3/Result.cs +++ b/src/R3/Result.cs @@ -6,6 +6,7 @@ namespace R3; public static class Result { public static Result Success(T value) => new(value, null); + public static Result Failure(Exception exception) => new(default, exception); public static Result Failure(Exception exception) => new(default, exception); } diff --git a/tests/R3.Tests/OperatorTests/FirstLastSingleTest.cs b/tests/R3.Tests/OperatorTests/FirstLastSingleTest.cs index 5bde40d0..fe1af190 100644 --- a/tests/R3.Tests/OperatorTests/FirstLastSingleTest.cs +++ b/tests/R3.Tests/OperatorTests/FirstLastSingleTest.cs @@ -1,4 +1,6 @@ -namespace R3.Tests.OperatorTests; +using System.Threading.Tasks; + +namespace R3.Tests.OperatorTests; public class FirstLastSingleTest { @@ -221,4 +223,18 @@ public async Task SingleOrDefault() publisher.PublishOnCompleted(); (await task4).Should().Be(99); } + + [Fact] + public async Task ErrorStream() + { + var publisher = new CompletablePublisher>(); + var task = publisher.LastAsync(); + + publisher.PublishOnNext(10); + publisher.PublishOnNext(20); + publisher.PublishOnNext(30); + publisher.PublishOnCompleted(Result.Failure(new Exception("foo"))); + + await Assert.ThrowsAsync(async () => await task); + } }