From b7d78c7891f756b8c7713322a0761d12085da638 Mon Sep 17 00:00:00 2001 From: neuecc Date: Thu, 4 Jan 2024 01:37:15 +0900 Subject: [PATCH] DelaySubscription, DelaySubscriptionFrame --- src/R3/Operators/DelaySubscription.cs | 80 ++++++++++++++++++ src/R3/Operators/DelaySubscriptionFrame.cs | 81 +++++++++++++++++++ src/R3/Operators/_Operators.cs | 5 +- .../OperatorTests/DelaySubscriptionTest.cs | 70 ++++++++++++++++ 4 files changed, 232 insertions(+), 4 deletions(-) create mode 100644 src/R3/Operators/DelaySubscription.cs create mode 100644 src/R3/Operators/DelaySubscriptionFrame.cs create mode 100644 tests/R3.Tests/OperatorTests/DelaySubscriptionTest.cs diff --git a/src/R3/Operators/DelaySubscription.cs b/src/R3/Operators/DelaySubscription.cs new file mode 100644 index 00000000..efed0874 --- /dev/null +++ b/src/R3/Operators/DelaySubscription.cs @@ -0,0 +1,80 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + public static Observable DelaySubscription(this Observable source, TimeSpan dueTime) + { + return new DelaySubscription(source, dueTime, ObservableSystem.DefaultTimeProvider); + } + + public static Observable DelaySubscription(this Observable source, TimeSpan dueTime, TimeProvider timeProvider) + { + return new DelaySubscription(source, dueTime, timeProvider); + } +} + +internal sealed class DelaySubscription(Observable source, TimeSpan dueTime, TimeProvider timeProvider) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return new _DelaySubscription(observer, source, dueTime.Normalize(), timeProvider).Run(); + } + + sealed class _DelaySubscription : Observer + { + static readonly TimerCallback timerCallback = Subscribe; + + readonly Observer observer; + readonly Observable source; + readonly TimeSpan dueTime; + readonly ITimer timer; + + public _DelaySubscription(Observer observer, Observable source, TimeSpan dueTime, TimeProvider timeProvider) + { + this.observer = observer; + this.source = source; + this.dueTime = dueTime; + this.timer = timeProvider.CreateStoppedTimer(timerCallback, this); + } + + public IDisposable Run() + { + timer.InvokeOnce(dueTime); + return this; + } + + protected override void OnNextCore(T value) + { + observer.OnNext(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + + protected override void DisposeCore() + { + timer.Dispose(); + } + + static void Subscribe(object? state) + { + var self = (_DelaySubscription)state!; + try + { + self.source.Subscribe(self); // subscribe self. + } + catch (Exception ex) + { + ObservableSystem.GetUnhandledExceptionHandler().Invoke(ex); + self.Dispose(); + } + } + } +} diff --git a/src/R3/Operators/DelaySubscriptionFrame.cs b/src/R3/Operators/DelaySubscriptionFrame.cs new file mode 100644 index 00000000..ce7bb80b --- /dev/null +++ b/src/R3/Operators/DelaySubscriptionFrame.cs @@ -0,0 +1,81 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + public static Observable DelaySubscriptionFrame(this Observable source, int frameCount) + { + return new DelaySubscriptionFrame(source, frameCount, ObservableSystem.DefaultFrameProvider); + } + + public static Observable DelaySubscriptionFrame(this Observable source, int frameCount, FrameProvider frameProvider) + { + return new DelaySubscriptionFrame(source, frameCount, frameProvider); + } +} + +internal sealed class DelaySubscriptionFrame(Observable source, int frameCount, FrameProvider frameProvider) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return new _DelaySubscription(observer, source, frameCount.NormalizeFrame(), frameProvider).Run(); + } + + sealed class _DelaySubscription : Observer, IFrameRunnerWorkItem + { + readonly Observer observer; + readonly Observable source; + readonly int frameCount; + readonly FrameProvider frameProvider; + int currentFrame; + + public _DelaySubscription(Observer observer, Observable source, int frameCount, FrameProvider frameProvider) + { + this.observer = observer; + this.source = source; + this.frameCount = frameCount; + this.frameProvider = frameProvider; + } + + public IDisposable Run() + { + frameProvider.Register(this); + return this; + } + + protected override void OnNextCore(T value) + { + observer.OnNext(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + + bool IFrameRunnerWorkItem.MoveNext(long _) + { + if (IsDisposed) return false; + + if (++currentFrame == frameCount) + { + try + { + source.Subscribe(this); // subscribe self. + } + catch (Exception ex) + { + ObservableSystem.GetUnhandledExceptionHandler().Invoke(ex); + Dispose(); + } + return false; + } + + return true; + } + } +} diff --git a/src/R3/Operators/_Operators.cs b/src/R3/Operators/_Operators.cs index 9df42074..d6f4511b 100644 --- a/src/R3/Operators/_Operators.cs +++ b/src/R3/Operators/_Operators.cs @@ -4,12 +4,10 @@ public static partial class ObservableExtensions { } - // TODO: this is working space, will remove this file after complete. // Time based -// Delay, DelaySubscription -// + frame variation +// Delay, DelayFrame // Rx Merging: // CombineLatest, Zip, WithLatestFrom, ZipLatest, Switch, Pairwise @@ -20,4 +18,3 @@ public static partial class ObservableExtensions // return tasks: // All, Any, Contains, SequenceEqual, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup, - diff --git a/tests/R3.Tests/OperatorTests/DelaySubscriptionTest.cs b/tests/R3.Tests/OperatorTests/DelaySubscriptionTest.cs new file mode 100644 index 00000000..0331ff07 --- /dev/null +++ b/tests/R3.Tests/OperatorTests/DelaySubscriptionTest.cs @@ -0,0 +1,70 @@ +namespace R3.Tests.OperatorTests; + +public class DelaySubscriptionTest +{ + [Fact] + public void DelaySubscription() + { + var provider = new FakeTimeProvider(); + + var publisher = new Subject(); + var subscribed = false; + var list = publisher + .Do(onSubscribe: () => subscribed = true) + .DelaySubscription(TimeSpan.FromSeconds(3), provider) + .ToLiveList(); + + subscribed.Should().BeFalse(); + publisher.OnNext(1); + list.AssertEqual([]); + + provider.Advance(TimeSpan.FromSeconds(2)); + + subscribed.Should().BeFalse(); + publisher.OnNext(2); + list.AssertEqual([]); + + provider.Advance(TimeSpan.FromSeconds(1)); + + subscribed.Should().BeTrue(); + publisher.OnNext(3); + list.AssertEqual([3]); + + publisher.OnCompleted(); + + list.AssertIsCompleted(); + } + + [Fact] + public void DelaySubscriptionFrame() + { + var provider = new ManualFrameProvider(); + + var publisher = new Subject(); + var subscribed = false; + var list = publisher + .Do(onSubscribe: () => subscribed = true) + .DelaySubscriptionFrame(3, provider) + .ToLiveList(); + + subscribed.Should().BeFalse(); + publisher.OnNext(1); + list.AssertEqual([]); + + provider.Advance(2); + + subscribed.Should().BeFalse(); + publisher.OnNext(2); + list.AssertEqual([]); + + provider.Advance(1); + + subscribed.Should().BeTrue(); + publisher.OnNext(3); + list.AssertEqual([3]); + + publisher.OnCompleted(); + + list.AssertIsCompleted(); + } +}