Skip to content

Commit

Permalink
DelaySubscription, DelaySubscriptionFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jan 3, 2024
1 parent 3ca76bb commit b7d78c7
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 4 deletions.
80 changes: 80 additions & 0 deletions src/R3/Operators/DelaySubscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<T> DelaySubscription<T>(this Observable<T> source, TimeSpan dueTime)
{
return new DelaySubscription<T>(source, dueTime, ObservableSystem.DefaultTimeProvider);
}

public static Observable<T> DelaySubscription<T>(this Observable<T> source, TimeSpan dueTime, TimeProvider timeProvider)
{
return new DelaySubscription<T>(source, dueTime, timeProvider);
}
}

internal sealed class DelaySubscription<T>(Observable<T> source, TimeSpan dueTime, TimeProvider timeProvider) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return new _DelaySubscription(observer, source, dueTime.Normalize(), timeProvider).Run();
}

sealed class _DelaySubscription : Observer<T>
{
static readonly TimerCallback timerCallback = Subscribe;

readonly Observer<T> observer;
readonly Observable<T> source;
readonly TimeSpan dueTime;
readonly ITimer timer;

public _DelaySubscription(Observer<T> observer, Observable<T> source, TimeSpan dueTime, TimeProvider timeProvider)
{
this.observer = observer;
this.source = source;
this.dueTime = dueTime;
this.timer = timeProvider.CreateStoppedTimer(timerCallback, this);
}

public IDisposable Run()
{
timer.InvokeOnce(dueTime);
return this;
}

protected override void OnNextCore(T value)
{
observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}

protected override void DisposeCore()
{
timer.Dispose();
}

static void Subscribe(object? state)
{
var self = (_DelaySubscription)state!;
try
{
self.source.Subscribe(self); // subscribe self.
}
catch (Exception ex)
{
ObservableSystem.GetUnhandledExceptionHandler().Invoke(ex);
self.Dispose();
}
}
}
}
81 changes: 81 additions & 0 deletions src/R3/Operators/DelaySubscriptionFrame.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<T> DelaySubscriptionFrame<T>(this Observable<T> source, int frameCount)
{
return new DelaySubscriptionFrame<T>(source, frameCount, ObservableSystem.DefaultFrameProvider);
}

public static Observable<T> DelaySubscriptionFrame<T>(this Observable<T> source, int frameCount, FrameProvider frameProvider)
{
return new DelaySubscriptionFrame<T>(source, frameCount, frameProvider);
}
}

internal sealed class DelaySubscriptionFrame<T>(Observable<T> source, int frameCount, FrameProvider frameProvider) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return new _DelaySubscription(observer, source, frameCount.NormalizeFrame(), frameProvider).Run();
}

sealed class _DelaySubscription : Observer<T>, IFrameRunnerWorkItem
{
readonly Observer<T> observer;
readonly Observable<T> source;
readonly int frameCount;
readonly FrameProvider frameProvider;
int currentFrame;

public _DelaySubscription(Observer<T> observer, Observable<T> source, int frameCount, FrameProvider frameProvider)
{
this.observer = observer;
this.source = source;
this.frameCount = frameCount;
this.frameProvider = frameProvider;
}

public IDisposable Run()
{
frameProvider.Register(this);
return this;
}

protected override void OnNextCore(T value)
{
observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}

bool IFrameRunnerWorkItem.MoveNext(long _)
{
if (IsDisposed) return false;

if (++currentFrame == frameCount)
{
try
{
source.Subscribe(this); // subscribe self.
}
catch (Exception ex)
{
ObservableSystem.GetUnhandledExceptionHandler().Invoke(ex);
Dispose();
}
return false;
}

return true;
}
}
}
5 changes: 1 addition & 4 deletions src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ public static partial class ObservableExtensions
{
}


// TODO: this is working space, will remove this file after complete.

// Time based
// Delay, DelaySubscription
// + frame variation
// Delay, DelayFrame

// Rx Merging:
// CombineLatest, Zip, WithLatestFrom, ZipLatest, Switch, Pairwise
Expand All @@ -20,4 +18,3 @@ public static partial class ObservableExtensions
// return tasks:
// All, Any, Contains, SequenceEqual, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup,


70 changes: 70 additions & 0 deletions tests/R3.Tests/OperatorTests/DelaySubscriptionTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
namespace R3.Tests.OperatorTests;

public class DelaySubscriptionTest
{
[Fact]
public void DelaySubscription()
{
var provider = new FakeTimeProvider();

var publisher = new Subject<int>();
var subscribed = false;
var list = publisher
.Do(onSubscribe: () => subscribed = true)
.DelaySubscription(TimeSpan.FromSeconds(3), provider)
.ToLiveList();

subscribed.Should().BeFalse();
publisher.OnNext(1);
list.AssertEqual([]);

provider.Advance(TimeSpan.FromSeconds(2));

subscribed.Should().BeFalse();
publisher.OnNext(2);
list.AssertEqual([]);

provider.Advance(TimeSpan.FromSeconds(1));

subscribed.Should().BeTrue();
publisher.OnNext(3);
list.AssertEqual([3]);

publisher.OnCompleted();

list.AssertIsCompleted();
}

[Fact]
public void DelaySubscriptionFrame()
{
var provider = new ManualFrameProvider();

var publisher = new Subject<int>();
var subscribed = false;
var list = publisher
.Do(onSubscribe: () => subscribed = true)
.DelaySubscriptionFrame(3, provider)
.ToLiveList();

subscribed.Should().BeFalse();
publisher.OnNext(1);
list.AssertEqual([]);

provider.Advance(2);

subscribed.Should().BeFalse();
publisher.OnNext(2);
list.AssertEqual([]);

provider.Advance(1);

subscribed.Should().BeTrue();
publisher.OnNext(3);
list.AssertEqual([3]);

publisher.OnCompleted();

list.AssertIsCompleted();
}
}

0 comments on commit b7d78c7

Please sign in to comment.