diff --git a/src/R3/Factories/EveryUpdate.cs b/src/R3/Factories/EveryUpdate.cs index 54f3c1bb..655a6fb2 100644 --- a/src/R3/Factories/EveryUpdate.cs +++ b/src/R3/Factories/EveryUpdate.cs @@ -38,9 +38,9 @@ protected override IDisposable SubscribeCore(Observer observer) class EveryUpdateRunnerWorkItem(Observer observer, CancellationToken cancellationToken) : CancellableFrameRunnerWorkItemBase(observer, cancellationToken) { - protected override bool MoveNextCore() + protected override bool MoveNextCore(long _) { - observer.OnNext(default); + PublishOnNext(default); return true; } } diff --git a/src/R3/Factories/Return.cs b/src/R3/Factories/Return.cs index 025672cc..d41e2ccc 100644 --- a/src/R3/Factories/Return.cs +++ b/src/R3/Factories/Return.cs @@ -24,9 +24,31 @@ public static Observable Return(T value, TimeSpan dueTime, TimeProvider ti return new Return(value, dueTime, timeProvider); // use ITimer } + + // Optimized case + + public static Observable ReturnUnit() + { + return R3.ReturnUnit.Instance; // singleton + } + + public static Observable Return(Unit value) + { + return R3.ReturnUnit.Instance; + } + + public static Observable Return(bool value) + { + return value ? ReturnBoolean.True : ReturnBoolean.False; // singleton + } + + public static Observable Return(int value) + { + return ReturnInt32.GetObservable(value); // -1~9 singleton + } } -internal class Return(T value, TimeSpan dueTime, TimeProvider timeProvider) : Observable +internal sealed class Return(T value, TimeSpan dueTime, TimeProvider timeProvider) : Observable { protected override IDisposable SubscribeCore(Observer observer) { @@ -60,7 +82,7 @@ public void Dispose() } } -internal class ImmediateScheduleReturn(T value) : Observable +internal sealed class ImmediateScheduleReturn(T value) : Observable { protected override IDisposable SubscribeCore(Observer observer) { @@ -70,7 +92,7 @@ protected override IDisposable SubscribeCore(Observer observer) } } -internal class ThreadPoolScheduleReturn(T value) : Observable +internal sealed class ThreadPoolScheduleReturn(T value) : Observable { protected override IDisposable SubscribeCore(Observer observer) { @@ -97,3 +119,89 @@ public void Dispose() } } } + +// Optimized case + +internal sealed class ReturnUnit : Observable +{ + internal static readonly Observable Instance = new ReturnUnit(); + + ReturnUnit() + { + } + + protected override IDisposable SubscribeCore(Observer observer) + { + observer.OnNext(default); + observer.OnCompleted(); + return Disposable.Empty; + } +} + +internal sealed class ReturnBoolean : Observable +{ + internal static readonly Observable True = new ReturnBoolean(true); + internal static readonly Observable False = new ReturnBoolean(false); + + bool value; + + ReturnBoolean(bool value) + { + this.value = value; + } + + protected override IDisposable SubscribeCore(Observer observer) + { + observer.OnNext(value); + observer.OnCompleted(); + return Disposable.Empty; + } +} + +internal sealed class ReturnInt32 : Observable +{ + internal static readonly Observable _m1 = new ReturnInt32(-1); + internal static readonly Observable _0 = new ReturnInt32(0); + internal static readonly Observable _1 = new ReturnInt32(1); + internal static readonly Observable _2 = new ReturnInt32(2); + internal static readonly Observable _3 = new ReturnInt32(3); + internal static readonly Observable _4 = new ReturnInt32(4); + internal static readonly Observable _5 = new ReturnInt32(5); + internal static readonly Observable _6 = new ReturnInt32(6); + internal static readonly Observable _7 = new ReturnInt32(7); + internal static readonly Observable _8 = new ReturnInt32(8); + internal static readonly Observable _9 = new ReturnInt32(9); + + public static Observable GetObservable(int value) + { + switch (value) + { + case -1: return _m1; + case 0: return _0; + case 1: return _1; + case 2: return _2; + case 3: return _3; + case 4: return _4; + case 5: return _5; + case 6: return _6; + case 7: return _7; + case 8: return _8; + case 9: return _9; + default: return new ReturnInt32(value); + } + } + + int value; + + ReturnInt32(int value) + { + this.value = value; + } + + protected override IDisposable SubscribeCore(Observer observer) + { + observer.OnNext(value); + observer.OnCompleted(); + return Disposable.Empty; + } +} diff --git a/src/R3/Factories/ReturnFrame.cs b/src/R3/Factories/ReturnFrame.cs new file mode 100644 index 00000000..ea0dd965 --- /dev/null +++ b/src/R3/Factories/ReturnFrame.cs @@ -0,0 +1,120 @@ +namespace R3; + +public static partial class Observable +{ + public static Observable ReturnUnitFrame(CancellationToken cancellationToken = default) + { + return ReturnFrame(Unit.Default, ObservableSystem.DefaultFrameProvider, cancellationToken); + } + + public static Observable ReturnUnitFrame(FrameProvider frameProvider, CancellationToken cancellationToken = default) + { + return ReturnFrame(Unit.Default, frameProvider, cancellationToken); + } + + public static Observable ReturnFrame(T value, CancellationToken cancellationToken = default) + { + return ReturnFrame(value, ObservableSystem.DefaultFrameProvider, cancellationToken); + } + + public static Observable ReturnFrame(T value, FrameProvider frameProvider, CancellationToken cancellationToken = default) + { + return new ReturnFrame(value, frameProvider, cancellationToken); + } + + public static Observable ReturnFrame(T value, int dueTimeFrame, CancellationToken cancellationToken = default) + { + return ReturnFrame(value, dueTimeFrame, ObservableSystem.DefaultFrameProvider, cancellationToken); + } + + public static Observable ReturnFrame(T value, int dueTimeFrame, FrameProvider frameProvider, CancellationToken cancellationToken = default) + { + return new ReturnFrameTime(value, dueTimeFrame, frameProvider, cancellationToken); + } + + public static Observable NextFrame(CancellationToken cancellationToken = default) + { + return NextFrame(ObservableSystem.DefaultFrameProvider, cancellationToken); + } + + public static Observable NextFrame(FrameProvider frameProvider, CancellationToken cancellationToken = default) + { + return new NextFrame(frameProvider, cancellationToken); + } +} + +internal sealed class ReturnFrame(T value, FrameProvider frameProvider, CancellationToken cancellationToken) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + var runner = new ReturnFrameRunnerWorkItem(value, observer, cancellationToken); + frameProvider.Register(runner); + return runner; + } + + sealed class ReturnFrameRunnerWorkItem(T value, Observer observer, CancellationToken cancellationToken) + : CancellableFrameRunnerWorkItemBase(observer, cancellationToken) + { + protected override bool MoveNextCore(long frameCount) + { + PublishOnNext(value); + PublishOnCompleted(); + return false; + } + } +} + +internal sealed class ReturnFrameTime(T value, int dueTimeFrame, FrameProvider frameProvider, CancellationToken cancellationToken) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + var runner = new ReturnFrameTimeRunnerWorkItem(value, dueTimeFrame.NormalizeFrame(), observer, cancellationToken); + frameProvider.Register(runner); + return runner; + } + + sealed class ReturnFrameTimeRunnerWorkItem(T value, int dueTimeFrame, Observer observer, CancellationToken cancellationToken) + : CancellableFrameRunnerWorkItemBase(observer, cancellationToken) + { + int currentFrame; + + protected override bool MoveNextCore(long frameCount) + { + if (++currentFrame == dueTimeFrame) + { + PublishOnNext(value); + PublishOnCompleted(); + return false; + } + + return true; + } + } +} + +internal sealed class NextFrame(FrameProvider frameProvider, CancellationToken cancellationToken) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + var runner = new NextFrameRunnerWorkItem(observer, frameProvider.GetFrameCount(), cancellationToken); + frameProvider.Register(runner); + return runner; + } + + sealed class NextFrameRunnerWorkItem(Observer observer, long startFrameCount, CancellationToken cancellationToken) + : CancellableFrameRunnerWorkItemBase(observer, cancellationToken) + { + protected override bool MoveNextCore(long frameCount) + { + // same frame, skip + if (startFrameCount == frameCount) + { + return true; + } + + PublishOnNext(default); + PublishOnCompleted(); + return false; + } + } +} diff --git a/src/R3/Factories/TimerFrame.cs b/src/R3/Factories/TimerFrame.cs index 937bb139..01c5d5a1 100644 --- a/src/R3/Factories/TimerFrame.cs +++ b/src/R3/Factories/TimerFrame.cs @@ -47,18 +47,17 @@ protected override IDisposable SubscribeCore(Observer observer) return runner; } - class SingleTimerFrameRunnerWorkItem(int dueTimeFrame, Observer observer, CancellationToken cancellationToken) + sealed class SingleTimerFrameRunnerWorkItem(int dueTimeFrame, Observer observer, CancellationToken cancellationToken) : CancellableFrameRunnerWorkItemBase(observer, cancellationToken) { int currentFrame; - protected override bool MoveNextCore() + protected override bool MoveNextCore(long _) { if (++currentFrame == dueTimeFrame) { - observer.OnNext(default); - observer.OnCompleted(); - Dispose(); + PublishOnNext(default); + PublishOnCompleted(); return false; } @@ -66,20 +65,20 @@ protected override bool MoveNextCore() } } - class MultiTimerFrameRunnerWorkItem(int dueTimeFrame, int periodFrame, Observer observer, CancellationToken cancellationToken) + sealed class MultiTimerFrameRunnerWorkItem(int dueTimeFrame, int periodFrame, Observer observer, CancellationToken cancellationToken) : CancellableFrameRunnerWorkItemBase(observer, cancellationToken) { int currentFrame; bool isPeriodPhase; - protected override bool MoveNextCore() + protected override bool MoveNextCore(long _) { // initial phase if (!isPeriodPhase) { if (++currentFrame == dueTimeFrame) { - observer.OnNext(default); + PublishOnNext(default); isPeriodPhase = true; currentFrame = 0; } @@ -89,7 +88,7 @@ protected override bool MoveNextCore() // period phase if (++currentFrame == periodFrame) { - observer.OnNext(default); + PublishOnNext(default); currentFrame = 0; } return true; diff --git a/src/R3/Factories/_EventFactory.cs b/src/R3/Factories/_EventFactory.cs index c3816c1e..9abbe229 100644 --- a/src/R3/Factories/_EventFactory.cs +++ b/src/R3/Factories/_EventFactory.cs @@ -19,11 +19,5 @@ public static partial class Observable // AsUnitObservable // AsUniResult // AsNeverComplete - - // TODO: use SystemDefault - - - - - } + diff --git a/src/R3/Internal/CancellableFrameRunnerWorkItemBase.cs b/src/R3/Internal/CancellableFrameRunnerWorkItemBase.cs index a44a0019..4b6b907f 100644 --- a/src/R3/Internal/CancellableFrameRunnerWorkItemBase.cs +++ b/src/R3/Internal/CancellableFrameRunnerWorkItemBase.cs @@ -3,7 +3,7 @@ // when Canceled, publish OnCompleted. internal abstract class CancellableFrameRunnerWorkItemBase : IFrameRunnerWorkItem, IDisposable { - protected readonly Observer observer; + readonly Observer observer; CancellationTokenRegistration cancellationTokenRegistration; bool isDisposed; @@ -29,10 +29,10 @@ public bool MoveNext(long frameCount) return false; } - return MoveNextCore(); + return MoveNextCore(frameCount); } - protected abstract bool MoveNextCore(); + protected abstract bool MoveNextCore(long frameCount); public void Dispose() { @@ -45,4 +45,28 @@ public void Dispose() } protected virtual void DisposeCore() { } + + // Observer + + protected void PublishOnNext(T value) + { + observer.OnNext(value); + } + + protected void PublishOnErrorResume(Exception error) + { + observer.OnErrorResume(error); + } + + protected void PublishOnCompleted(Exception error) + { + observer.OnCompleted(error); + Dispose(); + } + + protected void PublishOnCompleted() + { + observer.OnCompleted(); + Dispose(); + } } diff --git a/tests/R3.Tests/FactoryTests/ReturnFrameTest.cs b/tests/R3.Tests/FactoryTests/ReturnFrameTest.cs new file mode 100644 index 00000000..5f04d0f9 --- /dev/null +++ b/tests/R3.Tests/FactoryTests/ReturnFrameTest.cs @@ -0,0 +1,95 @@ +using System.Runtime.InteropServices; + +namespace R3.Tests.FactoryTests; + +public class ReturnFrameTest +{ + [Fact] + public void UnitTest() + { + var frameProvider = new ManualFrameProvider(); + var cts = new CancellationTokenSource(); + + var list = Observable.ReturnUnitFrame(frameProvider, cts.Token).ToLiveList(); + list.AssertIsNotCompleted(); + + frameProvider.Advance(); + list.AssertIsCompleted(); + list.AssertEqual([Unit.Default]); + } + + [Fact] + public void ValueTest() + { + { + var frameProvider = new ManualFrameProvider(); + var cts = new CancellationTokenSource(); + + var list = Observable.ReturnFrame(10, frameProvider, cts.Token).ToLiveList(); + list.AssertIsNotCompleted(); + + frameProvider.Advance(); + list.AssertIsCompleted(); + list.AssertEqual([10]); + } + { + var frameProvider = new ManualFrameProvider(); + var cts = new CancellationTokenSource(); + + var list = Observable.ReturnFrame(10, frameProvider, cts.Token).ToLiveList(); + list.AssertIsNotCompleted(); + + cts.Cancel(); + list.AssertIsCompleted(); + } + } + + [Fact] + public void TimeTest() + { + var frameProvider = new ManualFrameProvider(); + var cts = new CancellationTokenSource(); + + var list = Observable.ReturnFrame(10, 5, frameProvider, cts.Token).ToLiveList(); + list.AssertIsNotCompleted(); + + frameProvider.Advance(4); + list.AssertIsNotCompleted(); + + frameProvider.Advance(1); + list.AssertIsCompleted(); + list.AssertEqual([10]); + } + + [Fact] + public void NextFrameTest() + { + { + var frameProvider = new ManualFrameProvider(); + var cts = new CancellationTokenSource(); + + var list = Observable.NextFrame(frameProvider, cts.Token).ToLiveList(); + list.AssertIsNotCompleted(); + + frameProvider.Advance(1); // same frame, not run. + list.AssertIsNotCompleted(); + + frameProvider.Advance(1); // diffrent frame, ok to run. + list.AssertIsCompleted(); + list.AssertEqual(Unit.Default); + } + { + var frameProvider = new ManualFrameProvider(); // use custom fake + var cts = new CancellationTokenSource(); + + var list = Observable.ReturnUnitFrame(frameProvider, cts.Token).ToLiveList(); + list.AssertIsNotCompleted(); + + // ReturnFrame run same frame. + frameProvider.Advance(1); + + list.AssertIsCompleted(); + list.AssertEqual(Unit.Default); + } + } +} diff --git a/tests/R3.Tests/FactoryTests/ReturnTest.cs b/tests/R3.Tests/FactoryTests/ReturnTest.cs index 93079cbe..7a0b11c4 100644 --- a/tests/R3.Tests/FactoryTests/ReturnTest.cs +++ b/tests/R3.Tests/FactoryTests/ReturnTest.cs @@ -70,4 +70,25 @@ public void ReturnOnCompleted() list.AssertIsCompleted(); } } + + [Fact] + public void Optimized() + { + for (int i = -10; i < 100; i++) + { + using var list = Observable.Return(i).ToLiveList(); // int optimized + list.AssertEqual([i]); + list.AssertIsCompleted(); + } + + foreach (var item in new bool[] { true, false }) + { + using var list = Observable.Return(item).ToLiveList(); // bool optimized + list.AssertEqual([item]); + list.AssertIsCompleted(); + } + + Observable.Return(Unit.Default).ToLiveList().AssertEqual([Unit.Default]); + Observable.ReturnUnit().ToLiveList().AssertEqual([Unit.Default]); + } }