diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index 78643968..46e3b652 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -26,52 +26,21 @@ EventSystem.DefaultFrameProvider = new ThreadSleepFrameProvider(60); -//var t = new Thread(() => -//{ -// while (true) -// { -// Console.WriteLine("loop"); Thread.Sleep(60); -// } -//}); -//t.IsBackground = true; -//t.Start(); +// Enumerable.Empty().ElementAtOrDefault( -//var s = new NewThreadScheduler(_ => new Thread(() => { while (true) { Console.WriteLine("loop"); Thread.Sleep(60); } })); +var i = Enumerable.Range(4, 10).ElementAtOrDefault(^0); +Console.WriteLine(i); -//s.Schedule(() => Console.WriteLine("do once")); -//using var f = new ThreadSleepFrameProvider(60); -var source = Event.EveryUpdate(ct.Token); - - -source.DoOnDisposed(() => { Console.WriteLine("DISPOSED"); }).WriteLine(); - -SubscriptionTracker.ForEachActiveTask(x => -{ - Console.WriteLine(x); -}); - - - -Console.WriteLine("BeforeId:" + Thread.CurrentThread.ManagedThreadId); - -await source.WaitAsync(); -Console.WriteLine("Press Key to done."); - - -await Task.Yield(); - -Console.ReadLine(); - - -SubscriptionTracker.ForEachActiveTask(x => +IEnumerable Range(int count) { - Console.WriteLine(x); -}); - -Console.WriteLine("----------------"); -Console.WriteLine("AfterId:" + Thread.CurrentThread.ManagedThreadId); + for (int i = 0; i < count; i++) + { + Console.WriteLine(i); + yield return i; + } +} public static class Extensions @@ -92,3 +61,4 @@ public void Dispose() CalledCount += 1; } } + diff --git a/src/R3/Factories/Range.cs b/src/R3/Factories/Range.cs index 74ac043e..37cdd4c1 100644 --- a/src/R3/Factories/Range.cs +++ b/src/R3/Factories/Range.cs @@ -58,11 +58,12 @@ protected override IDisposable SubscribeCore(Subscriber subscriber) { if (cancellationToken.IsCancellationRequested) { + subscriber.OnCompleted(); return Disposable.Empty; } subscriber.OnNext(start + i); } - subscriber.OnCompleted(default); + subscriber.OnCompleted(); return Disposable.Empty; } } diff --git a/src/R3/Factories/Repeat.cs b/src/R3/Factories/Repeat.cs index 7968a386..ff7b0104 100644 --- a/src/R3/Factories/Repeat.cs +++ b/src/R3/Factories/Repeat.cs @@ -57,6 +57,7 @@ protected override IDisposable SubscribeCore(Subscriber subscriber) { if (cancellationToken.IsCancellationRequested) { + subscriber.OnCompleted(); return Disposable.Empty; } subscriber.OnNext(value); diff --git a/src/R3/Operators/ElementAtAsync.cs b/src/R3/Operators/ElementAtAsync.cs new file mode 100644 index 00000000..8c23f7e1 --- /dev/null +++ b/src/R3/Operators/ElementAtAsync.cs @@ -0,0 +1,134 @@ +namespace R3; + +public static partial class EventExtensions +{ + public static Task ElementAtAsync(this Event source, int index, CancellationToken cancellationToken = default) + { + if (index < 0) throw new ArgumentOutOfRangeException("index"); + + var subscriber = new ElementAtAsync(index, false, default, cancellationToken); + source.Subscribe(subscriber); + return subscriber.Task; + } + + public static Task ElementAtAsync(this Event source, Index index, CancellationToken cancellationToken = default) + { + if (index.IsFromEnd) + { + if (index.Value <= 0) throw new ArgumentOutOfRangeException("index"); + var subscriber = new ElementAtFromEndAsync(index.Value, false, default, cancellationToken); + source.Subscribe(subscriber); + return subscriber.Task; + } + else + { + return ElementAtAsync(source, index.Value, cancellationToken); + } + } + + public static Task ElementAtOrDefaultAsync(this Event source, int index, T? defaultValue = default, CancellationToken cancellationToken = default) + { + if (index < 0) throw new ArgumentOutOfRangeException("index"); + var subscriber = new ElementAtAsync(index, true, defaultValue, cancellationToken); + source.Subscribe(subscriber); + return subscriber.Task; + } + + public static Task ElementAtOrDefaultAsync(this Event source, Index index, T? defaultValue = default, CancellationToken cancellationToken = default) + { + if (index.IsFromEnd) + { + var subscriber = new ElementAtFromEndAsync(index.Value, true, defaultValue, cancellationToken); + source.Subscribe(subscriber); + return subscriber.Task; + } + else + { + return ElementAtOrDefaultAsync(source, index.Value, defaultValue, cancellationToken); + } + } +} + +internal sealed class ElementAtAsync(int index, bool useDefaultValue, T? defaultValue, CancellationToken cancellationToken) + : TaskSubscriberBase(cancellationToken) +{ + int count = 0; + + protected override void OnNextCore(T value) + { + if (count++ == index) + { + TrySetResult(value); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + TrySetException(error); + } + + protected override void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + TrySetException(result.Exception); + } + else + { + if (useDefaultValue) + { + TrySetResult(defaultValue!); + } + else + { + TrySetException(new ArgumentOutOfRangeException("index")); + } + } + } +} + +// Index.IsFromEnd +internal sealed class ElementAtFromEndAsync(int fromEndIndex, bool useDefaultValue, T? defaultValue, CancellationToken cancellationToken) + : TaskSubscriberBase(cancellationToken) +{ + Queue queue = new Queue(fromEndIndex); + + protected override void OnNextCore(T value) + { + if (queue.Count == fromEndIndex && queue.Count != 0) + { + queue.Dequeue(); + } + + queue.Enqueue(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + TrySetException(error); + } + + protected override void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + TrySetException(result.Exception); + return; + } + + if (queue.Count == fromEndIndex) + { + var value = queue.Dequeue(); + TrySetResult(value); + return; + } + + if (useDefaultValue) + { + TrySetResult(defaultValue!); + return; + } + + TrySetException(new ArgumentOutOfRangeException("index")); + } +} diff --git a/src/R3/Operators/_Operators.cs b/src/R3/Operators/_Operators.cs index 98d3fadd..e7fbb49c 100644 --- a/src/R3/Operators/_Operators.cs +++ b/src/R3/Operators/_Operators.cs @@ -22,90 +22,3 @@ public static partial class EventExtensions // return tasks: // All, Any, Contains, SequenceEqual, ElementAt, ElementAtOrDefault, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup, ForEachAsync } - -// TODO: now working - - - -internal sealed class ElementAtAsync(int index, bool useDefaultValue, T? defaultValue, CancellationToken cancellationToken) - : TaskSubscriberBase(cancellationToken) -{ - int count = 0; - bool hasValue; - - protected override void OnNextCore(T value) - { - hasValue = true; - if (count++ == index) - { - TrySetResult(value); - } - } - - protected override void OnErrorResumeCore(Exception error) - { - TrySetException(error); - } - - protected override void OnCompletedCore(Result result) - { - if (result.IsFailure) - { - // TODO:... - } - else - { - // TODO:... - } - } -} - -// Index.IsFromEnd -internal sealed class ElementAtFromEndAsync(int fromEndIndex, bool useDefaultValue, T? defaultValue, CancellationToken cancellationToken) - : TaskSubscriberBase(cancellationToken) -{ - bool hasValue; - - Queue queue = new Queue(fromEndIndex); - - protected override void OnNextCore(T value) - { - hasValue = true; - if (queue.Count == fromEndIndex) - { - queue.Dequeue(); - } - - queue.Enqueue(value); - } - - protected override void OnErrorResumeCore(Exception error) - { - TrySetException(error); - } - - protected override void OnCompletedCore(Result result) - { - if (queue.Count == fromEndIndex) - { - var value = queue.Dequeue(); - TrySetResult(value); - return; - } - - if (useDefaultValue) - { - TrySetResult(defaultValue!); - return; - } - - if (!hasValue) - { - TrySetException(new InvalidOperationException("Sequence contains no elements")); - } - else - { - TrySetException(new ArgumentOutOfRangeException("index")); - } - } -} diff --git a/tests/R3.Tests/OperatorTests/ElementAtTest.cs b/tests/R3.Tests/OperatorTests/ElementAtTest.cs new file mode 100644 index 00000000..92ddf7b6 --- /dev/null +++ b/tests/R3.Tests/OperatorTests/ElementAtTest.cs @@ -0,0 +1,90 @@ +namespace R3.Tests.OperatorTests; + +public class ElementAtTest +{ + [Fact] + public async Task ElementAt() + { + var xs = Event.Range(0, 10); + (await xs.ElementAtAsync(0)).Should().Be(0); + (await xs.ElementAtAsync(5)).Should().Be(5); + (await xs.ElementAtAsync(9)).Should().Be(9); + + await Assert.ThrowsAsync(async () => await xs.ElementAtAsync(-1)); + await Assert.ThrowsAsync(async () => await xs.ElementAtAsync(10)); + + await Assert.ThrowsAsync(async () => await Event.Throw(new Exception()).ElementAtAsync(0)); + + // empty case + await Assert.ThrowsAsync(async () => await Event.Empty().ElementAtAsync(0)); + } + + [Fact] + public async Task ElementAtOrDefault() + { + var xs = Event.Range(0, 10); + (await xs.ElementAtOrDefaultAsync(0)).Should().Be(0); + (await xs.ElementAtOrDefaultAsync(5)).Should().Be(5); + (await xs.ElementAtOrDefaultAsync(9)).Should().Be(9); + + await Assert.ThrowsAsync(async () => await xs.ElementAtOrDefaultAsync(-1)); + (await xs.ElementAtOrDefaultAsync(10, 9999)).Should().Be(9999); + + await Assert.ThrowsAsync(async () => await Event.Throw(new Exception()).ElementAtOrDefaultAsync(0)); + + // empty case + (await Event.Empty().ElementAtOrDefaultAsync(0, 99999)).Should().Be(99999); + } + + [Fact] + public async Task ElementAtIndex() + { + var xs = Event.Range(0, 10); + (await xs.ElementAtAsync(new Index(0))).Should().Be(0); + (await xs.ElementAtAsync(new Index(5))).Should().Be(5); + (await xs.ElementAtAsync(new Index(9))).Should().Be(9); + + await Assert.ThrowsAsync(async () => await xs.ElementAtAsync(new Index(10))); + + await Assert.ThrowsAsync(async () => await Event.Throw(new Exception()).ElementAtAsync(new Index(0))); + + // empty case + await Assert.ThrowsAsync(async () => await Event.Empty().ElementAtAsync(new Index(0))); + + // more + (await xs.ElementAtAsync(^1)).Should().Be(9); + (await xs.ElementAtAsync(^4)).Should().Be(6); + (await xs.ElementAtAsync(^10)).Should().Be(0); + + await Assert.ThrowsAsync(async () => await xs.ElementAtAsync(^0)); + await Assert.ThrowsAsync(async () => await xs.ElementAtAsync(^11)); + await Assert.ThrowsAsync(async () => await Event.Throw(new Exception()).ElementAtAsync(^1)); + await Assert.ThrowsAsync(async () => await Event.Empty().ElementAtAsync(^1)); + } + + [Fact] + public async Task ElementAtOrDefaultIndex() + { + var xs = Event.Range(0, 10); + (await xs.ElementAtOrDefaultAsync(new Index(0))).Should().Be(0); + (await xs.ElementAtOrDefaultAsync(new Index(5))).Should().Be(5); + (await xs.ElementAtOrDefaultAsync(new Index(9))).Should().Be(9); + + (await xs.ElementAtOrDefaultAsync(new Index(10))).Should().Be(0); + + await Assert.ThrowsAsync(async () => await Event.Throw(new Exception()).ElementAtOrDefaultAsync(new Index(0))); + + // empty case + (await Event.Empty().ElementAtOrDefaultAsync(new Index(0))).Should().Be(0); + + // more + (await xs.ElementAtOrDefaultAsync(^1)).Should().Be(9); + (await xs.ElementAtOrDefaultAsync(^4)).Should().Be(6); + (await xs.ElementAtOrDefaultAsync(^10)).Should().Be(0); + + (await xs.ElementAtOrDefaultAsync(^0, 9999)).Should().Be(9999); + (await xs.ElementAtOrDefaultAsync(^11, 9999)).Should().Be(9999); + await Assert.ThrowsAsync(async () => await Event.Throw(new Exception()).ElementAtOrDefaultAsync(^1)); + (await Event.Empty().ElementAtOrDefaultAsync(^1, 9999)).Should().Be(9999); + } +}