Skip to content

Commit

Permalink
more changes
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 15, 2023
1 parent 2e50606 commit a1519c0
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 66 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

Third Generation of Reactive Extensions.


* LINQ is not for EveryThing

```csharp
public abstract class Observable<T>
{
Expand Down
10 changes: 2 additions & 8 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,9 @@



IEnumerable<int> Range(int count)
{
for (int i = 0; i < count; i++)
{
Console.WriteLine(i);
yield return i;
}
}

var rp = new ReactiveProperty<int>(999);
rp.Value += 10;

public static class Extensions
{
Expand Down
22 changes: 22 additions & 0 deletions src/R3/Disposable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,23 @@ public static void AddTo(this IDisposable disposable, ICollection<IDisposable> d
disposables.Add(disposable);
}

public static CancellationTokenRegistration RegisterTo(this IDisposable disposable, CancellationToken cancellationToken)
{
if (!cancellationToken.CanBeCanceled) throw new ArgumentException("Require CancellationToken CanBeCanceled");

if (cancellationToken.IsCancellationRequested)
{
disposable.Dispose();
return default;
}

return cancellationToken.Register(state =>
{
var d = ((IDisposable)state!);
d.Dispose();
}, disposable);
}

public static IDisposable Create(Action onDisposed)
{
return new AnonymousDisposable(onDisposed);
Expand Down Expand Up @@ -443,6 +460,11 @@ public IDisposable Build()
return result;
}

public CancellationTokenRegistration RegisterTo(CancellationToken cancellationToken)
{
return Build().RegisterTo(cancellationToken);
}

public void Dispose()
{
if (count != -1)
Expand Down
29 changes: 10 additions & 19 deletions src/R3/Factories/EveryUpdate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,34 @@ public static partial class Observable
{
public static Observable<Unit> EveryUpdate()
{
return new EveryUpdate(ObservableSystem.DefaultFrameProvider, CancellationToken.None, cancelImmediately: false);
return new EveryUpdate(ObservableSystem.DefaultFrameProvider, CancellationToken.None);
}

public static Observable<Unit> EveryUpdate(CancellationToken cancellationToken)
{
return new EveryUpdate(ObservableSystem.DefaultFrameProvider, cancellationToken, cancelImmediately: false);
if (cancellationToken.IsCancellationRequested) return Empty<Unit>();
return new EveryUpdate(ObservableSystem.DefaultFrameProvider, cancellationToken);
}

public static Observable<Unit> EveryUpdate(FrameProvider frameProvider)
{
return new EveryUpdate(frameProvider, CancellationToken.None, cancelImmediately: false);
return new EveryUpdate(frameProvider, CancellationToken.None);
}

public static Observable<Unit> EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken)
{
return new EveryUpdate(frameProvider, cancellationToken, cancelImmediately: false);
}
if (cancellationToken.IsCancellationRequested) return Empty<Unit>();

public static Observable<Unit> EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken, bool cancelImmediately)
{
return new EveryUpdate(frameProvider, cancellationToken, cancelImmediately: cancelImmediately);
return new EveryUpdate(frameProvider, cancellationToken);
}
}


internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken, bool cancelImmediately) : Observable<Unit>
internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) : Observable<Unit>
{
protected override IDisposable SubscribeCore(Observer<Unit> observer)
{
var runner = new EveryUpdateRunnerWorkItem(observer, cancellationToken, cancelImmediately);
var runner = new EveryUpdateRunnerWorkItem(observer, cancellationToken);
frameProvider.Register(runner);
return runner;
}
Expand All @@ -45,12 +43,12 @@ class EveryUpdateRunnerWorkItem : IFrameRunnerWorkItem, IDisposable
CancellationTokenRegistration cancellationTokenRegistration;
bool isDisposed;

public EveryUpdateRunnerWorkItem(Observer<Unit> observer, CancellationToken cancellationToken, bool cancelImmediately)
public EveryUpdateRunnerWorkItem(Observer<Unit> observer, CancellationToken cancellationToken)
{
this.observer = observer;
this.cancellationToken = cancellationToken;

if (cancelImmediately && cancellationToken.CanBeCanceled)
if (cancellationToken.CanBeCanceled)
{
cancellationTokenRegistration = cancellationToken.UnsafeRegister(static state =>
{
Expand All @@ -68,13 +66,6 @@ public bool MoveNext(long frameCount)
return false;
}

if (cancellationToken.IsCancellationRequested)
{
observer.OnCompleted();
Dispose();
return false;
}

observer.OnNext(default);
return true;
}
Expand Down
39 changes: 36 additions & 3 deletions src/R3/Operators/Select.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

public static partial class ObservableExtensions
{
// TODO: Optimize Where.Select
// TODO: Element index overload

public static Observable<TResult> Select<T, TResult>(this Observable<T> source, Func<T, TResult> selector)
{
if (source is Where<T> where)
{
return new WhereSelect<T, TResult>(source, selector, where.predicate);
}

return new Select<T, TResult>(source, selector);
}

Expand All @@ -23,7 +27,7 @@ protected override IDisposable SubscribeCore(Observer<TResult> observer)
return source.Subscribe(new _Select(observer, selector));
}

class _Select(Observer<TResult> observer, Func<T, TResult> selector) : Observer<T>
sealed class _Select(Observer<TResult> observer, Func<T, TResult> selector) : Observer<T>
{
protected override void OnNextCore(T value)
{
Expand All @@ -49,7 +53,7 @@ protected override IDisposable SubscribeCore(Observer<TResult> observer)
return source.Subscribe(new _Select(observer, selector, state));
}

class _Select(Observer<TResult> observer, Func<T, TState, TResult> selector, TState state) : Observer<T>
sealed class _Select(Observer<TResult> observer, Func<T, TState, TResult> selector, TState state) : Observer<T>
{
protected override void OnNextCore(T value)
{
Expand All @@ -67,3 +71,32 @@ protected override void OnCompletedCore(Result result)
}
}
}

internal sealed class WhereSelect<T, TResult>(Observable<T> source, Func<T, TResult> selector, Func<T, bool> predicate) : Observable<TResult>
{
protected override IDisposable SubscribeCore(Observer<TResult> observer)
{
return source.Subscribe(new _Select(observer, selector, predicate));
}

sealed class _Select(Observer<TResult> observer, Func<T, TResult> selector, Func<T, bool> predicate) : Observer<T>
{
protected override void OnNextCore(T value)
{
if (predicate(value))
{
observer.OnNext(selector(value));
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}
}
}
2 changes: 1 addition & 1 deletion src/R3/Operators/Where.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static Observable<T> Where<T>(this Observable<T> source, Func<T, int, boo
internal sealed class Where<T>(Observable<T> source, Func<T, bool> predicate) : Observable<T>
{
internal Observable<T> source = source;
internal Func<T, bool> predicate = predicate;
internal Func<T, bool> predicate = predicate; // use in WhereWhere, WhereSelect(Select.cs)

protected override IDisposable SubscribeCore(Observer<T> observer)
{
Expand Down
39 changes: 32 additions & 7 deletions src/R3/ReactiveProperty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public abstract class ReadOnlyReactiveProperty<T> : Observable<T>
}

// allow inherit
public class ReactiveProperty<T> : ReadOnlyReactiveProperty<T>, IDisposable
public class ReactiveProperty<T> : ReadOnlyReactiveProperty<T>, ISubject<T>, IDisposable
{
T value;
IEqualityComparer<T>? equalityComparer;
Expand All @@ -32,11 +32,7 @@ public T Value
}
}

this.value = value;
foreach (var observer in list.AsSpan())
{
observer?.OnNext(value);
}
OnNext(value);
}
}

Expand All @@ -52,14 +48,32 @@ public ReactiveProperty(T value, EqualityComparer<T>? equalityComparer)
this.list = new FreeListCore<Subscription>(this);
}

public void PublishOnErrorResume(Exception error)
public void OnNext(T value)
{
this.value = value;
foreach (var observer in list.AsSpan())
{
observer?.OnNext(value);
}
}

public void OnErrorResume(Exception error)
{
foreach (var observer in list.AsSpan())
{
observer?.OnErrorResume(error);
}
}

// TODO: OnCompleted lock? same as Subject
public void OnCompleted(Result result)
{
foreach (var observer in list.AsSpan())
{
observer?.OnCompleted(result);
}
}

protected override IDisposable SubscribeCore(Observer<T> observer)
{
var value = this.value;
Expand All @@ -81,9 +95,14 @@ void Unsubscribe(Subscription subscription)
public void Dispose()
{
// TODO: call OnCompleted on Dispose.
DisposeCore();
list.Dispose();
}

protected virtual void DisposeCore()
{
}

public override string? ToString()
{
return (value == null) ? "(null)" : value.ToString();
Expand All @@ -105,6 +124,12 @@ public void OnErrorResume(Exception error)
observer.OnErrorResume(error);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void OnCompleted(Result result)
{
observer.OnCompleted(result);
}

public void Dispose()
{
var p = Interlocked.Exchange(ref parent, null);
Expand Down
54 changes: 54 additions & 0 deletions src/R3/ReactivePropertyExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
namespace R3;

public static class ReactivePropertyExtensions
{
public static ReadOnlyReactiveProperty<T> ToReadOnlyReactiveProperty<T>(this Observable<T> source, T initialValue = default!)
{
return source.ToReadOnlyReactiveProperty(EqualityComparer<T>.Default, initialValue);
}

public static ReadOnlyReactiveProperty<T> ToReadOnlyReactiveProperty<T>(this Observable<T> source, EqualityComparer<T>? equalityComparer, T initialValue = default!)
{
if (source is ReadOnlyReactiveProperty<T> rrp)
{
return rrp;
}

// for use hack, allow to cast ReactiveProperty<T>
return new ConnectedReactiveProperty<T>(source, initialValue, equalityComparer);
}
}

internal sealed class ConnectedReactiveProperty<T> : ReactiveProperty<T>
{
readonly IDisposable sourceSubscription;

public ConnectedReactiveProperty(Observable<T> source, T initialValue, EqualityComparer<T>? equalityComparer)
: base(initialValue, equalityComparer)
{
this.sourceSubscription = source.Subscribe(new Observer(this));
}

protected override void DisposeCore()
{
sourceSubscription.Dispose();
}

class Observer(ConnectedReactiveProperty<T> parent) : Observer<T>
{
protected override void OnNextCore(T value)
{
parent.Value = value;
}

protected override void OnErrorResumeCore(Exception error)
{
parent.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
parent.OnCompleted(result);
}
}
}
25 changes: 1 addition & 24 deletions tests/R3.Tests/FactoryTests/EveryUpdateTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,13 @@

public class EveryUpdateTest
{
[Fact]
public void EveryUpdate()
{
var cts = new CancellationTokenSource();
var frameProvider = new ManualFrameProvider();

var list = Observable.EveryUpdate(frameProvider, cts.Token).Select(_ => frameProvider.GetFrameCount()).ToLiveList();

list.AssertEqual([]);

frameProvider.Advance();
list.AssertEqual([0]);

frameProvider.Advance(3);
list.AssertEqual([0, 1, 2, 3]);

cts.Cancel();
list.AssertIsNotCompleted();

frameProvider.Advance();
list.AssertIsCompleted();
}

[Fact]
public void EveryUpdateCancelImmediate()
{
var cts = new CancellationTokenSource();
var frameProvider = new ManualFrameProvider();

var list = Observable.EveryUpdate(frameProvider, cts.Token, cancelImmediately: true).Select(_ => frameProvider.GetFrameCount()).ToLiveList();
var list = Observable.EveryUpdate(frameProvider, cts.Token).Select(_ => frameProvider.GetFrameCount()).ToLiveList();

list.AssertEqual([]);

Expand Down
Loading

0 comments on commit a1519c0

Please sign in to comment.