Skip to content

Commit

Permalink
ReturnUnit, NextFrame, ReturnFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 23, 2023
1 parent 874337a commit 9a685d1
Show file tree
Hide file tree
Showing 8 changed files with 385 additions and 24 deletions.
4 changes: 2 additions & 2 deletions src/R3/Factories/EveryUpdate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ protected override IDisposable SubscribeCore(Observer<Unit> observer)
class EveryUpdateRunnerWorkItem(Observer<Unit> observer, CancellationToken cancellationToken)
: CancellableFrameRunnerWorkItemBase<Unit>(observer, cancellationToken)
{
protected override bool MoveNextCore()
protected override bool MoveNextCore(long _)
{
observer.OnNext(default);
PublishOnNext(default);
return true;
}
}
Expand Down
114 changes: 111 additions & 3 deletions src/R3/Factories/Return.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,31 @@ public static Observable<T> Return<T>(T value, TimeSpan dueTime, TimeProvider ti

return new Return<T>(value, dueTime, timeProvider); // use ITimer
}

// Optimized case

public static Observable<Unit> ReturnUnit()
{
return R3.ReturnUnit.Instance; // singleton
}

public static Observable<Unit> Return(Unit value)
{
return R3.ReturnUnit.Instance;
}

public static Observable<bool> Return(bool value)
{
return value ? ReturnBoolean.True : ReturnBoolean.False; // singleton
}

public static Observable<int> Return(int value)
{
return ReturnInt32.GetObservable(value); // -1~9 singleton
}
}

internal class Return<T>(T value, TimeSpan dueTime, TimeProvider timeProvider) : Observable<T>
internal sealed class Return<T>(T value, TimeSpan dueTime, TimeProvider timeProvider) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
Expand Down Expand Up @@ -60,7 +82,7 @@ public void Dispose()
}
}

internal class ImmediateScheduleReturn<T>(T value) : Observable<T>
internal sealed class ImmediateScheduleReturn<T>(T value) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
Expand All @@ -70,7 +92,7 @@ protected override IDisposable SubscribeCore(Observer<T> observer)
}
}

internal class ThreadPoolScheduleReturn<T>(T value) : Observable<T>
internal sealed class ThreadPoolScheduleReturn<T>(T value) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
Expand All @@ -97,3 +119,89 @@ public void Dispose()
}
}
}

// Optimized case

internal sealed class ReturnUnit : Observable<Unit>
{
internal static readonly Observable<Unit> Instance = new ReturnUnit();

ReturnUnit()
{
}

protected override IDisposable SubscribeCore(Observer<Unit> observer)
{
observer.OnNext(default);
observer.OnCompleted();
return Disposable.Empty;
}
}

internal sealed class ReturnBoolean : Observable<bool>
{
internal static readonly Observable<bool> True = new ReturnBoolean(true);
internal static readonly Observable<bool> False = new ReturnBoolean(false);

bool value;

ReturnBoolean(bool value)
{
this.value = value;
}

protected override IDisposable SubscribeCore(Observer<bool> observer)
{
observer.OnNext(value);
observer.OnCompleted();
return Disposable.Empty;
}
}

internal sealed class ReturnInt32 : Observable<int>
{
internal static readonly Observable<int> _m1 = new ReturnInt32(-1);
internal static readonly Observable<int> _0 = new ReturnInt32(0);
internal static readonly Observable<int> _1 = new ReturnInt32(1);
internal static readonly Observable<int> _2 = new ReturnInt32(2);
internal static readonly Observable<int> _3 = new ReturnInt32(3);
internal static readonly Observable<int> _4 = new ReturnInt32(4);
internal static readonly Observable<int> _5 = new ReturnInt32(5);
internal static readonly Observable<int> _6 = new ReturnInt32(6);
internal static readonly Observable<int> _7 = new ReturnInt32(7);
internal static readonly Observable<int> _8 = new ReturnInt32(8);
internal static readonly Observable<int> _9 = new ReturnInt32(9);

public static Observable<int> GetObservable(int value)
{
switch (value)
{
case -1: return _m1;
case 0: return _0;
case 1: return _1;
case 2: return _2;
case 3: return _3;
case 4: return _4;
case 5: return _5;
case 6: return _6;
case 7: return _7;
case 8: return _8;
case 9: return _9;
default: return new ReturnInt32(value);
}
}

int value;

ReturnInt32(int value)
{
this.value = value;
}

protected override IDisposable SubscribeCore(Observer<int> observer)
{
observer.OnNext(value);
observer.OnCompleted();
return Disposable.Empty;
}
}
120 changes: 120 additions & 0 deletions src/R3/Factories/ReturnFrame.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
namespace R3;

public static partial class Observable
{
public static Observable<Unit> ReturnUnitFrame(CancellationToken cancellationToken = default)
{
return ReturnFrame(Unit.Default, ObservableSystem.DefaultFrameProvider, cancellationToken);
}

public static Observable<Unit> ReturnUnitFrame(FrameProvider frameProvider, CancellationToken cancellationToken = default)
{
return ReturnFrame(Unit.Default, frameProvider, cancellationToken);
}

public static Observable<T> ReturnFrame<T>(T value, CancellationToken cancellationToken = default)
{
return ReturnFrame(value, ObservableSystem.DefaultFrameProvider, cancellationToken);
}

public static Observable<T> ReturnFrame<T>(T value, FrameProvider frameProvider, CancellationToken cancellationToken = default)
{
return new ReturnFrame<T>(value, frameProvider, cancellationToken);
}

public static Observable<T> ReturnFrame<T>(T value, int dueTimeFrame, CancellationToken cancellationToken = default)
{
return ReturnFrame(value, dueTimeFrame, ObservableSystem.DefaultFrameProvider, cancellationToken);
}

public static Observable<T> ReturnFrame<T>(T value, int dueTimeFrame, FrameProvider frameProvider, CancellationToken cancellationToken = default)
{
return new ReturnFrameTime<T>(value, dueTimeFrame, frameProvider, cancellationToken);
}

public static Observable<Unit> NextFrame(CancellationToken cancellationToken = default)
{
return NextFrame(ObservableSystem.DefaultFrameProvider, cancellationToken);
}

public static Observable<Unit> NextFrame(FrameProvider frameProvider, CancellationToken cancellationToken = default)
{
return new NextFrame(frameProvider, cancellationToken);
}
}

internal sealed class ReturnFrame<T>(T value, FrameProvider frameProvider, CancellationToken cancellationToken) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
var runner = new ReturnFrameRunnerWorkItem(value, observer, cancellationToken);
frameProvider.Register(runner);
return runner;
}

sealed class ReturnFrameRunnerWorkItem(T value, Observer<T> observer, CancellationToken cancellationToken)
: CancellableFrameRunnerWorkItemBase<T>(observer, cancellationToken)
{
protected override bool MoveNextCore(long frameCount)
{
PublishOnNext(value);
PublishOnCompleted();
return false;
}
}
}

internal sealed class ReturnFrameTime<T>(T value, int dueTimeFrame, FrameProvider frameProvider, CancellationToken cancellationToken) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
var runner = new ReturnFrameTimeRunnerWorkItem(value, dueTimeFrame.NormalizeFrame(), observer, cancellationToken);
frameProvider.Register(runner);
return runner;
}

sealed class ReturnFrameTimeRunnerWorkItem(T value, int dueTimeFrame, Observer<T> observer, CancellationToken cancellationToken)
: CancellableFrameRunnerWorkItemBase<T>(observer, cancellationToken)
{
int currentFrame;

protected override bool MoveNextCore(long frameCount)
{
if (++currentFrame == dueTimeFrame)
{
PublishOnNext(value);
PublishOnCompleted();
return false;
}

return true;
}
}
}

internal sealed class NextFrame(FrameProvider frameProvider, CancellationToken cancellationToken) : Observable<Unit>
{
protected override IDisposable SubscribeCore(Observer<Unit> observer)
{
var runner = new NextFrameRunnerWorkItem(observer, frameProvider.GetFrameCount(), cancellationToken);
frameProvider.Register(runner);
return runner;
}

sealed class NextFrameRunnerWorkItem(Observer<Unit> observer, long startFrameCount, CancellationToken cancellationToken)
: CancellableFrameRunnerWorkItemBase<Unit>(observer, cancellationToken)
{
protected override bool MoveNextCore(long frameCount)
{
// same frame, skip
if (startFrameCount == frameCount)
{
return true;
}

PublishOnNext(default);
PublishOnCompleted();
return false;
}
}
}
17 changes: 8 additions & 9 deletions src/R3/Factories/TimerFrame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,39 +47,38 @@ protected override IDisposable SubscribeCore(Observer<Unit> observer)
return runner;
}

class SingleTimerFrameRunnerWorkItem(int dueTimeFrame, Observer<Unit> observer, CancellationToken cancellationToken)
sealed class SingleTimerFrameRunnerWorkItem(int dueTimeFrame, Observer<Unit> observer, CancellationToken cancellationToken)
: CancellableFrameRunnerWorkItemBase<Unit>(observer, cancellationToken)
{
int currentFrame;

protected override bool MoveNextCore()
protected override bool MoveNextCore(long _)
{
if (++currentFrame == dueTimeFrame)
{
observer.OnNext(default);
observer.OnCompleted();
Dispose();
PublishOnNext(default);
PublishOnCompleted();
return false;
}

return true;
}
}

class MultiTimerFrameRunnerWorkItem(int dueTimeFrame, int periodFrame, Observer<Unit> observer, CancellationToken cancellationToken)
sealed class MultiTimerFrameRunnerWorkItem(int dueTimeFrame, int periodFrame, Observer<Unit> observer, CancellationToken cancellationToken)
: CancellableFrameRunnerWorkItemBase<Unit>(observer, cancellationToken)
{
int currentFrame;
bool isPeriodPhase;

protected override bool MoveNextCore()
protected override bool MoveNextCore(long _)
{
// initial phase
if (!isPeriodPhase)
{
if (++currentFrame == dueTimeFrame)
{
observer.OnNext(default);
PublishOnNext(default);
isPeriodPhase = true;
currentFrame = 0;
}
Expand All @@ -89,7 +88,7 @@ protected override bool MoveNextCore()
// period phase
if (++currentFrame == periodFrame)
{
observer.OnNext(default);
PublishOnNext(default);
currentFrame = 0;
}
return true;
Expand Down
8 changes: 1 addition & 7 deletions src/R3/Factories/_EventFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,5 @@ public static partial class Observable
// AsUnitObservable
// AsUniResult
// AsNeverComplete

// TODO: use SystemDefault





}

30 changes: 27 additions & 3 deletions src/R3/Internal/CancellableFrameRunnerWorkItemBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// when Canceled, publish OnCompleted.
internal abstract class CancellableFrameRunnerWorkItemBase<T> : IFrameRunnerWorkItem, IDisposable
{
protected readonly Observer<T> observer;
readonly Observer<T> observer;
CancellationTokenRegistration cancellationTokenRegistration;
bool isDisposed;

Expand All @@ -29,10 +29,10 @@ public bool MoveNext(long frameCount)
return false;
}

return MoveNextCore();
return MoveNextCore(frameCount);
}

protected abstract bool MoveNextCore();
protected abstract bool MoveNextCore(long frameCount);

public void Dispose()
{
Expand All @@ -45,4 +45,28 @@ public void Dispose()
}

protected virtual void DisposeCore() { }

// Observer

protected void PublishOnNext(T value)
{
observer.OnNext(value);
}

protected void PublishOnErrorResume(Exception error)
{
observer.OnErrorResume(error);
}

protected void PublishOnCompleted(Exception error)
{
observer.OnCompleted(error);
Dispose();
}

protected void PublishOnCompleted()
{
observer.OnCompleted();
Dispose();
}
}
Loading

0 comments on commit 9a685d1

Please sign in to comment.