From a6bbd5395e16d1dc38d93caced5a0662470e38b2 Mon Sep 17 00:00:00 2001 From: neuecc Date: Wed, 27 Dec 2023 17:43:54 +0900 Subject: [PATCH] Amb --- sandbox/ConsoleApp1/Program.cs | 2 + src/R3/Factories/Amb.cs | 129 +++++++++++++++--- src/R3/Factories/Concat.cs | 103 ++++++++++++++ src/R3/Internal/ListDisposableCore.cs | 81 +++++++++++ src/R3/Internal/PooledThreadPoolWorkItem.cs | 1 + src/R3/Operators/Concat.cs | 83 ----------- ...tDisposableCore.cs => SerialDisposable.cs} | 29 ++-- src/R3/SingleAssignmentDisposable.cs | 67 +++++++++ tests/R3.Tests/OperatorTests/AmbTest.cs | 38 ++++++ .../OperatorTests/ConcatAppendPrependTest.cs | 30 ++++ 10 files changed, 454 insertions(+), 109 deletions(-) create mode 100644 src/R3/Factories/Concat.cs create mode 100644 src/R3/Internal/ListDisposableCore.cs delete mode 100644 src/R3/Operators/Concat.cs rename src/R3/{SingleAssignmentDisposableCore.cs => SerialDisposable.cs} (76%) create mode 100644 tests/R3.Tests/OperatorTests/AmbTest.cs diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index 4fafb14f..12b1c953 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -29,6 +29,8 @@ // Enumerable.Empty().ElementAtOrDefault( var range = System.Reactive.Linq.Observable.Range(1, 10); + + // range.Catch( // range.Append( diff --git a/src/R3/Factories/Amb.cs b/src/R3/Factories/Amb.cs index 3825f979..67615359 100644 --- a/src/R3/Factories/Amb.cs +++ b/src/R3/Factories/Amb.cs @@ -2,35 +2,128 @@ public static partial class Observable { - public static IObservable Amb(params IObservable[] sources) + public static Observable Amb(params Observable[] sources) { - throw new NotImplementedException(); + return new Amb(sources); } - public static IObservable Amb(IEnumerable> sources) + public static Observable Amb(IEnumerable> sources) { - throw new NotImplementedException(); + return new Amb(sources); } } -internal sealed class Amb(IEnumerable> sources) : Observable +public static partial class ObservableExtensions +{ + public static Observable Amb(this Observable source, Observable second) + { + return Observable.Amb(source, second); + } +} + +internal sealed class Amb(IEnumerable> sources) : Observable { protected override IDisposable SubscribeCore(Observer observer) { - //new CompositeDiposableBuilder - // Disposable.CreateBuilder(); - throw new NotImplementedException(); - //if (sources.TryGetNonEnumeratedCount(out var count)) - //{ - - - //} - //else - //{ - - //} - // throw new NotImplementedException(); + if (!sources.TryGetNonEnumeratedCount(out var count)) + { + count = 4; + } + + var amb = new _Amb(observer, count); + var index = 0; + foreach (var item in sources) + { + var d = item.Subscribe(new _AmbObserver(amb, index++)); + amb.disposables.Add(d); + } + return amb; + } + + sealed class _Amb : IDisposable + { + public Observer observer; + public ListDisposableCore disposables; + + public _AmbObserver? winner; + + public _Amb(Observer observer, int initialCount) + { + this.observer = observer; + this.disposables = new ListDisposableCore(initialCount, this); + } + + public void Dispose() + { + disposables.Dispose(); + } } + sealed class _AmbObserver(_Amb parent, int index) : Observer + { + protected override void OnNextCore(T value) + { + var field = Interlocked.CompareExchange(ref parent.winner, this, null); + if (field == null) + { + // first, dispose others. + parent.disposables.RemoveAllExceptAt(index); + parent.observer.OnNext(value); + } + else if (field == this) + { + parent.observer.OnNext(value); + } + else + { + // dispose self. + Dispose(); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + var field = Interlocked.CompareExchange(ref parent.winner, this, null); + if (field == null) + { + // first, dispose others. + parent.disposables.RemoveAllExceptAt(index); + parent.observer.OnErrorResume(error); + } + else if (field == this) + { + parent.observer.OnErrorResume(error); + } + else + { + // dispose self. + Dispose(); + } + } + protected override void OnCompletedCore(Result result) + { + var field = Interlocked.CompareExchange(ref parent.winner, this, null); + if (field == null) + { + // first, dispose others. + parent.disposables.RemoveAllExceptAt(index); + parent.observer.OnCompleted(result); + } + else if (field == this) + { + parent.observer.OnCompleted(result); + } + else + { + // dispose self. + Dispose(); + } + } + + protected override void DisposeCore() + { + parent.disposables.RemoveAt(index); + } + } } diff --git a/src/R3/Factories/Concat.cs b/src/R3/Factories/Concat.cs new file mode 100644 index 00000000..2f6aa368 --- /dev/null +++ b/src/R3/Factories/Concat.cs @@ -0,0 +1,103 @@ +namespace R3; + +public static partial class Observable +{ + public static Observable Concat(params Observable[] sources) + { + return new Concat(sources); + } + + public static Observable Concat(IEnumerable> sources) + { + return new Concat(sources); + } +} + +public static partial class ObservableExtensions +{ + public static Observable Concat(this Observable source, Observable second) + { + return new Concat(new[] { source, second }); + } +} + +internal sealed class Concat(IEnumerable> sources) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return new _Concat(observer, sources).Run(); + } + + sealed class _Concat : IDisposable + { + public Observer observer; + public IEnumerator> enumerator; + public SerialDisposableCore disposable; + + public _Concat(Observer observer, IEnumerable> sources) + { + this.observer = observer; + this.enumerator = sources.GetEnumerator(); + } + + public IDisposable Run() + { + if (!enumerator.MoveNext()) + { + observer.OnCompleted(); + enumerator.Dispose(); + return Disposable.Empty; + } + else + { + disposable.Disposable = enumerator.Current.Subscribe(new _ConcatObserver(this)); + return this; + } + } + + public void Dispose() + { + enumerator.Dispose(); + disposable.Dispose(); + } + } + + sealed class _ConcatObserver(_Concat parent) : Observer + { + protected override void OnNextCore(T value) + { + parent.observer.OnNext(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + parent.observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + if (result.IsFailure) + { + try + { + parent.observer.OnCompleted(result); + } + finally + { + Dispose(); + } + } + else + { + if (parent.enumerator.MoveNext()) + { + parent.disposable.Disposable = parent.enumerator.Current.Subscribe(new _ConcatObserver(parent)); + } + else + { + parent.observer.OnCompleted(); + } + } + } + } +} diff --git a/src/R3/Internal/ListDisposableCore.cs b/src/R3/Internal/ListDisposableCore.cs new file mode 100644 index 00000000..f3ee3b1e --- /dev/null +++ b/src/R3/Internal/ListDisposableCore.cs @@ -0,0 +1,81 @@ +namespace R3.Internal; + +internal struct ListDisposableCore : IDisposable +{ + IDisposable?[] disposables; + int count; + object gate; + + public ListDisposableCore(int initialCount, object gate) + { + this.disposables = new IDisposable?[initialCount]; + this.gate = gate; + } + + public void Add(IDisposable disposable) + { + lock (gate) + { + if (disposables.Length == count) + { + Array.Resize(ref disposables, count * 2); + } + + disposables[count++] = disposable; + } + } + + public void RemoveAt(int index) + { + lock (gate) + { + if (index < 0 || index >= count) + { + return; + } + + ref var d = ref disposables[index]; + if (d != null) + { + d.Dispose(); + } + d = null; + } + } + + public void RemoveAllExceptAt(int index) + { + lock (gate) + { + if (index < 0 || index >= count) + { + return; + } + + for (int i = 0; i < count; i++) + { + if (i == index) continue; + + ref var d = ref disposables[i]; + if (d != null) + { + d.Dispose(); + } + d = null; + } + } + } + + public void Dispose() + { + lock (gate) + { + for (int i = 0; i < count; i++) + { + disposables[i]?.Dispose(); + disposables[i] = null; + count = 0; + } + } + } +} diff --git a/src/R3/Internal/PooledThreadPoolWorkItem.cs b/src/R3/Internal/PooledThreadPoolWorkItem.cs index 99375d4f..5353f3a6 100644 --- a/src/R3/Internal/PooledThreadPoolWorkItem.cs +++ b/src/R3/Internal/PooledThreadPoolWorkItem.cs @@ -2,6 +2,7 @@ namespace R3.Internal; +// TODO: remove this(maybe no use). internal sealed class PooledThreadPoolWorkItem : IThreadPoolWorkItem { static ConcurrentQueue> pool = new(); diff --git a/src/R3/Operators/Concat.cs b/src/R3/Operators/Concat.cs deleted file mode 100644 index 85fa43ff..00000000 --- a/src/R3/Operators/Concat.cs +++ /dev/null @@ -1,83 +0,0 @@ -namespace R3; - -public static partial class ObservableExtensions -{ - public static Observable Concat(this Observable source, Observable second) - { - return new Concat(source, second); - } -} - -internal sealed class Concat(Observable source, Observable second) : Observable -{ - protected override IDisposable SubscribeCore(Observer observer) - { - return source.Subscribe(new _Concat(observer, second)); - } - - sealed class _Concat(Observer observer, Observable second) : Observer - { - readonly Observer observer = observer; - - SingleAssignmentDisposableCore secondSubscription; - - protected override bool AutoDisposeOnCompleted => false; - - protected override void OnNextCore(T value) - { - observer.OnNext(value); - } - - protected override void OnErrorResumeCore(Exception error) - { - observer.OnErrorResume(error); - } - - protected override void OnCompletedCore(Result result) - { - if (result.IsFailure) - { - try - { - observer.OnCompleted(result); - } - finally - { - Dispose(); - } - } - else - { - secondSubscription.Disposable = second.Subscribe(new SecondObserver(this)); - } - } - - protected override void DisposeCore() - { - secondSubscription.Dispose(); - } - - internal sealed class SecondObserver(_Concat parent) : Observer - { - protected override void OnNextCore(T value) - { - parent.observer.OnNext(value); - } - - protected override void OnErrorResumeCore(Exception error) - { - parent.observer.OnErrorResume(error); - } - - protected override void OnCompletedCore(Result result) - { - parent.observer.OnCompleted(result); - } - - protected override void DisposeCore() - { - parent.Dispose(); - } - } - } -} diff --git a/src/R3/SingleAssignmentDisposableCore.cs b/src/R3/SerialDisposable.cs similarity index 76% rename from src/R3/SingleAssignmentDisposableCore.cs rename to src/R3/SerialDisposable.cs index 702d491b..c2491791 100644 --- a/src/R3/SingleAssignmentDisposableCore.cs +++ b/src/R3/SerialDisposable.cs @@ -1,7 +1,25 @@ namespace R3; +public sealed class SerialDisposable : IDisposable +{ + SerialDisposableCore core; + + public bool IsDisposed => core.IsDisposed; + + public IDisposable? Disposable + { + get => core.Disposable; + set => core.Disposable = value; + } + + public void Dispose() + { + core.Dispose(); + } +} + // struct, be carefult to use -public struct SingleAssignmentDisposableCore +public struct SerialDisposableCore { IDisposable? current; @@ -34,8 +52,8 @@ public IDisposable? Disposable return; } - // otherwise, invalid assignment - ThrowAlreadyAssignment(); + // otherwise, dispose previous disposable + field.Dispose(); } } @@ -48,11 +66,6 @@ public void Dispose() } } - static void ThrowAlreadyAssignment() - { - throw new InvalidOperationException("Disposable is already assigned."); - } - sealed class DisposedSentinel : IDisposable { public static readonly DisposedSentinel Instance = new(); diff --git a/src/R3/SingleAssignmentDisposable.cs b/src/R3/SingleAssignmentDisposable.cs index 50286d25..437aec09 100644 --- a/src/R3/SingleAssignmentDisposable.cs +++ b/src/R3/SingleAssignmentDisposable.cs @@ -17,3 +17,70 @@ public void Dispose() core.Dispose(); } } + +// struct, be carefult to use +public struct SingleAssignmentDisposableCore +{ + IDisposable? current; + + public bool IsDisposed => Volatile.Read(ref current) == DisposedSentinel.Instance; + + public IDisposable? Disposable + { + get + { + var field = Volatile.Read(ref current); + if (field == DisposedSentinel.Instance) + { + return R3.Disposable.Empty; // don't expose sentinel + } + return field; + } + set + { + var field = Interlocked.CompareExchange(ref current, value, null); + if (field == null) + { + // ok to set. + return; + } + + if (field == DisposedSentinel.Instance) + { + // We've already been disposed, so dispose the value we've just been given. + value?.Dispose(); + return; + } + + // otherwise, invalid assignment + ThrowAlreadyAssignment(); + } + } + + public void Dispose() + { + var field = Interlocked.Exchange(ref current, DisposedSentinel.Instance); + if (field != DisposedSentinel.Instance) + { + field?.Dispose(); + } + } + + static void ThrowAlreadyAssignment() + { + throw new InvalidOperationException("Disposable is already assigned."); + } + + sealed class DisposedSentinel : IDisposable + { + public static readonly DisposedSentinel Instance = new(); + + DisposedSentinel() + { + } + + public void Dispose() + { + } + } +} diff --git a/tests/R3.Tests/OperatorTests/AmbTest.cs b/tests/R3.Tests/OperatorTests/AmbTest.cs new file mode 100644 index 00000000..1c027a43 --- /dev/null +++ b/tests/R3.Tests/OperatorTests/AmbTest.cs @@ -0,0 +1,38 @@ +namespace R3.Tests.OperatorTests; + +public class AmbTest +{ + [Fact] + public void Amb3() + { + var subject1 = new Subject(); + var subject2 = new Subject(); + var subject3 = new Subject(); + + var d1Called = false; + var d2Called = false; + var d3Called = false; + + var list = Observable.Amb( + subject1.Do(onDispose: () => d1Called = true), + subject2.Do(onDispose: () => d2Called = true), + subject3.Do(onDispose: () => d3Called = true) + ).ToLiveList(); + + d1Called.Should().BeFalse(); + + subject2.OnNext(2); + list.AssertEqual([2]); + + d1Called.Should().BeTrue(); + d3Called.Should().BeTrue(); + + subject2.OnNext(20); + list.AssertEqual([2, 20]); + + subject2.OnCompleted(); + + d2Called.Should().BeTrue(); + list.AssertIsCompleted(); + } +} diff --git a/tests/R3.Tests/OperatorTests/ConcatAppendPrependTest.cs b/tests/R3.Tests/OperatorTests/ConcatAppendPrependTest.cs index 4c4797e6..99a3a43f 100644 --- a/tests/R3.Tests/OperatorTests/ConcatAppendPrependTest.cs +++ b/tests/R3.Tests/OperatorTests/ConcatAppendPrependTest.cs @@ -54,4 +54,34 @@ public void Concat() list.AssertIsCompleted(); } + + [Fact] + public void ConcatMany() + { + var subject1 = new Subject(); + var subject2 = new Subject(); + var subject3 = new Subject(); + using var list = Observable.Concat(subject1, subject2, subject3).ToLiveList(); + + subject1.OnNext(10); + subject2.OnNext(9999); + + list.AssertEqual([10]); + + subject1.OnCompleted(); + + subject2.OnNext(11111); + + list.AssertEqual([10, 11111]); + + subject2.OnCompleted(); + + subject3.OnNext(9999999); + + list.AssertEqual([10, 11111, 9999999]); + + subject3.OnCompleted(); + + list.AssertIsCompleted(); + } }