Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jan 1, 2024
1 parent c0ffd65 commit 961978d
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 154 deletions.
6 changes: 6 additions & 0 deletions src/R3/ConnectableObservable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace R3;

public abstract class ConnectableObservable<T> : Observable<T>
{
public abstract IDisposable Connect();
}
97 changes: 97 additions & 0 deletions src/R3/Operators/Multicast.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
namespace R3;

public static partial class ObservableExtensions
{
// Multicast, Publish, Replay, Share

public static ConnectableObservable<T> Multicast<T>(this Observable<T> source, ISubject<T> subject)
{
return new Multicast<T>(source, subject);
}

public static ConnectableObservable<T> Publish<T>(this Observable<T> source)
{
return source.Multicast(new Subject<T>());
}

public static ConnectableObservable<T> Publish<T>(this Observable<T> source, T initialValue)
{
return source.Multicast(new ReactiveProperty<T>(initialValue, equalityComparer: null));
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source)
{
return source.Multicast(new ReplaySubject<T>());
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source, int bufferSize)
{
return source.Multicast(new ReplaySubject<T>(bufferSize));
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source, TimeSpan window)
{
return source.Multicast(new ReplaySubject<T>(window));
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source, TimeSpan window, TimeProvider timeProvider)
{
return source.Multicast(new ReplaySubject<T>(window, timeProvider));
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source, int bufferSize, TimeSpan window)
{
return source.Multicast(new ReplaySubject<T>(bufferSize, window));
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source, int bufferSize, TimeSpan window, TimeProvider timeProvider)
{
return source.Multicast(new ReplaySubject<T>(bufferSize, window, timeProvider));
}

public static Observable<T> Share<T>(this Observable<T> source)
{
return source.Publish().RefCount();
}
}

internal sealed class Multicast<T>(Observable<T> source, ISubject<T> subject) : ConnectableObservable<T>
{
readonly object gate = new object();
Connection? connection;

public override IDisposable Connect()
{
lock (gate)
{
if (connection == null)
{
var subscription = source.Subscribe(subject.AsObserver());
connection = new Connection(this, subscription);
}

return connection;
}
}

protected override IDisposable SubscribeCore(Observer<T> observer)
{
return subject.Subscribe(observer.Wrap());
}

sealed class Connection(Multicast<T> parent, IDisposable? subscription) : IDisposable
{
public void Dispose()
{
lock (parent.gate)
{
if (subscription != null)
{
subscription.Dispose();
subscription = null;
parent.connection = null;
}
}
}
}
}
59 changes: 59 additions & 0 deletions src/R3/Operators/RefCount.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<T> RefCount<T>(this ConnectableObservable<T> source)
{
return new RefCount<T>(source);
}
}

internal sealed class RefCount<T>(ConnectableObservable<T> source) : Observable<T>
{
readonly object gate = new object();
int refCount = 0;
IDisposable? connection;

protected override IDisposable SubscribeCore(Observer<T> observer)
{
lock (gate)
{
var subcription = source.Subscribe(new _RefCount(this, observer));
if (++refCount == 1)
{
connection = source.Connect();
}
return subcription;
}
}

sealed class _RefCount(RefCount<T> parent, Observer<T> observer) : Observer<T>
{
protected override void OnNextCore(T value)
{
observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}

protected override void DisposeCore()
{
lock (parent.gate)
{
if (--parent.refCount == 0)
{
parent.connection?.Dispose();
parent.connection = null;
}
}
}
}
}
154 changes: 0 additions & 154 deletions src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,158 +28,4 @@ public static partial class ObservableExtensions

// return tasks:
// All, Any, Contains, SequenceEqual, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup,

// Multicast
// Multicast, Publish, Replay, RefCount, Share(Publish().RefCount())

public static ConnectableObservable<T> Multicast<T>(this Observable<T> source, ISubject<T> subject)
{
return new Multicast<T>(source, subject);
}

public static ConnectableObservable<T> Publish<T>(this Observable<T> source)
{
return source.Multicast(new Subject<T>());
}

public static ConnectableObservable<T> Publish<T>(this Observable<T> source, T initialValue)
{
return source.Multicast(new ReactiveProperty<T>(initialValue, equalityComparer: null));
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source)
{
return source.Multicast(new ReplaySubject<T>());
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source, int bufferSize)
{
return source.Multicast(new ReplaySubject<T>(bufferSize));
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source, TimeSpan window)
{
return source.Multicast(new ReplaySubject<T>(window));
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source, TimeSpan window, TimeProvider timeProvider)
{
return source.Multicast(new ReplaySubject<T>(window, timeProvider));
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source, int bufferSize, TimeSpan window)
{
return source.Multicast(new ReplaySubject<T>(bufferSize, window));
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source, int bufferSize, TimeSpan window, TimeProvider timeProvider)
{
return source.Multicast(new ReplaySubject<T>(bufferSize, window, timeProvider));
}

public static Observable<T> RefCount<T>(this ConnectableObservable<T> source)
{
return new RefCount<T>(source);
}

public static Observable<T> Share<T>(this Observable<T> source)
{
return source.Publish().RefCount();
}
}

public abstract class ConnectableObservable<T> : Observable<T>
{
public abstract IDisposable Connect();
}

internal sealed class Multicast<T>(Observable<T> source, ISubject<T> subject) : ConnectableObservable<T>
{
readonly object gate = new object();
Connection? connection;

public override IDisposable Connect()
{
lock (gate)
{
if (connection == null)
{
var subscription = source.Subscribe(subject.AsObserver());
connection = new Connection(this, subscription);
}

return connection;
}
}

protected override IDisposable SubscribeCore(Observer<T> observer)
{
return subject.Subscribe(observer);
}

sealed class Connection(Multicast<T> parent, IDisposable? subscription) : IDisposable
{
public void Dispose()
{
lock (parent.gate)
{
if (subscription != null)
{
subscription.Dispose();
subscription = null;
parent.connection = null;
}
}
}
}
}

internal sealed class RefCount<T>(ConnectableObservable<T> source) : Observable<T>
{
readonly object gate = new object();
int refCount = 0;
IDisposable? connection;

protected override IDisposable SubscribeCore(Observer<T> observer)
{
lock (gate)
{
var subcription = source.Subscribe(new _RefCount(this, observer));
if (++refCount == 1)
{
connection = source.Connect();
}
return subcription;
}
}

sealed class _RefCount(RefCount<T> parent, Observer<T> observer) : Observer<T>
{
protected override void OnNextCore(T value)
{
observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}

protected override void DisposeCore()
{
lock (parent.gate)
{
if (--parent.refCount == 0)
{
parent.connection?.Dispose();
parent.connection = null;
}
}
}
}
}
40 changes: 40 additions & 0 deletions tests/R3.Tests/OperatorTests/MulticastTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
namespace R3.Tests.OperatorTests;

public class MulticastTest
{
[Fact]
public void Publish()
{
var subject = new Subject<int>();

var connectable = subject.Publish();

var list1 = connectable.ToLiveList();
var list2 = connectable.ToLiveList();

subject.OnNext(100);

var connection = connectable.Connect();

subject.OnNext(110);
subject.OnNext(120);
subject.OnNext(130);

list1.AssertEqual([110, 120, 130]);
list2.AssertEqual([110, 120, 130]);

connection.Dispose();

subject.OnCompleted();

list1.AssertIsNotCompleted();
list2.AssertIsNotCompleted();

var reconnection = connectable.Connect();

list1.AssertIsCompleted();
list2.AssertIsCompleted();

reconnection.Dispose();
}
}
32 changes: 32 additions & 0 deletions tests/R3.Tests/OperatorTests/RefCountTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace R3.Tests.OperatorTests;

public class RefCountTest
{
[Fact]
public void RefCount()
{
var subject = new Subject<int>();
var refCounted = subject.Publish().RefCount(); // same as Share
var list1 = refCounted.ToLiveList();
var list2 = refCounted.ToLiveList();

subject.OnNext(10);
list1.AssertEqual([10]);
list2.AssertEqual([10]);

subject.OnNext(20);
list1.AssertEqual([10, 20]);
list2.AssertEqual([10, 20]);

list1.Dispose();
list2.Dispose();

var list3 = refCounted.ToLiveList();

subject.OnNext(30);
subject.OnCompleted();

list3.AssertEqual([30]);
list3.AssertIsCompleted();
}
}

0 comments on commit 961978d

Please sign in to comment.