diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index 46e3b652..becd8b5d 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -28,8 +28,18 @@ // Enumerable.Empty().ElementAtOrDefault( -var i = Enumerable.Range(4, 10).ElementAtOrDefault(^0); -Console.WriteLine(i); +var range = Observable.Range(1, 10); + +// range.TakeLast( + + +//var publisher = new Publisher(); +//publisher.PublishOnNext(1); + + +//var xs = await publisher.Take(TimeSpan.FromSeconds(5)); + + diff --git a/src/R3/Operators/Take.cs b/src/R3/Operators/Take.cs index 7db94d80..36c4da0b 100644 --- a/src/R3/Operators/Take.cs +++ b/src/R3/Operators/Take.cs @@ -4,8 +4,37 @@ public static partial class EventExtensions { public static Event Take(this Event source, int count) { + if (count == 0) + { + return Event.Empty(); + } + return new Take(source, count); } + + // TimeBased + + public static Event Take(this Event source, TimeSpan duration) + { + return Take(source, duration, EventSystem.DefaultTimeProvider); + } + + public static Event Take(this Event source, TimeSpan duration, TimeProvider timeProvider) + { + return new TakeTime(source, duration, timeProvider); + } + + // TakeFrame + + public static Event TakeFrame(this Event source, int frameCount) + { + return TakeFrame(source, frameCount, EventSystem.DefaultFrameProvider); + } + + public static Event TakeFrame(this Event source, int frameCount, FrameProvider frameProvider) + { + return new TakeFrame(source, frameCount, frameProvider); + } } internal sealed class Take(Event source, int count) : Event @@ -43,3 +72,124 @@ protected override void OnCompletedCore(Result result) } } } + +internal sealed class TakeTime(Event source, TimeSpan duration, TimeProvider timeProvider) : Event +{ + protected override IDisposable SubscribeCore(Subscriber subscriber) + { + return source.Subscribe(new _TakeTime(subscriber, duration, timeProvider)); + } + + sealed class _TakeTime : Subscriber, IDisposable + { + static readonly TimerCallback timerCallback = TimerStopped; + + readonly Subscriber subscriber; + readonly ITimer timer; + readonly object gate = new object(); + + public _TakeTime(Subscriber subscriber, TimeSpan duration, TimeProvider timeProvider) + { + this.subscriber = subscriber; + this.timer = timeProvider.CreateStoppedTimer(timerCallback, this); + timer.InvokeOnce(duration); + } + + protected override void OnNextCore(T value) + { + lock (gate) + { + subscriber.OnNext(value); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + subscriber.OnErrorResume(error); + } + } + + protected override void OnCompletedCore(Result result) + { + lock (gate) + { + subscriber.OnCompleted(result); + } + } + + static void TimerStopped(object? state) + { + var self = (_TakeTime)state!; + self.OnCompleted(); + } + + protected override void DisposeCore() + { + timer.Dispose(); + } + } +} + +internal sealed class TakeFrame(Event source, int frameCount, FrameProvider frameProvider) : Event +{ + protected override IDisposable SubscribeCore(Subscriber subscriber) + { + return source.Subscribe(new _TakeFrame(subscriber, frameCount, frameProvider)); + } + + sealed class _TakeFrame : Subscriber, IDisposable, IFrameRunnerWorkItem + { + readonly Subscriber subscriber; + long remaining; + readonly object gate = new object(); + + public _TakeFrame(Subscriber subscriber, int frameCount, FrameProvider frameProvider) + { + this.subscriber = subscriber; + this.remaining = frameProvider.GetFrameCount() + frameCount; + frameProvider.Register(this); + } + + protected override void OnNextCore(T value) + { + lock (gate) + { + subscriber.OnNext(value); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + subscriber.OnErrorResume(error); + } + } + + protected override void OnCompletedCore(Result result) + { + lock (gate) + { + subscriber.OnCompleted(result); + } + } + + bool IFrameRunnerWorkItem.MoveNext(long _) + { + if (this.IsDisposed) return false; + + if (remaining > 0) + { + remaining--; + return true; + } + else + { + OnCompleted(Result.Success); + return false; + } + } + } +} diff --git a/src/R3/Operators/TakeLast.cs b/src/R3/Operators/TakeLast.cs new file mode 100644 index 00000000..0188fc6b --- /dev/null +++ b/src/R3/Operators/TakeLast.cs @@ -0,0 +1,226 @@ +namespace R3; + +public static partial class EventExtensions +{ + public static Event TakeLast(this Event source, int count) + { + return new TakeLast(source, count); + } + + // TimeBased + + public static Event TakeLast(this Event source, TimeSpan duration) + { + return TakeLast(source, duration, EventSystem.DefaultTimeProvider); + } + + public static Event TakeLast(this Event source, TimeSpan duration, TimeProvider timeProvider) + { + return new TakeLastTime(source, duration, timeProvider); + } + + // TakeLastFrame + + public static Event TakeLastFrame(this Event source, int frameCount) + { + return TakeLastFrame(source, frameCount, EventSystem.DefaultFrameProvider); + } + + public static Event TakeLastFrame(this Event source, int frameCount, FrameProvider frameProvider) + { + return new TakeLastFrame(source, frameCount, frameProvider); + } +} + +internal sealed class TakeLast(Event source, int count) : Event +{ + protected override IDisposable SubscribeCore(Subscriber subscriber) + { + return source.Subscribe(new _TakeLast(subscriber, count)); + } + + sealed class _TakeLast(Subscriber subscriber, int count) : Subscriber, IDisposable + { + Queue queue = new Queue(count); + + protected override void OnNextCore(T value) + { + if (queue.Count == count && queue.Count != 0) + { + queue.Dequeue(); + } + queue.Enqueue(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + subscriber.OnCompleted(result); + return; + } + + foreach (var item in queue) + { + subscriber.OnNext(item); + } + subscriber.OnCompleted(); + } + + protected override void DisposeCore() + { + queue.Clear(); + } + } +} + +internal sealed class TakeLastTime(Event source, TimeSpan duration, TimeProvider timeProvider) : Event +{ + protected override IDisposable SubscribeCore(Subscriber subscriber) + { + return source.Subscribe(new _TakeLastTime(subscriber, duration, timeProvider)); + } + + sealed class _TakeLastTime : Subscriber, IDisposable + { + readonly Subscriber subscriber; + readonly object gate = new object(); + readonly Queue<(long timestamp, T value)> queue = new(); + readonly TimeSpan duration; + readonly TimeProvider timeProvider; + + public _TakeLastTime(Subscriber subscriber, TimeSpan duration, TimeProvider timeProvider) + { + this.subscriber = subscriber; + this.timeProvider = timeProvider; + this.duration = duration; + } + + protected override void OnNextCore(T value) + { + lock (gate) + { + var current = timeProvider.GetTimestamp(); + queue.Enqueue((current, value)); + Trim(current); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + subscriber.OnErrorResume(error); + } + } + + protected override void OnCompletedCore(Result result) + { + lock (gate) + { + if (result.IsFailure) + { + subscriber.OnCompleted(result); + return; + } + + Trim(timeProvider.GetTimestamp()); + foreach (var item in queue) + { + subscriber.OnNext(item.value); + } + subscriber.OnCompleted(); + } + } + + protected override void DisposeCore() + { + lock (gate) + { + queue.Clear(); + } + } + + void Trim(long currentTimestamp) + { + while (queue.Count > 0 && timeProvider.GetElapsedTime(queue.Peek().timestamp, currentTimestamp) > duration) + { + queue.Dequeue(); + } + } + } +} + +internal sealed class TakeLastFrame(Event source, int frameCount, FrameProvider frameProvider) : Event +{ + protected override IDisposable SubscribeCore(Subscriber subscriber) + { + return source.Subscribe(new _TakeLastFrame(subscriber, frameCount, frameProvider)); + } + + sealed class _TakeLastFrame : Subscriber, IDisposable + { + readonly Subscriber subscriber; + readonly object gate = new object(); + readonly Queue<(long frameCount, T value)> queue = new(); + readonly int frameCount; + readonly FrameProvider frameProvider; + + public _TakeLastFrame(Subscriber subscriber, int frameCount, FrameProvider frameProvider) + { + this.subscriber = subscriber; + this.frameCount = frameCount; + this.frameProvider = frameProvider; + } + + protected override void OnNextCore(T value) + { + lock (gate) + { + var current = frameProvider.GetFrameCount(); + queue.Enqueue((current, value)); + Trim(current); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + subscriber.OnErrorResume(error); + } + } + + protected override void OnCompletedCore(Result result) + { + lock (gate) + { + if (result.IsFailure) + { + subscriber.OnCompleted(result); + return; + } + + Trim(frameProvider.GetFrameCount()); + foreach (var item in queue) + { + subscriber.OnNext(item.value); + } + subscriber.OnCompleted(); + } + } + + void Trim(long currentFrameCount) + { + while (queue.Count > 0 && currentFrameCount - queue.Peek().frameCount > frameCount) + { + queue.Dequeue(); + } + } + } +} diff --git a/tests/R3.Tests/OperatorTests/TakeLastTest.cs b/tests/R3.Tests/OperatorTests/TakeLastTest.cs new file mode 100644 index 00000000..75d78c8e --- /dev/null +++ b/tests/R3.Tests/OperatorTests/TakeLastTest.cs @@ -0,0 +1,64 @@ +using System.Reactive.Linq; + +namespace R3.Tests.OperatorTests; + +public class TakeLastTest +{ + [Fact] + public async Task Take() + { + var xs = await Event.Range(1, 10).TakeLast(3).ToArrayAsync(); + xs.Should().Equal([8, 9, 10]); + } + + [Fact] + public void TakeTime() + { + var timeProvider = new FakeTimeProvider(); + + var publisher = new Publisher(); + var list = publisher.TakeLast(TimeSpan.FromSeconds(3), timeProvider).ToLiveList(); + + publisher.PublishOnNext(1); + publisher.PublishOnNext(10); + list.AssertEqual([]); + + timeProvider.Advance(TimeSpan.FromSeconds(2)); + publisher.PublishOnNext(100); + publisher.PublishOnNext(1000); + + timeProvider.Advance(TimeSpan.FromSeconds(2)); + publisher.PublishOnNext(2); + publisher.PublishOnNext(20); + + publisher.PublishOnCompleted(); + + list.AssertEqual([100, 1000, 2, 20]); + list.AssertIsCompleted(); + } + + [Fact] + public void TakeFrame2() + { + var frameProvider = new ManualFrameProvider(); + var cts = new CancellationTokenSource(); + + var list = Event.EveryUpdate(frameProvider, cts.Token, cancelImmediately: true) + .Select(x => frameProvider.GetFrameCount()) + .TakeLastFrame(3, frameProvider) + .ToLiveList(); + + frameProvider.Advance(3); // 0, 1, 2 + list.AssertEqual([]); + + frameProvider.Advance(2); // 3, 4 + frameProvider.Advance(1); // 5 + + cts.Cancel(); // stop and OnCompleted + + list.AssertEqual([3, 4, 5]); + + + list.AssertIsCompleted(); + } +} diff --git a/tests/R3.Tests/OperatorTests/TakeTest.cs b/tests/R3.Tests/OperatorTests/TakeTest.cs new file mode 100644 index 00000000..52aaa67d --- /dev/null +++ b/tests/R3.Tests/OperatorTests/TakeTest.cs @@ -0,0 +1,75 @@ +using System.Reactive.Linq; + +namespace R3.Tests.OperatorTests; + +public class TakeTest +{ + [Fact] + public async Task Take() + { + var xs = await Event.Range(1, 10).Take(3).ToArrayAsync(); + + xs.Should().Equal([1, 2, 3]); + + var timeProvider = new FakeTimeProvider(); + + var publisher = new Publisher(); + var list = publisher.Take(TimeSpan.FromSeconds(5), timeProvider).ToLiveList(); + + publisher.PublishOnNext(1); + publisher.PublishOnNext(10); + publisher.PublishOnNext(100); + list.AssertEqual([1, 10, 100]); + + timeProvider.Advance(TimeSpan.FromSeconds(3)); + + publisher.PublishOnNext(1000); + publisher.PublishOnNext(10000); + list.AssertEqual([1, 10, 100, 1000, 10000]); + + timeProvider.Advance(TimeSpan.FromSeconds(2)); + list.AssertIsCompleted(); + } + + [Fact] + public void TakeFrame() + { + var frameProvider = new ManualFrameProvider(); + + var publisher = new Publisher(); + var list = publisher.TakeFrame(5, frameProvider).ToLiveList(); + publisher.PublishOnNext(1); + publisher.PublishOnNext(10); + publisher.PublishOnNext(100); + list.AssertEqual([1, 10, 100]); + + frameProvider.Advance(3); + publisher.PublishOnNext(1000); + publisher.PublishOnNext(10000); + list.AssertEqual([1, 10, 100, 1000, 10000]); + frameProvider.Advance(2); + list.AssertIsNotCompleted(); + frameProvider.Advance(1); + list.AssertIsCompleted(); + } + + [Fact] + public void TakeFrame2() + { + var frameProvider = new ManualFrameProvider(); + + var list = Event.EveryUpdate(frameProvider) + .Select(x => (int)frameProvider.GetFrameCount()) + .TakeFrame(5, frameProvider) + .ToLiveList(); + + frameProvider.Advance(3); + list.AssertEqual([0, 1, 2]); + + frameProvider.Advance(2); + list.AssertEqual([0, 1, 2, 3, 4]); + frameProvider.Advance(1); + + list.AssertIsCompleted(); + } +}