From ea90e364f323a056d0375aa177dcd08bfd890714 Mon Sep 17 00:00:00 2001 From: neuecc Date: Sat, 6 Jan 2024 23:01:35 +0900 Subject: [PATCH] ObserveOn/SubscribeOn Dispatcher --- sandbox/WpfApp1/MainWindow.xaml.cs | 36 ++- sandbox/WpfApp1/WpfApp1.csproj | 4 + src/R3.WPF/ObserveOnExtensions.cs | 230 +++++++++++++++++ src/R3.WPF/WpfRenderingFrameProvider.cs | 1 + src/R3/{ => Collections}/FreeListCore.cs | 2 +- src/R3/Collections/LiveList.cs | 236 ++++++++++++++++++ .../{Internal => Collections}/SwapListCore.cs | 4 +- src/R3/LiveList.cs | 232 ----------------- src/R3/ObservableSystem.cs | 4 +- src/R3/R3.csproj | 1 + tests/R3.Tests/_TestHelper.cs | 4 +- 11 files changed, 497 insertions(+), 257 deletions(-) create mode 100644 src/R3.WPF/ObserveOnExtensions.cs rename src/R3/{ => Collections}/FreeListCore.cs (99%) create mode 100644 src/R3/Collections/LiveList.cs rename src/R3/{Internal => Collections}/SwapListCore.cs (97%) delete mode 100644 src/R3/LiveList.cs diff --git a/sandbox/WpfApp1/MainWindow.xaml.cs b/sandbox/WpfApp1/MainWindow.xaml.cs index 31e0ec7b..297890e2 100644 --- a/sandbox/WpfApp1/MainWindow.xaml.cs +++ b/sandbox/WpfApp1/MainWindow.xaml.cs @@ -1,18 +1,6 @@ using R3; -using R3.WPF; using System.Diagnostics; -using System.Runtime.CompilerServices; -using System.Text; using System.Windows; -using System.Windows.Controls; -using System.Windows.Data; -using System.Windows.Documents; -using System.Windows.Input; -using System.Windows.Media; -using System.Windows.Media.Imaging; -using System.Windows.Navigation; -using System.Windows.Shapes; -using System.Windows.Threading; namespace WpfApp1; /// @@ -35,15 +23,25 @@ public MainWindow() - //var sw = Stopwatch.StartNew(); - //Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5)).Subscribe(_ => + //Observable.EveryValueChanged(this, x => x.Width).Subscribe(x => textBlock.Text = x.ToString()); + // this.ObserveEveryValueChanged(x => x.Height).Subscribe(x => HeightText.Text = x.ToString()); + + var sw = Stopwatch.StartNew(); + + //System.Reactive.Linq.Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5)).Subscribe(_ => //{ // textBlock.Text = "Hello World:" + sw.Elapsed; //}); - - Observable.TimerFrame(50, 100).Subscribe(_ => - { - textBlock.Text = "Hello World:" + ObservableSystem.DefaultFrameProvider.GetFrameCount(); - }); + Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5), TimeProvider.System) + .ObserveOnCurrentDispatcher() + .Subscribe(_ => + { + textBlock.Text = "Hello World:" + sw.Elapsed; + }); + + //Observable.TimerFrame(50, 100).Subscribe(_ => + //{ + // textBlock.Text = "Hello World:" + ObservableSystem.DefaultFrameProvider.GetFrameCount(); + //}); } } diff --git a/sandbox/WpfApp1/WpfApp1.csproj b/sandbox/WpfApp1/WpfApp1.csproj index 1f1f76e0..72951cbc 100644 --- a/sandbox/WpfApp1/WpfApp1.csproj +++ b/sandbox/WpfApp1/WpfApp1.csproj @@ -9,6 +9,10 @@ true + + + + diff --git a/src/R3.WPF/ObserveOnExtensions.cs b/src/R3.WPF/ObserveOnExtensions.cs new file mode 100644 index 00000000..10cc8c47 --- /dev/null +++ b/src/R3.WPF/ObserveOnExtensions.cs @@ -0,0 +1,230 @@ +using R3.Collections; +using System.Windows.Threading; + +namespace R3; // using R3 + +public static class ObserveOnExtensions +{ + public static Observable ObserveOnDispatcher(this Observable source, Dispatcher dispatcher, DispatcherPriority dispatcherPriority = DispatcherPriority.Normal) + { + return new ObserveOnDispatcher(source, dispatcher, dispatcherPriority); + } + + public static Observable ObserveOnCurrentDispatcher(this Observable source, DispatcherPriority dispatcherPriority = DispatcherPriority.Normal) + { + return ObserveOnDispatcher(source, Dispatcher.CurrentDispatcher, dispatcherPriority); + } + + public static Observable SubscribeOnDispatcher(this Observable source, Dispatcher dispatcher, DispatcherPriority dispatcherPriority = DispatcherPriority.Normal) + { + return new SubscribeOnDispatcher(source, dispatcher, dispatcherPriority); + } + + public static Observable SubscribeOnCurrentDispatcher(this Observable source, DispatcherPriority dispatcherPriority = DispatcherPriority.Normal) + { + return SubscribeOnDispatcher(source, Dispatcher.CurrentDispatcher, dispatcherPriority); + } +} + +internal sealed class ObserveOnDispatcher(Observable source, Dispatcher dispatcher, DispatcherPriority dispatcherPriority) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _ObserveOnDispatcher(observer, dispatcher, dispatcherPriority)); + } + + sealed class _ObserveOnDispatcher : Observer + { + readonly Action postCallback; + + readonly Observer observer; + readonly Dispatcher dispatcher; + readonly DispatcherPriority dispatcherPriority; + readonly object gate = new object(); + SwapListCore> list; + bool running; + + protected override bool AutoDisposeOnCompleted => false; + + public _ObserveOnDispatcher(Observer observer, Dispatcher dispatcher, DispatcherPriority dispatcherPriority) + { + this.observer = observer; + this.dispatcher = dispatcher; + this.dispatcherPriority = dispatcherPriority; + this.postCallback = DrainMessages; + } + + protected override void OnNextCore(T value) + { + EnqueueValue(new(value)); + } + + protected override void OnErrorResumeCore(Exception error) + { + EnqueueValue(new(error)); + } + + protected override void OnCompletedCore(Result result) + { + EnqueueValue(new(result)); + } + + void EnqueueValue(Notification value) + { + lock (gate) + { + if (IsDisposed) return; + list.Add(value); + + if (!running) + { + running = true; + dispatcher.InvokeAsync(postCallback, dispatcherPriority); + } + } + } + + protected override void DisposeCore() + { + lock (gate) + { + list.Dispose(); + } + } + + void DrainMessages() + { + var self = this; + + ReadOnlySpan> values; + bool token; + lock (self.gate) + { + values = self.list.Swap(out token); + if (values.Length == 0) + { + goto FINALIZE; + } + } + + foreach (var value in values) + { + try + { + switch (value.Kind) + { + case NotificationKind.OnNext: + self.observer.OnNext(value.Value!); + break; + case NotificationKind.OnErrorResume: + self.observer.OnErrorResume(value.Error!); + break; + case NotificationKind.OnCompleted: + try + { + self.observer.OnCompleted(value.Result!.Value); + } + finally + { + self.Dispose(); + } + break; + default: + break; + } + } + catch (Exception ex) + { + try + { + ObservableSystem.GetUnhandledExceptionHandler().Invoke(ex); + } + catch { } + } + } + + FINALIZE: + lock (self.gate) + { + self.list.Clear(token); + + if (self.IsDisposed) + { + self.running = false; + return; + } + + if (self.list.HasValue) + { + // post again + dispatcher.InvokeAsync(postCallback, dispatcherPriority); + return; + } + else + { + self.running = false; + return; + } + } + } + } +} + +internal sealed class SubscribeOnDispatcher(Observable source, Dispatcher dispatcher, DispatcherPriority dispatcherPriority) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return new _SubscribeOnDispatcher(observer, source, dispatcher, dispatcherPriority).Run(); + } + + sealed class _SubscribeOnDispatcher : Observer + { + readonly Action postCallback; + + readonly Observer observer; + readonly Observable source; + readonly Dispatcher dispatcher; + readonly DispatcherPriority dispatcherPriority; + SingleAssignmentDisposableCore disposable; + + public _SubscribeOnDispatcher(Observer observer, Observable source, Dispatcher dispatcher, DispatcherPriority dispatcherPriority) + { + this.observer = observer; + this.source = source; + this.dispatcher = dispatcher; + this.dispatcherPriority = dispatcherPriority; + this.postCallback = Subscribe; + } + + public IDisposable Run() + { + dispatcher.InvokeAsync(postCallback, dispatcherPriority); + return this; + } + + void Subscribe() + { + disposable.Disposable = source.Subscribe(this); + } + + protected override void OnNextCore(T value) + { + observer.OnNext(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + + protected override void DisposeCore() + { + disposable.Dispose(); + } + } +} diff --git a/src/R3.WPF/WpfRenderingFrameProvider.cs b/src/R3.WPF/WpfRenderingFrameProvider.cs index ae45cccb..0e90be6a 100644 --- a/src/R3.WPF/WpfRenderingFrameProvider.cs +++ b/src/R3.WPF/WpfRenderingFrameProvider.cs @@ -1,4 +1,5 @@ using System.Diagnostics.CodeAnalysis; +using R3.Collections; namespace R3.WPF; diff --git a/src/R3/FreeListCore.cs b/src/R3/Collections/FreeListCore.cs similarity index 99% rename from src/R3/FreeListCore.cs rename to src/R3/Collections/FreeListCore.cs index 021e7089..c0b6f878 100644 --- a/src/R3/FreeListCore.cs +++ b/src/R3/Collections/FreeListCore.cs @@ -1,7 +1,7 @@ using System.Runtime.CompilerServices; using System.Runtime.InteropServices; -namespace R3; +namespace R3.Collections; [StructLayout(LayoutKind.Auto)] public struct FreeListCore diff --git a/src/R3/Collections/LiveList.cs b/src/R3/Collections/LiveList.cs new file mode 100644 index 00000000..38e2aa56 --- /dev/null +++ b/src/R3/Collections/LiveList.cs @@ -0,0 +1,236 @@ +using System.Collections; +using System.Runtime.InteropServices; + +namespace R3 +{ + public static partial class ObservableExtensions + { + public static LiveList ToLiveList(this Observable source) + { + return new LiveList(source); + } + + public static LiveList ToLiveList(this Observable source, int bufferSize) + { + return new LiveList(source, bufferSize); + } + } +} + +namespace R3.Collections +{ + public sealed class LiveList : IReadOnlyList, IDisposable + { + readonly IReadOnlyList list; // RingBuffer or List + readonly IDisposable sourceSubscription; + readonly int bufferSize; + + bool isCompleted; + Result completedValue; + + public bool IsCompleted => isCompleted; + + public Result Result + { + get + { + lock (list) + { + if (!isCompleted) throw new InvalidOperationException("LiveList is not completed, you should check IsCompleted."); + return completedValue; + } + } + } + + public LiveList(Observable source) + { + if (bufferSize == 0) bufferSize = 1; + this.bufferSize = -1; + this.list = new List(); + this.sourceSubscription = source.Subscribe(new ListObserver(this)); + } + + public LiveList(Observable source, int bufferSize) + { + if (bufferSize == 0) bufferSize = 1; + this.bufferSize = bufferSize; // bufferSize must set before Subscribe(sometimes Subscribe run immediately) + this.list = new RingBuffer(bufferSize); + this.sourceSubscription = source.Subscribe(new ListObserver(this)); + } + + public T this[int index] + { + get + { + lock (list) + { + return list[index]; + } + } + } + + public int Count + { + get + { + lock (list) + { + return list.Count; + } + } + } + + public void Clear() + { + lock (list) + { + list.Clear(); + } + } + + public void Dispose() + { + sourceSubscription.Dispose(); + } + + public void ForEach(Action action) + { + lock (list) + { + var span = list.GetSpan(); + foreach (ref readonly var item in span) + { + action(item); + } + } + } + + public void ForEach(TState state, Action action) + { + lock (list) + { + var span = list.GetSpan(); + foreach (ref readonly var item in span) + { + action(item, state); + } + } + } + + public T[] ToArray() + { + lock (list) + { + return list.ToArray(); + } + } + + IEnumerator IEnumerable.GetEnumerator() + { + lock (list) + { + // snapshot + return ToArray().AsEnumerable().GetEnumerator(); + } + } + + IEnumerator IEnumerable.GetEnumerator() + { + lock (list) + { + // snapshot + return ToArray().AsEnumerable().GetEnumerator(); + } + } + + sealed class ListObserver(LiveList parent) : Observer + { + protected override void OnNextCore(T message) + { + lock (parent.list) + { + if (parent.bufferSize == -1) + { + ((List)parent.list).Add(message); + } + else + { + var ring = (RingBuffer)parent.list; + + if (ring.Count == parent.bufferSize) + { + ring.RemoveFirst(); + } + ring.AddLast(message); + } + } + } + + protected override void OnErrorResumeCore(Exception error) + { + ObservableSystem.GetUnhandledExceptionHandler().Invoke(error); + } + + protected override void OnCompletedCore(Result complete) + { + lock (parent.list) + { + parent.completedValue = complete; + parent.isCompleted = true; + } + } + } + } + + file static class RingBufferOrListExtensions + { + public static RingBufferSpan GetSpan(this IReadOnlyList list) + { + if (list is RingBuffer r) + { + return r.GetSpan(); + } + else if (list is List l) + { + var span1 = CollectionsMarshal.AsSpan(l); + return new RingBufferSpan(span1, default, span1.Length); + } + else + { + throw new NotSupportedException(); + } + } + + public static void Clear(this IReadOnlyList list) + { + if (list is RingBuffer r) + { + r.Clear(); + } + else if (list is List l) + { + l.Clear(); + } + else + { + throw new NotSupportedException(); + } + } + + public static T[] ToArray(this IReadOnlyList list) + { + if (list is RingBuffer r) + { + return r.ToArray(); + } + else if (list is List l) + { + return CollectionsMarshal.AsSpan(l).ToArray(); + } + else + { + throw new NotSupportedException(); + } + } + } +} diff --git a/src/R3/Internal/SwapListCore.cs b/src/R3/Collections/SwapListCore.cs similarity index 97% rename from src/R3/Internal/SwapListCore.cs rename to src/R3/Collections/SwapListCore.cs index ec34c34c..c2aaaa3f 100644 --- a/src/R3/Internal/SwapListCore.cs +++ b/src/R3/Collections/SwapListCore.cs @@ -1,6 +1,6 @@ -namespace R3.Internal; +namespace R3.Collections; -internal struct SwapListCore +public struct SwapListCore { const int InitialArraySize = 4; diff --git a/src/R3/LiveList.cs b/src/R3/LiveList.cs deleted file mode 100644 index 286cffa2..00000000 --- a/src/R3/LiveList.cs +++ /dev/null @@ -1,232 +0,0 @@ -using System.Collections; -using System.Runtime.InteropServices; - -namespace R3; - -public static partial class ObservableExtensions -{ - public static LiveList ToLiveList(this Observable source) - { - return new LiveList(source); - } - - public static LiveList ToLiveList(this Observable source, int bufferSize) - { - return new LiveList(source, bufferSize); - } -} - -public sealed class LiveList : IReadOnlyList, IDisposable -{ - readonly IReadOnlyList list; // RingBuffer or List - readonly IDisposable sourceSubscription; - readonly int bufferSize; - - bool isCompleted; - Result completedValue; - - public bool IsCompleted => isCompleted; - - public Result Result - { - get - { - lock (list) - { - if (!isCompleted) throw new InvalidOperationException("LiveList is not completed, you should check IsCompleted."); - return completedValue; - } - } - } - - public LiveList(Observable source) - { - if (bufferSize == 0) bufferSize = 1; - this.bufferSize = -1; - this.list = new List(); - this.sourceSubscription = source.Subscribe(new ListObserver(this)); - } - - public LiveList(Observable source, int bufferSize) - { - if (bufferSize == 0) bufferSize = 1; - this.bufferSize = bufferSize; // bufferSize must set before Subscribe(sometimes Subscribe run immediately) - this.list = new RingBuffer(bufferSize); - this.sourceSubscription = source.Subscribe(new ListObserver(this)); - } - - public T this[int index] - { - get - { - lock (list) - { - return list[index]; - } - } - } - - public int Count - { - get - { - lock (list) - { - return list.Count; - } - } - } - - public void Clear() - { - lock (list) - { - list.Clear(); - } - } - - public void Dispose() - { - sourceSubscription.Dispose(); - } - - public void ForEach(Action action) - { - lock (list) - { - var span = list.GetSpan(); - foreach (ref readonly var item in span) - { - action(item); - } - } - } - - public void ForEach(TState state, Action action) - { - lock (list) - { - var span = list.GetSpan(); - foreach (ref readonly var item in span) - { - action(item, state); - } - } - } - - public T[] ToArray() - { - lock (list) - { - return list.ToArray(); - } - } - - IEnumerator IEnumerable.GetEnumerator() - { - lock (list) - { - // snapshot - return ToArray().AsEnumerable().GetEnumerator(); - } - } - - IEnumerator IEnumerable.GetEnumerator() - { - lock (list) - { - // snapshot - return ToArray().AsEnumerable().GetEnumerator(); - } - } - - sealed class ListObserver(LiveList parent) : Observer - { - protected override void OnNextCore(T message) - { - lock (parent.list) - { - if (parent.bufferSize == -1) - { - ((List)parent.list).Add(message); - } - else - { - var ring = (RingBuffer)parent.list; - - if (ring.Count == parent.bufferSize) - { - ring.RemoveFirst(); - } - ring.AddLast(message); - } - } - } - - protected override void OnErrorResumeCore(Exception error) - { - ObservableSystem.GetUnhandledExceptionHandler().Invoke(error); - } - - protected override void OnCompletedCore(Result complete) - { - lock (parent.list) - { - parent.completedValue = complete; - parent.isCompleted = true; - } - } - } -} - -file static class RingBufferOrListExtensions -{ - public static RingBufferSpan GetSpan(this IReadOnlyList list) - { - if (list is RingBuffer r) - { - return r.GetSpan(); - } - else if (list is List l) - { - var span1 = CollectionsMarshal.AsSpan(l); - return new RingBufferSpan(span1, default, span1.Length); - } - else - { - throw new NotSupportedException(); - } - } - - public static void Clear(this IReadOnlyList list) - { - if (list is RingBuffer r) - { - r.Clear(); - } - else if (list is List l) - { - l.Clear(); - } - else - { - throw new NotSupportedException(); - } - } - - public static T[] ToArray(this IReadOnlyList list) - { - if (list is RingBuffer r) - { - return r.ToArray(); - } - else if (list is List l) - { - return CollectionsMarshal.AsSpan(l).ToArray(); - } - else - { - throw new NotSupportedException(); - } - } -} diff --git a/src/R3/ObservableSystem.cs b/src/R3/ObservableSystem.cs index 4e6c6120..c730ac0e 100644 --- a/src/R3/ObservableSystem.cs +++ b/src/R3/ObservableSystem.cs @@ -41,12 +41,12 @@ internal sealed class NotSupportedFrameProvider : FrameProvider { public override long GetFrameCount() { - throw new NotSupportedException("EventSystem.DefaultFrameProvider is not set. Please set ObservableSystem.DefaultFrameProvider to a valid FrameProvider(ThreadSleepFrameProvider, etc...)."); + throw new NotSupportedException("ObservableSystem.DefaultFrameProvider is not set. Please set ObservableSystem.DefaultFrameProvider to a valid FrameProvider(ThreadSleepFrameProvider, etc...)."); } public override void Register(IFrameRunnerWorkItem callback) { - throw new NotSupportedException("EventSystem.DefaultFrameProvider is not set. Please set ObservableSystem.DefaultFrameProvider to a valid FrameProvider(ThreadSleepFrameProvider, etc...)."); + throw new NotSupportedException("ObservableSystem.DefaultFrameProvider is not set. Please set ObservableSystem.DefaultFrameProvider to a valid FrameProvider(ThreadSleepFrameProvider, etc...)."); } } diff --git a/src/R3/R3.csproj b/src/R3/R3.csproj index 86f9636a..c1e31b8d 100644 --- a/src/R3/R3.csproj +++ b/src/R3/R3.csproj @@ -39,6 +39,7 @@ + diff --git a/tests/R3.Tests/_TestHelper.cs b/tests/R3.Tests/_TestHelper.cs index ca88a85c..ab0b6376 100644 --- a/tests/R3.Tests/_TestHelper.cs +++ b/tests/R3.Tests/_TestHelper.cs @@ -1,4 +1,6 @@ -namespace R3.Tests; +using R3.Collections; + +namespace R3.Tests; public static class _TestHelper {