From 74c36450d0b9529270a05ab656467e2ea93e5f97 Mon Sep 17 00:00:00 2001 From: neuecc Date: Wed, 27 Dec 2023 00:31:29 +0900 Subject: [PATCH] SwapListCore --- src/R3/Internal/SwapListCore.cs | 128 ++++++++++++++++++++ src/R3/Operators/ObserveOn.cs | 205 ++++++++++++++++++++------------ 2 files changed, 259 insertions(+), 74 deletions(-) create mode 100644 src/R3/Internal/SwapListCore.cs diff --git a/src/R3/Internal/SwapListCore.cs b/src/R3/Internal/SwapListCore.cs new file mode 100644 index 00000000..ec34c34c --- /dev/null +++ b/src/R3/Internal/SwapListCore.cs @@ -0,0 +1,128 @@ +namespace R3.Internal; + +internal struct SwapListCore +{ + const int InitialArraySize = 4; + + T[]? arrayA; + int lengthA; + + T[]? arrayB; + int lengthB; + + bool useA; + + public bool HasValue + { + get + { + return lengthA > 0 || lengthB > 0; + } + } + + public void Add(T value) + { + if (useA) + { + if (arrayA == null) + { + arrayA = new T[InitialArraySize]; + } + else if (lengthA == arrayA.Length) + { + var tmp = new T[arrayA.Length * 2]; + Array.Copy(arrayA, tmp, arrayA.Length); + arrayA = tmp; + } + arrayA[lengthA++] = value; + } + else + { + if (arrayB == null) + { + arrayB = new T[InitialArraySize]; + } + else if (lengthB == arrayB.Length) + { + var tmp = new T[arrayB.Length * 2]; + Array.Copy(arrayB, tmp, arrayB.Length); + arrayB = tmp; + } + arrayB[lengthB++] = value; + } + } + + public ReadOnlySpan Swap(out bool token) + { + if (useA) + { + useA = false; + if (arrayA == null) + { + token = true; + return ReadOnlySpan.Empty; + } + else + { + token = true; + return arrayA.AsSpan(0, lengthA); + } + } + else + { + useA = true; + if (arrayB == null) + { + token = false; + return ReadOnlySpan.Empty; + } + else + { + token = false; + return arrayB.AsSpan(0, lengthB); + } + } + } + + public void Clear(bool token) + { + if (token) // token means useA + { + if (arrayA != null) + { + Array.Clear(arrayA, 0, lengthA); + lengthA = 0; + } + } + else + { + if (arrayB != null) + { + Array.Clear(arrayB, 0, lengthB); + lengthB = 0; + } + } + + if (lengthB == 0) + { + useA = true; + } + } + + public void Dispose() + { + if (arrayA != null) + { + Array.Clear(arrayA, 0, lengthA); + arrayA = null; + lengthA = 0; + } + + if (arrayB != null) + { + Array.Clear(arrayB, 0, lengthB); + arrayB = null; + lengthB = 0; + } + } +} diff --git a/src/R3/Operators/ObserveOn.cs b/src/R3/Operators/ObserveOn.cs index b134fb7a..3d90b3f2 100644 --- a/src/R3/Operators/ObserveOn.cs +++ b/src/R3/Operators/ObserveOn.cs @@ -1,10 +1,15 @@ using System.Collections.Concurrent; -using System.Runtime.InteropServices; namespace R3; public static partial class ObservableExtensions { + /// ObserveOn SynchronizationContext.Current + public static Observable ObserveOnCurrent(this Observable source) + { + return ObserveOn(source, SynchronizationContext.Current); + } + public static Observable ObserveOn(this Observable source, SynchronizationContext? synchronizationContext) { if (synchronizationContext == null) @@ -31,7 +36,6 @@ public static Observable ObserveOn(this Observable source, FrameProvide } } -// TODO: use local-queue(careful re-entrant) implementation internal sealed class ObserveOnSynchronizationContext(Observable source, SynchronizationContext synchronizationContext) : Observable { protected override IDisposable SubscribeCore(Observer observer) @@ -41,10 +45,13 @@ protected override IDisposable SubscribeCore(Observer observer) sealed class _ObserveOn : Observer { + static readonly SendOrPostCallback postCallback = DrainMessages; + readonly Observer observer; readonly SynchronizationContext synchronizationContext; - SendOrPostCallback onNext; - SendOrPostCallback onErrorResume; + readonly object gate = new object(); + SwapListCore> list; + bool running; protected override bool AutoDisposeOnCompleted => false; @@ -52,46 +59,121 @@ public _ObserveOn(Observer observer, SynchronizationContext synchronizationCo { this.observer = observer; this.synchronizationContext = synchronizationContext; - // make closure(capture observer) - onNext = PostOnNext; - onErrorResume = PostOnErrorResume; } protected override void OnNextCore(T value) { - synchronizationContext.Post(onNext, value); + EnqueueValue(new(value)); } protected override void OnErrorResumeCore(Exception error) { - synchronizationContext.Post(onErrorResume, error); + EnqueueValue(new(error)); } protected override void OnCompletedCore(Result result) { - // OnCompletedCore is call once, observer capture here. - synchronizationContext.Post(_ => + EnqueueValue(new(result)); + } + + void EnqueueValue(Notification value) + { + lock (gate) { - try - { - observer.OnCompleted(result); - } - finally + if (IsDisposed) return; + list.Add(value); + + if (!running) { - Dispose(); + running = true; + synchronizationContext.Post(postCallback, this); } - }, null); + } } - void PostOnNext(object? state) + protected override void DisposeCore() { - observer.OnNext((T)state!); + lock (gate) + { + list.Dispose(); + } } - void PostOnErrorResume(object? state) + static void DrainMessages(object? state) { - observer.OnErrorResume((Exception)state!); + var self = (_ObserveOn)state!; + + ReadOnlySpan> values; + bool token; + lock (self.gate) + { + values = self.list.Swap(out token); + if (values.Length == 0) + { + goto FINALIZE; + } + } + + foreach (var value in values) + { + try + { + switch (value.Kind) + { + case NotificationKind.OnNext: + self.observer.OnNext(value.Value!); + break; + case NotificationKind.OnErrorResume: + self.observer.OnErrorResume(value.Error!); + break; + case NotificationKind.OnCompleted: + try + { + self.observer.OnCompleted(value.Result!.Value); + } + finally + { + self.Dispose(); + } + break; + default: + break; + } + } + catch (Exception ex) + { + try + { + ObservableSystem.GetUnhandledExceptionHandler().Invoke(ex); + } + catch { } + } + } + + FINALIZE: + lock (self.gate) + { + self.list.Clear(token); + + if (self.IsDisposed) + { + self.running = false; + return; + } + + if (self.list.HasValue) + { + // post again + self.synchronizationContext.Post(postCallback, self); + return; + } + else + { + self.running = false; + return; + } + } } } } @@ -103,10 +185,8 @@ protected override IDisposable SubscribeCore(Observer observer) return source.Subscribe(new _ObserveOn(observer)); } - sealed class _ObserveOn(Observer observer) : Observer + sealed class _ObserveOn(Observer observer) : Observer, IThreadPoolWorkItem { - static readonly Action<_ObserveOn> drainMessages = DrainMessages; - Observer observer = observer; ConcurrentQueue> q = new(); bool running = false; @@ -138,44 +218,44 @@ void TryStartWorker() if (!running) { running = true; - ThreadPool.UnsafeQueueUserWorkItem(drainMessages, this, preferLocal: false); + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); } } } - static void DrainMessages(_ObserveOn state) + void IThreadPoolWorkItem.Execute() { AGAIN: - while (state.q.TryDequeue(out var item)) + while (q.TryDequeue(out var item)) { switch (item.Kind) { case NotificationKind.OnNext: - state.observer.OnNext(item.Value!); + observer.OnNext(item.Value!); break; case NotificationKind.OnErrorResume: - state.observer.OnErrorResume(item.Error!); + observer.OnErrorResume(item.Error!); break; case NotificationKind.OnCompleted: try { - state.observer.OnCompleted(item.Result!.Value); + observer.OnCompleted(item.Result!.Value); } finally { - state.Dispose(); + Dispose(); } break; } } - lock (state.q) + lock (q) { - if (state.q.Count != 0) + if (q.Count != 0) { goto AGAIN; } - state.running = false; + running = false; return; } } @@ -343,10 +423,8 @@ internal sealed class _ObserveOn : Observer, IFrameRunnerWorkItem readonly Observer observer; readonly FrameProvider frameProvider; readonly object gate = new object(); - List>? listA; - List>? listB; + SwapListCore> list; bool running; - bool useListA; protected override bool AutoDisposeOnCompleted => false; @@ -355,7 +433,7 @@ public _ObserveOn(Observer observer, FrameProvider frameProvider) this.observer = observer; this.frameProvider = frameProvider; this.running = false; - this.useListA = true; + this.list = new SwapListCore>(); } protected override void OnNextCore(T value) @@ -378,16 +456,7 @@ void EnqueueValue(Notification value) lock (gate) { if (IsDisposed) return; - if (useListA) - { - if (listA == null) listA = new(); - listA.Add(value); - } - else - { - if (listB == null) listB = new(); - listB.Add(value); - } + list.Add(value); if (!running) { @@ -399,27 +468,18 @@ void EnqueueValue(Notification value) public bool MoveNext(long frameCount) { - List>? list = null; + ReadOnlySpan> values; + bool token; lock (gate) { - if (useListA) + values = list.Swap(out token); + if (values.Length == 0) { - list = listA; - useListA = false; // switch to listB + goto FINALIZE; } - else - { - list = listB; - useListA = true; // swith to listA - } - } - - if (list == null) - { - goto FINALIZE; } - foreach (var value in CollectionsMarshal.AsSpan(list)) + foreach (var value in values) { try { @@ -458,10 +518,7 @@ public bool MoveNext(long frameCount) FINALIZE: lock (gate) { - if (list != null) - { - list.Clear(); - } + list.Clear(token); if (IsDisposed) { @@ -469,14 +526,15 @@ public bool MoveNext(long frameCount) return false; } - if ((listA?.Count ?? 0) == 0 && (listB?.Count ?? 0) == 0) + if (list.HasValue) + { + return true; + } + else { running = false; - useListA = true; return false; } - - return true; } } @@ -484,8 +542,7 @@ protected override void DisposeCore() { lock (gate) { - listA = null; // not call Clear, because list is used in MoveNext - listB = null; + list.Dispose(); } } }