diff --git a/src/R3/ConnectableObservable.cs b/src/R3/ConnectableObservable.cs new file mode 100644 index 00000000..9a9a8a53 --- /dev/null +++ b/src/R3/ConnectableObservable.cs @@ -0,0 +1,6 @@ +namespace R3; + +public abstract class ConnectableObservable : Observable +{ + public abstract IDisposable Connect(); +} diff --git a/src/R3/Operators/Multicast.cs b/src/R3/Operators/Multicast.cs new file mode 100644 index 00000000..d91cb122 --- /dev/null +++ b/src/R3/Operators/Multicast.cs @@ -0,0 +1,97 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + // Multicast, Publish, Replay, Share + + public static ConnectableObservable Multicast(this Observable source, ISubject subject) + { + return new Multicast(source, subject); + } + + public static ConnectableObservable Publish(this Observable source) + { + return source.Multicast(new Subject()); + } + + public static ConnectableObservable Publish(this Observable source, T initialValue) + { + return source.Multicast(new ReactiveProperty(initialValue, equalityComparer: null)); + } + + public static ConnectableObservable Replay(this Observable source) + { + return source.Multicast(new ReplaySubject()); + } + + public static ConnectableObservable Replay(this Observable source, int bufferSize) + { + return source.Multicast(new ReplaySubject(bufferSize)); + } + + public static ConnectableObservable Replay(this Observable source, TimeSpan window) + { + return source.Multicast(new ReplaySubject(window)); + } + + public static ConnectableObservable Replay(this Observable source, TimeSpan window, TimeProvider timeProvider) + { + return source.Multicast(new ReplaySubject(window, timeProvider)); + } + + public static ConnectableObservable Replay(this Observable source, int bufferSize, TimeSpan window) + { + return source.Multicast(new ReplaySubject(bufferSize, window)); + } + + public static ConnectableObservable Replay(this Observable source, int bufferSize, TimeSpan window, TimeProvider timeProvider) + { + return source.Multicast(new ReplaySubject(bufferSize, window, timeProvider)); + } + + public static Observable Share(this Observable source) + { + return source.Publish().RefCount(); + } +} + +internal sealed class Multicast(Observable source, ISubject subject) : ConnectableObservable +{ + readonly object gate = new object(); + Connection? connection; + + public override IDisposable Connect() + { + lock (gate) + { + if (connection == null) + { + var subscription = source.Subscribe(subject.AsObserver()); + connection = new Connection(this, subscription); + } + + return connection; + } + } + + protected override IDisposable SubscribeCore(Observer observer) + { + return subject.Subscribe(observer.Wrap()); + } + + sealed class Connection(Multicast parent, IDisposable? subscription) : IDisposable + { + public void Dispose() + { + lock (parent.gate) + { + if (subscription != null) + { + subscription.Dispose(); + subscription = null; + parent.connection = null; + } + } + } + } +} diff --git a/src/R3/Operators/RefCount.cs b/src/R3/Operators/RefCount.cs new file mode 100644 index 00000000..1ef21e3a --- /dev/null +++ b/src/R3/Operators/RefCount.cs @@ -0,0 +1,59 @@ +namespace R3; + +public static partial class ObservableExtensions +{ + public static Observable RefCount(this ConnectableObservable source) + { + return new RefCount(source); + } +} + +internal sealed class RefCount(ConnectableObservable source) : Observable +{ + readonly object gate = new object(); + int refCount = 0; + IDisposable? connection; + + protected override IDisposable SubscribeCore(Observer observer) + { + lock (gate) + { + var subcription = source.Subscribe(new _RefCount(this, observer)); + if (++refCount == 1) + { + connection = source.Connect(); + } + return subcription; + } + } + + sealed class _RefCount(RefCount parent, Observer observer) : Observer + { + protected override void OnNextCore(T value) + { + observer.OnNext(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + + protected override void DisposeCore() + { + lock (parent.gate) + { + if (--parent.refCount == 0) + { + parent.connection?.Dispose(); + parent.connection = null; + } + } + } + } +} diff --git a/src/R3/Operators/_Operators.cs b/src/R3/Operators/_Operators.cs index 9306cc9c..930d6fc6 100644 --- a/src/R3/Operators/_Operators.cs +++ b/src/R3/Operators/_Operators.cs @@ -28,158 +28,4 @@ public static partial class ObservableExtensions // return tasks: // All, Any, Contains, SequenceEqual, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup, - - // Multicast - // Multicast, Publish, Replay, RefCount, Share(Publish().RefCount()) - - public static ConnectableObservable Multicast(this Observable source, ISubject subject) - { - return new Multicast(source, subject); - } - - public static ConnectableObservable Publish(this Observable source) - { - return source.Multicast(new Subject()); - } - - public static ConnectableObservable Publish(this Observable source, T initialValue) - { - return source.Multicast(new ReactiveProperty(initialValue, equalityComparer: null)); - } - - public static ConnectableObservable Replay(this Observable source) - { - return source.Multicast(new ReplaySubject()); - } - - public static ConnectableObservable Replay(this Observable source, int bufferSize) - { - return source.Multicast(new ReplaySubject(bufferSize)); - } - - public static ConnectableObservable Replay(this Observable source, TimeSpan window) - { - return source.Multicast(new ReplaySubject(window)); - } - - public static ConnectableObservable Replay(this Observable source, TimeSpan window, TimeProvider timeProvider) - { - return source.Multicast(new ReplaySubject(window, timeProvider)); - } - - public static ConnectableObservable Replay(this Observable source, int bufferSize, TimeSpan window) - { - return source.Multicast(new ReplaySubject(bufferSize, window)); - } - - public static ConnectableObservable Replay(this Observable source, int bufferSize, TimeSpan window, TimeProvider timeProvider) - { - return source.Multicast(new ReplaySubject(bufferSize, window, timeProvider)); - } - - public static Observable RefCount(this ConnectableObservable source) - { - return new RefCount(source); - } - - public static Observable Share(this Observable source) - { - return source.Publish().RefCount(); - } -} - -public abstract class ConnectableObservable : Observable -{ - public abstract IDisposable Connect(); -} - -internal sealed class Multicast(Observable source, ISubject subject) : ConnectableObservable -{ - readonly object gate = new object(); - Connection? connection; - - public override IDisposable Connect() - { - lock (gate) - { - if (connection == null) - { - var subscription = source.Subscribe(subject.AsObserver()); - connection = new Connection(this, subscription); - } - - return connection; - } - } - - protected override IDisposable SubscribeCore(Observer observer) - { - return subject.Subscribe(observer); - } - - sealed class Connection(Multicast parent, IDisposable? subscription) : IDisposable - { - public void Dispose() - { - lock (parent.gate) - { - if (subscription != null) - { - subscription.Dispose(); - subscription = null; - parent.connection = null; - } - } - } - } -} - -internal sealed class RefCount(ConnectableObservable source) : Observable -{ - readonly object gate = new object(); - int refCount = 0; - IDisposable? connection; - - protected override IDisposable SubscribeCore(Observer observer) - { - lock (gate) - { - var subcription = source.Subscribe(new _RefCount(this, observer)); - if (++refCount == 1) - { - connection = source.Connect(); - } - return subcription; - } - } - - sealed class _RefCount(RefCount parent, Observer observer) : Observer - { - protected override void OnNextCore(T value) - { - observer.OnNext(value); - } - - protected override void OnErrorResumeCore(Exception error) - { - observer.OnErrorResume(error); - } - - protected override void OnCompletedCore(Result result) - { - observer.OnCompleted(result); - } - - protected override void DisposeCore() - { - lock (parent.gate) - { - if (--parent.refCount == 0) - { - parent.connection?.Dispose(); - parent.connection = null; - } - } - } - } } diff --git a/tests/R3.Tests/OperatorTests/MulticastTest.cs b/tests/R3.Tests/OperatorTests/MulticastTest.cs new file mode 100644 index 00000000..8405a164 --- /dev/null +++ b/tests/R3.Tests/OperatorTests/MulticastTest.cs @@ -0,0 +1,40 @@ +namespace R3.Tests.OperatorTests; + +public class MulticastTest +{ + [Fact] + public void Publish() + { + var subject = new Subject(); + + var connectable = subject.Publish(); + + var list1 = connectable.ToLiveList(); + var list2 = connectable.ToLiveList(); + + subject.OnNext(100); + + var connection = connectable.Connect(); + + subject.OnNext(110); + subject.OnNext(120); + subject.OnNext(130); + + list1.AssertEqual([110, 120, 130]); + list2.AssertEqual([110, 120, 130]); + + connection.Dispose(); + + subject.OnCompleted(); + + list1.AssertIsNotCompleted(); + list2.AssertIsNotCompleted(); + + var reconnection = connectable.Connect(); + + list1.AssertIsCompleted(); + list2.AssertIsCompleted(); + + reconnection.Dispose(); + } +} diff --git a/tests/R3.Tests/OperatorTests/RefCountTest.cs b/tests/R3.Tests/OperatorTests/RefCountTest.cs new file mode 100644 index 00000000..04c41d9a --- /dev/null +++ b/tests/R3.Tests/OperatorTests/RefCountTest.cs @@ -0,0 +1,32 @@ +namespace R3.Tests.OperatorTests; + +public class RefCountTest +{ + [Fact] + public void RefCount() + { + var subject = new Subject(); + var refCounted = subject.Publish().RefCount(); // same as Share + var list1 = refCounted.ToLiveList(); + var list2 = refCounted.ToLiveList(); + + subject.OnNext(10); + list1.AssertEqual([10]); + list2.AssertEqual([10]); + + subject.OnNext(20); + list1.AssertEqual([10, 20]); + list2.AssertEqual([10, 20]); + + list1.Dispose(); + list2.Dispose(); + + var list3 = refCounted.ToLiveList(); + + subject.OnNext(30); + subject.OnCompleted(); + + list3.AssertEqual([30]); + list3.AssertIsCompleted(); + } +}