Skip to content

Commit

Permalink
Debounce, ThrottleFirst, Sample + Frame
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jan 3, 2024
1 parent 771899c commit 2a34f17
Show file tree
Hide file tree
Showing 9 changed files with 770 additions and 23 deletions.
13 changes: 6 additions & 7 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,21 @@


var sw = Stopwatch.StartNew();
var subject = new System.Reactive.Subjects.Subject<int>();
subject.Buffer(TimeSpan.FromSeconds(5), 3).Subscribe(x=> Console.WriteLine("[" + string.Join(", ", x) + "]" + sw.Elapsed));


await Task.Delay(TimeSpan.FromSeconds(4));
var subject = new System.Reactive.Subjects.Subject<int>();
subject.Sample(TimeSpan.FromSeconds(3)).Subscribe(x => Console.WriteLine(x));

subject.OnNext(1);
subject.OnNext(2);
// subject.OnNext(3);

Console.ReadLine();

subject.OnNext(2);

subject.OnCompleted();





public static class Extensions
{
public static IDisposable WriteLine<T>(this Observable<T> source)
Expand Down
94 changes: 94 additions & 0 deletions src/R3/Operators/Debounce.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<T> Debounce<T>(this Observable<T> source, TimeSpan timeSpan)
{
return new Debounce<T>(source, timeSpan, ObservableSystem.DefaultTimeProvider);
}

public static Observable<T> Debounce<T>(this Observable<T> source, TimeSpan timeSpan, TimeProvider timeProvider)
{
return new Debounce<T>(source, timeSpan, timeProvider);
}
}


internal sealed class Debounce<T>(Observable<T> source, TimeSpan timeSpan, TimeProvider timeProvider) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _Debounce(observer, timeSpan.Normalize(), timeProvider));
}

sealed class _Debounce : Observer<T>
{
static readonly TimerCallback timerCallback = RaiseOnNext;

readonly Observer<T> observer;
readonly TimeSpan timeSpan;
readonly ITimer timer;
readonly object gate = new object();
T? latestValue;
bool hasvalue;
int timerId;

public _Debounce(Observer<T> observer, TimeSpan timeSpan, TimeProvider timeProvider)
{
this.observer = observer;
this.timeSpan = timeSpan;
this.timer = timeProvider.CreateStoppedTimer(timerCallback, this);
}

protected override void OnNextCore(T value)
{
lock (gate)
{
latestValue = value;
hasvalue = true;
Volatile.Write(ref timerId, unchecked(timerId + 1));
timer.InvokeOnce(timeSpan); // restart timer
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
lock (gate)
{
if (hasvalue)
{
observer.OnNext(latestValue!);
hasvalue = false;
latestValue = default;
}
}
observer.OnCompleted(result);
}

protected override void DisposeCore()
{
timer.Dispose();
}

static void RaiseOnNext(object? state)
{
var self = (_Debounce)state!;

var timerId = Volatile.Read(ref self.timerId);
lock (self.gate)
{
if (timerId != self.timerId) return;
if (!self.hasvalue) return;

self.observer.OnNext(self.latestValue!);
self.hasvalue = false;
self.latestValue = default;
}
}
}
}
94 changes: 94 additions & 0 deletions src/R3/Operators/DebounceFrame.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<T> DebounceFrame<T>(this Observable<T> source, int frameCount)
{
return new DebounceFrame<T>(source, frameCount, ObservableSystem.DefaultFrameProvider);
}

public static Observable<T> DebounceFrame<T>(this Observable<T> source, int frameCount, FrameProvider frameProvider)
{
return new DebounceFrame<T>(source, frameCount, frameProvider);
}
}

// DebounceFrame
internal sealed class DebounceFrame<T>(Observable<T> source, int frameCount, FrameProvider frameProvider) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _DebounceFrame(observer, frameCount.NormalizeFrame(), frameProvider));
}

sealed class _DebounceFrame : Observer<T>, IFrameRunnerWorkItem
{
readonly Observer<T> observer;
readonly int frameCount;
readonly object gate = new object();
T? latestValue;
bool hasvalue;
int currentFrame;

public _DebounceFrame(Observer<T> observer, int frameCount, FrameProvider frameProvider)
{
this.observer = observer;
this.frameCount = frameCount;
frameProvider.Register(this);
}

protected override void OnNextCore(T value)
{
lock (gate)
{
latestValue = value;
hasvalue = true;
currentFrame = 0;
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
lock (gate)
{
if (hasvalue)
{
observer.OnNext(latestValue!);
hasvalue = false;
latestValue = default;
}
}
observer.OnCompleted(result);
}

bool IFrameRunnerWorkItem.MoveNext(long _)
{
if (this.IsDisposed) return false;

lock (gate)
{
if (hasvalue)
{
if (++currentFrame == frameCount)
{
observer.OnNext(latestValue!);
hasvalue = false;
latestValue = default;
currentFrame = 0;
}
}
else
{
currentFrame = 0;
}
}

return true;
}
}
}
79 changes: 79 additions & 0 deletions src/R3/Operators/Sample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<T> Sample<T>(this Observable<T> source, TimeSpan timeSpan)
{
return new Sample<T>(source, timeSpan, ObservableSystem.DefaultTimeProvider);
}

public static Observable<T> Sample<T>(this Observable<T> source, TimeSpan timeSpan, TimeProvider timeProvider)
{
return new Sample<T>(source, timeSpan, timeProvider);
}
}

// Sample(ThrottleLast)
internal sealed class Sample<T>(Observable<T> source, TimeSpan interval, TimeProvider timeProvider) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _Sample(observer, interval.Normalize(), timeProvider));
}

sealed class _Sample : Observer<T>
{
static readonly TimerCallback timerCallback = RaiseOnNext;

readonly Observer<T> observer;
readonly ITimer timer;
readonly object gate = new object();
T? lastValue;
bool hasValue;

public _Sample(Observer<T> observer, TimeSpan interval, TimeProvider timeProvider)
{
this.observer = observer;
this.timer = timeProvider.CreateStoppedTimer(timerCallback, this);
this.timer.Change(interval, interval);
}

protected override void OnNextCore(T value)
{
lock (gate)
{
hasValue = true;
lastValue = value;
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}

protected override void DisposeCore()
{
timer.Dispose();
}

static void RaiseOnNext(object? state)
{
var self = (_Sample)state!;
lock (self.gate)
{
if (self.hasValue)
{
self.observer.OnNext(self.lastValue!);
self.hasValue = false;
self.lastValue = default;
}
}
}
}
}
79 changes: 79 additions & 0 deletions src/R3/Operators/SampleFrame.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<T> SampleFrame<T>(this Observable<T> source, int frameCount)
{
return new SampleFrame<T>(source, frameCount, ObservableSystem.DefaultFrameProvider);
}

public static Observable<T> SampleFrame<T>(this Observable<T> source, int frameCount, FrameProvider frameProvider)
{
return new SampleFrame<T>(source, frameCount, frameProvider);
}
}

internal sealed class SampleFrame<T>(Observable<T> source, int frameCount, FrameProvider frameProvider) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _SampleFrame(observer, frameCount.NormalizeFrame(), frameProvider));
}

sealed class _SampleFrame : Observer<T>, IFrameRunnerWorkItem
{
readonly Observer<T> observer;
readonly int frameCount;
readonly object gate = new object();
T? lastValue;
bool hasValue;
int currentFrame;

public _SampleFrame(Observer<T> observer, int frameCount, FrameProvider frameProvider)
{
this.observer = observer;
this.frameCount = frameCount;
frameProvider.Register(this);
}

protected override void OnNextCore(T value)
{
lock (gate)
{
hasValue = true;
lastValue = value;
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}

bool IFrameRunnerWorkItem.MoveNext(long _)
{
if (this.IsDisposed) return false;

lock (gate)
{
if (++currentFrame == frameCount)
{
if (hasValue)
{
observer.OnNext(lastValue!);
hasValue = false;
lastValue = default;
currentFrame = 0;
}
}
}

return true;
}
}
}
Loading

0 comments on commit 2a34f17

Please sign in to comment.