From 83f3eb6ac1f0c702c2e90abfc9619ae3ae6d4cbc Mon Sep 17 00:00:00 2001 From: neuecc Date: Tue, 12 Dec 2023 21:05:24 +0900 Subject: [PATCH] CompositeDisposable --- sandbox/ConsoleApp1/Program.cs | 38 +++- src/R3/CompositeDisposable.cs | 238 ++++++++++++++++++++++ src/R3/Disposable.cs | 5 + src/R3/Factories/Return.cs | 8 - src/R3/Factories/ReturnOnCompleted.cs | 4 - src/R3/Internal/FreeListCore.cs | 74 +++++++ src/R3/Operators/_Operators.cs | 15 +- src/R3/SafeTimeProvider.cs | 53 ----- tests/R3.Tests/CompositeDisposableTest.cs | 68 +++++++ 9 files changed, 436 insertions(+), 67 deletions(-) create mode 100644 src/R3/CompositeDisposable.cs delete mode 100644 src/R3/SafeTimeProvider.cs create mode 100644 tests/R3.Tests/CompositeDisposableTest.cs diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index 4e008df0..e66e40c8 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -13,6 +13,31 @@ + + + + + +var disposables = Enumerable.Range(1, 100).Select(x => new TestDisposable()).ToArray(); +var composite = new System.Reactive.Disposables.CompositeDisposable(disposables); + +foreach (var item in disposables) +{ + composite.Remove(item); +} + + + + + + + + + + + + + SubscriptionTracker.EnableTracking = true; SubscriptionTracker.EnableStackTrace = true; @@ -119,7 +144,7 @@ - +// subject.ForEachAsync( @@ -175,3 +200,14 @@ public static IDisposable WriteLine(this CompletableEvent source) return source.Subscribe(x => Console.WriteLine(x), _ => Console.WriteLine("COMPLETED")); } } + + +class TestDisposable : IDisposable +{ + public int CalledCount = 0; + + public void Dispose() + { + CalledCount += 1; + } +} diff --git a/src/R3/CompositeDisposable.cs b/src/R3/CompositeDisposable.cs new file mode 100644 index 00000000..5ae64cf7 --- /dev/null +++ b/src/R3/CompositeDisposable.cs @@ -0,0 +1,238 @@ +using System.Buffers; +using System.Collections; +using System.Runtime.InteropServices; + +namespace R3; + +public sealed class CompositeDisposable : ICollection, IDisposable +{ + List list; // when removed, set null + readonly object gate = new object(); + bool isDisposed; + int count; + + const int ShrinkThreshold = 64; + + public bool IsDisposed => Volatile.Read(ref isDisposed); + + public CompositeDisposable() + { + this.list = new(); + } + + public CompositeDisposable(int capacity) + { + if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity)); + this.list = new(capacity); + } + + public CompositeDisposable(params IDisposable[] disposables) + { + this.list = new(disposables); + this.count = list.Count; + } + + public CompositeDisposable(IEnumerable disposables) + { + this.list = new(disposables); + this.count = list.Count; + } + + public int Count + { + get + { + lock (gate) + { + return count; + } + } + } + + public bool IsReadOnly => false; + + public void Add(IDisposable item) + { + lock (gate) + { + if (!isDisposed) + { + count += 1; + list.Add(item); + return; + } + } + + // CompositeDisposable is Disposed. + item.Dispose(); + } + + public bool Remove(IDisposable item) + { + lock (gate) + { + // CompositeDisposable is Disposed, do nothing. + if (isDisposed) return false; + + var current = list; + + var index = current.IndexOf(item); + if (index == -1) + { + // not found + return false; + } + + // don't do RemoveAt(avoid Array Copy) + current[index] = null; + + // Do shrink + if (current.Capacity > ShrinkThreshold && count < current.Capacity / 2) + { + var fresh = new List(current.Capacity / 2); + + foreach (var d in current) + { + if (d != null) + { + fresh.Add(d); + } + } + + list = fresh; + } + + count -= 1; + } + + // Dispose outside of lock + item.Dispose(); + return true; + } + + public void Clear() + { + IDisposable?[] targetDisposables; + int clearCount; + lock (gate) + { + // CompositeDisposable is Disposed, do nothing. + if (isDisposed) return; + if (count == 0) return; + + targetDisposables = ArrayPool.Shared.Rent(list.Count); + clearCount = list.Count; + + list.CopyTo(targetDisposables); + + list.Clear(); + count = 0; + } + + // Dispose outside of lock + try + { + foreach (var item in targetDisposables.AsSpan(0, clearCount)) + { + item?.Dispose(); + } + } + finally + { + ArrayPool.Shared.Return(targetDisposables, clearArray: true); + } + } + + public bool Contains(IDisposable item) + { + lock (gate) + { + if (isDisposed) return false; + return list.Contains(item); + } + } + + public void CopyTo(IDisposable[] array, int arrayIndex) + { + if (arrayIndex < 0 || arrayIndex >= array.Length) + { + throw new ArgumentOutOfRangeException(nameof(arrayIndex)); + } + + lock (gate) + { + if (isDisposed) return; + + if (arrayIndex + count > array.Length) + { + throw new ArgumentOutOfRangeException(nameof(arrayIndex)); + } + + var i = 0; + foreach (var item in CollectionsMarshal.AsSpan(list)) + { + if (item != null) + { + array[arrayIndex + i++] = item; + } + } + } + } + + public void Dispose() + { + List disposables; + + lock (gate) + { + if (isDisposed) return; + + count = 0; + isDisposed = true; + disposables = list; + list = null!; // dereference. + } + + foreach (var item in disposables) + { + item?.Dispose(); + } + disposables.Clear(); + } + + public IEnumerator GetEnumerator() + { + lock (gate) + { + // make snapshot + return EnumerateAndClear(list.ToArray()).GetEnumerator(); + } + } + + IEnumerator IEnumerable.GetEnumerator() + { + lock (gate) + { + // make snapshot + return EnumerateAndClear(list.ToArray()).GetEnumerator(); + } + } + + static IEnumerable EnumerateAndClear(IDisposable?[] disposables) + { + try + { + foreach (var item in disposables) + { + if (item != null) + { + yield return item; + } + } + } + finally + { + disposables.AsSpan().Clear(); + } + } +} diff --git a/src/R3/Disposable.cs b/src/R3/Disposable.cs index a1f06533..70dcee68 100644 --- a/src/R3/Disposable.cs +++ b/src/R3/Disposable.cs @@ -17,6 +17,11 @@ public static void AddTo(this IDisposable disposable, ref DisposableBuilder buil builder.Add(disposable); } + public static void AddTo(this IDisposable disposable, ICollection disposables) + { + disposables.Add(disposable); + } + public static IDisposable Create(Action onDisposed) { return new AnonymousDisposable(onDisposed); diff --git a/src/R3/Factories/Return.cs b/src/R3/Factories/Return.cs index daf54492..fc4be9a2 100644 --- a/src/R3/Factories/Return.cs +++ b/src/R3/Factories/Return.cs @@ -20,10 +20,6 @@ public static CompletableEvent Return(TMessage value, { return new ThreadPoolScheduleReturn(value, default, null); // optimize for SystemTimeProvidr, use ThreadPool.UnsafeQueueUserWorkItem } - else if (timeProvider is SafeTimerTimeProvider t && t.IsSystemTimeProvider) - { - return new ThreadPoolScheduleReturn(value, default, t.UnhandledExceptionHandler); // use with SafeTimeProvider.UnhandledExceptionHandler - } } return new Return(value, default, dueTime, timeProvider); // use ITimer @@ -49,10 +45,6 @@ public static CompletableEvent Return( { return new ThreadPoolScheduleReturn(value, complete, null); // optimize for SystemTimeProvidr, use ThreadPool.UnsafeQueueUserWorkItem } - else if (timeProvider is SafeTimerTimeProvider t && t.IsSystemTimeProvider) - { - return new ThreadPoolScheduleReturn(value, complete, t.UnhandledExceptionHandler); // use with SafeTimeProvider.UnhandledExceptionHandler - } } return new Return(value, complete, dueTime, timeProvider); // use ITimer diff --git a/src/R3/Factories/ReturnOnCompleted.cs b/src/R3/Factories/ReturnOnCompleted.cs index 5a30589c..196c91bc 100644 --- a/src/R3/Factories/ReturnOnCompleted.cs +++ b/src/R3/Factories/ReturnOnCompleted.cs @@ -22,10 +22,6 @@ public static CompletableEvent ReturnOnCompleted(complete, null); // optimize for SystemTimeProvidr, use ThreadPool.UnsafeQueueUserWorkItem } - else if (timeProvider is SafeTimerTimeProvider t && t.IsSystemTimeProvider) - { - return new ThreadPoolScheduleReturnOnCompleted(complete, t.UnhandledExceptionHandler); // use with SafeTimeProvider.UnhandledExceptionHandler - } } return new ReturnOnCompleted(complete, dueTime, timeProvider); // use ITimer diff --git a/src/R3/Internal/FreeListCore.cs b/src/R3/Internal/FreeListCore.cs index 34867ec6..21500493 100644 --- a/src/R3/Internal/FreeListCore.cs +++ b/src/R3/Internal/FreeListCore.cs @@ -15,9 +15,44 @@ internal struct FreeListCore public FreeListCore(object gate) { + // don't create values at initialize this.gate = gate; } + public FreeListCore(object gate, int capacity) + { + this.gate = gate; + this.values = new T[capacity]; + } + + public FreeListCore(object gate, T[] items) + { + this.gate = gate; + this.values = new T[items.Length]; + for (int i = 0; i < items.Length; i++) + { + this.values[i] = items[i]; + } + } + + public FreeListCore(object gate, IEnumerable items) + { + this.gate = gate; + if (items.TryGetNonEnumeratedCount(out var count)) + { + this.values = new T[count]; + } + else + { + this.values = new T[InitialArraySize]; + } + var i = 0; + foreach (var item in items) + { + this.values[i++] = item; + } + } + public bool IsDisposed => lastIndex == -2; public ReadOnlySpan AsSpan() @@ -83,6 +118,45 @@ public void Remove(int index) } } + public bool RemoveSlow(T value) + { + lock (gate) + { + ObjectDisposedException.ThrowIf(IsDisposed, typeof(FreeListCore)); + if (values == null) return false; + + var index = -1; + var span = values.AsSpan(0, lastIndex); + for (int i = 0; i < span.Length; i++) + { + if (span[i] == value) + { + index = i; + break; + } + } + + if (index != -1) + { + Remove(index); + return true; + } + } + return false; + } + + public void Clear(bool removeArray) + { + lock (gate) + { + values.AsSpan(0, lastIndex).Clear(); + if (removeArray) + { + values = null; + } + } + } + public void Dispose() { lock (gate) diff --git a/src/R3/Operators/_Operators.cs b/src/R3/Operators/_Operators.cs index e285b11d..baec6861 100644 --- a/src/R3/Operators/_Operators.cs +++ b/src/R3/Operators/_Operators.cs @@ -4,10 +4,23 @@ public static partial class EventExtensions { // TODO: this is working space, will remove this file after complete. - // Standard Query + // Time based // Frame based // OnErrorStop + + // Rx Merging: + //CombineLatest, Merge, Zip, WithLatestFrom, ZipLatest, Switch, MostRecent + + // Standard Query: + // Concat, Append, Prepend, Distinct, DistinctUntilChanged, Scan, Select, SelectMany + + // SkipTake: + // Skip, SkipLast, SkipUntil, SkipWhile, Take, TakeLast, TakeLastBuffer, TakeUntil, TakeWhile + // TakeUntilDestroy, TakeUntilDisable + + // return tasks: + // All, Any, Contains, SequenceEqual, ElementAt, ElementAtOrDefault, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup, ForEachAsync } } diff --git a/src/R3/SafeTimeProvider.cs b/src/R3/SafeTimeProvider.cs deleted file mode 100644 index b2f3dc1a..00000000 --- a/src/R3/SafeTimeProvider.cs +++ /dev/null @@ -1,53 +0,0 @@ -namespace R3; - - -// TODO: remove this. -public sealed class SafeTimerTimeProvider : TimeProvider -{ - readonly TimeProvider timeProvider; - readonly Action unhandledExceptionHandler; - readonly TimerCallback wrappedCallback; - - internal bool IsSystemTimeProvider => timeProvider == TimeProvider.System; - internal Action UnhandledExceptionHandler => unhandledExceptionHandler; - - public SafeTimerTimeProvider(TimeProvider timeProvider, Action unhandledExceptionHandler) - { - this.timeProvider = timeProvider; - this.unhandledExceptionHandler = unhandledExceptionHandler; - this.wrappedCallback = CallbackWithHandleUnhandledException; - } - - public override long GetTimestamp() => timeProvider.GetTimestamp(); - - public override DateTimeOffset GetUtcNow() => timeProvider.GetUtcNow(); - - public override TimeZoneInfo LocalTimeZone => timeProvider.LocalTimeZone; - - public override long TimestampFrequency => timeProvider.TimestampFrequency; - - public override ITimer CreateTimer(TimerCallback callback, object? state, TimeSpan dueTime, TimeSpan period) - { - return timeProvider.CreateTimer(wrappedCallback, Tuple.Create(callback, state), dueTime, period); - } - - void CallbackWithHandleUnhandledException(object? state) - { - try - { - var (originalCallback, originalState) = (Tuple)state!; - originalCallback.Invoke(originalState); - } - catch (Exception ex) - { - try - { - unhandledExceptionHandler(ex); - } - catch - { - // ignore - } - } - } -} diff --git a/tests/R3.Tests/CompositeDisposableTest.cs b/tests/R3.Tests/CompositeDisposableTest.cs new file mode 100644 index 00000000..f0d03d8b --- /dev/null +++ b/tests/R3.Tests/CompositeDisposableTest.cs @@ -0,0 +1,68 @@ +namespace R3.Tests; + +public class CompositeDisposableTest +{ + [Fact] + public void Add() + { + var d1 = new TestDisposable(); + var d2 = new TestDisposable(); + var d3 = new TestDisposable(); + + var composite = new CompositeDisposable(); + + composite.Add(d1); + composite.Add(d2); + composite.Add(d3); + + d1.CalledCount.Should().Be(0); + + composite.Remove(d2); + d2.CalledCount.Should().Be(1); + + composite.Clear(); + d1.CalledCount.Should().Be(1); + d3.CalledCount.Should().Be(1); + + composite.Add(d1); + composite.Add(d2); + composite.Add(d3); + + composite.Dispose(); + + d1.CalledCount.Should().Be(2); + d2.CalledCount.Should().Be(2); + d3.CalledCount.Should().Be(2); + + composite.Add(d1); + d1.CalledCount.Should().Be(3); + } + + [Fact] + public void RemoveAndShrink() + { + var disposables = Enumerable.Range(1, 100).Select(x => new TestDisposable()).ToArray(); + var composite = new CompositeDisposable(disposables); + + foreach (var item in disposables) + { + composite.Remove(item); + } + + foreach (var item in disposables) + { + item.CalledCount.Should().Be(1); + } + } + + + class TestDisposable : IDisposable + { + public int CalledCount = 0; + + public void Dispose() + { + CalledCount += 1; + } + } +}