diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index 9b75e321..3a930eaa 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -84,6 +84,7 @@ var s = new System.Reactive.Subjects.Subject(); +Console.WriteLine($"Average: {Enumerable.Empty().Average()}"); diff --git a/src/R3/Event.cs b/src/R3/Event.cs index c74ae015..e22e812a 100644 --- a/src/R3/Event.cs +++ b/src/R3/Event.cs @@ -169,6 +169,10 @@ public void OnCompleted(TComplete complete) { OnCompletedCore(complete); } + catch (Exception ex) + { + throw; + } finally { Dispose(); @@ -188,7 +192,7 @@ public void Dispose() DisposeCore(); // Dispose self SourceSubscription.Dispose(); // Dispose attached parent } - + [StackTraceHidden, DebuggerStepThrough] protected virtual void DisposeCore() { } } diff --git a/src/R3/Factories/_EventFactory.cs b/src/R3/Factories/_EventFactory.cs index 2ef87e47..0bd82e12 100644 --- a/src/R3/Factories/_EventFactory.cs +++ b/src/R3/Factories/_EventFactory.cs @@ -10,16 +10,11 @@ public static partial class EventFactory // TODO: Defer, DeferAsync, FromAsync, FromAsyncPattern, FromEvent, FromEventPattern, Start, Using, Create // Timer, Interval, TimerFrame, IntervalFrame, ToObservable(ToEvent) - // TODO: Convert - // ToArray - // ToAsync - // ToDictionary - // ToHashSet - // ToEnumerable + + + // ToAsyncEnumerable? // ToEvent // ToEventPattern - // ToList - // ToLookup diff --git a/src/R3/Operators/AggregateAsync.cs b/src/R3/Operators/AggregateAsync.cs new file mode 100644 index 00000000..98dc8e15 --- /dev/null +++ b/src/R3/Operators/AggregateAsync.cs @@ -0,0 +1,153 @@ +namespace R3 +{ + public static partial class EventExtensions + { + public static Task AggregateAsync + (this CompletableEvent source, + TAccumulate seed, + Func func, + Func resultSelector, + CancellationToken cancellationToken = default) + { + var tcs = new TaskCompletionSource(); + + var subscriber = new AggregateAsync(tcs, seed, func, resultSelector, cancellationToken); + + // before Subscribe, register and set CancellationTokenRegistration + subscriber.tokenRegistration = cancellationToken.UnsafeRegister(static state => + { + var s = ((AggregateAsync)state!); + + s.Dispose(); // subscriber is subscription, dispose + s.tcs.TrySetCanceled(s.cancellationToken); + }, subscriber); + + source.Subscribe(subscriber); // return subscriber self so ignore subscription + + // when canceled, throws TaskCanceledException in here and subscription.Dispose() is called. + return tcs.Task; + } + + public static Task AggregateAsync + (this CompletableEvent> source, + TAccumulate seed, + Func func, + Func, TResult> resultSelector, + CancellationToken cancellationToken = default) + { + var tcs = new TaskCompletionSource(); + + var subscriber = new AggregateAsyncR(tcs, seed, func, resultSelector, cancellationToken); + + // before Subscribe, register and set CancellationTokenRegistration + subscriber.tokenRegistration = cancellationToken.UnsafeRegister(static state => + { + var s = ((AggregateAsyncR)state!); + + s.Dispose(); // subscriber is subscription, dispose + s.tcs.TrySetCanceled(s.cancellationToken); + }, subscriber); + + source.Subscribe(subscriber); // return subscriber self so ignore subscription + + // when canceled, throws TaskCanceledException in here and subscription.Dispose() is called. + return tcs.Task; + } + } +} + +namespace R3.Operators +{ + internal sealed class AggregateAsync( + TaskCompletionSource tcs, + TAccumulate seed, + Func func, + Func resultSelector, + CancellationToken cancellationToken) : Subscriber + { + // hold state for CancellationToken.Register + internal TaskCompletionSource tcs = tcs; + internal CancellationToken cancellationToken = cancellationToken; + internal CancellationTokenRegistration tokenRegistration; + + TAccumulate value = seed; + + protected override void OnNextCore(TMessage message) + { + value = func(value, message); // OnNext error is route to OnErrorResumeCore + } + + protected override void OnErrorResumeCore(Exception error) + { + tcs.TrySetException(error); + } + + protected override void OnCompletedCore(TComplete complete) + { + try + { + var result = resultSelector(value, complete); + tcs.TrySetResult(result); + } + catch (Exception ex) + { + tcs.TrySetException(ex); + } + } + + protected override void DisposeCore() + { + tokenRegistration.Dispose(); + } + } + + internal sealed class AggregateAsyncR( + TaskCompletionSource tcs, + TAccumulate seed, + Func func, + Func, TResult> resultSelector, + CancellationToken cancellationToken) : Subscriber> + { + // hold state for CancellationToken.Register + internal TaskCompletionSource tcs = tcs; + internal CancellationToken cancellationToken = cancellationToken; + internal CancellationTokenRegistration tokenRegistration; + + TAccumulate value = seed; + + protected override void OnNextCore(TMessage message) + { + value = func(value, message); + } + + protected override void OnErrorResumeCore(Exception error) + { + tcs.TrySetException(error); + } + + protected override void OnCompletedCore(Result complete) + { + try + { + var result = resultSelector(value, complete); + if (complete.IsSuccess) + { + tcs.TrySetResult(result); + } + else + { + tcs.TrySetException(complete.Exception); + } + } + catch (Exception ex) + { + tcs.TrySetException(ex); + } + } + + protected override void DisposeCore() + { + tokenRegistration.Dispose(); + } + } +} diff --git a/src/R3/Operators/AggregateOperators.cs b/src/R3/Operators/AggregateOperators.cs new file mode 100644 index 00000000..5b48feb1 --- /dev/null +++ b/src/R3/Operators/AggregateOperators.cs @@ -0,0 +1,239 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Numerics; +using System.Text; +using System.Threading.Tasks; + +namespace R3 +{ + // TODO: ToDictionary + // TODO: ToLookup + + public static partial class EventExtensions + { + public static Task ToArrayAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, new List(), static (list, message) => + { + list.Add(message); + return list; + }, (list, _) => list.ToArray(), cancellationToken); // ignore complete + } + + public static Task ToArrayAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, new List(), static (list, message) => + { + list.Add(message); + return list; + }, (list, _) => list.ToArray(), cancellationToken); // ignore complete + } + + public static Task> ToListAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, new List(), static (list, message) => + { + list.Add(message); + return list; + }, (list, _) => list, cancellationToken); // ignore complete + } + + public static Task> ToListAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, new List(), static (list, message) => + { + list.Add(message); + return list; + }, (list, _) => list, cancellationToken); // ignore complete + } + + public static Task> ToHashSetAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + { + return ToHashSetAsync(source, null, cancellationToken); + } + + public static Task> ToHashSetAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + { + return ToHashSetAsync(source, null, cancellationToken); + } + + public static Task> ToHashSetAsync(this CompletableEvent source, IEqualityComparer? equalityComparer, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, new HashSet(equalityComparer), static (value, message) => + { + value.Add(message); + return value; + }, (value, _) => value, cancellationToken); // ignore complete + } + + public static Task> ToHashSetAsync(this CompletableEvent> source, IEqualityComparer? equalityComparer, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, new HashSet(equalityComparer), static (value, message) => + { + value.Add(message); + return value; + }, (value, _) => value, cancellationToken); // ignore complete + } + + // CountAsync using AggregateAsync + public static Task CountAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, 0, static (count, _) => checked(count + 1), (count, _) => count, cancellationToken); // ignore complete + } + + // CountAsync Result variation + public static Task CountAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, 0, static (count, _) => checked(count + 1), (count, _) => count, cancellationToken); // ignore complete + } + + // LongCountAsync using AggregateAsync + public static Task LongCountAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, 0L, static (count, _) => checked(count + 1), (count, _) => count, cancellationToken); // ignore complete + } + + // LongCountAsync using AggregateAsync Result variation + public static Task LongCountAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, 0L, static (count, _) => checked(count + 1), (count, _) => count, cancellationToken); // ignore complete + } + + public static Task MinAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, (default(TMessage)!, hasValue: false), + static (min, message) => + { + if (!min.hasValue) return (message, true); // first + return Comparer.Default.Compare(min.Item1, message) < 0 ? (min.Item1, true) : (message, true); + }, + static (min, _) => + { + if (!min.hasValue) throw new InvalidOperationException("Sequence contains no elements"); + return min.Item1; + }, cancellationToken); + } + + public static Task MinAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, (default(TMessage)!, hasValue: false), + static (min, message) => + { + if (!min.hasValue) return (message, true); // first + return Comparer.Default.Compare(min.Item1, message) < 0 ? (min.Item1, true) : (message, true); + }, + static (min, _) => + { + if (!min.hasValue) throw new InvalidOperationException("Sequence contains no elements"); + return min.Item1; + }, cancellationToken); + } + + public static Task MaxAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, (default(TMessage)!, hasValue: false), + static (max, message) => + { + if (!max.hasValue) return (message, true); // first + return Comparer.Default.Compare(max.Item1, message) > 0 ? (max.Item1, true) : (message, true); + }, + static (max, _) => + { + if (!max.hasValue) throw new InvalidOperationException("Sequence contains no elements"); + return max.Item1; + }, cancellationToken); + } + + public static Task MaxAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, (default(TMessage)!, hasValue: false), + static (max, message) => + { + if (!max.hasValue) return (message, true); // first + return Comparer.Default.Compare(max.Item1, message) > 0 ? (max.Item1, true) : (message, true); + }, + static (max, _) => + { + if (!max.hasValue) throw new InvalidOperationException("Sequence contains no elements"); + return max.Item1; + }, cancellationToken); + } + + public static Task<(TMessage Min, TMessage Max)> MinMaxAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, + (min: default(TMessage)!, max: default(TMessage)!, hasValue: false), + static (minmax, message) => + { + if (!minmax.hasValue) return (message, message, true); // first + var min = Comparer.Default.Compare(minmax.min, message) < 0 ? minmax.min : message; + var max = Comparer.Default.Compare(minmax.max, message) > 0 ? minmax.max : message; + return (min, max, true); + }, + static (minmax, _) => + { + if (!minmax.hasValue) throw new InvalidOperationException("Sequence contains no elements"); + return (minmax.min, minmax.max); + }, cancellationToken); + } + + + public static Task<(TMessage Min, TMessage Max)> MinMaxAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + { + return AggregateAsync(source, + (min: default(TMessage)!, max: default(TMessage)!, hasValue: false), + static (minmax, message) => + { + if (!minmax.hasValue) return (message, message, true); // first + var min = Comparer.Default.Compare(minmax.min, message) < 0 ? minmax.min : message; + var max = Comparer.Default.Compare(minmax.max, message) > 0 ? minmax.max : message; + return (min, max, true); + }, + static (minmax, _) => + { + if (!minmax.hasValue) throw new InvalidOperationException("Sequence contains no elements"); + return (minmax.min, minmax.max); + }, cancellationToken); + } + + public static Task SumAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + where TMessage : IAdditionOperators + { + return AggregateAsync(source, default(TMessage)!, static (sum, message) => checked(sum + message), (sum, _) => sum, cancellationToken); // ignore complete + } + + public static Task SumAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + where TMessage : IAdditionOperators + { + return AggregateAsync(source, default(TMessage)!, static (sum, message) => checked(sum + message), (sum, _) => sum, cancellationToken); // ignore complete + } + + public static Task AverageAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + where TMessage : INumberBase + { + return AggregateAsync(source, + (sum: default(TMessage)!, count: 0, hasValue: false), + static (avg, message) => + { + return (checked(avg.sum + message), checked(avg.count + 1), true); // sum, count, hasValue + }, + static (avg, _) => + { + if (!avg.hasValue) throw new InvalidOperationException("Sequence contains no elements"); + return double.CreateChecked(avg.sum) / double.CreateChecked(avg.count); + }, + cancellationToken); + } + + public static Task AverageAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + where TMessage : INumberBase + { + return AggregateAsync(source, + (default(TMessage)!, 0), + static ((TMessage Sum, long Count) avg, TMessage message) => (avg.Sum + message, checked(avg.Count + 1)), + (avg, _) => double.CreateChecked(avg.Item1) / double.CreateChecked(avg.Item2), // ignore complete + cancellationToken); + } + } +} diff --git a/src/R3/Operators/CountAsync.cs b/src/R3/Operators/CountAsync.cs deleted file mode 100644 index f6bc1391..00000000 --- a/src/R3/Operators/CountAsync.cs +++ /dev/null @@ -1,92 +0,0 @@ - -namespace R3 -{ - public static partial class EventExtensions - { - public static async Task CountAsync(this CompletableEvent source, CancellationToken cancellationToken = default) - { - var tcs = new TaskCompletionSource(); - - using var subscription = source.Subscribe(new CountAsync(tcs)); - using var registration = cancellationToken.UnsafeRegister(static state => - { - ((TaskCompletionSource)state!).TrySetCanceled(); - }, tcs); - - // when canceled, throws TaskCanceledException in here and subscription.Dispose() is called. - return await tcs.Task.ConfigureAwait(false); - } - - public static async Task CountAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) - { - var tcs = new TaskCompletionSource(); - - using var subscription = source.Subscribe(new CountUAsync(tcs)); - using var registration = cancellationToken.UnsafeRegister(static state => - { - ((TaskCompletionSource)state!).TrySetCanceled(); - }, tcs); - - // when canceled, throws TaskCanceledException in here and subscription.Dispose() is called. - return await tcs.Task.ConfigureAwait(false); - } - } -} - -namespace R3.Operators -{ - internal sealed class CountAsync(TaskCompletionSource tcs) : Subscriber - { - int count; - - protected override void OnNextCore(TMessage message) - { - checked - { - count++; - } - } - - protected override void OnErrorResumeCore(Exception error) - { - tcs.TrySetException(error); - this.Dispose(); // stop subscription. - } - - protected override void OnCompletedCore(TComplete complete) - { - tcs.TrySetResult(count); - } - } - - internal sealed class CountUAsync(TaskCompletionSource tcs) : Subscriber> - { - int count; - - protected override void OnNextCore(TMessage message) - { - checked - { - count++; - } - } - - protected override void OnErrorResumeCore(Exception error) - { - tcs.TrySetException(error); - this.Dispose(); // stop subscription. - } - - protected override void OnCompletedCore(Result complete) - { - if (complete.IsSuccess) - { - tcs.TrySetResult(count); - } - else - { - tcs.TrySetException(complete.Exception); - } - } - } -} diff --git a/src/R3/Operators/Select.cs b/src/R3/Operators/Select.cs index aecd2357..24dcb2e6 100644 --- a/src/R3/Operators/Select.cs +++ b/src/R3/Operators/Select.cs @@ -13,6 +13,13 @@ public static Event Select(this Event sour return new Select(source, selector); } + public static CompletableEvent Select( + this CompletableEvent source, + Func messageSelector) + { + return new Select(source, messageSelector, static x => x); + } + public static CompletableEvent Select( this CompletableEvent source, Func messageSelector, @@ -45,7 +52,7 @@ protected override void OnErrorResumeCore(Exception error) } } } - + internal sealed class Select( CompletableEvent source, Func messageSelector, diff --git a/src/R3/Operators/ToListAsync.cs b/src/R3/Operators/ToListAsync.cs deleted file mode 100644 index ca5f04bc..00000000 --- a/src/R3/Operators/ToListAsync.cs +++ /dev/null @@ -1,94 +0,0 @@ -namespace R3 -{ - public static partial class EventExtensions - { - public static async Task ToArrayAsync(this CompletableEvent source, CancellationToken cancellationToken = default) - { - return (await source.ToListAsync(cancellationToken).ConfigureAwait(false)).ToArray(); - } - - public static async Task ToArrayAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) - { - return (await source.ToListAsync(cancellationToken).ConfigureAwait(false)).ToArray(); - } - - public static async Task> ToListAsync(this CompletableEvent source, CancellationToken cancellationToken = default) - { - var tcs = new TaskCompletionSource>(); - - using var subscription = source.Subscribe(new ToListAsync(tcs)); - using var registration = cancellationToken.UnsafeRegister(static state => - { - ((TaskCompletionSource>)state!).TrySetCanceled(); - }, tcs); - - // when canceled, throws TaskCanceledException in here and subscription.Dispose() is called. - return await tcs.Task.ConfigureAwait(false); - } - - public static async Task> ToListAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) - { - var tcs = new TaskCompletionSource>(); - - using var subscription = source.Subscribe(new ToListAsyncR(tcs)); - using var registration = cancellationToken.UnsafeRegister(static state => - { - ((TaskCompletionSource>)state!).TrySetCanceled(); - }, tcs); - - // when canceled, throws TaskCanceledException in here and subscription.Dispose() is called. - return await tcs.Task.ConfigureAwait(false); - } - } -} - - -namespace R3.Operators -{ - internal sealed class ToListAsync(TaskCompletionSource> tcs) : Subscriber - { - List list = new List(); - - protected override void OnNextCore(TMessage message) - { - list.Add(message); - } - - protected override void OnErrorResumeCore(Exception error) - { - EventSystem.GetUnhandledExceptionHandler().Invoke(error); - } - - protected override void OnCompletedCore(TComplete complete) - { - tcs.TrySetResult(list); // complete result is ignored. - } - } - - internal sealed class ToListAsyncR(TaskCompletionSource> tcs) : Subscriber> - { - List list = new List(); - - protected override void OnNextCore(TMessage message) - { - list.Add(message); - } - - protected override void OnErrorResumeCore(Exception error) - { - EventSystem.GetUnhandledExceptionHandler().Invoke(error); - } - - protected override void OnCompletedCore(Result complete) - { - if (complete.IsSuccess) - { - tcs.TrySetResult(list); // complete result is ignored. - } - else - { - tcs.TrySetException(complete.Exception); - } - } - } -} diff --git a/tests/R3.Tests/OperatorTests/AggregateTest.cs b/tests/R3.Tests/OperatorTests/AggregateTest.cs new file mode 100644 index 00000000..c4038d3f --- /dev/null +++ b/tests/R3.Tests/OperatorTests/AggregateTest.cs @@ -0,0 +1,203 @@ +using System.Reactive.Linq; + +namespace R3.Tests.OperatorTests; + +public class AggregateTest +{ + [Fact] + public async Task Aggreagte() + { + var publisher = new CompletablePublisher(); + + var listTask = publisher.AggregateAsync(new List(), (x, i) => { x.Add(i); return x; }, (x, _) => x); + + publisher.PublishOnNext(1); + publisher.PublishOnNext(2); + publisher.PublishOnNext(3); + publisher.PublishOnNext(4); + publisher.PublishOnNext(5); + + listTask.Status.Should().Be(TaskStatus.WaitingForActivation); + + publisher.PublishOnCompleted(Unit.Default); + + (await listTask).Should().Equal(1, 2, 3, 4, 5); + } + + [Fact] + public async Task ImmediateCompleted() + { + var range = EventFactory.Range(1, 5); + var listTask = range.AggregateAsync(new List(), (x, i) => { x.Add(i); return x; }, (x, _) => x); + (await listTask).Should().Equal(1, 2, 3, 4, 5); + } + + [Fact] + public async Task BeforeCanceled() + { + var cts = new CancellationTokenSource(); + cts.Cancel(); + + var publisher = new CompletablePublisher(); + var isDisposed = false; + + var listTask = publisher + .DoOnDisposed(() => isDisposed = true) + .AggregateAsync(new List(), (x, i) => { x.Add(i); return x; }, (x, _) => x, cts.Token); + + + isDisposed.Should().BeTrue(); + + await Assert.ThrowsAsync(async () => await listTask); + } + + // and Aggregate used operators + + [Fact] + public async Task ToHashSet() + { + var source = new int[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToEvent(); + var set = await source.ToHashSetAsync(); + + set.Should().BeEquivalentTo([1, 10, 3, 4, 6, 7]); + } + + [Fact] + public async Task Count() + { + var source = new int[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToEvent(); + var count = await source.CountAsync(); + + count.Should().Be(8); + + var count2 = await EventFactory.Empty().CountAsync(); + count2.Should().Be(0); + } + + [Fact] + public async Task LongCount() + { + var source = new int[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToEvent(); + var count = await source.LongCountAsync(); + + count.Should().Be(8); + + var count2 = await EventFactory.Empty().LongCountAsync(); + count2.Should().Be(0); + + var error = EventFactory.Throw(new Exception("foo")); + + await Assert.ThrowsAsync(async () => await error.LongCountAsync()); + } + + [Fact] + public async Task Min() + { + var source = new int[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToEvent(); + var min = await source.MinAsync(); + + min.Should().Be(1); + + (await EventFactory.Return(999).MinAsync()).Should().Be(999); + + var task = EventFactory.Empty().MinAsync(); + + await Assert.ThrowsAsync(async () => await task); + + var error = EventFactory.Range(1, 10).Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }).OnErrorAsComplete(); + await Assert.ThrowsAsync(async () => await error.MinAsync()); + } + + [Fact] + public async Task Max() + { + var source = new int[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToEvent(); + var min = await source.MaxAsync(); + + min.Should().Be(10); + + (await EventFactory.Return(999).MaxAsync()).Should().Be(999); + + var task = EventFactory.Empty().MaxAsync(); + + await Assert.ThrowsAsync(async () => await task); + + var error = EventFactory.Range(1, 10).Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }).OnErrorAsComplete(); + await Assert.ThrowsAsync(async () => await error.MaxAsync()); + } + + [Fact] + public async Task MinMax() + { + var source = new int[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToEvent(); + var minmax = await source.MinMaxAsync(); + + minmax.Min.Should().Be(1); + minmax.Max.Should().Be(10); + + var mm2 = await EventFactory.Return(999).MinMaxAsync(); + mm2.Min.Should().Be(999); + mm2.Max.Should().Be(999); + + var task = EventFactory.Empty().MaxAsync(); + + await Assert.ThrowsAsync(async () => await task); + + var error = EventFactory.Range(1, 10).Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }).OnErrorAsComplete(); + await Assert.ThrowsAsync(async () => await error.MinMaxAsync()); + } + + [Fact] + public async Task Sum() + { + var source = new int[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToEvent(); + var sum = await source.SumAsync(); + + sum.Should().Be(36); + + (await EventFactory.Return(999).SumAsync()).Should().Be(999); + + var task = EventFactory.Empty().SumAsync(); + (await task).Should().Be(0); + + var error = EventFactory.Range(1, 10).Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }).OnErrorAsComplete(); + await Assert.ThrowsAsync(async () => await error.MinAsync()); + } + + [Fact] + public async Task Avg() + { + var source = new int[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToEvent(); + var avg = await source.AverageAsync(); + + avg.Should().Be(new int[] { 1, 10, 1, 3, 4, 6, 7, 4 }.Average()); + + (await EventFactory.Return(999).AverageAsync()).Should().Be(999); + + var task = EventFactory.Empty().AverageAsync(); + await Assert.ThrowsAsync(async () => await task); + + var error = EventFactory.Range(1, 10).Select(x => + { + if (x == 3) throw new Exception("foo"); + return x; + }).OnErrorAsComplete(); + await Assert.ThrowsAsync(async () => await error.AverageAsync()); + } +}