Skip to content

Commit

Permalink
TimerFrame and IntervalFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 23, 2023
1 parent c48271c commit c03a772
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 39 deletions.
38 changes: 3 additions & 35 deletions src/R3/Factories/EveryUpdate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public static Observable<Unit> EveryUpdate(FrameProvider frameProvider, Cancella
}
}


internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) : Observable<Unit>
{
protected override IDisposable SubscribeCore(Observer<Unit> observer)
Expand All @@ -36,44 +35,13 @@ protected override IDisposable SubscribeCore(Observer<Unit> observer)
return runner;
}

class EveryUpdateRunnerWorkItem : IFrameRunnerWorkItem, IDisposable
class EveryUpdateRunnerWorkItem(Observer<Unit> observer, CancellationToken cancellationToken)
: CancellableFrameRunnerWorkItemBase<Unit>(observer, cancellationToken)
{
Observer<Unit> observer;
CancellationToken cancellationToken;
CancellationTokenRegistration cancellationTokenRegistration;
bool isDisposed;

public EveryUpdateRunnerWorkItem(Observer<Unit> observer, CancellationToken cancellationToken)
{
this.observer = observer;
this.cancellationToken = cancellationToken;

if (cancellationToken.CanBeCanceled)
{
cancellationTokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = (EveryUpdateRunnerWorkItem)state!;
s.observer.OnCompleted();
s.Dispose();
}, this);
}
}

public bool MoveNext(long frameCount)
protected override bool MoveNextCore()
{
if (isDisposed)
{
return false;
}

observer.OnNext(default);
return true;
}

public void Dispose()
{
isDisposed = true;
cancellationTokenRegistration.Dispose();
}
}
}
98 changes: 98 additions & 0 deletions src/R3/Factories/TimerFrame.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
namespace R3;

public static partial class Observable
{
public static Observable<Unit> IntervalFrame(int periodFrame, CancellationToken cancellationToken = default)
{
return TimerFrame(periodFrame, periodFrame, cancellationToken);
}

public static Observable<Unit> IntervalFrame(int periodFrame, FrameProvider frameProvider, CancellationToken cancellationToken = default)
{
return TimerFrame(periodFrame, periodFrame, frameProvider, cancellationToken);
}

public static Observable<Unit> TimerFrame(int dueTimeFrame, CancellationToken cancellationToken = default)
{
return new TimerFrame(dueTimeFrame, null, ObservableSystem.DefaultFrameProvider, cancellationToken);
}

public static Observable<Unit> TimerFrame(int dueTimeFrame, int periodFrame, CancellationToken cancellationToken = default)
{
return new TimerFrame(dueTimeFrame, periodFrame, ObservableSystem.DefaultFrameProvider, cancellationToken);
}

public static Observable<Unit> TimerFrame(int dueTimeFrame, FrameProvider frameProvider, CancellationToken cancellationToken = default)
{
return new TimerFrame(dueTimeFrame, null, frameProvider, cancellationToken);
}

public static Observable<Unit> TimerFrame(int dueTimeFrame, int periodFrame, FrameProvider frameProvider, CancellationToken cancellationToken = default)
{
return new TimerFrame(dueTimeFrame, periodFrame, frameProvider, cancellationToken);
}
}

internal sealed class TimerFrame(int dueTimeFrame, int? periodFrame, FrameProvider frameProvider, CancellationToken cancellationToken) : Observable<Unit>
{
protected override IDisposable SubscribeCore(Observer<Unit> observer)
{
dueTimeFrame = dueTimeFrame.Normalize();
periodFrame = periodFrame?.Normalize();

CancellableFrameRunnerWorkItemBase<Unit> runner = (periodFrame == null)
? new SingleTimerFrameRunnerWorkItem(dueTimeFrame, observer, cancellationToken)
: new MultiTimerFrameRunnerWorkItem(dueTimeFrame, periodFrame.Value, observer, cancellationToken);
frameProvider.Register(runner);
return runner;
}

class SingleTimerFrameRunnerWorkItem(int dueTimeFrame, Observer<Unit> observer, CancellationToken cancellationToken)
: CancellableFrameRunnerWorkItemBase<Unit>(observer, cancellationToken)
{
int currentFrame;

protected override bool MoveNextCore()
{
if (++currentFrame == dueTimeFrame)
{
observer.OnNext(default);
observer.OnCompleted();
Dispose();
return false;
}

return true;
}
}

class MultiTimerFrameRunnerWorkItem(int dueTimeFrame, int periodFrame, Observer<Unit> observer, CancellationToken cancellationToken)
: CancellableFrameRunnerWorkItemBase<Unit>(observer, cancellationToken)
{
int currentFrame;
bool isPeriodPhase;

protected override bool MoveNextCore()
{
// initial phase
if (!isPeriodPhase)
{
if (++currentFrame == dueTimeFrame)
{
observer.OnNext(default);
isPeriodPhase = true;
currentFrame = 0;
}
return true;
}

// period phase
if (++currentFrame == periodFrame)
{
observer.OnNext(default);
currentFrame = 0;
}
return true;
}
}
}
9 changes: 6 additions & 3 deletions src/R3/Factories/_EventFactory.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
namespace R3;
using R3.Internal;

namespace R3;

public static partial class Observable
{
// TODO: this is working space, will remove this file after complete.

// TODO: Defer, DeferAsync, Start, Using, Create
// Timer, Interval, TimerFrame, IntervalFrame, ToObservable(ToEvent)
// ToObservable(ToEvent)



Expand All @@ -22,5 +24,6 @@ public static partial class Observable



}


}
48 changes: 48 additions & 0 deletions src/R3/Internal/CancellableFrameRunnerWorkItemBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
namespace R3.Internal;

// when Canceled, publish OnCompleted.
internal abstract class CancellableFrameRunnerWorkItemBase<T> : IFrameRunnerWorkItem, IDisposable
{
protected readonly Observer<T> observer;
CancellationTokenRegistration cancellationTokenRegistration;
bool isDisposed;

public CancellableFrameRunnerWorkItemBase(Observer<T> observer, CancellationToken cancellationToken)
{
this.observer = observer;

if (cancellationToken.CanBeCanceled)
{
this.cancellationTokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = (CancellableFrameRunnerWorkItemBase<T>)state!;
s.observer.OnCompleted();
s.Dispose();
}, this);
}
}

public bool MoveNext(long frameCount)
{
if (isDisposed)
{
return false;
}

return MoveNextCore();
}

protected abstract bool MoveNextCore();

public void Dispose()
{
if (!isDisposed)
{
isDisposed = true;
cancellationTokenRegistration.Dispose();
DisposeCore();
}
}

protected virtual void DisposeCore() { }
}
10 changes: 10 additions & 0 deletions src/R3/Internal/FrameCountExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace R3.Internal;

internal static class FrameCountExtensions
{
// 0 is invalid, 1 is valid.
public static int Normalize(this int frameCount)
{
return frameCount > 0 ? frameCount : 1;
}
}
4 changes: 3 additions & 1 deletion src/R3/Internal/TaskObserverBase.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace R3.Internal;
using System.Threading;

namespace R3.Internal;

// for return Task(tcs.TrySet***)
// include proper Cancel registration
Expand Down
102 changes: 102 additions & 0 deletions tests/R3.Tests/FactoryTests/TimerFrameTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
namespace R3.Tests.FactoryTests;

public class TimerFrameTest
{

[Fact]
public void TimerSingle()
{
{
var fakeTime = new ManualFrameProvider();
var list = Observable.TimerFrame(0, fakeTime).ToLiveList();
fakeTime.Advance(1);
list.AssertIsCompleted();
list.AssertEqual([Unit.Default]);
}
{
var fakeTime = new ManualFrameProvider();
var list = Observable.TimerFrame(1, fakeTime).ToLiveList();
fakeTime.Advance(1);
list.AssertIsCompleted();
list.AssertEqual([Unit.Default]);
}
{
var fakeTime = new ManualFrameProvider();
var list = Observable.TimerFrame(2, fakeTime).ToLiveList();
fakeTime.Advance(2);
list.AssertIsCompleted();
list.AssertEqual([Unit.Default]);
}
}

[Fact]
public void TimerSingle2()
{
var fakeTime = new ManualFrameProvider();

var list = Observable.TimerFrame(5, fakeTime).ToLiveList();

fakeTime.Advance(4);
list.AssertIsNotCompleted();

fakeTime.Advance(1);
list.AssertIsCompleted();
list.AssertEqual(new[] { Unit.Default });
}

[Fact]
public void TimerMulti()
{
var cts = new CancellationTokenSource();
var fakeTime = new ManualFrameProvider();

var list = Observable.TimerFrame(5, 8, fakeTime, cts.Token).ToLiveList();

fakeTime.Advance(4);
list.AssertIsNotCompleted();

fakeTime.Advance(1);
list.AssertIsNotCompleted();
list.AssertEqual([Unit.Default]);

fakeTime.Advance(7);
list.AssertEqual([Unit.Default]);

fakeTime.Advance(1);
list.AssertEqual([Unit.Default, Unit.Default]);

fakeTime.Advance(8);
list.AssertEqual([Unit.Default, Unit.Default, Unit.Default]);

cts.Cancel();
list.AssertIsCompleted();
}

[Fact]
public void Interval()
{
var cts = new CancellationTokenSource();
var fakeTime = new ManualFrameProvider();

var list = Observable.IntervalFrame(5, fakeTime, cts.Token).ToLiveList();

fakeTime.Advance(4);
list.AssertIsNotCompleted();

fakeTime.Advance(1);
list.AssertIsNotCompleted();
list.AssertEqual([Unit.Default]);

fakeTime.Advance(4);
list.AssertEqual([Unit.Default]);

fakeTime.Advance(1);
list.AssertEqual([Unit.Default, Unit.Default]);

fakeTime.Advance(5);
list.AssertEqual([Unit.Default, Unit.Default, Unit.Default]);

cts.Cancel();
list.AssertIsCompleted();
}
}

0 comments on commit c03a772

Please sign in to comment.