Skip to content

Commit

Permalink
Add LiveList
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 12, 2023
1 parent f98f4e5 commit 7e2f875
Show file tree
Hide file tree
Showing 14 changed files with 873 additions and 67 deletions.
99 changes: 99 additions & 0 deletions sandbox/ConsoleApp1/LiveList.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using R3;
using System.Collections;
using System.Runtime.InteropServices;

public sealed class LiveList<T> : IReadOnlyList<T>, IDisposable
{
readonly List<T> list = new List<T>();
readonly IDisposable sourceSubscription;

public LiveList(Event<T> source)
{
sourceSubscription = source.Subscribe(new ListSubscriber(list));
}

public T this[int index]
{
get
{
lock (list)
{
return list[index];
}
}
}

public int Count
{
get
{
lock (list)
{
return list.Count;
}
}
}

public void Dispose()
{
sourceSubscription.Dispose();
}

public void ForEach(Action<T> action)
{
lock (list)
{
var span = CollectionsMarshal.AsSpan(list);
foreach (ref var item in span)
{
action(item);
}
}
}

public void ForEach<TState>(Action<T, TState> action, TState state)
{
lock (list)
{
var span = CollectionsMarshal.AsSpan(list);
foreach (ref var item in span)
{
action(item, state);
}
}
}

public IEnumerator<T> GetEnumerator()
{
lock (list)
{
// snapshot
return CollectionsMarshal.AsSpan(list).ToArray().AsEnumerable().GetEnumerator();
}
}

IEnumerator IEnumerable.GetEnumerator()
{
lock (list)
{
// snapshot
return CollectionsMarshal.AsSpan(list).ToArray().AsEnumerable().GetEnumerator();
}
}

sealed class ListSubscriber(List<T> list) : Subscriber<T>
{
protected override void OnNextCore(T message)
{
lock (list)
{
list.Add(message);
}
}

protected override void OnErrorResumeCore(Exception error)
{
EventSystem.GetUnhandledExceptionHandler().Invoke(error);
}
}
}
6 changes: 2 additions & 4 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Runtime.InteropServices;
using System.Reactive.Subjects;
using System.Threading.Channels;
using ZLogger;
//using System.Reactive.Disposables;
Expand Down Expand Up @@ -81,7 +81,7 @@

Console.WriteLine($"Average: {Enumerable.Empty<int>().Average()}");


// s.ToListObservable();

// Observable.Throw(
// s.Where(
Expand Down Expand Up @@ -173,5 +173,3 @@ public static IDisposable WriteLine<T, U>(this CompletableEvent<T, U> source)
return source.Subscribe(x => Console.WriteLine(x), _ => Console.WriteLine("COMPLETED"));
}
}


Loading

0 comments on commit 7e2f875

Please sign in to comment.