From 2a34f17aadc3f4d81b6cfcbe809266ec6c62b3d8 Mon Sep 17 00:00:00 2001 From: neuecc Date: Thu, 4 Jan 2024 00:14:07 +0900 Subject: [PATCH] Debounce, ThrottleFirst, Sample + Frame --- sandbox/ConsoleApp1/Program.cs | 13 +- src/R3/Operators/Debounce.cs | 94 +++++++ src/R3/Operators/DebounceFrame.cs | 94 +++++++ src/R3/Operators/Sample.cs | 79 ++++++ src/R3/Operators/SampleFrame.cs | 79 ++++++ src/R3/Operators/ThrottleFirst.cs | 82 ++++++ src/R3/Operators/ThrottleFirstFrame.cs | 83 ++++++ src/R3/Operators/_Operators.cs | 30 +-- .../DebounceThrottleFirstSampleTest.cs | 239 ++++++++++++++++++ 9 files changed, 770 insertions(+), 23 deletions(-) create mode 100644 src/R3/Operators/Debounce.cs create mode 100644 src/R3/Operators/DebounceFrame.cs create mode 100644 src/R3/Operators/Sample.cs create mode 100644 src/R3/Operators/SampleFrame.cs create mode 100644 src/R3/Operators/ThrottleFirst.cs create mode 100644 src/R3/Operators/ThrottleFirstFrame.cs create mode 100644 tests/R3.Tests/OperatorTests/DebounceThrottleFirstSampleTest.cs diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index 220bfdc8..ebab44ca 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -21,22 +21,21 @@ var sw = Stopwatch.StartNew(); - var subject = new System.Reactive.Subjects.Subject(); -subject.Buffer(TimeSpan.FromSeconds(5), 3).Subscribe(x=> Console.WriteLine("[" + string.Join(", ", x) + "]" + sw.Elapsed)); - - -await Task.Delay(TimeSpan.FromSeconds(4)); +var subject = new System.Reactive.Subjects.Subject(); +subject.Sample(TimeSpan.FromSeconds(3)).Subscribe(x => Console.WriteLine(x)); subject.OnNext(1); -subject.OnNext(2); -// subject.OnNext(3); Console.ReadLine(); +subject.OnNext(2); + subject.OnCompleted(); + + public static class Extensions { public static IDisposable WriteLine(this Observable source) diff --git a/src/R3/Operators/Debounce.cs b/src/R3/Operators/Debounce.cs new file mode 100644 index 00000000..73bea041 --- /dev/null +++ b/src/R3/Operators/Debounce.cs @@ -0,0 +1,94 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + public static Observable Debounce(this Observable source, TimeSpan timeSpan) + { + return new Debounce(source, timeSpan, ObservableSystem.DefaultTimeProvider); + } + + public static Observable Debounce(this Observable source, TimeSpan timeSpan, TimeProvider timeProvider) + { + return new Debounce(source, timeSpan, timeProvider); + } +} + + +internal sealed class Debounce(Observable source, TimeSpan timeSpan, TimeProvider timeProvider) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _Debounce(observer, timeSpan.Normalize(), timeProvider)); + } + + sealed class _Debounce : Observer + { + static readonly TimerCallback timerCallback = RaiseOnNext; + + readonly Observer observer; + readonly TimeSpan timeSpan; + readonly ITimer timer; + readonly object gate = new object(); + T? latestValue; + bool hasvalue; + int timerId; + + public _Debounce(Observer observer, TimeSpan timeSpan, TimeProvider timeProvider) + { + this.observer = observer; + this.timeSpan = timeSpan; + this.timer = timeProvider.CreateStoppedTimer(timerCallback, this); + } + + protected override void OnNextCore(T value) + { + lock (gate) + { + latestValue = value; + hasvalue = true; + Volatile.Write(ref timerId, unchecked(timerId + 1)); + timer.InvokeOnce(timeSpan); // restart timer + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + lock (gate) + { + if (hasvalue) + { + observer.OnNext(latestValue!); + hasvalue = false; + latestValue = default; + } + } + observer.OnCompleted(result); + } + + protected override void DisposeCore() + { + timer.Dispose(); + } + + static void RaiseOnNext(object? state) + { + var self = (_Debounce)state!; + + var timerId = Volatile.Read(ref self.timerId); + lock (self.gate) + { + if (timerId != self.timerId) return; + if (!self.hasvalue) return; + + self.observer.OnNext(self.latestValue!); + self.hasvalue = false; + self.latestValue = default; + } + } + } +} diff --git a/src/R3/Operators/DebounceFrame.cs b/src/R3/Operators/DebounceFrame.cs new file mode 100644 index 00000000..72eb9df0 --- /dev/null +++ b/src/R3/Operators/DebounceFrame.cs @@ -0,0 +1,94 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + public static Observable DebounceFrame(this Observable source, int frameCount) + { + return new DebounceFrame(source, frameCount, ObservableSystem.DefaultFrameProvider); + } + + public static Observable DebounceFrame(this Observable source, int frameCount, FrameProvider frameProvider) + { + return new DebounceFrame(source, frameCount, frameProvider); + } +} + +// DebounceFrame +internal sealed class DebounceFrame(Observable source, int frameCount, FrameProvider frameProvider) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _DebounceFrame(observer, frameCount.NormalizeFrame(), frameProvider)); + } + + sealed class _DebounceFrame : Observer, IFrameRunnerWorkItem + { + readonly Observer observer; + readonly int frameCount; + readonly object gate = new object(); + T? latestValue; + bool hasvalue; + int currentFrame; + + public _DebounceFrame(Observer observer, int frameCount, FrameProvider frameProvider) + { + this.observer = observer; + this.frameCount = frameCount; + frameProvider.Register(this); + } + + protected override void OnNextCore(T value) + { + lock (gate) + { + latestValue = value; + hasvalue = true; + currentFrame = 0; + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + lock (gate) + { + if (hasvalue) + { + observer.OnNext(latestValue!); + hasvalue = false; + latestValue = default; + } + } + observer.OnCompleted(result); + } + + bool IFrameRunnerWorkItem.MoveNext(long _) + { + if (this.IsDisposed) return false; + + lock (gate) + { + if (hasvalue) + { + if (++currentFrame == frameCount) + { + observer.OnNext(latestValue!); + hasvalue = false; + latestValue = default; + currentFrame = 0; + } + } + else + { + currentFrame = 0; + } + } + + return true; + } + } +} diff --git a/src/R3/Operators/Sample.cs b/src/R3/Operators/Sample.cs new file mode 100644 index 00000000..9f9cbb44 --- /dev/null +++ b/src/R3/Operators/Sample.cs @@ -0,0 +1,79 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + public static Observable Sample(this Observable source, TimeSpan timeSpan) + { + return new Sample(source, timeSpan, ObservableSystem.DefaultTimeProvider); + } + + public static Observable Sample(this Observable source, TimeSpan timeSpan, TimeProvider timeProvider) + { + return new Sample(source, timeSpan, timeProvider); + } +} + +// Sample(ThrottleLast) +internal sealed class Sample(Observable source, TimeSpan interval, TimeProvider timeProvider) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _Sample(observer, interval.Normalize(), timeProvider)); + } + + sealed class _Sample : Observer + { + static readonly TimerCallback timerCallback = RaiseOnNext; + + readonly Observer observer; + readonly ITimer timer; + readonly object gate = new object(); + T? lastValue; + bool hasValue; + + public _Sample(Observer observer, TimeSpan interval, TimeProvider timeProvider) + { + this.observer = observer; + this.timer = timeProvider.CreateStoppedTimer(timerCallback, this); + this.timer.Change(interval, interval); + } + + protected override void OnNextCore(T value) + { + lock (gate) + { + hasValue = true; + lastValue = value; + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + + protected override void DisposeCore() + { + timer.Dispose(); + } + + static void RaiseOnNext(object? state) + { + var self = (_Sample)state!; + lock (self.gate) + { + if (self.hasValue) + { + self.observer.OnNext(self.lastValue!); + self.hasValue = false; + self.lastValue = default; + } + } + } + } +} diff --git a/src/R3/Operators/SampleFrame.cs b/src/R3/Operators/SampleFrame.cs new file mode 100644 index 00000000..b4cda6dd --- /dev/null +++ b/src/R3/Operators/SampleFrame.cs @@ -0,0 +1,79 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + public static Observable SampleFrame(this Observable source, int frameCount) + { + return new SampleFrame(source, frameCount, ObservableSystem.DefaultFrameProvider); + } + + public static Observable SampleFrame(this Observable source, int frameCount, FrameProvider frameProvider) + { + return new SampleFrame(source, frameCount, frameProvider); + } +} + +internal sealed class SampleFrame(Observable source, int frameCount, FrameProvider frameProvider) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _SampleFrame(observer, frameCount.NormalizeFrame(), frameProvider)); + } + + sealed class _SampleFrame : Observer, IFrameRunnerWorkItem + { + readonly Observer observer; + readonly int frameCount; + readonly object gate = new object(); + T? lastValue; + bool hasValue; + int currentFrame; + + public _SampleFrame(Observer observer, int frameCount, FrameProvider frameProvider) + { + this.observer = observer; + this.frameCount = frameCount; + frameProvider.Register(this); + } + + protected override void OnNextCore(T value) + { + lock (gate) + { + hasValue = true; + lastValue = value; + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + + bool IFrameRunnerWorkItem.MoveNext(long _) + { + if (this.IsDisposed) return false; + + lock (gate) + { + if (++currentFrame == frameCount) + { + if (hasValue) + { + observer.OnNext(lastValue!); + hasValue = false; + lastValue = default; + currentFrame = 0; + } + } + } + + return true; + } + } +} diff --git a/src/R3/Operators/ThrottleFirst.cs b/src/R3/Operators/ThrottleFirst.cs new file mode 100644 index 00000000..8919f0f8 --- /dev/null +++ b/src/R3/Operators/ThrottleFirst.cs @@ -0,0 +1,82 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + public static Observable ThrottleFirst(this Observable source, TimeSpan timeSpan) + { + return new ThrottleFirst(source, timeSpan, ObservableSystem.DefaultTimeProvider); + } + + public static Observable ThrottleFirst(this Observable source, TimeSpan timeSpan, TimeProvider timeProvider) + { + return new ThrottleFirst(source, timeSpan, timeProvider); + } +} + +// ThrottleFirst +internal sealed class ThrottleFirst(Observable source, TimeSpan interval, TimeProvider timeProvider) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _ThrottleFirst(observer, interval.Normalize(), timeProvider)); + } + + sealed class _ThrottleFirst : Observer + { + static readonly TimerCallback timerCallback = RaiseOnNext; + + readonly Observer observer; + readonly ITimer timer; + readonly object gate = new object(); + T? firstValue; + bool hasValue; + + public _ThrottleFirst(Observer observer, TimeSpan interval, TimeProvider timeProvider) + { + this.observer = observer; + this.timer = timeProvider.CreateStoppedTimer(timerCallback, this); + this.timer.Change(interval, interval); + } + + protected override void OnNextCore(T value) + { + lock (gate) + { + if (!hasValue) + { + hasValue = true; + firstValue = value; + } + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + + protected override void DisposeCore() + { + timer.Dispose(); + } + + static void RaiseOnNext(object? state) + { + var self = (_ThrottleFirst)state!; + lock (self.gate) + { + if (self.hasValue) + { + self.observer.OnNext(self.firstValue!); + self.hasValue = false; + self.firstValue = default; + } + } + } + } +} diff --git a/src/R3/Operators/ThrottleFirstFrame.cs b/src/R3/Operators/ThrottleFirstFrame.cs new file mode 100644 index 00000000..647daca9 --- /dev/null +++ b/src/R3/Operators/ThrottleFirstFrame.cs @@ -0,0 +1,83 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + public static Observable ThrottleFirstFrame(this Observable source, int frameCount) + { + return new ThrottleFirstFrame(source, frameCount, ObservableSystem.DefaultFrameProvider); + } + + public static Observable ThrottleFirstFrame(this Observable source, int frameCount, FrameProvider frameProvider) + { + return new ThrottleFirstFrame(source, frameCount, frameProvider); + } +} + +// ThrottleFirstFrame +internal sealed class ThrottleFirstFrame(Observable source, int frameCount, FrameProvider frameProvider) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _ThrottleFirstFrame(observer, frameCount.NormalizeFrame(), frameProvider)); + } + + sealed class _ThrottleFirstFrame : Observer, IFrameRunnerWorkItem + { + readonly Observer observer; + readonly int frameCount; + readonly object gate = new object(); + T? firstValue; + bool hasValue; + int currentFrame; + + public _ThrottleFirstFrame(Observer observer, int frameCount, FrameProvider frameProvider) + { + this.observer = observer; + this.frameCount = frameCount; + frameProvider.Register(this); + } + + protected override void OnNextCore(T value) + { + lock (gate) + { + if (!hasValue) + { + hasValue = true; + firstValue = value; + } + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + + bool IFrameRunnerWorkItem.MoveNext(long _) + { + if (this.IsDisposed) return false; + + lock (gate) + { + if (++currentFrame == frameCount) + { + if (hasValue) + { + observer.OnNext(firstValue!); + hasValue = false; + firstValue = default; + currentFrame = 0; + } + } + } + + return true; + } + } +} diff --git a/src/R3/Operators/_Operators.cs b/src/R3/Operators/_Operators.cs index 0f5aca48..78217d98 100644 --- a/src/R3/Operators/_Operators.cs +++ b/src/R3/Operators/_Operators.cs @@ -1,25 +1,23 @@ -using System; -using System.Reflection; - -namespace R3; +namespace R3; public static partial class ObservableExtensions { - // TODO: this is working space, will remove this file after complete. +} - // AsUnitObservable - // Time based - // Delay, DelaySubscription, Timeout, Debounce, Throttle, ThrottleFirst, Sample, - // + frame variation +// TODO: this is working space, will remove this file after complete. - // Rx Merging: - // CombineLatest, Zip, WithLatestFrom, ZipLatest, Switch, Pairwise +// Time based +// Delay, DelaySubscription, Timeout +// + frame variation - // Standard Query: - // Distinct, DistinctBy, DistinctUntilChanged, Scan, DefaultIfEmpty +// Rx Merging: +// CombineLatest, Zip, WithLatestFrom, ZipLatest, Switch, Pairwise + +// Standard Query: +// Distinct, DistinctBy, DistinctUntilChanged, Scan, DefaultIfEmpty + +// return tasks: +// All, Any, Contains, SequenceEqual, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup, - // return tasks: - // All, Any, Contains, SequenceEqual, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup, -} diff --git a/tests/R3.Tests/OperatorTests/DebounceThrottleFirstSampleTest.cs b/tests/R3.Tests/OperatorTests/DebounceThrottleFirstSampleTest.cs new file mode 100644 index 00000000..3521bc07 --- /dev/null +++ b/tests/R3.Tests/OperatorTests/DebounceThrottleFirstSampleTest.cs @@ -0,0 +1,239 @@ +namespace R3.Tests.OperatorTests; + +public class DebounceThrottleFirstSampleTest +{ + // Debounce + [Fact] + public void Debounce() + { + var timeProvider = new FakeTimeProvider(); + + var publisher = new Subject(); + var list = publisher.Debounce(TimeSpan.FromSeconds(3), timeProvider).ToLiveList(); + + publisher.OnNext(1); + publisher.OnNext(10); + publisher.OnNext(100); + list.AssertEqual([]); + + timeProvider.Advance(TimeSpan.FromSeconds(2)); + + publisher.OnNext(1000); + publisher.OnNext(10000); + list.AssertEqual([]); + + timeProvider.Advance(TimeSpan.FromSeconds(1)); + + list.AssertEqual([]); + + timeProvider.Advance(TimeSpan.FromSeconds(2)); + + list.AssertEqual([10000]); + + publisher.OnNext(2); + + timeProvider.Advance(TimeSpan.FromSeconds(2)); + list.AssertEqual([10000]); + + timeProvider.Advance(TimeSpan.FromSeconds(1)); + + list.AssertEqual([10000, 2]); + + publisher.OnNext(3); + + timeProvider.Advance(TimeSpan.FromSeconds(1)); + list.AssertEqual([10000, 2]); + + publisher.OnCompleted(); + + list.AssertEqual([10000, 2, 3]); + } + + // ThrottleFirst + [Fact] + public void ThrottleFirst() + { + var timeProvider = new FakeTimeProvider(); + + var publisher = new Subject(); + var list = publisher.ThrottleFirst(TimeSpan.FromSeconds(3), timeProvider).ToLiveList(); + + publisher.OnNext(1); + publisher.OnNext(10); + publisher.OnNext(100); + list.AssertEqual([]); + + timeProvider.Advance(TimeSpan.FromSeconds(2)); + + publisher.OnNext(1000); + publisher.OnNext(10000); + list.AssertEqual([]); + + timeProvider.Advance(TimeSpan.FromSeconds(1)); + list.AssertEqual([1]); + publisher.OnNext(2); + publisher.OnNext(20); + publisher.OnNext(200); + + timeProvider.Advance(TimeSpan.FromSeconds(3)); + list.AssertEqual([1, 2]); + publisher.OnNext(3); + + publisher.OnCompleted(); + + list.AssertEqual([1, 2]); + list.AssertIsCompleted(); + } + + // Sample + [Fact] + public void Sample() + { + var timeProvider = new FakeTimeProvider(); + + var publisher = new Subject(); + var list = publisher.Sample(TimeSpan.FromSeconds(3), timeProvider).ToLiveList(); + + publisher.OnNext(1); + publisher.OnNext(10); + publisher.OnNext(100); + list.AssertEqual([]); + + timeProvider.Advance(TimeSpan.FromSeconds(2)); + + publisher.OnNext(1000); + publisher.OnNext(10000); + list.AssertEqual([]); + + timeProvider.Advance(TimeSpan.FromSeconds(1)); + list.AssertEqual([10000]); + publisher.OnNext(2); + publisher.OnNext(20); + publisher.OnNext(200); + + timeProvider.Advance(TimeSpan.FromSeconds(3)); + list.AssertEqual([10000, 200]); + publisher.OnNext(3); + + publisher.OnCompleted(); + + list.AssertEqual([10000, 200]); + list.AssertIsCompleted(); + } + + [Fact] + public void DebounceFrame() + { + var frameProvider = new ManualFrameProvider(); + + var publisher = new Subject(); + var list = publisher.DebounceFrame(3, frameProvider).ToLiveList(); + + publisher.OnNext(1); + publisher.OnNext(10); + publisher.OnNext(100); + list.AssertEqual([]); + + frameProvider.Advance(2); + + publisher.OnNext(1000); + publisher.OnNext(10000); + list.AssertEqual([]); + + frameProvider.Advance(1); + + list.AssertEqual([]); + + frameProvider.Advance(2); + + list.AssertEqual([10000]); + + publisher.OnNext(2); + + frameProvider.Advance(2); + list.AssertEqual([10000]); + + frameProvider.Advance(1); + + list.AssertEqual([10000, 2]); + + publisher.OnNext(3); + + frameProvider.Advance(1); + list.AssertEqual([10000, 2]); + + publisher.OnCompleted(); + + list.AssertEqual([10000, 2, 3]); + } + + [Fact] + public void ThrottleFirstFrame() + { + var frameProvider = new ManualFrameProvider(); + + var publisher = new Subject(); + var list = publisher.ThrottleFirstFrame(3, frameProvider).ToLiveList(); + + publisher.OnNext(1); + publisher.OnNext(10); + publisher.OnNext(100); + list.AssertEqual([]); + + frameProvider.Advance(2); + + publisher.OnNext(1000); + publisher.OnNext(10000); + list.AssertEqual([]); + + frameProvider.Advance(1); + list.AssertEqual([1]); + publisher.OnNext(2); + publisher.OnNext(20); + publisher.OnNext(200); + + frameProvider.Advance(3); + list.AssertEqual([1, 2]); + publisher.OnNext(3); + + publisher.OnCompleted(); + + list.AssertEqual([1, 2]); + list.AssertIsCompleted(); + } + + [Fact] + public void SampleFrame() + { + var frameProvider = new ManualFrameProvider(); + + var publisher = new Subject(); + var list = publisher.SampleFrame(3, frameProvider).ToLiveList(); + + publisher.OnNext(1); + publisher.OnNext(10); + publisher.OnNext(100); + list.AssertEqual([]); + + frameProvider.Advance(2); + + publisher.OnNext(1000); + publisher.OnNext(10000); + list.AssertEqual([]); + + frameProvider.Advance(1); + list.AssertEqual([10000]); + publisher.OnNext(2); + publisher.OnNext(20); + publisher.OnNext(200); + + frameProvider.Advance(3); + list.AssertEqual([10000, 200]); + publisher.OnNext(3); + + publisher.OnCompleted(); + + list.AssertEqual([10000, 200]); + list.AssertIsCompleted(); + } +}