diff --git a/src/R3/Operators/_Operators.cs b/src/R3/Operators/_Operators.cs index 853b286f..9306cc9c 100644 --- a/src/R3/Operators/_Operators.cs +++ b/src/R3/Operators/_Operators.cs @@ -37,29 +37,56 @@ public static ConnectableObservable Multicast(this Observable source, I return new Multicast(source, subject); } - public static Observable Publish(this Observable source) + public static ConnectableObservable Publish(this Observable source) { return source.Multicast(new Subject()); } - public static Observable Publish(this Observable source, T initialValue) + public static ConnectableObservable Publish(this Observable source, T initialValue) { - return source.Multicast(new ReactiveProperty(initialValue)); + return source.Multicast(new ReactiveProperty(initialValue, equalityComparer: null)); } - // TODO: ReplaySubject - //public static Observable Replay(this Observable source) - //{ - // return source.Multicast(new ReplaySubject()); - //} + public static ConnectableObservable Replay(this Observable source) + { + return source.Multicast(new ReplaySubject()); + } - // TODO: require RefCount - //public static Observable Share(this Observable source) - //{ - // return source.Publish().RefCount(); - //} -} + public static ConnectableObservable Replay(this Observable source, int bufferSize) + { + return source.Multicast(new ReplaySubject(bufferSize)); + } + + public static ConnectableObservable Replay(this Observable source, TimeSpan window) + { + return source.Multicast(new ReplaySubject(window)); + } + + public static ConnectableObservable Replay(this Observable source, TimeSpan window, TimeProvider timeProvider) + { + return source.Multicast(new ReplaySubject(window, timeProvider)); + } + + public static ConnectableObservable Replay(this Observable source, int bufferSize, TimeSpan window) + { + return source.Multicast(new ReplaySubject(bufferSize, window)); + } + public static ConnectableObservable Replay(this Observable source, int bufferSize, TimeSpan window, TimeProvider timeProvider) + { + return source.Multicast(new ReplaySubject(bufferSize, window, timeProvider)); + } + + public static Observable RefCount(this ConnectableObservable source) + { + return new RefCount(source); + } + + public static Observable Share(this Observable source) + { + return source.Publish().RefCount(); + } +} public abstract class ConnectableObservable : Observable { @@ -106,3 +133,53 @@ public void Dispose() } } } + +internal sealed class RefCount(ConnectableObservable source) : Observable +{ + readonly object gate = new object(); + int refCount = 0; + IDisposable? connection; + + protected override IDisposable SubscribeCore(Observer observer) + { + lock (gate) + { + var subcription = source.Subscribe(new _RefCount(this, observer)); + if (++refCount == 1) + { + connection = source.Connect(); + } + return subcription; + } + } + + sealed class _RefCount(RefCount parent, Observer observer) : Observer + { + protected override void OnNextCore(T value) + { + observer.OnNext(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + + protected override void DisposeCore() + { + lock (parent.gate) + { + if (--parent.refCount == 0) + { + parent.connection?.Dispose(); + parent.connection = null; + } + } + } + } +} diff --git a/src/R3/ReplaySubject.cs b/src/R3/ReplaySubject.cs index 065ed650..9efb885f 100644 --- a/src/R3/ReplaySubject.cs +++ b/src/R3/ReplaySubject.cs @@ -31,6 +31,11 @@ public ReplaySubject(TimeSpan window, TimeProvider timeProvider) { } + public ReplaySubject(int bufferSize, TimeSpan window) + : this(bufferSize, window, ObservableSystem.DefaultTimeProvider) + { + } + // full constructor public ReplaySubject(int bufferSize, TimeSpan window, TimeProvider timeProvider) {