From 12bfd55bfcd9bc18a1bc5123eb89a59ecb533dfb Mon Sep 17 00:00:00 2001 From: neuecc Date: Mon, 25 Dec 2023 02:29:13 +0900 Subject: [PATCH] EveryValueChanged, TimerFrameProvider --- src/R3/Factories/Create.cs | 1 - src/R3/Factories/EveryUpdate.cs | 2 +- src/R3/Factories/EveryValueChanged.cs | 77 +++++++++++++++ src/R3/FrameProvider.cs | 11 +++ .../CancellableFrameRunnerWorkItemBase.cs | 6 ++ src/R3/TimerFrameProvider.cs | 95 +++++++++++++++++++ .../FactoryTests/EveryValueChangedTest.cs | 45 +++++++++ 7 files changed, 235 insertions(+), 2 deletions(-) create mode 100644 src/R3/Factories/EveryValueChanged.cs create mode 100644 src/R3/TimerFrameProvider.cs create mode 100644 tests/R3.Tests/FactoryTests/EveryValueChangedTest.cs diff --git a/src/R3/Factories/Create.cs b/src/R3/Factories/Create.cs index 7e023d97..aecc3186 100644 --- a/src/R3/Factories/Create.cs +++ b/src/R3/Factories/Create.cs @@ -28,4 +28,3 @@ protected override IDisposable SubscribeCore(Observer observer) return subscribe(observer, state); } } - diff --git a/src/R3/Factories/EveryUpdate.cs b/src/R3/Factories/EveryUpdate.cs index 655a6fb2..7b9dc6d2 100644 --- a/src/R3/Factories/EveryUpdate.cs +++ b/src/R3/Factories/EveryUpdate.cs @@ -35,7 +35,7 @@ protected override IDisposable SubscribeCore(Observer observer) return runner; } - class EveryUpdateRunnerWorkItem(Observer observer, CancellationToken cancellationToken) + sealed class EveryUpdateRunnerWorkItem(Observer observer, CancellationToken cancellationToken) : CancellableFrameRunnerWorkItemBase(observer, cancellationToken) { protected override bool MoveNextCore(long _) diff --git a/src/R3/Factories/EveryValueChanged.cs b/src/R3/Factories/EveryValueChanged.cs new file mode 100644 index 00000000..6b6ba51f --- /dev/null +++ b/src/R3/Factories/EveryValueChanged.cs @@ -0,0 +1,77 @@ +namespace R3; + +public static partial class Observable +{ + public static Observable EveryValueChanged(TSource source, Func propertySelector, CancellationToken cancellationToken = default) + where TSource : class + { + return EveryValueChanged(source, propertySelector, ObservableSystem.DefaultFrameProvider, EqualityComparer.Default, cancellationToken); + } + + public static Observable EveryValueChanged(TSource source, Func propertySelector, FrameProvider frameProvider, CancellationToken cancellationToken = default) + where TSource : class + { + return EveryValueChanged(source, propertySelector, frameProvider, EqualityComparer.Default, cancellationToken); + } + + public static Observable EveryValueChanged(TSource source, Func propertySelector, EqualityComparer equalityComparer, CancellationToken cancellationToken = default) + where TSource : class + { + return EveryValueChanged(source, propertySelector, ObservableSystem.DefaultFrameProvider, equalityComparer, cancellationToken); + } + + public static Observable EveryValueChanged(TSource source, Func propertySelector, FrameProvider frameProvider, EqualityComparer equalityComparer, CancellationToken cancellationToken = default) + where TSource : class + { + return new EveryValueChanged(source, propertySelector, frameProvider, equalityComparer, cancellationToken); + } +} + +internal sealed class EveryValueChanged(TSource source, Func propertySelector, FrameProvider frameProvider, EqualityComparer equalityComparer, CancellationToken cancellationToken) : Observable + where TSource : class +{ + protected override IDisposable SubscribeCore(Observer observer) + { + // raise latest value on subscribe + var value = propertySelector(source); + observer.OnNext(value); + if (observer.IsDisposed) + { + return Disposable.Empty; + } + + var runner = new EveryValueChangedRunnerWorkItem(observer, source, value, propertySelector, equalityComparer, cancellationToken); + frameProvider.Register(runner); + return runner; + } + + sealed class EveryValueChangedRunnerWorkItem(Observer observer, TSource source, TProperty previousValue, Func propertySelector, EqualityComparer equalityComparer, CancellationToken cancellationToken) + : CancellableFrameRunnerWorkItemBase(observer, cancellationToken) + { + protected override bool MoveNextCore(long _) + { + TProperty currentValue; + try + { + currentValue = propertySelector(source); + } + catch (Exception ex) + { + PublishOnCompleted(ex); // when error, stop. + return false; + } + + if (equalityComparer.Equals(previousValue, currentValue)) + { + // don't emit but continue. + return true; + } + + previousValue = currentValue; + PublishOnNext(currentValue); // emit latest + return true; + } + } +} + + diff --git a/src/R3/FrameProvider.cs b/src/R3/FrameProvider.cs index a01aa90c..4dc57981 100644 --- a/src/R3/FrameProvider.cs +++ b/src/R3/FrameProvider.cs @@ -46,6 +46,17 @@ public void Advance(int advanceCount) } } + public int GetRegisteredCount() + { + var span = list.AsSpan(); + var count = 0; + foreach (ref readonly var item in span) + { + if (item != null) count++; + } + return count; + } + void RunLoop() { var span = list.AsSpan(); diff --git a/src/R3/Internal/CancellableFrameRunnerWorkItemBase.cs b/src/R3/Internal/CancellableFrameRunnerWorkItemBase.cs index 4b6b907f..94fd6f59 100644 --- a/src/R3/Internal/CancellableFrameRunnerWorkItemBase.cs +++ b/src/R3/Internal/CancellableFrameRunnerWorkItemBase.cs @@ -29,6 +29,12 @@ public bool MoveNext(long frameCount) return false; } + if (observer.IsDisposed) + { + Dispose(); + return false; + } + return MoveNextCore(frameCount); } diff --git a/src/R3/TimerFrameProvider.cs b/src/R3/TimerFrameProvider.cs new file mode 100644 index 00000000..a7dc35ad --- /dev/null +++ b/src/R3/TimerFrameProvider.cs @@ -0,0 +1,95 @@ +namespace R3; + +public sealed class TimerFrameProvider : FrameProvider, IDisposable +{ + static readonly TimerCallback timerCallback = Run; + + readonly object gate = new object(); + long frameCount; + bool disposed; + FreeListCore list; + ITimer timer; + + public TimerFrameProvider(TimeSpan period) + : this(period, period, TimeProvider.System) + { + } + + public TimerFrameProvider(TimeSpan dueTime, TimeSpan period) + : this(dueTime, period, TimeProvider.System) + { + } + + public TimerFrameProvider(TimeSpan dueTime, TimeSpan period, TimeProvider timeProvider) + { + this.list = new FreeListCore(gate); + this.timer = timeProvider.CreateStoppedTimer(timerCallback, this); + + // start timer + this.timer.Change(dueTime, period); + } + + public override long GetFrameCount() + { + ObjectDisposedException.ThrowIf(disposed, typeof(TimerFrameProvider)); + return frameCount; + } + + public override void Register(IFrameRunnerWorkItem callback) + { + ObjectDisposedException.ThrowIf(disposed, typeof(TimerFrameProvider)); + list.Add(callback); + } + + public void Dispose() + { + if (!disposed) + { + disposed = true; + lock (gate) + { + timer.Dispose(); + list.Dispose(); + } + } + } + + static void Run(object? state) + { + var self = (TimerFrameProvider)state!; + if (self.disposed) + { + return; + } + + lock (self.gate) + { + var span = self.list.AsSpan(); + for (int i = 0; i < span.Length; i++) + { + ref readonly var item = ref span[i]; + if (item != null) + { + try + { + if (!item.MoveNext(self.frameCount)) + { + self.list.Remove(i); + } + } + catch (Exception ex) + { + self.list.Remove(i); + try + { + ObservableSystem.GetUnhandledExceptionHandler().Invoke(ex); + } + catch { } + } + } + } + + self.frameCount++; + } + } +} diff --git a/tests/R3.Tests/FactoryTests/EveryValueChangedTest.cs b/tests/R3.Tests/FactoryTests/EveryValueChangedTest.cs new file mode 100644 index 00000000..82d72634 --- /dev/null +++ b/tests/R3.Tests/FactoryTests/EveryValueChangedTest.cs @@ -0,0 +1,45 @@ + +namespace R3.Tests.FactoryTests; + +public class EveryValueChangedTest +{ + [Fact] + public void EveryValueChanged() + { + var frameProvider = new ManualFrameProvider(); + + var t = new Target(); + t.MyProperty = 99; + + var list = Observable.EveryValueChanged(t, x => x.MyProperty, frameProvider).ToLiveList(); + + list.AssertEqual([99]); + + t.MyProperty = 100; + frameProvider.Advance(); + + list.AssertEqual([99, 100]); + + t.MyProperty = 100; + frameProvider.Advance(); + + list.AssertEqual([99, 100]); + + t.MyProperty = 1000; + frameProvider.Advance(); + + list.AssertEqual([99, 100, 1000]); + + frameProvider.GetRegisteredCount().Should().Be(1); + + list.Dispose(); + frameProvider.Advance(); + + frameProvider.GetRegisteredCount().Should().Be(0); + } +} + +file class Target +{ + public int MyProperty { get; set; } +}