Skip to content

Commit

Permalink
EAT
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 13, 2023
1 parent 31f78c6 commit e9802cf
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 31 deletions.
3 changes: 3 additions & 0 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@
//d.Dispose();


var iiii = Enumerable.Range(1, 10).ElementAt(^12);
Console.WriteLine(iiii);


// System.Reactive.Linq.Observable.Empty<int>(

Expand Down
55 changes: 52 additions & 3 deletions src/R3/Internal/TaskSubscriberBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ namespace R3.Internal;

internal abstract class TaskSubscriberBase<TMessage, TTask> : Subscriber<TMessage>
{
protected TaskCompletionSource<TTask> tcs; // use this field.

TaskCompletionSource<TTask> tcs;
CancellationToken cancellationToken;
CancellationTokenRegistration tokenRegistration;

Expand All @@ -20,6 +19,7 @@ public TaskSubscriberBase(CancellationToken cancellationToken)

if (cancellationToken.CanBeCanceled)
{
// register before call Subscribe
this.tokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = (TaskSubscriberBase<TMessage, TTask>)state!;
Expand All @@ -35,11 +35,35 @@ protected override void DisposeCore()
{
tokenRegistration.Dispose();
}

protected void TrySetResult(TTask result)
{
try
{
tcs.TrySetResult(result);
}
finally
{
Dispose();
}
}

protected void TrySetException(Exception exception)
{
try
{
tcs.TrySetException(exception);
}
finally
{
Dispose();
}
}
}

internal abstract class TaskSubscriberBase<TMessage, TComplete, TTask> : Subscriber<TMessage, TComplete>
{
protected TaskCompletionSource<TTask> tcs; // use this field.
TaskCompletionSource<TTask> tcs; // use this field.

CancellationToken cancellationToken;
CancellationTokenRegistration tokenRegistration;
Expand All @@ -53,6 +77,7 @@ public TaskSubscriberBase(CancellationToken cancellationToken)

if (cancellationToken.CanBeCanceled)
{
// register before call Subscribe
this.tokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = (TaskSubscriberBase<TMessage, TComplete, TTask>)state!;
Expand All @@ -68,4 +93,28 @@ protected override void DisposeCore()
{
tokenRegistration.Dispose();
}

protected void TrySetResult(TTask result)
{
try
{
tcs.TrySetResult(result);
}
finally
{
Dispose();
}
}

protected void TrySetException(Exception exception)
{
try
{
tcs.TrySetException(exception);
}
finally
{
Dispose();
}
}
}
14 changes: 7 additions & 7 deletions src/R3/Operators/AggregateAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,19 @@ protected override void OnNextCore(TMessage message)

protected override void OnErrorResumeCore(Exception error)
{
tcs.TrySetException(error);
TrySetException(error);
}

protected override void OnCompletedCore(TComplete complete)
{
try
{
var result = resultSelector(value, complete); // trap this resultSelector exception
tcs.TrySetResult(result);
TrySetResult(result);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
TrySetException(ex);
}
}
}
Expand All @@ -78,7 +78,7 @@ protected override void OnNextCore(TMessage message)

protected override void OnErrorResumeCore(Exception error)
{
tcs.TrySetException(error);
TrySetException(error);
}

protected override void OnCompletedCore(Result<TComplete> complete)
Expand All @@ -88,16 +88,16 @@ protected override void OnCompletedCore(Result<TComplete> complete)
var result = resultSelector(value, complete); // trap this resultSelector exception
if (complete.IsSuccess)
{
tcs.TrySetResult(result);
TrySetResult(result);
}
else
{
tcs.TrySetException(complete.Exception);
TrySetException(complete.Exception);
}
}
catch (Exception ex)
{
tcs.TrySetException(ex);
TrySetException(ex);
}
}
}
Expand Down
31 changes: 15 additions & 16 deletions src/R3/Operators/FirstLastSingle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,12 @@ protected override void OnNextCore(TMessage message)
{
if (!predicate(message)) return;

tcs.TrySetResult(message); // First
Dispose();
TrySetResult(message); // First
}

protected override void OnErrorResumeCore(Exception error)
{
tcs.TrySetException(error);
TrySetException(error);
}
}

Expand All @@ -95,14 +94,14 @@ protected override void OnNextCore(TMessage message)

if (operation == FirstLastSingleOperation.Single && hasValue)
{
tcs.TrySetException(new InvalidOperationException("Sequence contains more than one element."));
TrySetException(new InvalidOperationException("Sequence contains more than one element."));
return;
}

hasValue = true;
if (operation == FirstLastSingleOperation.First)
{
tcs.TrySetResult(message); // First / FirstOrDefault
Dispose();
TrySetResult(message); // First / FirstOrDefault
}
else
{
Expand All @@ -112,18 +111,18 @@ protected override void OnNextCore(TMessage message)

protected override void OnErrorResumeCore(Exception error)
{
tcs.TrySetException(error);
TrySetException(error);
}

protected override void OnCompletedCore(TComplete complete)
{
if (hasValue || useDefaultIfEmpty)
{
tcs.TrySetResult(latestValue!); // FirstOrDefault / Last / LastOrDefault / Single / SingleOrDefault
TrySetResult(latestValue!); // FirstOrDefault / Last / LastOrDefault / Single / SingleOrDefault
return;
}

tcs.TrySetException(new InvalidOperationException("Sequence contains no elements."));
TrySetException(new InvalidOperationException("Sequence contains no elements."));
}
}

Expand All @@ -139,14 +138,14 @@ protected override void OnNextCore(TMessage message)

if (operation == FirstLastSingleOperation.Single && hasValue)
{
tcs.TrySetException(new InvalidOperationException("Sequence contains more than one element."));
TrySetException(new InvalidOperationException("Sequence contains more than one element."));
return;
}

hasValue = true;
if (operation == FirstLastSingleOperation.First)
{
tcs.TrySetResult(message); // First / FirstOrDefault
Dispose();
TrySetResult(message); // First / FirstOrDefault
}
else
{
Expand All @@ -156,24 +155,24 @@ protected override void OnNextCore(TMessage message)

protected override void OnErrorResumeCore(Exception error)
{
tcs.TrySetException(error);
TrySetException(error);
}

protected override void OnCompletedCore(Result<TComplete> complete)
{
if (complete.IsFailure)
{
tcs.TrySetException(complete.Exception);
TrySetException(complete.Exception);
return;
}

if (hasValue || useDefaultIfEmpty)
{
tcs.TrySetResult(latestValue!); // FirstOrDefault / Last / LastOrDefault / Single / SingleOrDefault
TrySetResult(latestValue!); // FirstOrDefault / Last / LastOrDefault / Single / SingleOrDefault
return;
}

tcs.TrySetException(new InvalidOperationException("Sequence contains no elements."));
TrySetException(new InvalidOperationException("Sequence contains no elements."));
}
}

Expand Down
105 changes: 100 additions & 5 deletions src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
namespace R3

using System.Diagnostics.CodeAnalysis;

namespace R3
{
public static partial class EventExtensions
{
Expand Down Expand Up @@ -26,9 +29,101 @@ public static partial class EventExtensions

namespace R3.Operators
{
//internal sealed class ElementAtAsync<TMessage>(Event<TMessage> CancellationToken cancellationToken)

//{
//}
// TODO: now working

internal sealed class ElementAtAsync<TMessage>(Event<TMessage> source, int index, CancellationToken cancellationToken)
: TaskSubscriberBase<TMessage, TMessage>(cancellationToken)
{
int count = 0;

protected override void OnNextCore(TMessage message)
{
if (count++ == index)
{
TrySetResult(message);
}
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}
}

internal sealed class ElementAtAsync<TMessage, TComplete>(CompletableEvent<TMessage, TComplete> source, int index, bool useDefaultValue, TMessage? defaultValue, CancellationToken cancellationToken)

Check warning on line 53 in src/R3/Operators/_Operators.cs

View workflow job for this annotation

GitHub Actions / build-dotnet

Parameter 'source' is unread.

Check warning on line 53 in src/R3/Operators/_Operators.cs

View workflow job for this annotation

GitHub Actions / build-dotnet

Parameter 'useDefaultValue' is unread.
: TaskSubscriberBase<TMessage, TComplete, TMessage>(cancellationToken)
{
int count = 0;
bool hasValue;

protected override void OnNextCore(TMessage message)
{
hasValue = true;
if (count++ == index)
{
TrySetResult(message);
}
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(TComplete complete)
{
throw new NotImplementedException();
}
}

// Index.IsFromEnd
internal sealed class ElementAtFromEndAsync<TMessage, TComplete>(CompletableEvent<TMessage, TComplete> source, int fromEndIndex, bool useDefaultValue, TMessage? defaultValue, CancellationToken cancellationToken)
: TaskSubscriberBase<TMessage, TComplete, TMessage>(cancellationToken)
{
int count = 0;
bool hasValue;

Queue<TMessage> queue = new Queue<TMessage>(fromEndIndex);

protected override void OnNextCore(TMessage message)
{
hasValue = true;
if (queue.Count == fromEndIndex)
{
queue.Dequeue();
}

queue.Enqueue(message);
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(TComplete complete)
{
if (queue.Count == fromEndIndex)
{
var result = queue.Dequeue();
TrySetResult(result);
return;
}

if (useDefaultValue)
{
TrySetResult(defaultValue!);
return;
}

if (!hasValue)
{
TrySetException(new InvalidOperationException("Sequence contains no elements"));
}
else
{
TrySetException(new ArgumentOutOfRangeException("index"));
}
}
}
}

0 comments on commit e9802cf

Please sign in to comment.