Skip to content

Commit

Permalink
EveryValueChanged, TimerFrameProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 24, 2023
1 parent 31983b2 commit 12bfd55
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 2 deletions.
1 change: 0 additions & 1 deletion src/R3/Factories/Create.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,3 @@ protected override IDisposable SubscribeCore(Observer<T> observer)
return subscribe(observer, state);
}
}

2 changes: 1 addition & 1 deletion src/R3/Factories/EveryUpdate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected override IDisposable SubscribeCore(Observer<Unit> observer)
return runner;
}

class EveryUpdateRunnerWorkItem(Observer<Unit> observer, CancellationToken cancellationToken)
sealed class EveryUpdateRunnerWorkItem(Observer<Unit> observer, CancellationToken cancellationToken)
: CancellableFrameRunnerWorkItemBase<Unit>(observer, cancellationToken)
{
protected override bool MoveNextCore(long _)
Expand Down
77 changes: 77 additions & 0 deletions src/R3/Factories/EveryValueChanged.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
namespace R3;

public static partial class Observable
{
public static Observable<TProperty> EveryValueChanged<TSource, TProperty>(TSource source, Func<TSource, TProperty> propertySelector, CancellationToken cancellationToken = default)
where TSource : class
{
return EveryValueChanged(source, propertySelector, ObservableSystem.DefaultFrameProvider, EqualityComparer<TProperty>.Default, cancellationToken);
}

public static Observable<TProperty> EveryValueChanged<TSource, TProperty>(TSource source, Func<TSource, TProperty> propertySelector, FrameProvider frameProvider, CancellationToken cancellationToken = default)
where TSource : class
{
return EveryValueChanged(source, propertySelector, frameProvider, EqualityComparer<TProperty>.Default, cancellationToken);
}

public static Observable<TProperty> EveryValueChanged<TSource, TProperty>(TSource source, Func<TSource, TProperty> propertySelector, EqualityComparer<TProperty> equalityComparer, CancellationToken cancellationToken = default)
where TSource : class
{
return EveryValueChanged(source, propertySelector, ObservableSystem.DefaultFrameProvider, equalityComparer, cancellationToken);
}

public static Observable<TProperty> EveryValueChanged<TSource, TProperty>(TSource source, Func<TSource, TProperty> propertySelector, FrameProvider frameProvider, EqualityComparer<TProperty> equalityComparer, CancellationToken cancellationToken = default)
where TSource : class
{
return new EveryValueChanged<TSource, TProperty>(source, propertySelector, frameProvider, equalityComparer, cancellationToken);
}
}

internal sealed class EveryValueChanged<TSource, TProperty>(TSource source, Func<TSource, TProperty> propertySelector, FrameProvider frameProvider, EqualityComparer<TProperty> equalityComparer, CancellationToken cancellationToken) : Observable<TProperty>
where TSource : class
{
protected override IDisposable SubscribeCore(Observer<TProperty> observer)
{
// raise latest value on subscribe
var value = propertySelector(source);
observer.OnNext(value);
if (observer.IsDisposed)
{
return Disposable.Empty;
}

var runner = new EveryValueChangedRunnerWorkItem(observer, source, value, propertySelector, equalityComparer, cancellationToken);
frameProvider.Register(runner);
return runner;
}

sealed class EveryValueChangedRunnerWorkItem(Observer<TProperty> observer, TSource source, TProperty previousValue, Func<TSource, TProperty> propertySelector, EqualityComparer<TProperty> equalityComparer, CancellationToken cancellationToken)
: CancellableFrameRunnerWorkItemBase<TProperty>(observer, cancellationToken)
{
protected override bool MoveNextCore(long _)
{
TProperty currentValue;
try
{
currentValue = propertySelector(source);
}
catch (Exception ex)
{
PublishOnCompleted(ex); // when error, stop.
return false;
}

if (equalityComparer.Equals(previousValue, currentValue))
{
// don't emit but continue.
return true;
}

previousValue = currentValue;
PublishOnNext(currentValue); // emit latest
return true;
}
}
}


11 changes: 11 additions & 0 deletions src/R3/FrameProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ public void Advance(int advanceCount)
}
}

public int GetRegisteredCount()
{
var span = list.AsSpan();
var count = 0;
foreach (ref readonly var item in span)
{
if (item != null) count++;
}
return count;
}

void RunLoop()
{
var span = list.AsSpan();
Expand Down
6 changes: 6 additions & 0 deletions src/R3/Internal/CancellableFrameRunnerWorkItemBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ public bool MoveNext(long frameCount)
return false;
}

if (observer.IsDisposed)
{
Dispose();
return false;
}

return MoveNextCore(frameCount);
}

Expand Down
95 changes: 95 additions & 0 deletions src/R3/TimerFrameProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
namespace R3;

public sealed class TimerFrameProvider : FrameProvider, IDisposable
{
static readonly TimerCallback timerCallback = Run;

readonly object gate = new object();
long frameCount;
bool disposed;
FreeListCore<IFrameRunnerWorkItem> list;
ITimer timer;

public TimerFrameProvider(TimeSpan period)
: this(period, period, TimeProvider.System)
{
}

public TimerFrameProvider(TimeSpan dueTime, TimeSpan period)
: this(dueTime, period, TimeProvider.System)
{
}

public TimerFrameProvider(TimeSpan dueTime, TimeSpan period, TimeProvider timeProvider)
{
this.list = new FreeListCore<IFrameRunnerWorkItem>(gate);
this.timer = timeProvider.CreateStoppedTimer(timerCallback, this);

// start timer
this.timer.Change(dueTime, period);
}

public override long GetFrameCount()
{
ObjectDisposedException.ThrowIf(disposed, typeof(TimerFrameProvider));
return frameCount;
}

public override void Register(IFrameRunnerWorkItem callback)
{
ObjectDisposedException.ThrowIf(disposed, typeof(TimerFrameProvider));
list.Add(callback);
}

public void Dispose()
{
if (!disposed)
{
disposed = true;
lock (gate)
{
timer.Dispose();
list.Dispose();
}
}
}

static void Run(object? state)
{
var self = (TimerFrameProvider)state!;
if (self.disposed)
{
return;
}

lock (self.gate)
{
var span = self.list.AsSpan();
for (int i = 0; i < span.Length; i++)
{
ref readonly var item = ref span[i];
if (item != null)
{
try
{
if (!item.MoveNext(self.frameCount))
{
self.list.Remove(i);
}
}
catch (Exception ex)
{
self.list.Remove(i);
try
{
ObservableSystem.GetUnhandledExceptionHandler().Invoke(ex);
}
catch { }
}
}
}

self.frameCount++;
}
}
}
45 changes: 45 additions & 0 deletions tests/R3.Tests/FactoryTests/EveryValueChangedTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@

namespace R3.Tests.FactoryTests;

public class EveryValueChangedTest
{
[Fact]
public void EveryValueChanged()
{
var frameProvider = new ManualFrameProvider();

var t = new Target();
t.MyProperty = 99;

var list = Observable.EveryValueChanged(t, x => x.MyProperty, frameProvider).ToLiveList();

list.AssertEqual([99]);

t.MyProperty = 100;
frameProvider.Advance();

list.AssertEqual([99, 100]);

t.MyProperty = 100;
frameProvider.Advance();

list.AssertEqual([99, 100]);

t.MyProperty = 1000;
frameProvider.Advance();

list.AssertEqual([99, 100, 1000]);

frameProvider.GetRegisteredCount().Should().Be(1);

list.Dispose();
frameProvider.Advance();

frameProvider.GetRegisteredCount().Should().Be(0);
}
}

file class Target
{
public int MyProperty { get; set; }
}

0 comments on commit 12bfd55

Please sign in to comment.