diff --git a/README.md b/README.md index c56a67f0..c3c9649c 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,9 @@ Third Generation of Reactive Extensions. + +* LINQ is not for EveryThing + ```csharp public abstract class Observable { diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index fc3b4e5c..ea7f91db 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -50,15 +50,9 @@ -IEnumerable Range(int count) -{ - for (int i = 0; i < count; i++) - { - Console.WriteLine(i); - yield return i; - } -} +var rp = new ReactiveProperty(999); +rp.Value += 10; public static class Extensions { diff --git a/src/R3/Disposable.cs b/src/R3/Disposable.cs index 70dcee68..53f43556 100644 --- a/src/R3/Disposable.cs +++ b/src/R3/Disposable.cs @@ -22,6 +22,23 @@ public static void AddTo(this IDisposable disposable, ICollection d disposables.Add(disposable); } + public static CancellationTokenRegistration RegisterTo(this IDisposable disposable, CancellationToken cancellationToken) + { + if (!cancellationToken.CanBeCanceled) throw new ArgumentException("Require CancellationToken CanBeCanceled"); + + if (cancellationToken.IsCancellationRequested) + { + disposable.Dispose(); + return default; + } + + return cancellationToken.Register(state => + { + var d = ((IDisposable)state!); + d.Dispose(); + }, disposable); + } + public static IDisposable Create(Action onDisposed) { return new AnonymousDisposable(onDisposed); @@ -443,6 +460,11 @@ public IDisposable Build() return result; } + public CancellationTokenRegistration RegisterTo(CancellationToken cancellationToken) + { + return Build().RegisterTo(cancellationToken); + } + public void Dispose() { if (count != -1) diff --git a/src/R3/Factories/EveryUpdate.cs b/src/R3/Factories/EveryUpdate.cs index 05762d7c..6d1ae555 100644 --- a/src/R3/Factories/EveryUpdate.cs +++ b/src/R3/Factories/EveryUpdate.cs @@ -4,36 +4,34 @@ public static partial class Observable { public static Observable EveryUpdate() { - return new EveryUpdate(ObservableSystem.DefaultFrameProvider, CancellationToken.None, cancelImmediately: false); + return new EveryUpdate(ObservableSystem.DefaultFrameProvider, CancellationToken.None); } public static Observable EveryUpdate(CancellationToken cancellationToken) { - return new EveryUpdate(ObservableSystem.DefaultFrameProvider, cancellationToken, cancelImmediately: false); + if (cancellationToken.IsCancellationRequested) return Empty(); + return new EveryUpdate(ObservableSystem.DefaultFrameProvider, cancellationToken); } public static Observable EveryUpdate(FrameProvider frameProvider) { - return new EveryUpdate(frameProvider, CancellationToken.None, cancelImmediately: false); + return new EveryUpdate(frameProvider, CancellationToken.None); } public static Observable EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) { - return new EveryUpdate(frameProvider, cancellationToken, cancelImmediately: false); - } + if (cancellationToken.IsCancellationRequested) return Empty(); - public static Observable EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken, bool cancelImmediately) - { - return new EveryUpdate(frameProvider, cancellationToken, cancelImmediately: cancelImmediately); + return new EveryUpdate(frameProvider, cancellationToken); } } -internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken, bool cancelImmediately) : Observable +internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) : Observable { protected override IDisposable SubscribeCore(Observer observer) { - var runner = new EveryUpdateRunnerWorkItem(observer, cancellationToken, cancelImmediately); + var runner = new EveryUpdateRunnerWorkItem(observer, cancellationToken); frameProvider.Register(runner); return runner; } @@ -45,12 +43,12 @@ class EveryUpdateRunnerWorkItem : IFrameRunnerWorkItem, IDisposable CancellationTokenRegistration cancellationTokenRegistration; bool isDisposed; - public EveryUpdateRunnerWorkItem(Observer observer, CancellationToken cancellationToken, bool cancelImmediately) + public EveryUpdateRunnerWorkItem(Observer observer, CancellationToken cancellationToken) { this.observer = observer; this.cancellationToken = cancellationToken; - if (cancelImmediately && cancellationToken.CanBeCanceled) + if (cancellationToken.CanBeCanceled) { cancellationTokenRegistration = cancellationToken.UnsafeRegister(static state => { @@ -68,13 +66,6 @@ public bool MoveNext(long frameCount) return false; } - if (cancellationToken.IsCancellationRequested) - { - observer.OnCompleted(); - Dispose(); - return false; - } - observer.OnNext(default); return true; } diff --git a/src/R3/Operators/Select.cs b/src/R3/Operators/Select.cs index 7a462336..8db3f9a9 100644 --- a/src/R3/Operators/Select.cs +++ b/src/R3/Operators/Select.cs @@ -2,11 +2,15 @@ public static partial class ObservableExtensions { - // TODO: Optimize Where.Select // TODO: Element index overload public static Observable Select(this Observable source, Func selector) { + if (source is Where where) + { + return new WhereSelect(source, selector, where.predicate); + } + return new Select(source, selector); } @@ -23,7 +27,7 @@ protected override IDisposable SubscribeCore(Observer observer) return source.Subscribe(new _Select(observer, selector)); } - class _Select(Observer observer, Func selector) : Observer + sealed class _Select(Observer observer, Func selector) : Observer { protected override void OnNextCore(T value) { @@ -49,7 +53,7 @@ protected override IDisposable SubscribeCore(Observer observer) return source.Subscribe(new _Select(observer, selector, state)); } - class _Select(Observer observer, Func selector, TState state) : Observer + sealed class _Select(Observer observer, Func selector, TState state) : Observer { protected override void OnNextCore(T value) { @@ -67,3 +71,32 @@ protected override void OnCompletedCore(Result result) } } } + +internal sealed class WhereSelect(Observable source, Func selector, Func predicate) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return source.Subscribe(new _Select(observer, selector, predicate)); + } + + sealed class _Select(Observer observer, Func selector, Func predicate) : Observer + { + protected override void OnNextCore(T value) + { + if (predicate(value)) + { + observer.OnNext(selector(value)); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + } +} diff --git a/src/R3/Operators/Where.cs b/src/R3/Operators/Where.cs index 9b3b2e24..bafaa4dc 100644 --- a/src/R3/Operators/Where.cs +++ b/src/R3/Operators/Where.cs @@ -25,7 +25,7 @@ public static Observable Where(this Observable source, Func(Observable source, Func predicate) : Observable { internal Observable source = source; - internal Func predicate = predicate; + internal Func predicate = predicate; // use in WhereWhere, WhereSelect(Select.cs) protected override IDisposable SubscribeCore(Observer observer) { diff --git a/src/R3/ReactiveProperty.cs b/src/R3/ReactiveProperty.cs index e70ce177..b66dc0df 100644 --- a/src/R3/ReactiveProperty.cs +++ b/src/R3/ReactiveProperty.cs @@ -9,7 +9,7 @@ public abstract class ReadOnlyReactiveProperty : Observable } // allow inherit -public class ReactiveProperty : ReadOnlyReactiveProperty, IDisposable +public class ReactiveProperty : ReadOnlyReactiveProperty, ISubject, IDisposable { T value; IEqualityComparer? equalityComparer; @@ -32,11 +32,7 @@ public T Value } } - this.value = value; - foreach (var observer in list.AsSpan()) - { - observer?.OnNext(value); - } + OnNext(value); } } @@ -52,7 +48,16 @@ public ReactiveProperty(T value, EqualityComparer? equalityComparer) this.list = new FreeListCore(this); } - public void PublishOnErrorResume(Exception error) + public void OnNext(T value) + { + this.value = value; + foreach (var observer in list.AsSpan()) + { + observer?.OnNext(value); + } + } + + public void OnErrorResume(Exception error) { foreach (var observer in list.AsSpan()) { @@ -60,6 +65,15 @@ public void PublishOnErrorResume(Exception error) } } + // TODO: OnCompleted lock? same as Subject + public void OnCompleted(Result result) + { + foreach (var observer in list.AsSpan()) + { + observer?.OnCompleted(result); + } + } + protected override IDisposable SubscribeCore(Observer observer) { var value = this.value; @@ -81,9 +95,14 @@ void Unsubscribe(Subscription subscription) public void Dispose() { // TODO: call OnCompleted on Dispose. + DisposeCore(); list.Dispose(); } + protected virtual void DisposeCore() + { + } + public override string? ToString() { return (value == null) ? "(null)" : value.ToString(); @@ -105,6 +124,12 @@ public void OnErrorResume(Exception error) observer.OnErrorResume(error); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void OnCompleted(Result result) + { + observer.OnCompleted(result); + } + public void Dispose() { var p = Interlocked.Exchange(ref parent, null); diff --git a/src/R3/ReactivePropertyExtensions.cs b/src/R3/ReactivePropertyExtensions.cs new file mode 100644 index 00000000..f775f354 --- /dev/null +++ b/src/R3/ReactivePropertyExtensions.cs @@ -0,0 +1,54 @@ +namespace R3; + +public static class ReactivePropertyExtensions +{ + public static ReadOnlyReactiveProperty ToReadOnlyReactiveProperty(this Observable source, T initialValue = default!) + { + return source.ToReadOnlyReactiveProperty(EqualityComparer.Default, initialValue); + } + + public static ReadOnlyReactiveProperty ToReadOnlyReactiveProperty(this Observable source, EqualityComparer? equalityComparer, T initialValue = default!) + { + if (source is ReadOnlyReactiveProperty rrp) + { + return rrp; + } + + // for use hack, allow to cast ReactiveProperty + return new ConnectedReactiveProperty(source, initialValue, equalityComparer); + } +} + +internal sealed class ConnectedReactiveProperty : ReactiveProperty +{ + readonly IDisposable sourceSubscription; + + public ConnectedReactiveProperty(Observable source, T initialValue, EqualityComparer? equalityComparer) + : base(initialValue, equalityComparer) + { + this.sourceSubscription = source.Subscribe(new Observer(this)); + } + + protected override void DisposeCore() + { + sourceSubscription.Dispose(); + } + + class Observer(ConnectedReactiveProperty parent) : Observer + { + protected override void OnNextCore(T value) + { + parent.Value = value; + } + + protected override void OnErrorResumeCore(Exception error) + { + parent.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + parent.OnCompleted(result); + } + } +} diff --git a/tests/R3.Tests/FactoryTests/EveryUpdateTest.cs b/tests/R3.Tests/FactoryTests/EveryUpdateTest.cs index 38f22e30..1801e2b4 100644 --- a/tests/R3.Tests/FactoryTests/EveryUpdateTest.cs +++ b/tests/R3.Tests/FactoryTests/EveryUpdateTest.cs @@ -2,36 +2,13 @@ public class EveryUpdateTest { - [Fact] - public void EveryUpdate() - { - var cts = new CancellationTokenSource(); - var frameProvider = new ManualFrameProvider(); - - var list = Observable.EveryUpdate(frameProvider, cts.Token).Select(_ => frameProvider.GetFrameCount()).ToLiveList(); - - list.AssertEqual([]); - - frameProvider.Advance(); - list.AssertEqual([0]); - - frameProvider.Advance(3); - list.AssertEqual([0, 1, 2, 3]); - - cts.Cancel(); - list.AssertIsNotCompleted(); - - frameProvider.Advance(); - list.AssertIsCompleted(); - } - [Fact] public void EveryUpdateCancelImmediate() { var cts = new CancellationTokenSource(); var frameProvider = new ManualFrameProvider(); - var list = Observable.EveryUpdate(frameProvider, cts.Token, cancelImmediately: true).Select(_ => frameProvider.GetFrameCount()).ToLiveList(); + var list = Observable.EveryUpdate(frameProvider, cts.Token).Select(_ => frameProvider.GetFrameCount()).ToLiveList(); list.AssertEqual([]); diff --git a/tests/R3.Tests/OperatorTests/TakeLastTest.cs b/tests/R3.Tests/OperatorTests/TakeLastTest.cs index 513001dd..8cfbdaf3 100644 --- a/tests/R3.Tests/OperatorTests/TakeLastTest.cs +++ b/tests/R3.Tests/OperatorTests/TakeLastTest.cs @@ -43,7 +43,7 @@ public void TakeFrame2() var frameProvider = new ManualFrameProvider(); var cts = new CancellationTokenSource(); - var list = Observable.EveryUpdate(frameProvider, cts.Token, cancelImmediately: true) + var list = Observable.EveryUpdate(frameProvider, cts.Token) .Select(x => frameProvider.GetFrameCount()) .TakeLastFrame(3, frameProvider) .ToLiveList(); @@ -57,8 +57,6 @@ public void TakeFrame2() cts.Cancel(); // stop and OnCompleted list.AssertEqual([3, 4, 5]); - - list.AssertIsCompleted(); } } diff --git a/tests/R3.Tests/OperatorTests/TakeTest.cs b/tests/R3.Tests/OperatorTests/TakeTest.cs index c73373b1..4e8bcf52 100644 --- a/tests/R3.Tests/OperatorTests/TakeTest.cs +++ b/tests/R3.Tests/OperatorTests/TakeTest.cs @@ -59,7 +59,7 @@ public void TakeFrame2() var frameProvider = new ManualFrameProvider(); var list = Observable.EveryUpdate(frameProvider) - .Select(x => (int)frameProvider.GetFrameCount()) + .Select(x => frameProvider.GetFrameCount()) .TakeFrame(5, frameProvider) .ToLiveList();