Skip to content

Commit

Permalink
Aggregate, Count, LongCount, Min, Max, MinMax, Sum, Average, ToHashSet
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 11, 2023
1 parent 8869a87 commit 2992cb6
Show file tree
Hide file tree
Showing 9 changed files with 612 additions and 196 deletions.
1 change: 1 addition & 0 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@

var s = new System.Reactive.Subjects.Subject<string>();

Console.WriteLine($"Average: {Enumerable.Empty<int>().Average()}");



Expand Down
6 changes: 5 additions & 1 deletion src/R3/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ public void OnCompleted(TComplete complete)
{
OnCompletedCore(complete);
}
catch (Exception ex)
{
throw;
}
finally
{
Dispose();
Expand All @@ -188,7 +192,7 @@ public void Dispose()
DisposeCore(); // Dispose self
SourceSubscription.Dispose(); // Dispose attached parent
}

[StackTraceHidden, DebuggerStepThrough]
protected virtual void DisposeCore() { }
}
11 changes: 3 additions & 8 deletions src/R3/Factories/_EventFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,11 @@ public static partial class EventFactory
// TODO: Defer, DeferAsync, FromAsync, FromAsyncPattern, FromEvent, FromEventPattern, Start, Using, Create
// Timer, Interval, TimerFrame, IntervalFrame, ToObservable(ToEvent)

// TODO: Convert
// ToArray
// ToAsync
// ToDictionary
// ToHashSet
// ToEnumerable


// ToAsyncEnumerable?
// ToEvent
// ToEventPattern
// ToList
// ToLookup



Expand Down
153 changes: 153 additions & 0 deletions src/R3/Operators/AggregateAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
namespace R3
{
public static partial class EventExtensions
{
public static Task<TResult> AggregateAsync<TMessage, TComplete, TAccumulate, TResult>
(this CompletableEvent<TMessage, TComplete> source,
TAccumulate seed,
Func<TAccumulate, TMessage, TAccumulate> func,
Func<TAccumulate, TComplete, TResult> resultSelector,
CancellationToken cancellationToken = default)
{
var tcs = new TaskCompletionSource<TResult>();

var subscriber = new AggregateAsync<TMessage, TComplete, TAccumulate, TResult>(tcs, seed, func, resultSelector, cancellationToken);

// before Subscribe, register and set CancellationTokenRegistration
subscriber.tokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = ((AggregateAsync<TMessage, TComplete, TAccumulate, TResult>)state!);

s.Dispose(); // subscriber is subscription, dispose
s.tcs.TrySetCanceled(s.cancellationToken);
}, subscriber);

source.Subscribe(subscriber); // return subscriber self so ignore subscription

// when canceled, throws TaskCanceledException in here and subscription.Dispose() is called.
return tcs.Task;
}

public static Task<TResult> AggregateAsync<TMessage, TComplete, TAccumulate, TResult>
(this CompletableEvent<TMessage, Result<TComplete>> source,
TAccumulate seed,
Func<TAccumulate, TMessage, TAccumulate> func,
Func<TAccumulate, Result<TComplete>, TResult> resultSelector,
CancellationToken cancellationToken = default)
{
var tcs = new TaskCompletionSource<TResult>();

var subscriber = new AggregateAsyncR<TMessage, TComplete, TAccumulate, TResult>(tcs, seed, func, resultSelector, cancellationToken);

// before Subscribe, register and set CancellationTokenRegistration
subscriber.tokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = ((AggregateAsyncR<TMessage, TComplete, TAccumulate, TResult>)state!);

s.Dispose(); // subscriber is subscription, dispose
s.tcs.TrySetCanceled(s.cancellationToken);
}, subscriber);

source.Subscribe(subscriber); // return subscriber self so ignore subscription

// when canceled, throws TaskCanceledException in here and subscription.Dispose() is called.
return tcs.Task;
}
}
}

namespace R3.Operators
{
internal sealed class AggregateAsync<TMessage, TComplete, TAccumulate, TResult>(
TaskCompletionSource<TResult> tcs,
TAccumulate seed,
Func<TAccumulate, TMessage, TAccumulate> func,
Func<TAccumulate, TComplete, TResult> resultSelector,
CancellationToken cancellationToken) : Subscriber<TMessage, TComplete>
{
// hold state for CancellationToken.Register
internal TaskCompletionSource<TResult> tcs = tcs;
internal CancellationToken cancellationToken = cancellationToken;
internal CancellationTokenRegistration tokenRegistration;

TAccumulate value = seed;

protected override void OnNextCore(TMessage message)
{
value = func(value, message); // OnNext error is route to OnErrorResumeCore
}

protected override void OnErrorResumeCore(Exception error)
{
tcs.TrySetException(error);
}

protected override void OnCompletedCore(TComplete complete)
{
try
{
var result = resultSelector(value, complete);
tcs.TrySetResult(result);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}

protected override void DisposeCore()
{
tokenRegistration.Dispose();
}
}

internal sealed class AggregateAsyncR<TMessage, TComplete, TAccumulate, TResult>(
TaskCompletionSource<TResult> tcs,
TAccumulate seed,
Func<TAccumulate, TMessage, TAccumulate> func,
Func<TAccumulate, Result<TComplete>, TResult> resultSelector,
CancellationToken cancellationToken) : Subscriber<TMessage, Result<TComplete>>
{
// hold state for CancellationToken.Register
internal TaskCompletionSource<TResult> tcs = tcs;
internal CancellationToken cancellationToken = cancellationToken;
internal CancellationTokenRegistration tokenRegistration;

TAccumulate value = seed;

protected override void OnNextCore(TMessage message)
{
value = func(value, message);
}

protected override void OnErrorResumeCore(Exception error)
{
tcs.TrySetException(error);
}

protected override void OnCompletedCore(Result<TComplete> complete)
{
try
{
var result = resultSelector(value, complete);
if (complete.IsSuccess)
{
tcs.TrySetResult(result);
}
else
{
tcs.TrySetException(complete.Exception);
}
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
}

protected override void DisposeCore()
{
tokenRegistration.Dispose();
}
}
}
Loading

0 comments on commit 2992cb6

Please sign in to comment.