Skip to content

Commit

Permalink
Timer overload
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 22, 2023
1 parent 5961fa7 commit 16766c0
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 17 deletions.
4 changes: 3 additions & 1 deletion sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@

var range = System.Reactive.Linq.Observable.Range(1, 10);



// range.TakeLast(


var publisher = new R3.Subject<int>();
// var publisher = new R3.Subject<int>();
//publisher.PublishOnNext(1);

// publisher.Subscribe(new object(), (x,y) => y
Expand Down
111 changes: 101 additions & 10 deletions src/R3/Factories/Timer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,117 @@

public static partial class Observable
{
// TODO: No Provider overload?
// TODO: test
// TODO: Interval

public static Observable<Unit> Timer(TimeSpan dueTime, TimeProvider timeProvider)

public static Observable<Unit> Timer(TimeSpan dueTime, CancellationToken cancellationToken = default)
{
return Timer(dueTime, ObservableSystem.DefaultTimeProvider, cancellationToken);
}

public static Observable<Unit> Timer(DateTimeOffset dueTime, CancellationToken cancellationToken = default)
{
return Timer(dueTime, ObservableSystem.DefaultTimeProvider, cancellationToken);
}

public static Observable<Unit> Timer(TimeSpan dueTime, TimeSpan period, CancellationToken cancellationToken = default)
{
return Timer(dueTime, period, ObservableSystem.DefaultTimeProvider, cancellationToken);
}

public static Observable<Unit> Timer(DateTimeOffset dueTime, TimeSpan period, CancellationToken cancellationToken = default)
{
return Timer(dueTime, period, ObservableSystem.DefaultTimeProvider, cancellationToken);
}

public static Observable<Unit> Timer(TimeSpan dueTime, TimeProvider timeProvider, CancellationToken cancellationToken = default)
{
return new Timer(dueTime, null, timeProvider, cancellationToken);
}

public static Observable<Unit> Timer(DateTimeOffset dueTime, TimeProvider timeProvider, CancellationToken cancellationToken = default)
{
return new Timer(dueTime, null, timeProvider, cancellationToken);
}

public static Observable<Unit> Timer(TimeSpan dueTime, TimeSpan period, TimeProvider timeProvider, CancellationToken cancellationToken = default)
{
return new Timer(dueTime, period, timeProvider, cancellationToken);
}

public static Observable<Unit> Timer(DateTimeOffset dueTime, TimeSpan period, TimeProvider timeProvider, CancellationToken cancellationToken = default)
{
return new Timer(dueTime, timeProvider);
return new Timer(dueTime, period, timeProvider, cancellationToken);
}
}

internal sealed class Timer : Observable<Unit>
{
readonly TimeSpan dueTime;
readonly TimeSpan? dueTime1;
readonly DateTimeOffset? dueTime2;
readonly TimeSpan? period;
readonly TimeProvider timeProvider;
readonly CancellationToken cancellationToken;

public Timer(TimeSpan dueTime, TimeProvider timeProvider)
public Timer(TimeSpan dueTime, TimeSpan? period, TimeProvider timeProvider, CancellationToken cancellationToken)
{
this.dueTime = dueTime;
this.dueTime1 = dueTime;
this.period = period;
this.timeProvider = timeProvider;
this.cancellationToken = cancellationToken;
}

public Timer(DateTimeOffset dueTime, TimeSpan? period, TimeProvider timeProvider, CancellationToken cancellationToken)
{
this.dueTime2 = dueTime;
this.period = period;
this.timeProvider = timeProvider;
this.cancellationToken = cancellationToken;
}

protected override IDisposable SubscribeCore(Observer<Unit> observer)
{
var callback = (period == null) ? _Timer.singleTimerCallback : _Timer.periodicTimerCallback;
var method = new _Timer(observer);
method.Timer = timeProvider.CreateStoppedTimer(_Timer.timerCallback, method);
method.Timer.InvokeOnce(dueTime);
method.Timer = timeProvider.CreateStoppedTimer(callback, method);

var dueTime = (dueTime1 != null)
? dueTime1.Value
: dueTime2!.Value - timeProvider.GetUtcNow();

if (cancellationToken.CanBeCanceled)
{
method.cancellationTokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
var s = (_Timer)state!;
s.CompleteDispose();
}, method);
}

if (period == null)
{
method.Timer.InvokeOnce(dueTime.Normalize());
}
else
{
method.Timer.Change(dueTime.Normalize(), period.Value);
}

return method;
}

sealed class _Timer(Observer<Unit> observer) : IDisposable
{
public static readonly TimerCallback timerCallback = NextTick;
public static readonly TimerCallback singleTimerCallback = SingleTick;
public static readonly TimerCallback periodicTimerCallback = PeriodicTick;

internal CancellationTokenRegistration cancellationTokenRegistration;
Observer<Unit> observer = observer;

public ITimer? Timer { get; set; }

static void NextTick(object? state)
static void SingleTick(object? state)
{
var self = (_Timer)state!;
try
Expand All @@ -51,8 +126,24 @@ static void NextTick(object? state)
}
}

static void PeriodicTick(object? state)
{
var self = (_Timer)state!;
lock (self)
{
self.observer.OnNext(default);
}
}

public void CompleteDispose()
{
observer.OnCompleted();
Dispose();
}

public void Dispose()
{
cancellationTokenRegistration.Dispose();
Timer?.Dispose();
Timer = null;
}
Expand Down
6 changes: 0 additions & 6 deletions src/R3/Factories/_EventFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ public static partial class Observable


// ToAsyncEnumerable?
// ToEvent
// ToEventPattern



// AsObservable
// AsSingleUnitObservable
Expand All @@ -26,7 +22,5 @@ public static partial class Observable




}


9 changes: 9 additions & 0 deletions src/R3/Internal/TimeSpanExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace R3.Internal;

internal static class TimeSpanExtensions
{
public static TimeSpan Normalize(this TimeSpan timeSpan)
{
return timeSpan >= TimeSpan.Zero ? timeSpan : TimeSpan.Zero;
}
}

0 comments on commit 16766c0

Please sign in to comment.