diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index e66e40c8..864e3456 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -99,6 +99,9 @@ //d.Dispose(); +var iiii = Enumerable.Range(1, 10).ElementAt(^12); +Console.WriteLine(iiii); + // System.Reactive.Linq.Observable.Empty( diff --git a/src/R3/Internal/TaskSubscriberBase.cs b/src/R3/Internal/TaskSubscriberBase.cs index a723556b..e9e73adc 100644 --- a/src/R3/Internal/TaskSubscriberBase.cs +++ b/src/R3/Internal/TaskSubscriberBase.cs @@ -6,8 +6,7 @@ namespace R3.Internal; internal abstract class TaskSubscriberBase : Subscriber { - protected TaskCompletionSource tcs; // use this field. - + TaskCompletionSource tcs; CancellationToken cancellationToken; CancellationTokenRegistration tokenRegistration; @@ -20,6 +19,7 @@ public TaskSubscriberBase(CancellationToken cancellationToken) if (cancellationToken.CanBeCanceled) { + // register before call Subscribe this.tokenRegistration = cancellationToken.UnsafeRegister(static state => { var s = (TaskSubscriberBase)state!; @@ -35,11 +35,35 @@ protected override void DisposeCore() { tokenRegistration.Dispose(); } + + protected void TrySetResult(TTask result) + { + try + { + tcs.TrySetResult(result); + } + finally + { + Dispose(); + } + } + + protected void TrySetException(Exception exception) + { + try + { + tcs.TrySetException(exception); + } + finally + { + Dispose(); + } + } } internal abstract class TaskSubscriberBase : Subscriber { - protected TaskCompletionSource tcs; // use this field. + TaskCompletionSource tcs; // use this field. CancellationToken cancellationToken; CancellationTokenRegistration tokenRegistration; @@ -53,6 +77,7 @@ public TaskSubscriberBase(CancellationToken cancellationToken) if (cancellationToken.CanBeCanceled) { + // register before call Subscribe this.tokenRegistration = cancellationToken.UnsafeRegister(static state => { var s = (TaskSubscriberBase)state!; @@ -68,4 +93,28 @@ protected override void DisposeCore() { tokenRegistration.Dispose(); } + + protected void TrySetResult(TTask result) + { + try + { + tcs.TrySetResult(result); + } + finally + { + Dispose(); + } + } + + protected void TrySetException(Exception exception) + { + try + { + tcs.TrySetException(exception); + } + finally + { + Dispose(); + } + } } diff --git a/src/R3/Operators/AggregateAsync.cs b/src/R3/Operators/AggregateAsync.cs index 1907dc12..c33c9e04 100644 --- a/src/R3/Operators/AggregateAsync.cs +++ b/src/R3/Operators/AggregateAsync.cs @@ -46,7 +46,7 @@ protected override void OnNextCore(TMessage message) protected override void OnErrorResumeCore(Exception error) { - tcs.TrySetException(error); + TrySetException(error); } protected override void OnCompletedCore(TComplete complete) @@ -54,11 +54,11 @@ protected override void OnCompletedCore(TComplete complete) try { var result = resultSelector(value, complete); // trap this resultSelector exception - tcs.TrySetResult(result); + TrySetResult(result); } catch (Exception ex) { - tcs.TrySetException(ex); + TrySetException(ex); } } } @@ -78,7 +78,7 @@ protected override void OnNextCore(TMessage message) protected override void OnErrorResumeCore(Exception error) { - tcs.TrySetException(error); + TrySetException(error); } protected override void OnCompletedCore(Result complete) @@ -88,16 +88,16 @@ protected override void OnCompletedCore(Result complete) var result = resultSelector(value, complete); // trap this resultSelector exception if (complete.IsSuccess) { - tcs.TrySetResult(result); + TrySetResult(result); } else { - tcs.TrySetException(complete.Exception); + TrySetException(complete.Exception); } } catch (Exception ex) { - tcs.TrySetException(ex); + TrySetException(ex); } } } diff --git a/src/R3/Operators/FirstLastSingle.cs b/src/R3/Operators/FirstLastSingle.cs index 4392692c..747f76a6 100644 --- a/src/R3/Operators/FirstLastSingle.cs +++ b/src/R3/Operators/FirstLastSingle.cs @@ -73,13 +73,12 @@ protected override void OnNextCore(TMessage message) { if (!predicate(message)) return; - tcs.TrySetResult(message); // First - Dispose(); + TrySetResult(message); // First } protected override void OnErrorResumeCore(Exception error) { - tcs.TrySetException(error); + TrySetException(error); } } @@ -95,14 +94,14 @@ protected override void OnNextCore(TMessage message) if (operation == FirstLastSingleOperation.Single && hasValue) { - tcs.TrySetException(new InvalidOperationException("Sequence contains more than one element.")); + TrySetException(new InvalidOperationException("Sequence contains more than one element.")); + return; } hasValue = true; if (operation == FirstLastSingleOperation.First) { - tcs.TrySetResult(message); // First / FirstOrDefault - Dispose(); + TrySetResult(message); // First / FirstOrDefault } else { @@ -112,18 +111,18 @@ protected override void OnNextCore(TMessage message) protected override void OnErrorResumeCore(Exception error) { - tcs.TrySetException(error); + TrySetException(error); } protected override void OnCompletedCore(TComplete complete) { if (hasValue || useDefaultIfEmpty) { - tcs.TrySetResult(latestValue!); // FirstOrDefault / Last / LastOrDefault / Single / SingleOrDefault + TrySetResult(latestValue!); // FirstOrDefault / Last / LastOrDefault / Single / SingleOrDefault return; } - tcs.TrySetException(new InvalidOperationException("Sequence contains no elements.")); + TrySetException(new InvalidOperationException("Sequence contains no elements.")); } } @@ -139,14 +138,14 @@ protected override void OnNextCore(TMessage message) if (operation == FirstLastSingleOperation.Single && hasValue) { - tcs.TrySetException(new InvalidOperationException("Sequence contains more than one element.")); + TrySetException(new InvalidOperationException("Sequence contains more than one element.")); + return; } hasValue = true; if (operation == FirstLastSingleOperation.First) { - tcs.TrySetResult(message); // First / FirstOrDefault - Dispose(); + TrySetResult(message); // First / FirstOrDefault } else { @@ -156,24 +155,24 @@ protected override void OnNextCore(TMessage message) protected override void OnErrorResumeCore(Exception error) { - tcs.TrySetException(error); + TrySetException(error); } protected override void OnCompletedCore(Result complete) { if (complete.IsFailure) { - tcs.TrySetException(complete.Exception); + TrySetException(complete.Exception); return; } if (hasValue || useDefaultIfEmpty) { - tcs.TrySetResult(latestValue!); // FirstOrDefault / Last / LastOrDefault / Single / SingleOrDefault + TrySetResult(latestValue!); // FirstOrDefault / Last / LastOrDefault / Single / SingleOrDefault return; } - tcs.TrySetException(new InvalidOperationException("Sequence contains no elements.")); + TrySetException(new InvalidOperationException("Sequence contains no elements.")); } } diff --git a/src/R3/Operators/_Operators.cs b/src/R3/Operators/_Operators.cs index 1858f51a..d2676612 100644 --- a/src/R3/Operators/_Operators.cs +++ b/src/R3/Operators/_Operators.cs @@ -1,4 +1,7 @@ -namespace R3 + +using System.Diagnostics.CodeAnalysis; + +namespace R3 { public static partial class EventExtensions { @@ -26,9 +29,101 @@ public static partial class EventExtensions namespace R3.Operators { - //internal sealed class ElementAtAsync(Event CancellationToken cancellationToken) - - //{ - //} + // TODO: now working + + internal sealed class ElementAtAsync(Event source, int index, CancellationToken cancellationToken) + : TaskSubscriberBase(cancellationToken) + { + int count = 0; + + protected override void OnNextCore(TMessage message) + { + if (count++ == index) + { + TrySetResult(message); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + TrySetException(error); + } + } + internal sealed class ElementAtAsync(CompletableEvent source, int index, bool useDefaultValue, TMessage? defaultValue, CancellationToken cancellationToken) + : TaskSubscriberBase(cancellationToken) + { + int count = 0; + bool hasValue; + + protected override void OnNextCore(TMessage message) + { + hasValue = true; + if (count++ == index) + { + TrySetResult(message); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + TrySetException(error); + } + + protected override void OnCompletedCore(TComplete complete) + { + throw new NotImplementedException(); + } + } + + // Index.IsFromEnd + internal sealed class ElementAtFromEndAsync(CompletableEvent source, int fromEndIndex, bool useDefaultValue, TMessage? defaultValue, CancellationToken cancellationToken) + : TaskSubscriberBase(cancellationToken) + { + int count = 0; + bool hasValue; + + Queue queue = new Queue(fromEndIndex); + + protected override void OnNextCore(TMessage message) + { + hasValue = true; + if (queue.Count == fromEndIndex) + { + queue.Dequeue(); + } + + queue.Enqueue(message); + } + + protected override void OnErrorResumeCore(Exception error) + { + TrySetException(error); + } + + protected override void OnCompletedCore(TComplete complete) + { + if (queue.Count == fromEndIndex) + { + var result = queue.Dequeue(); + TrySetResult(result); + return; + } + + if (useDefaultValue) + { + TrySetResult(defaultValue!); + return; + } + + if (!hasValue) + { + TrySetException(new InvalidOperationException("Sequence contains no elements")); + } + else + { + TrySetException(new ArgumentOutOfRangeException("index")); + } + } + } }