Skip to content

Commit

Permalink
ObserveOn/SubscribeOn Dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jan 6, 2024
1 parent db70213 commit ea90e36
Show file tree
Hide file tree
Showing 11 changed files with 497 additions and 257 deletions.
36 changes: 17 additions & 19 deletions sandbox/WpfApp1/MainWindow.xaml.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,6 @@
using R3;
using R3.WPF;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Text;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;
using System.Windows.Threading;

namespace WpfApp1;
/// <summary>
Expand All @@ -35,15 +23,25 @@ public MainWindow()



//var sw = Stopwatch.StartNew();
//Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5)).Subscribe(_ =>
//Observable.EveryValueChanged(this, x => x.Width).Subscribe(x => textBlock.Text = x.ToString());
// this.ObserveEveryValueChanged(x => x.Height).Subscribe(x => HeightText.Text = x.ToString());

var sw = Stopwatch.StartNew();

//System.Reactive.Linq.Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5)).Subscribe(_ =>
//{
// textBlock.Text = "Hello World:" + sw.Elapsed;
//});

Observable.TimerFrame(50, 100).Subscribe(_ =>
{
textBlock.Text = "Hello World:" + ObservableSystem.DefaultFrameProvider.GetFrameCount();
});
Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5), TimeProvider.System)
.ObserveOnCurrentDispatcher()
.Subscribe(_ =>
{
textBlock.Text = "Hello World:" + sw.Elapsed;
});

//Observable.TimerFrame(50, 100).Subscribe(_ =>
//{
// textBlock.Text = "Hello World:" + ObservableSystem.DefaultFrameProvider.GetFrameCount();
//});
}
}
4 changes: 4 additions & 0 deletions sandbox/WpfApp1/WpfApp1.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
<EnableWindowsTargeting>true</EnableWindowsTargeting>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="System.Reactive.Linq" Version="6.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\R3.WPF\R3.WPF.csproj" />
<ProjectReference Include="..\..\src\R3\R3.csproj" />
Expand Down
230 changes: 230 additions & 0 deletions src/R3.WPF/ObserveOnExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
using R3.Collections;
using System.Windows.Threading;

namespace R3; // using R3

public static class ObserveOnExtensions
{
public static Observable<T> ObserveOnDispatcher<T>(this Observable<T> source, Dispatcher dispatcher, DispatcherPriority dispatcherPriority = DispatcherPriority.Normal)
{
return new ObserveOnDispatcher<T>(source, dispatcher, dispatcherPriority);
}

public static Observable<T> ObserveOnCurrentDispatcher<T>(this Observable<T> source, DispatcherPriority dispatcherPriority = DispatcherPriority.Normal)
{
return ObserveOnDispatcher(source, Dispatcher.CurrentDispatcher, dispatcherPriority);
}

public static Observable<T> SubscribeOnDispatcher<T>(this Observable<T> source, Dispatcher dispatcher, DispatcherPriority dispatcherPriority = DispatcherPriority.Normal)
{
return new SubscribeOnDispatcher<T>(source, dispatcher, dispatcherPriority);
}

public static Observable<T> SubscribeOnCurrentDispatcher<T>(this Observable<T> source, DispatcherPriority dispatcherPriority = DispatcherPriority.Normal)
{
return SubscribeOnDispatcher(source, Dispatcher.CurrentDispatcher, dispatcherPriority);
}
}

internal sealed class ObserveOnDispatcher<T>(Observable<T> source, Dispatcher dispatcher, DispatcherPriority dispatcherPriority) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _ObserveOnDispatcher(observer, dispatcher, dispatcherPriority));
}

sealed class _ObserveOnDispatcher : Observer<T>
{
readonly Action postCallback;

readonly Observer<T> observer;
readonly Dispatcher dispatcher;
readonly DispatcherPriority dispatcherPriority;
readonly object gate = new object();
SwapListCore<Notification<T>> list;
bool running;

protected override bool AutoDisposeOnCompleted => false;

public _ObserveOnDispatcher(Observer<T> observer, Dispatcher dispatcher, DispatcherPriority dispatcherPriority)
{
this.observer = observer;
this.dispatcher = dispatcher;
this.dispatcherPriority = dispatcherPriority;
this.postCallback = DrainMessages;
}

protected override void OnNextCore(T value)
{
EnqueueValue(new(value));
}

protected override void OnErrorResumeCore(Exception error)
{
EnqueueValue(new(error));
}

protected override void OnCompletedCore(Result result)
{
EnqueueValue(new(result));
}

void EnqueueValue(Notification<T> value)
{
lock (gate)
{
if (IsDisposed) return;
list.Add(value);

if (!running)
{
running = true;
dispatcher.InvokeAsync(postCallback, dispatcherPriority);
}
}
}

protected override void DisposeCore()
{
lock (gate)
{
list.Dispose();
}
}

void DrainMessages()
{
var self = this;

ReadOnlySpan<Notification<T>> values;
bool token;
lock (self.gate)
{
values = self.list.Swap(out token);
if (values.Length == 0)
{
goto FINALIZE;
}
}

foreach (var value in values)
{
try
{
switch (value.Kind)
{
case NotificationKind.OnNext:
self.observer.OnNext(value.Value!);
break;
case NotificationKind.OnErrorResume:
self.observer.OnErrorResume(value.Error!);
break;
case NotificationKind.OnCompleted:
try
{
self.observer.OnCompleted(value.Result!.Value);
}
finally
{
self.Dispose();
}
break;
default:
break;
}
}
catch (Exception ex)
{
try
{
ObservableSystem.GetUnhandledExceptionHandler().Invoke(ex);
}
catch { }
}
}

FINALIZE:
lock (self.gate)
{
self.list.Clear(token);

if (self.IsDisposed)
{
self.running = false;
return;
}

if (self.list.HasValue)
{
// post again
dispatcher.InvokeAsync(postCallback, dispatcherPriority);
return;
}
else
{
self.running = false;
return;
}
}
}
}
}

internal sealed class SubscribeOnDispatcher<T>(Observable<T> source, Dispatcher dispatcher, DispatcherPriority dispatcherPriority) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return new _SubscribeOnDispatcher(observer, source, dispatcher, dispatcherPriority).Run();
}

sealed class _SubscribeOnDispatcher : Observer<T>
{
readonly Action postCallback;

readonly Observer<T> observer;
readonly Observable<T> source;
readonly Dispatcher dispatcher;
readonly DispatcherPriority dispatcherPriority;
SingleAssignmentDisposableCore disposable;

public _SubscribeOnDispatcher(Observer<T> observer, Observable<T> source, Dispatcher dispatcher, DispatcherPriority dispatcherPriority)
{
this.observer = observer;
this.source = source;
this.dispatcher = dispatcher;
this.dispatcherPriority = dispatcherPriority;
this.postCallback = Subscribe;
}

public IDisposable Run()
{
dispatcher.InvokeAsync(postCallback, dispatcherPriority);
return this;
}

void Subscribe()
{
disposable.Disposable = source.Subscribe(this);
}

protected override void OnNextCore(T value)
{
observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}

protected override void DisposeCore()
{
disposable.Dispose();
}
}
}
1 change: 1 addition & 0 deletions src/R3.WPF/WpfRenderingFrameProvider.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics.CodeAnalysis;
using R3.Collections;

namespace R3.WPF;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

namespace R3;
namespace R3.Collections;

[StructLayout(LayoutKind.Auto)]
public struct FreeListCore<T>
Expand Down
Loading

0 comments on commit ea90e36

Please sign in to comment.