From 14abb799fa926bfec6e2c5b34c6297af5362fe40 Mon Sep 17 00:00:00 2001 From: neuecc Date: Fri, 15 Dec 2023 02:00:28 +0900 Subject: [PATCH] Subscribe with state --- sandbox/ConsoleApp1/Program.cs | 4 +-- src/R3/EventSubscribeExtensions.cs | 48 ++++++++++++++++++++++++++---- src/R3/Factories/Timer.cs | 2 ++ src/R3/Factories/ToEvent.cs | 2 +- src/R3/Internal/Stubs.cs | 19 +++++++++++- src/R3/LiveList.cs | 2 +- src/R3/Operators/DoOnDisposed.cs | 2 +- src/R3/Operators/Select.cs | 36 +++++++++++++++++++--- src/R3/Operators/Where.cs | 2 ++ 9 files changed, 102 insertions(+), 15 deletions(-) diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index 9d463399..3645b9a3 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -33,10 +33,10 @@ // range.TakeLast( -//var publisher = new Publisher(); +var publisher = new Publisher(); //publisher.PublishOnNext(1); - +// publisher.Subscribe(new object(), (x,y) => y //var xs = await publisher.Take(TimeSpan.FromSeconds(5)); diff --git a/src/R3/EventSubscribeExtensions.cs b/src/R3/EventSubscribeExtensions.cs index 907b8059..62a2abb7 100644 --- a/src/R3/EventSubscribeExtensions.cs +++ b/src/R3/EventSubscribeExtensions.cs @@ -4,19 +4,16 @@ namespace R3; public static class EventSubscribeExtensions { - // TODO: with State - [DebuggerStepThrough] public static IDisposable Subscribe(this Event source) { return source.Subscribe(NopSubscriber.Instance); } - [DebuggerStepThrough] public static IDisposable Subscribe(this Event source, Action onNext) { - return source.Subscribe(new AnonymousSubscriber(onNext, EventSystem.GetUnhandledExceptionHandler(), Stubs.HandleError)); + return source.Subscribe(new AnonymousSubscriber(onNext, EventSystem.GetUnhandledExceptionHandler(), Stubs.HandleResult)); } [DebuggerStepThrough] @@ -30,6 +27,26 @@ public static IDisposable Subscribe(this Event source, Action onNext, A { return source.Subscribe(new AnonymousSubscriber(onNext, onErrorResume, onComplete)); } + + // with state + + [DebuggerStepThrough] + public static IDisposable Subscribe(this Event source, TState state, Action onNext) + { + return source.Subscribe(new AnonymousSubscriber(onNext, Stubs.HandleException, Stubs.HandleResult, state)); + } + + [DebuggerStepThrough] + public static IDisposable Subscribe(this Event source, TState state, Action onNext, Action onComplete) + { + return source.Subscribe(new AnonymousSubscriber(onNext, Stubs.HandleException, onComplete, state)); + } + + [DebuggerStepThrough] + public static IDisposable Subscribe(this Event source, TState state, Action onNext, Action onErrorResume, Action onComplete) + { + return source.Subscribe(new AnonymousSubscriber(onNext, onErrorResume, onComplete, state)); + } } [DebuggerStepThrough] @@ -87,7 +104,6 @@ protected override void OnCompletedCore(Result result) } } - [DebuggerStepThrough] internal sealed class AnonymousSubscriber(Action onNext, Action onErrorResume, Action onComplete) : Subscriber { @@ -109,3 +125,25 @@ protected override void OnCompletedCore(Result complete) onComplete(complete); } } + +[DebuggerStepThrough] +internal sealed class AnonymousSubscriber(Action onNext, Action onErrorResume, Action onComplete, TState state) : Subscriber +{ + [DebuggerStepThrough] + protected override void OnNextCore(T value) + { + onNext(value, state); + } + + [DebuggerStepThrough] + protected override void OnErrorResumeCore(Exception error) + { + onErrorResume(error, state); + } + + [DebuggerStepThrough] + protected override void OnCompletedCore(Result complete) + { + onComplete(complete, state); + } +} diff --git a/src/R3/Factories/Timer.cs b/src/R3/Factories/Timer.cs index 50483b48..3845b1f1 100644 --- a/src/R3/Factories/Timer.cs +++ b/src/R3/Factories/Timer.cs @@ -2,6 +2,8 @@ public static partial class Event { + // TODO: No Provider overload? + public static Event Timer(TimeSpan dueTime, TimeProvider timeProvider) { return new Timer(dueTime, timeProvider); diff --git a/src/R3/Factories/ToEvent.cs b/src/R3/Factories/ToEvent.cs index 3eacc15f..f4821412 100644 --- a/src/R3/Factories/ToEvent.cs +++ b/src/R3/Factories/ToEvent.cs @@ -2,7 +2,7 @@ public static partial class Event { - public static Event ToCompletableEvent(this Task task) + public static Event ToEvent(this Task task) { return new TaskToEvent(task); } diff --git a/src/R3/Internal/Stubs.cs b/src/R3/Internal/Stubs.cs index 1e64b15b..e9f37190 100644 --- a/src/R3/Internal/Stubs.cs +++ b/src/R3/Internal/Stubs.cs @@ -2,7 +2,7 @@ internal static class Stubs { - internal static readonly Action HandleError = static x => + internal static readonly Action HandleResult = static x => { if (x.IsFailure) { @@ -14,4 +14,21 @@ internal static class Stubs internal static class Stubs { internal static readonly Func ReturnSelf = static x => x; + + + // TState + + internal static readonly Action HandleException = static (x, _) => + { + EventSystem.GetUnhandledExceptionHandler().Invoke(x); + }; + + + internal static readonly Action HandleResult = static (x, _) => + { + if (x.IsFailure) + { + EventSystem.GetUnhandledExceptionHandler().Invoke(x.Exception); + } + }; } diff --git a/src/R3/LiveList.cs b/src/R3/LiveList.cs index 87882dd3..8858fb23 100644 --- a/src/R3/LiveList.cs +++ b/src/R3/LiveList.cs @@ -102,7 +102,7 @@ public void ForEach(Action action) } } - public void ForEach(Action action, TState state) + public void ForEach(TState state, Action action) { lock (list) { diff --git a/src/R3/Operators/DoOnDisposed.cs b/src/R3/Operators/DoOnDisposed.cs index 3b4f01d4..ee694a7b 100644 --- a/src/R3/Operators/DoOnDisposed.cs +++ b/src/R3/Operators/DoOnDisposed.cs @@ -10,7 +10,7 @@ public static Event DoOnDisposed(this Event source, Action action) return new DoOnDisposed(source, action); } - public static Event DoOnDisposed(this Event source, Action action, TState state) + public static Event DoOnDisposed(this Event source, TState state, Action action) { return new DoOnDisposed(source, action, state); } diff --git a/src/R3/Operators/Select.cs b/src/R3/Operators/Select.cs index fb1430d9..0beb2f30 100644 --- a/src/R3/Operators/Select.cs +++ b/src/R3/Operators/Select.cs @@ -3,15 +3,17 @@ public static partial class EventExtensions { // TODO: Optimize Where.Select - // TODO: CompletableEvent.Select // TODO: Element index overload - public static Event Select( - this Event source, - Func selector) + public static Event Select(this Event source, Func selector) { return new Select(source, selector); } + + public static Event Select(this Event source, TState state, Func selector) + { + return new Select(source, selector, state); + } } internal sealed class Select(Event source, Func selector) : Event @@ -39,3 +41,29 @@ protected override void OnCompletedCore(Result result) } } } + +internal sealed class Select(Event source, Func selector, TState state) : Event +{ + protected override IDisposable SubscribeCore(Subscriber subscriber) + { + return source.Subscribe(new _Select(subscriber, selector, state)); + } + + class _Select(Subscriber subscriber, Func selector, TState state) : Subscriber + { + protected override void OnNextCore(T value) + { + subscriber.OnNext(selector(value, state)); + } + + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + subscriber.OnCompleted(result); + } + } +} diff --git a/src/R3/Operators/Where.cs b/src/R3/Operators/Where.cs index 8a31466f..dd5dcf40 100644 --- a/src/R3/Operators/Where.cs +++ b/src/R3/Operators/Where.cs @@ -2,6 +2,8 @@ public static partial class EventExtensions { + // TODO: TState + public static Event Where(this Event source, Func predicate) { if (source is Where where)