diff --git a/sandbox/ConsoleApp1/LiveList.cs b/sandbox/ConsoleApp1/LiveList.cs new file mode 100644 index 00000000..03c04e6f --- /dev/null +++ b/sandbox/ConsoleApp1/LiveList.cs @@ -0,0 +1,99 @@ +using R3; +using System.Collections; +using System.Runtime.InteropServices; + +public sealed class LiveList : IReadOnlyList, IDisposable +{ + readonly List list = new List(); + readonly IDisposable sourceSubscription; + + public LiveList(Event source) + { + sourceSubscription = source.Subscribe(new ListSubscriber(list)); + } + + public T this[int index] + { + get + { + lock (list) + { + return list[index]; + } + } + } + + public int Count + { + get + { + lock (list) + { + return list.Count; + } + } + } + + public void Dispose() + { + sourceSubscription.Dispose(); + } + + public void ForEach(Action action) + { + lock (list) + { + var span = CollectionsMarshal.AsSpan(list); + foreach (ref var item in span) + { + action(item); + } + } + } + + public void ForEach(Action action, TState state) + { + lock (list) + { + var span = CollectionsMarshal.AsSpan(list); + foreach (ref var item in span) + { + action(item, state); + } + } + } + + public IEnumerator GetEnumerator() + { + lock (list) + { + // snapshot + return CollectionsMarshal.AsSpan(list).ToArray().AsEnumerable().GetEnumerator(); + } + } + + IEnumerator IEnumerable.GetEnumerator() + { + lock (list) + { + // snapshot + return CollectionsMarshal.AsSpan(list).ToArray().AsEnumerable().GetEnumerator(); + } + } + + sealed class ListSubscriber(List list) : Subscriber + { + protected override void OnNextCore(T message) + { + lock (list) + { + list.Add(message); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + EventSystem.GetUnhandledExceptionHandler().Invoke(error); + } + } +} diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index ae0f274d..afa95fb3 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -4,7 +4,7 @@ using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Linq; -using System.Runtime.InteropServices; +using System.Reactive.Subjects; using System.Threading.Channels; using ZLogger; //using System.Reactive.Disposables; @@ -81,7 +81,7 @@ Console.WriteLine($"Average: {Enumerable.Empty().Average()}"); - +// s.ToListObservable(); // Observable.Throw( // s.Where( @@ -173,5 +173,3 @@ public static IDisposable WriteLine(this CompletableEvent source) return source.Subscribe(x => Console.WriteLine(x), _ => Console.WriteLine("COMPLETED")); } } - - diff --git a/src/R3/Internal/RingBuffer.cs b/src/R3/Internal/RingBuffer.cs new file mode 100644 index 00000000..a3118e8b --- /dev/null +++ b/src/R3/Internal/RingBuffer.cs @@ -0,0 +1,405 @@ +using System.Collections; +using System.Diagnostics.CodeAnalysis; + +namespace R3.Internal; + +public sealed class RingBuffer : IList, IReadOnlyList +{ + T[] buffer; + int head; + int count; + int mask; + + public RingBuffer() + { + this.buffer = new T[8]; + this.head = 0; + this.count = 0; + this.mask = buffer.Length - 1; + } + + public RingBuffer(int capacity) + { + this.buffer = new T[CalculateCapacity(capacity)]; + this.head = 0; + this.count = 0; + this.mask = buffer.Length - 1; + } + + public RingBuffer(IEnumerable collection) + { + var array = collection.TryGetNonEnumeratedCount(out var count) + ? new T[CalculateCapacity(count)] + : new T[8]; + var i = 0; + foreach (var item in collection) + { + if (i == array.Length) + { + Array.Resize(ref array, i * 2); + } + array[i++] = item; + } + + this.buffer = array; + this.head = 0; + this.count = i; + this.mask = buffer.Length - 1; + } + + static int CalculateCapacity(int size) + { + size--; + size |= size >> 1; + size |= size >> 2; + size |= size >> 4; + size |= size >> 8; + size |= size >> 16; + size += 1; + + if (size < 8) + { + size = 8; + } + return size; + } + + public T this[int index] + { + get + { + var i = (head + index) & mask; + return buffer[i]; + } + set + { + var i = (head + index) & mask; + buffer[i] = value; + } + } + + public int Count => count; + + public bool IsReadOnly => false; + + public void AddLast(T item) + { + if (count == buffer.Length) EnsureCapacity(); + + var index = (head + count) & mask; + buffer[index] = item; + count++; + } + + public void AddFirst(T item) + { + if (count == buffer.Length) EnsureCapacity(); + + head = (head - 1) & mask; + buffer[head] = item; + count++; + } + + public T RemoveLast() + { + if (count == 0) ThrowForEmpty(); + + var index = (head + count - 1) & mask; + var v = buffer[index]; + buffer[index] = default!; + count--; + return v; + } + + public T RemoveFirst() + { + if (count == 0) ThrowForEmpty(); + + var index = head & mask; + var v = buffer[index]; + buffer[index] = default!; + head = head + 1; + count--; + return v; + } + + void EnsureCapacity() + { + var newBuffer = new T[buffer.Length * 2]; + + var i = head & mask; + buffer.AsSpan(i).CopyTo(newBuffer); + + if (i != 0) + { + buffer.AsSpan(0, i).CopyTo(newBuffer.AsSpan(buffer.Length - i)); + } + + head = 0; + buffer = newBuffer; + mask = newBuffer.Length - 1; + } + + void ICollection.Add(T item) + { + AddLast(item); + } + + public void Clear() + { + Array.Clear(buffer, 0, buffer.Length); + head = 0; + count = 0; + } + + public RingBufferSpan GetSpan() + { + if (count == 0) + { + return new RingBufferSpan(Array.Empty(), Array.Empty(), 0); + } + + var start = head & mask; + var end = (head + count) & mask; + + if (end > start) + { + var first = buffer.AsSpan(start, count); + var second = Array.Empty().AsSpan(); + return new RingBufferSpan(first, second, count); + } + else + { + var first = buffer.AsSpan(start, buffer.Length - start); + var second = buffer.AsSpan(0, end); + return new RingBufferSpan(first, second, count); + } + } + + public IEnumerator GetEnumerator() + { + if (count == 0) yield break; + + var start = head & mask; + var end = (head + count) & mask; + + if (end > start) + { + // start...end + for (int i = start; i < end; i++) + { + yield return buffer[i]; + } + } + else + { + // start... + for (int i = start; i < buffer.Length; i++) + { + yield return buffer[i]; + } + // 0...end + for (int i = 0; i < end; i++) + { + yield return buffer[i]; + } + } + } + + public IEnumerable Reverse() + { + if (count == 0) yield break; + + var start = head & mask; + var end = (head + count) & mask; + + if (end > start) + { + // end...start + for (int i = end - 1; i >= start; i--) + { + yield return buffer[i]; + } + } + else + { + // end...0 + for (int i = end - 1; i >= 0; i--) + { + yield return buffer[i]; + } + + // ...start + for (int i = buffer.Length - 1; i >= start; i--) + { + yield return buffer[i]; + } + } + } + + public bool Contains(T item) + { + return IndexOf(item) != -1; + } + + public void CopyTo(T[] array, int arrayIndex) + { + var span = GetSpan(); + var dest = array.AsSpan(arrayIndex); + span.First.CopyTo(dest); + span.Second.CopyTo(dest.Slice(span.First.Length)); + } + + public int IndexOf(T item) + { + var i = 0; + foreach (var v in GetSpan()) + { + if (EqualityComparer.Default.Equals(item, v)) + { + return i; + } + i++; + } + return -1; + } + + public T[] ToArray() + { + return GetSpan().ToArray(); + } + + public int BinarySearch(T item) + { + return BinarySearch(item, Comparer.Default); + } + + public int BinarySearch(T item, IComparer comparer) + { + var lo = 0; + var hi = count - 1; + + while (lo <= hi) + { + var mid = (int)(((uint)hi + (uint)lo) >> 1); + var found = comparer.Compare(this[mid], item); + + if (found == 0) return mid; + if (found < 0) + { + lo = mid + 1; + } + else + { + hi = mid - 1; + } + } + + return ~lo; + } + + void IList.Insert(int index, T item) + { + throw new NotSupportedException(); + } + + bool ICollection.Remove(T item) + { + throw new NotSupportedException(); + } + + void IList.RemoveAt(int index) + { + throw new NotSupportedException(); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return ((IEnumerable)this).GetEnumerator(); + } + + [DoesNotReturn] + static void ThrowForEmpty() + { + throw new InvalidOperationException("RingBuffer is empty."); + } +} + +public ref struct RingBufferSpan +{ + public readonly ReadOnlySpan First; + public readonly ReadOnlySpan Second; + public readonly int Count; + + internal RingBufferSpan(ReadOnlySpan first, ReadOnlySpan second, int count) + { + First = first; + Second = second; + Count = count; + } + + public Enumerator GetEnumerator() + { + return new Enumerator(this); + } + + public T[] ToArray() + { + var array = new T[Count]; + var span = array.AsSpan(); + First.CopyTo(span); + span = span.Slice(First.Length); + Second.CopyTo(span); + return array; + } + + public ref struct Enumerator + { + ReadOnlySpan.Enumerator firstEnumerator; + ReadOnlySpan.Enumerator secondEnumerator; + bool useFirst; + + public Enumerator(RingBufferSpan span) + { + this.firstEnumerator = span.First.GetEnumerator(); + this.secondEnumerator = span.Second.GetEnumerator(); + this.useFirst = true; + } + + public bool MoveNext() + { + if (useFirst) + { + if (firstEnumerator.MoveNext()) + { + return true; + } + else + { + useFirst = false; + } + } + + if (secondEnumerator.MoveNext()) + { + return true; + } + return false; + } + + public ref readonly T Current + { + get + { + if (useFirst) + { + return ref firstEnumerator.Current; + } + else + { + return ref secondEnumerator.Current; + } + } + } + } +} diff --git a/src/R3/LiveList.cs b/src/R3/LiveList.cs new file mode 100644 index 00000000..02a2fe5a --- /dev/null +++ b/src/R3/LiveList.cs @@ -0,0 +1,277 @@ +using System.Collections; +using System.Diagnostics.CodeAnalysis; + +namespace R3; + +public static partial class EventExtensions +{ + public static LiveList ToLiveList(this Event source) + { + return new LiveList(source); + } + + public static LiveList ToLiveList(this Event source, int bufferSize) + { + return new LiveList(source, bufferSize); + } + + public static LiveList ToLiveList(this CompletableEvent source) + { + return new LiveList(source); + } + + public static LiveList ToLiveList(this CompletableEvent source, int bufferSize) + { + return new LiveList(source, bufferSize); + } +} + +public sealed class LiveList : IReadOnlyList, IDisposable +{ + readonly RingBuffer list = new RingBuffer(); + readonly IDisposable sourceSubscription; + readonly int bufferSize; + + public LiveList(Event source) + : this(source, -1) + { + } + + public LiveList(Event source, int bufferSize) + { + if (bufferSize == 0) bufferSize = 1; + this.bufferSize = bufferSize; // bufferSize must set before Subscribe(sometimes Subscribe run immediately) + this.sourceSubscription = source.Subscribe(new ListSubscriber(this)); + } + + public T this[int index] + { + get + { + lock (list) + { + return list[index]; + } + } + } + + public int Count + { + get + { + lock (list) + { + return list.Count; + } + } + } + + public void Dispose() + { + sourceSubscription.Dispose(); + } + + public void ForEach(Action action) + { + lock (list) + { + var span = list.GetSpan(); + foreach (ref readonly var item in span) + { + action(item); + } + } + } + + public void ForEach(Action action, TState state) + { + lock (list) + { + var span = list.GetSpan(); + foreach (ref readonly var item in span) + { + action(item, state); + } + } + } + + public T[] ToArray() + { + lock (list) + { + return list.ToArray(); + } + } + + IEnumerator IEnumerable.GetEnumerator() + { + lock (list) + { + // snapshot + return ToArray().AsEnumerable().GetEnumerator(); + } + } + + IEnumerator IEnumerable.GetEnumerator() + { + lock (list) + { + // snapshot + return ToArray().AsEnumerable().GetEnumerator(); + } + } + + sealed class ListSubscriber(LiveList parent) : Subscriber + { + protected override void OnNextCore(T message) + { + lock (parent.list) + { + if (parent.list.Count == parent.bufferSize) + { + parent.list.RemoveFirst(); + } + parent.list.AddLast(message); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + EventSystem.GetUnhandledExceptionHandler().Invoke(error); + } + } +} + +public sealed class LiveList : IReadOnlyList, IDisposable +{ + readonly RingBuffer list = new RingBuffer(); + readonly IDisposable sourceSubscription; + readonly int bufferSize; + + bool isCompleted; + TComplete? completedValue; + + [MemberNotNullWhen(true, nameof(CompletedValue))] + public bool IsCompleted => isCompleted; + + public TComplete? CompletedValue => completedValue; + + public LiveList(CompletableEvent source) + : this(source, -1) + { + } + + public LiveList(CompletableEvent source, int bufferSize) + { + if (bufferSize == 0) bufferSize = 1; + this.bufferSize = bufferSize; // bufferSize must set before Subscribe(sometimes Subscribe run immediately) + this.sourceSubscription = source.Subscribe(new ListSubscriber(this)); + } + + public T this[int index] + { + get + { + lock (list) + { + return list[index]; + } + } + } + + public int Count + { + get + { + lock (list) + { + return list.Count; + } + } + } + + public void Dispose() + { + sourceSubscription.Dispose(); + } + + public void ForEach(Action action) + { + lock (list) + { + var span = list.GetSpan(); + foreach (ref readonly var item in span) + { + action(item); + } + } + } + + public void ForEach(Action action, TState state) + { + lock (list) + { + var span = list.GetSpan(); + foreach (ref readonly var item in span) + { + action(item, state); + } + } + } + + public T[] ToArray() + { + lock (list) + { + return list.ToArray(); + } + } + + IEnumerator IEnumerable.GetEnumerator() + { + lock (list) + { + // snapshot + return ToArray().AsEnumerable().GetEnumerator(); + } + } + + IEnumerator IEnumerable.GetEnumerator() + { + lock (list) + { + // snapshot + return ToArray().AsEnumerable().GetEnumerator(); + } + } + + sealed class ListSubscriber(LiveList parent) : Subscriber + { + protected override void OnNextCore(T message) + { + lock (parent.list) + { + if (parent.list.Count == parent.bufferSize) + { + parent.list.RemoveFirst(); + + } + parent.list.AddLast(message); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + EventSystem.GetUnhandledExceptionHandler().Invoke(error); + } + + protected override void OnCompletedCore(TComplete complete) + { + lock (parent.list) + { + parent.completedValue = complete; + parent.isCompleted = true; + } + } + } +} diff --git a/tests/R3.Tests/FactoryTests/EmptyTest.cs b/tests/R3.Tests/FactoryTests/EmptyTest.cs index 6da95aa5..f02592e9 100644 --- a/tests/R3.Tests/FactoryTests/EmptyTest.cs +++ b/tests/R3.Tests/FactoryTests/EmptyTest.cs @@ -7,7 +7,7 @@ public class EmptyTest [Fact] public void Empty() { - using var list = EventFactory.Empty().LiveRecord(); + using var list = EventFactory.Empty().ToLiveList(); list.AssertIsCompleted(); } @@ -15,7 +15,7 @@ public void Empty() public void EmptyWithTime() { var fakeTime = new FakeTimeProvider(); - using var list = EventFactory.Empty(TimeSpan.FromSeconds(5), fakeTime).LiveRecord(); + using var list = EventFactory.Empty(TimeSpan.FromSeconds(5), fakeTime).ToLiveList(); fakeTime.Advance(TimeSpan.FromSeconds(4)); list.AssertIsNotCompleted(); diff --git a/tests/R3.Tests/FactoryTests/NeverTest.cs b/tests/R3.Tests/FactoryTests/NeverTest.cs index 0aee71b6..e03598b9 100644 --- a/tests/R3.Tests/FactoryTests/NeverTest.cs +++ b/tests/R3.Tests/FactoryTests/NeverTest.cs @@ -5,15 +5,15 @@ public class NeverTest [Fact] public void Never() { - using var list = EventFactory.Never().LiveRecord(); - list.AssertIsNotCompleted(); + using var list = EventFactory.Never().ToLiveList(); + list.AssertEqual([]); } // NeverComplete test [Fact] public void NeverComplete() { - using var list = EventFactory.NeverComplete().LiveRecord(); + using var list = EventFactory.NeverComplete().ToLiveList(); list.AssertIsNotCompleted(); } } diff --git a/tests/R3.Tests/FactoryTests/RangeTest.cs b/tests/R3.Tests/FactoryTests/RangeTest.cs index 55de139a..1c3a8158 100644 --- a/tests/R3.Tests/FactoryTests/RangeTest.cs +++ b/tests/R3.Tests/FactoryTests/RangeTest.cs @@ -7,19 +7,19 @@ public class RangeTest [Fact] public void Range() { - using var list1 = EventFactory.Range(5, 8).LiveRecord(); + using var list1 = EventFactory.Range(5, 8).ToLiveList(); list1.AssertEqual([5, 6, 7, 8, 9, 10, 11, 12]); list1.AssertIsCompleted(); - using var list2 = EventFactory.Range(20, 3).LiveRecord(); + using var list2 = EventFactory.Range(20, 3).ToLiveList(); list2.AssertEqual([20, 21, 22]); list2.AssertIsCompleted(); - using var list3 = EventFactory.Range(-3, 5).LiveRecord(); + using var list3 = EventFactory.Range(-3, 5).ToLiveList(); list3.AssertEqual([-3, -2, -1, 0, 1]); list3.AssertIsCompleted(); - using var list4 = EventFactory.Range(10, 0).LiveRecord(); + using var list4 = EventFactory.Range(10, 0).ToLiveList(); list4.AssertEqual([]); list4.AssertIsCompleted(); @@ -34,7 +34,7 @@ public void Stop() using var list = EventFactory.Range(0, int.MaxValue, cts.Token) .Take(5) .DoOnCompleted(() => cts.Cancel()) - .LiveRecord(); + .ToLiveList(); list.AssertEqual([0, 1, 2, 3, 4]); list.AssertIsCompleted(); diff --git a/tests/R3.Tests/FactoryTests/RepeatTest.cs b/tests/R3.Tests/FactoryTests/RepeatTest.cs index b6e3408b..eb9fee05 100644 --- a/tests/R3.Tests/FactoryTests/RepeatTest.cs +++ b/tests/R3.Tests/FactoryTests/RepeatTest.cs @@ -6,11 +6,11 @@ public class RepeatTest [Fact] public void Repeat() { - using var list = EventFactory.Repeat("foo", 3).LiveRecord(); + using var list = EventFactory.Repeat("foo", 3).ToLiveList(); list.AssertEqual(["foo", "foo", "foo"]); list.AssertIsCompleted(); - using var list2 = EventFactory.Repeat("foo", 0).LiveRecord(); + using var list2 = EventFactory.Repeat("foo", 0).ToLiveList(); list2.AssertEqual([]); list2.AssertIsCompleted(); @@ -25,7 +25,7 @@ public void Stop() using var list = EventFactory.Repeat("foo", int.MaxValue, cts.Token) .Take(5) .DoOnCompleted(() => cts.Cancel()) - .LiveRecord(); + .ToLiveList(); list.AssertEqual(["foo", "foo", "foo", "foo", "foo"]); list.AssertIsCompleted(); diff --git a/tests/R3.Tests/FactoryTests/ReturnOnCompletedTest.cs b/tests/R3.Tests/FactoryTests/ReturnOnCompletedTest.cs index 6ee13977..8bb9173d 100644 --- a/tests/R3.Tests/FactoryTests/ReturnOnCompletedTest.cs +++ b/tests/R3.Tests/FactoryTests/ReturnOnCompletedTest.cs @@ -8,7 +8,7 @@ public class ReturnOnCompletedTest public void ReturnOnCompleted() { { - using var list = EventFactory.ReturnOnCompleted("foo").LiveRecord(); + using var list = EventFactory.ReturnOnCompleted("foo").ToLiveList(); list.AssertEqual([]); list.AssertIsCompleted(); list.AssertCompletedValue("foo"); @@ -16,7 +16,7 @@ public void ReturnOnCompleted() { var fakeTime = new FakeTimeProvider(); - using var list = EventFactory.ReturnOnCompleted("foo", TimeSpan.FromSeconds(5), fakeTime).LiveRecord(); + using var list = EventFactory.ReturnOnCompleted("foo", TimeSpan.FromSeconds(5), fakeTime).ToLiveList(); fakeTime.Advance(TimeSpan.FromSeconds(4)); list.AssertEqual([]); diff --git a/tests/R3.Tests/FactoryTests/ReturnTest.cs b/tests/R3.Tests/FactoryTests/ReturnTest.cs index 59c941f1..dbbc2101 100644 --- a/tests/R3.Tests/FactoryTests/ReturnTest.cs +++ b/tests/R3.Tests/FactoryTests/ReturnTest.cs @@ -8,21 +8,21 @@ public class ReturnTest public void Return() { { - using var list = EventFactory.Return(10).LiveRecord(); + using var list = EventFactory.Return(10).ToLiveList(); list.AssertEqual([10]); list.AssertIsCompleted(); } { var fakeTime = new FakeTimeProvider(); - using var list = EventFactory.Return(10, TimeSpan.Zero, fakeTime).LiveRecord(); + using var list = EventFactory.Return(10, TimeSpan.Zero, fakeTime).ToLiveList(); list.AssertEqual([10]); list.AssertIsCompleted(); } { var fakeTime = new FakeTimeProvider(); - using var list = EventFactory.Return(10, TimeSpan.FromSeconds(5), fakeTime).LiveRecord(); + using var list = EventFactory.Return(10, TimeSpan.FromSeconds(5), fakeTime).ToLiveList(); list.AssertEqual([]); fakeTime.Advance(TimeSpan.FromSeconds(4)); @@ -38,7 +38,7 @@ public void Return() [Fact] public void ReturnThreadPoolScheduleOptimized() { - using var list = EventFactory.Return(10).LiveRecord(); + using var list = EventFactory.Return(10).ToLiveList(); Thread.Sleep(1); @@ -51,7 +51,7 @@ public void ReturnThreadPoolScheduleOptimized() public void ReturnOnCompleted() { { - using var list = EventFactory.Return(0, "foo").LiveRecord(); + using var list = EventFactory.Return(0, "foo").ToLiveList(); list.AssertEqual([0]); list.AssertIsCompleted(); list.AssertCompletedValue("foo"); @@ -59,7 +59,7 @@ public void ReturnOnCompleted() { var fakeTime = new FakeTimeProvider(); - using var list = EventFactory.Return(10, "foo", TimeSpan.FromSeconds(5), fakeTime).LiveRecord(); + using var list = EventFactory.Return(10, "foo", TimeSpan.FromSeconds(5), fakeTime).ToLiveList(); list.AssertEqual([]); fakeTime.Advance(TimeSpan.FromSeconds(4)); diff --git a/tests/R3.Tests/FactoryTests/ThrowTest.cs b/tests/R3.Tests/FactoryTests/ThrowTest.cs index ee64470e..ddbc9267 100644 --- a/tests/R3.Tests/FactoryTests/ThrowTest.cs +++ b/tests/R3.Tests/FactoryTests/ThrowTest.cs @@ -8,7 +8,7 @@ public void Throw() { { var e = new Exception(); - using var list = EventFactory.Throw(e).LiveRecord(); + using var list = EventFactory.Throw(e).ToLiveList(); list.AssertEqual([]); list.CompletedValue.IsFailure.Should().BeTrue(); list.CompletedValue.Exception.Should().Be(e); @@ -17,7 +17,7 @@ public void Throw() var fakeTime = new FakeTimeProvider(); var e = new Exception(); - using var list = EventFactory.Throw(e, TimeSpan.FromSeconds(5), fakeTime).LiveRecord(); + using var list = EventFactory.Throw(e, TimeSpan.FromSeconds(5), fakeTime).ToLiveList(); fakeTime.Advance(TimeSpan.FromSeconds(4)); list.AssertEqual([]); diff --git a/tests/R3.Tests/LiveListTest.cs b/tests/R3.Tests/LiveListTest.cs new file mode 100644 index 00000000..16363cc2 --- /dev/null +++ b/tests/R3.Tests/LiveListTest.cs @@ -0,0 +1,52 @@ +namespace R3.Tests; + +public class LiveListTest +{ + [Fact] + public void FromEvent() + { + var publisher = new Publisher(); + var list = publisher.ToLiveList(); + + list.AssertEqual([]); + + publisher.PublishOnNext(10); + list.AssertEqual([10]); + + publisher.PublishOnNext(20); + list.AssertEqual([10, 20]); + + publisher.PublishOnNext(30); + list.AssertEqual([10, 20, 30]); + + list.Dispose(); + + publisher.PublishOnNext(40); + list.AssertEqual([10, 20, 30]); + } + + [Fact] + public void BufferSize() + { + var publisher = new Publisher(); + var list = publisher.ToLiveList(bufferSize: 5); + + publisher.PublishOnNext(10); + publisher.PublishOnNext(20); + publisher.PublishOnNext(30); + publisher.PublishOnNext(40); + publisher.PublishOnNext(50); + + list.AssertEqual([10, 20, 30, 40, 50]); + + publisher.PublishOnNext(60); + + list.AssertEqual([20, 30, 40, 50, 60]); + + list[0].Should().Be(20); + list[1].Should().Be(30); + list[2].Should().Be(40); + list[3].Should().Be(50); + list[4].Should().Be(60); + } +} diff --git a/tests/R3.Tests/OperatorTests/WhereTest.cs b/tests/R3.Tests/OperatorTests/WhereTest.cs index 9e30293d..e69c24e6 100644 --- a/tests/R3.Tests/OperatorTests/WhereTest.cs +++ b/tests/R3.Tests/OperatorTests/WhereTest.cs @@ -7,7 +7,7 @@ public void Where() { var p = new Publisher(); - using var list = p.Where(x => x % 2 != 0).LiveRecord(); + using var list = p.Where(x => x % 2 != 0).ToLiveList(); p.PublishOnNext(2); list.AssertEqual([]); @@ -28,7 +28,7 @@ public void WhereWhere() { var p = new Publisher(); - using var list = p.Where(x => x % 2 != 0).Where(x => x % 3 != 0).LiveRecord(); + using var list = p.Where(x => x % 2 != 0).Where(x => x % 3 != 0).ToLiveList(); p.PublishOnNext(2); list.AssertEqual([]); @@ -55,7 +55,7 @@ public void WhereIndexed() { var p = new Publisher(); - using var list = p.Where((x, i) => i % 2 != 0).LiveRecord(); + using var list = p.Where((x, i) => i % 2 != 0).ToLiveList(); p.PublishOnNext(2); list.AssertEqual([]); @@ -82,7 +82,7 @@ public void WhereCompletable() { var p = new CompletablePublisher(); - using var list = p.Where(x => x % 2 != 0).LiveRecord(); + using var list = p.Where(x => x % 2 != 0).ToLiveList(); p.PublishOnNext(2); list.AssertEqual([]); @@ -110,7 +110,7 @@ public void WhereCompletableIndexed() { var p = new CompletablePublisher(); - using var list = p.Where((x, i) => i % 2 != 0).LiveRecord(); + using var list = p.Where((x, i) => i % 2 != 0).ToLiveList(); p.PublishOnNext(2); list.AssertEqual([]); diff --git a/tests/R3.Tests/_TestHelper.cs b/tests/R3.Tests/_TestHelper.cs index 0b517b8f..95acc450 100644 --- a/tests/R3.Tests/_TestHelper.cs +++ b/tests/R3.Tests/_TestHelper.cs @@ -5,53 +5,28 @@ namespace R3.Tests; public static class _TestHelper { - public static RecordList LiveRecord(this Event source) + public static void AssertEqual(this LiveList list, params T[] expected) { - var l = new RecordList(); - l.SourceSubscription.Disposable = source.Subscribe(x => l.Add(x)); - return l; + list.Should().Equal(expected); } - public static RecordList LiveRecord(this CompletableEvent source) + public static void AssertEqual(this LiveList list, params T[] expected) { - var l = new RecordList(); - l.SourceSubscription.Disposable = source.Subscribe(x => l.Add(x), x => { l.IsCompleted = true; l.CompletedValue = x; }); - return l; - } -} - -public sealed class RecordList : List, IDisposable -{ - public SingleAssignmentDisposableCore SourceSubscription; - public bool IsCompleted { get; set; } - public TC? CompletedValue { get; set; } - - public void Dispose() - { - SourceSubscription.Dispose(); - } - - // test helper - - [MemberNotNull(nameof(CompletedValue))] - public void AssertIsCompleted() - { - Debug.Assert(CompletedValue != null); - IsCompleted.Should().BeTrue(); + list.Should().Equal(expected); } - public void AssertIsNotCompleted() + public static void AssertIsCompleted(this LiveList list) { - IsCompleted.Should().BeFalse(); + list.IsCompleted.Should().BeTrue(); } - public void AssertCompletedValue(TC value) + public static void AssertIsNotCompleted(this LiveList list) { - CompletedValue.Should().Be(value); + list.IsCompleted.Should().BeFalse(); } - public void AssertEqual(params T[] expected) + public static void AssertCompletedValue(this LiveList list, TC value) { - this.Should().Equal(expected); + list.CompletedValue.Should().Be(value); } }