Skip to content

Commit

Permalink
ReplaySubject
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jan 1, 2024
1 parent 82e6f93 commit e199029
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 32 deletions.
27 changes: 8 additions & 19 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,23 @@
var logger = factory.CreateLogger<Program>();


var rp = new ReactiveProperty<int>(9999);
// dvar rp = new ReactiveProperty<int>(9999);



var ct = new CancellationTokenSource(1000);
ObservableSystem.DefaultFrameProvider = new ThreadSleepFrameProvider(60);
var rep = new System.Reactive.Subjects.ReplaySubject<int>();
//var rep = new System.Reactive.Subjects.BehaviorSubject<int>(10);

rep.OnNext(10);
rep.OnNext(100);
rep.OnNext(1000);
rep.OnCompleted();

// Enumerable.Empty<int>().ElementAtOrDefault(
rep.Subscribe(x => Console.WriteLine(x), () => Console.WriteLine("completed"));

var publisher = new System.Reactive.Subjects.Subject<int>();

var connectable = publisher.Multicast(new System.Reactive.Subjects.Subject<int>());


connectable.Subscribe(x => Console.WriteLine(x));

var d= connectable.Connect();


publisher.OnNext(100);

d.Dispose();


//var d2 = connectable.Connect();

publisher.OnNext(200);

public static class Extensions
{
Expand Down
2 changes: 2 additions & 0 deletions src/R3/ReactiveProperty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public void OnNext(T value)
{
if (completeState.IsCompleted) return;

this.value = value; // different from Subject<T>; set value before raise OnNext

foreach (var subscription in list.AsSpan())
{
subscription?.observer.OnNext(value);
Expand Down
187 changes: 174 additions & 13 deletions src/R3/ReplaySubject.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
namespace R3;

public sealed class ReplaySubject<T>
public sealed class ReplaySubject<T> : Observable<T>, ISubject<T>, IDisposable
{
public ReplaySubject()
: this(int.MaxValue, TimeSpan.MaxValue, ObservableSystem.DefaultTimeProvider)
{
}
readonly int bufferSize;
readonly TimeSpan window;
readonly TimeProvider? timeProvider;
readonly RingBuffer<(long timestamp, T value)> replayBuffer; // lock object

public ReplaySubject(TimeProvider timeProvider)
: this(int.MaxValue, TimeSpan.MaxValue, timeProvider)
{
}
// Subject
FreeListCore<Subscription> list;
CompleteState completeState;

public ReplaySubject(int bufferSize)
: this(bufferSize, TimeSpan.MaxValue, ObservableSystem.DefaultTimeProvider)
public ReplaySubject()
: this(int.MaxValue, TimeSpan.MaxValue, null!) // allow null internally
{
}

public ReplaySubject(int bufferSize, TimeProvider timeProvider)
: this(bufferSize, TimeSpan.MaxValue, timeProvider)
public ReplaySubject(int bufferSize)
: this(bufferSize, TimeSpan.MaxValue, null!)
{
}

Expand All @@ -35,6 +34,168 @@ public ReplaySubject(TimeSpan window, TimeProvider timeProvider)
// full constructor
public ReplaySubject(int bufferSize, TimeSpan window, TimeProvider timeProvider)
{
this.bufferSize = bufferSize;
this.window = window;
this.timeProvider = timeProvider;
this.replayBuffer = new RingBuffer<(long, T)>(bufferSize < 8 ? bufferSize : 8);
this.list = new FreeListCore<Subscription>(replayBuffer);
}

public bool IsDisposed => completeState.IsDisposed;

public void OnNext(T value)
{
if (completeState.IsCompleted) return;

lock (replayBuffer)
{
Trim();
replayBuffer.AddLast((timeProvider?.GetTimestamp() ?? 0, value));
}

foreach (var subscription in list.AsSpan())
{
subscription?.observer.OnNext(value);
}
}

public void OnErrorResume(Exception error)
{
if (completeState.IsCompleted) return;

foreach (var subscription in list.AsSpan())
{
subscription?.observer.OnErrorResume(error);
}
}

public void OnCompleted(Result result)
{
var status = completeState.TrySetResult(result);
if (status != CompleteState.ResultStatus.Done)
{
return; // already completed
}

foreach (var subscription in list.AsSpan())
{
subscription?.observer.OnCompleted(result);
}
}

protected override IDisposable SubscribeCore(Observer<T> observer)
{
// raise latest value on subscribe(before check completed add observer to list)
lock (replayBuffer)
{
Trim(); // Trim before get span
var dualSpan = replayBuffer.GetSpan();
foreach (ref readonly var item in dualSpan.First)
{
observer.OnNext(item.value);
}
foreach (ref readonly var item in dualSpan.Second)
{
observer.OnNext(item.value);
}
}

var result = completeState.TryGetResult();
if (result != null)
{
observer.OnCompleted(result.Value);
return Disposable.Empty;
}

var subscription = new Subscription(this, observer); // create subscription and add observer to list.

// need to check called completed during adding
result = completeState.TryGetResult();
if (result != null)
{
subscription.observer.OnCompleted(result.Value);
subscription.Dispose();
return Disposable.Empty;
}

return subscription;
}

public void Dispose()
{
Dispose(true);
}

public void Dispose(bool callOnCompleted)
{
if (completeState.TrySetDisposed(out var alreadyCompleted))
{
if (callOnCompleted && !alreadyCompleted)
{
// not yet disposed so can call list iteration
foreach (var subscription in list.AsSpan())
{
subscription?.observer.OnCompleted();
}
}

list.Dispose();
lock (replayBuffer)
{
replayBuffer.Clear();
}
}
}

void Trim()
{
// Trim by Count
while (replayBuffer.Count > bufferSize)
{
replayBuffer.RemoveFirst();
}

// Trim by Time
if (timeProvider != null)
{
var now = timeProvider.GetTimestamp();
while (replayBuffer.Count > 0)
{
var value = replayBuffer[0]; // peek first
var elapsed = timeProvider.GetElapsedTime(value.timestamp, now);
if (elapsed >= window)
{
replayBuffer.RemoveFirst();
}
else
{
break;
}
}
}
}

sealed class Subscription : IDisposable
{
public readonly Observer<T> observer;
readonly int removeKey;
ReplaySubject<T>? parent;

public Subscription(ReplaySubject<T> parent, Observer<T> observer)
{
this.parent = parent;
this.observer = observer;
parent.list.Add(this, out removeKey); // for the thread-safety, add and set removeKey in same lock.
}

public void Dispose()
{
var p = Interlocked.Exchange(ref parent, null);
if (p == null) return;

// removeKey is index, will reuse if remove completed so only allows to call from here and must not call twice.
p.list.Remove(removeKey);
}
}
}

Expand Down
33 changes: 33 additions & 0 deletions tests/R3.Tests/ReactivePropertyTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace R3.Tests;

public class ReactivePropertyTest
{
[Fact]
public void Test()
{
var rp = new ReactiveProperty<int>(100);
rp.Value.Should().Be(100);

var list = rp.ToLiveList();
list.AssertEqual([100]);

rp.Value = 9999;

var list2 = rp.ToLiveList();
list.AssertEqual([100, 9999]);
list2.AssertEqual([9999]);

rp.Value = 9999;
list.AssertEqual([100, 9999]);
list2.AssertEqual([9999]);

rp.Value = 100;
list.AssertEqual([100, 9999, 100]);
list2.AssertEqual([9999, 100]);

rp.Dispose();

list.AssertIsCompleted();
list2.AssertIsCompleted();
}
}
98 changes: 98 additions & 0 deletions tests/R3.Tests/ReplaySubjectTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
namespace R3.Tests;

public class ReplaySubjectTest
{
[Fact]
public void ReplayAll()
{
var subject = new ReplaySubject<int>();
foreach (var i in Enumerable.Range(0, 100))
{
subject.OnNext(i);
}

var list = subject.ToLiveList();
list.AssertEqual(Enumerable.Range(0, 100).ToArray());

list.Clear();
subject.OnNext(9);
list.AssertEqual([9]);

subject.OnCompleted();
list.AssertIsCompleted();

var list2 = subject.ToLiveList();

list2.AssertEqual(Enumerable.Range(0, 100).Append(9).ToArray());
list2.AssertIsCompleted();
}

[Fact]
public void ReplayCount()
{
var subject = new ReplaySubject<int>(bufferSize: 50);
foreach (var i in Enumerable.Range(0, 50))
{
subject.OnNext(i);
}

{
using var list = subject.ToLiveList();
list.AssertEqual(Enumerable.Range(0, 50).ToArray());
}
{
subject.OnNext(100);
subject.OnNext(101);
subject.OnNext(102);

using var list = subject.ToLiveList();
list.AssertEqual(Enumerable.Range(0, 50).Skip(3).Concat([100, 101, 102]).ToArray());
}

subject.OnCompleted();

{
using var list = subject.ToLiveList();
list.AssertEqual(Enumerable.Range(0, 50).Skip(3).Concat([100, 101, 102]).ToArray());
list.AssertIsCompleted();
}
}

[Fact]
public void ReplayTime()
{
var fakeTime = new FakeTimeProvider();

var subject = new ReplaySubject<int>(TimeSpan.FromSeconds(3), fakeTime);

subject.OnNext(10);
fakeTime.Advance(TimeSpan.FromSeconds(1));
subject.ToLiveList().AssertEqual([10]);

subject.OnNext(20);
fakeTime.Advance(TimeSpan.FromSeconds(1));
subject.ToLiveList().AssertEqual([10, 20]);

subject.OnNext(30);
fakeTime.Advance(TimeSpan.FromSeconds(1));

var list = subject.ToLiveList();
subject.ToLiveList().AssertEqual([20, 30]);

subject.OnNext(40);
subject.OnNext(50);
subject.OnNext(60);

fakeTime.Advance(TimeSpan.FromSeconds(2));
subject.OnNext(70);

subject.ToLiveList().AssertEqual([40, 50, 60, 70]);
fakeTime.Advance(TimeSpan.FromSeconds(1));

subject.ToLiveList().AssertEqual([70]);

subject.OnCompleted();
subject.ToLiveList().AssertEqual([70]);
subject.ToLiveList().AssertIsCompleted();
}
}

0 comments on commit e199029

Please sign in to comment.