Skip to content

Commit

Permalink
CompositeDisposable
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 12, 2023
1 parent 6812a37 commit 83f3eb6
Show file tree
Hide file tree
Showing 9 changed files with 436 additions and 67 deletions.
38 changes: 37 additions & 1 deletion sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,31 @@








var disposables = Enumerable.Range(1, 100).Select(x => new TestDisposable()).ToArray();
var composite = new System.Reactive.Disposables.CompositeDisposable(disposables);

foreach (var item in disposables)
{
composite.Remove(item);
}













SubscriptionTracker.EnableTracking = true;
SubscriptionTracker.EnableStackTrace = true;

Expand Down Expand Up @@ -119,7 +144,7 @@




// subject.ForEachAsync(



Expand Down Expand Up @@ -175,3 +200,14 @@ public static IDisposable WriteLine<T, U>(this CompletableEvent<T, U> source)
return source.Subscribe(x => Console.WriteLine(x), _ => Console.WriteLine("COMPLETED"));
}
}


class TestDisposable : IDisposable
{
public int CalledCount = 0;

public void Dispose()
{
CalledCount += 1;
}
}
238 changes: 238 additions & 0 deletions src/R3/CompositeDisposable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
using System.Buffers;
using System.Collections;
using System.Runtime.InteropServices;

namespace R3;

public sealed class CompositeDisposable : ICollection<IDisposable>, IDisposable
{
List<IDisposable?> list; // when removed, set null
readonly object gate = new object();
bool isDisposed;
int count;

const int ShrinkThreshold = 64;

public bool IsDisposed => Volatile.Read(ref isDisposed);

public CompositeDisposable()
{
this.list = new();
}

public CompositeDisposable(int capacity)
{
if (capacity < 0) throw new ArgumentOutOfRangeException(nameof(capacity));
this.list = new(capacity);
}

public CompositeDisposable(params IDisposable[] disposables)
{
this.list = new(disposables);
this.count = list.Count;
}

public CompositeDisposable(IEnumerable<IDisposable> disposables)
{
this.list = new(disposables);
this.count = list.Count;
}

public int Count
{
get
{
lock (gate)
{
return count;
}
}
}

public bool IsReadOnly => false;

public void Add(IDisposable item)
{
lock (gate)
{
if (!isDisposed)
{
count += 1;
list.Add(item);
return;
}
}

// CompositeDisposable is Disposed.
item.Dispose();
}

public bool Remove(IDisposable item)
{
lock (gate)
{
// CompositeDisposable is Disposed, do nothing.
if (isDisposed) return false;

var current = list;

var index = current.IndexOf(item);
if (index == -1)
{
// not found
return false;
}

// don't do RemoveAt(avoid Array Copy)
current[index] = null;

// Do shrink
if (current.Capacity > ShrinkThreshold && count < current.Capacity / 2)
{
var fresh = new List<IDisposable?>(current.Capacity / 2);

foreach (var d in current)
{
if (d != null)
{
fresh.Add(d);
}
}

list = fresh;
}

count -= 1;
}

// Dispose outside of lock
item.Dispose();
return true;
}

public void Clear()
{
IDisposable?[] targetDisposables;
int clearCount;
lock (gate)
{
// CompositeDisposable is Disposed, do nothing.
if (isDisposed) return;
if (count == 0) return;

targetDisposables = ArrayPool<IDisposable?>.Shared.Rent(list.Count);
clearCount = list.Count;

list.CopyTo(targetDisposables);

list.Clear();
count = 0;
}

// Dispose outside of lock
try
{
foreach (var item in targetDisposables.AsSpan(0, clearCount))
{
item?.Dispose();
}
}
finally
{
ArrayPool<IDisposable?>.Shared.Return(targetDisposables, clearArray: true);
}
}

public bool Contains(IDisposable item)
{
lock (gate)
{
if (isDisposed) return false;
return list.Contains(item);
}
}

public void CopyTo(IDisposable[] array, int arrayIndex)
{
if (arrayIndex < 0 || arrayIndex >= array.Length)
{
throw new ArgumentOutOfRangeException(nameof(arrayIndex));
}

lock (gate)
{
if (isDisposed) return;

if (arrayIndex + count > array.Length)
{
throw new ArgumentOutOfRangeException(nameof(arrayIndex));
}

var i = 0;
foreach (var item in CollectionsMarshal.AsSpan(list))
{
if (item != null)
{
array[arrayIndex + i++] = item;
}
}
}
}

public void Dispose()
{
List<IDisposable?> disposables;

lock (gate)
{
if (isDisposed) return;

count = 0;
isDisposed = true;
disposables = list;
list = null!; // dereference.
}

foreach (var item in disposables)
{
item?.Dispose();
}
disposables.Clear();
}

public IEnumerator<IDisposable> GetEnumerator()
{
lock (gate)
{
// make snapshot
return EnumerateAndClear(list.ToArray()).GetEnumerator();
}
}

IEnumerator IEnumerable.GetEnumerator()
{
lock (gate)
{
// make snapshot
return EnumerateAndClear(list.ToArray()).GetEnumerator();
}
}

static IEnumerable<IDisposable> EnumerateAndClear(IDisposable?[] disposables)
{
try
{
foreach (var item in disposables)
{
if (item != null)
{
yield return item;
}
}
}
finally
{
disposables.AsSpan().Clear();
}
}
}
5 changes: 5 additions & 0 deletions src/R3/Disposable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ public static void AddTo(this IDisposable disposable, ref DisposableBuilder buil
builder.Add(disposable);
}

public static void AddTo(this IDisposable disposable, ICollection<IDisposable> disposables)
{
disposables.Add(disposable);
}

public static IDisposable Create(Action onDisposed)
{
return new AnonymousDisposable(onDisposed);
Expand Down
8 changes: 0 additions & 8 deletions src/R3/Factories/Return.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ public static CompletableEvent<TMessage, Unit> Return<TMessage>(TMessage value,
{
return new ThreadPoolScheduleReturn<TMessage, Unit>(value, default, null); // optimize for SystemTimeProvidr, use ThreadPool.UnsafeQueueUserWorkItem
}
else if (timeProvider is SafeTimerTimeProvider t && t.IsSystemTimeProvider)
{
return new ThreadPoolScheduleReturn<TMessage, Unit>(value, default, t.UnhandledExceptionHandler); // use with SafeTimeProvider.UnhandledExceptionHandler
}
}

return new Return<TMessage, Unit>(value, default, dueTime, timeProvider); // use ITimer
Expand All @@ -49,10 +45,6 @@ public static CompletableEvent<TMessage, TComplete> Return<TMessage, TComplete>(
{
return new ThreadPoolScheduleReturn<TMessage, TComplete>(value, complete, null); // optimize for SystemTimeProvidr, use ThreadPool.UnsafeQueueUserWorkItem
}
else if (timeProvider is SafeTimerTimeProvider t && t.IsSystemTimeProvider)
{
return new ThreadPoolScheduleReturn<TMessage, TComplete>(value, complete, t.UnhandledExceptionHandler); // use with SafeTimeProvider.UnhandledExceptionHandler
}
}

return new Return<TMessage, TComplete>(value, complete, dueTime, timeProvider); // use ITimer
Expand Down
4 changes: 0 additions & 4 deletions src/R3/Factories/ReturnOnCompleted.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ public static CompletableEvent<TMessage, TComplete> ReturnOnCompleted<TMessage,
{
return new ThreadPoolScheduleReturnOnCompleted<TMessage, TComplete>(complete, null); // optimize for SystemTimeProvidr, use ThreadPool.UnsafeQueueUserWorkItem
}
else if (timeProvider is SafeTimerTimeProvider t && t.IsSystemTimeProvider)
{
return new ThreadPoolScheduleReturnOnCompleted<TMessage, TComplete>(complete, t.UnhandledExceptionHandler); // use with SafeTimeProvider.UnhandledExceptionHandler
}
}

return new ReturnOnCompleted<TMessage, TComplete>(complete, dueTime, timeProvider); // use ITimer
Expand Down
Loading

0 comments on commit 83f3eb6

Please sign in to comment.