Skip to content

Commit

Permalink
RefCount
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jan 1, 2024
1 parent e199029 commit c0ffd65
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 14 deletions.
105 changes: 91 additions & 14 deletions src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,56 @@ public static ConnectableObservable<T> Multicast<T>(this Observable<T> source, I
return new Multicast<T>(source, subject);
}

public static Observable<T> Publish<T>(this Observable<T> source)
public static ConnectableObservable<T> Publish<T>(this Observable<T> source)
{
return source.Multicast(new Subject<T>());
}

public static Observable<T> Publish<T>(this Observable<T> source, T initialValue)
public static ConnectableObservable<T> Publish<T>(this Observable<T> source, T initialValue)
{
return source.Multicast(new ReactiveProperty<T>(initialValue));
return source.Multicast(new ReactiveProperty<T>(initialValue, equalityComparer: null));
}

// TODO: ReplaySubject
//public static Observable<T> Replay<T>(this Observable<T> source)
//{
// return source.Multicast(new ReplaySubject<T>());
//}
public static ConnectableObservable<T> Replay<T>(this Observable<T> source)
{
return source.Multicast(new ReplaySubject<T>());
}

// TODO: require RefCount
//public static Observable<T> Share<T>(this Observable<T> source)
//{
// return source.Publish().RefCount();
//}
}
public static ConnectableObservable<T> Replay<T>(this Observable<T> source, int bufferSize)
{
return source.Multicast(new ReplaySubject<T>(bufferSize));
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source, TimeSpan window)
{
return source.Multicast(new ReplaySubject<T>(window));
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source, TimeSpan window, TimeProvider timeProvider)
{
return source.Multicast(new ReplaySubject<T>(window, timeProvider));
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source, int bufferSize, TimeSpan window)
{
return source.Multicast(new ReplaySubject<T>(bufferSize, window));
}

public static ConnectableObservable<T> Replay<T>(this Observable<T> source, int bufferSize, TimeSpan window, TimeProvider timeProvider)
{
return source.Multicast(new ReplaySubject<T>(bufferSize, window, timeProvider));
}

public static Observable<T> RefCount<T>(this ConnectableObservable<T> source)
{
return new RefCount<T>(source);
}

public static Observable<T> Share<T>(this Observable<T> source)
{
return source.Publish().RefCount();
}
}

public abstract class ConnectableObservable<T> : Observable<T>
{
Expand Down Expand Up @@ -106,3 +133,53 @@ public void Dispose()
}
}
}

internal sealed class RefCount<T>(ConnectableObservable<T> source) : Observable<T>
{
readonly object gate = new object();
int refCount = 0;
IDisposable? connection;

protected override IDisposable SubscribeCore(Observer<T> observer)
{
lock (gate)
{
var subcription = source.Subscribe(new _RefCount(this, observer));
if (++refCount == 1)
{
connection = source.Connect();
}
return subcription;
}
}

sealed class _RefCount(RefCount<T> parent, Observer<T> observer) : Observer<T>
{
protected override void OnNextCore(T value)
{
observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}

protected override void DisposeCore()
{
lock (parent.gate)
{
if (--parent.refCount == 0)
{
parent.connection?.Dispose();
parent.connection = null;
}
}
}
}
}
5 changes: 5 additions & 0 deletions src/R3/ReplaySubject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public ReplaySubject(TimeSpan window, TimeProvider timeProvider)
{
}

public ReplaySubject(int bufferSize, TimeSpan window)
: this(bufferSize, window, ObservableSystem.DefaultTimeProvider)
{
}

// full constructor
public ReplaySubject(int bufferSize, TimeSpan window, TimeProvider timeProvider)
{
Expand Down

0 comments on commit c0ffd65

Please sign in to comment.