diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index 102356a4..e107fe88 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -61,9 +61,9 @@ public static class Extensions { - public static IDisposable WriteLine(this Event source) + public static IDisposable WriteLine(this Event source) { - return source.Subscribe(x => Console.WriteLine(x), _ => Console.WriteLine("COMPLETED")); + return source.Subscribe(x => Console.WriteLine(x), x => Console.WriteLine(x)); } } diff --git a/src/R3/Event.cs b/src/R3/Event.cs index 6f946da7..8db860d6 100644 --- a/src/R3/Event.cs +++ b/src/R3/Event.cs @@ -4,10 +4,10 @@ namespace R3; -public abstract class Event +public abstract class Event { [StackTraceHidden, DebuggerStepThrough] - public IDisposable Subscribe(Subscriber subscriber) + public IDisposable Subscribe(Subscriber subscriber) { try { @@ -28,14 +28,13 @@ public IDisposable Subscribe(Subscriber subscriber) } } - protected abstract IDisposable SubscribeCore(Subscriber subscriber); + protected abstract IDisposable SubscribeCore(Subscriber subscriber); } -// similar as IObserver -public abstract class Subscriber : IDisposable +public abstract class Subscriber : IDisposable { #if DEBUG - [Obsolete("Only allow in CompletableEvent.")] + [Obsolete("Only allow in Event.")] #endif internal SingleAssignmentDisposableCore SourceSubscription; @@ -46,13 +45,13 @@ public abstract class Subscriber : IDisposable bool IsCalledCompleted => Volatile.Read(ref calledOnCompleted) != 0; [StackTraceHidden, DebuggerStepThrough] - public void OnNext(TMessage message) + public void OnNext(T value) { if (IsDisposed || IsCalledCompleted) return; try { - OnNextCore(message); + OnNextCore(value); } catch (Exception ex) { @@ -60,7 +59,7 @@ public void OnNext(TMessage message) } } - protected abstract void OnNextCore(TMessage message); + protected abstract void OnNextCore(T value); [StackTraceHidden, DebuggerStepThrough] public void OnErrorResume(Exception error) @@ -80,7 +79,7 @@ public void OnErrorResume(Exception error) protected abstract void OnErrorResumeCore(Exception error); [StackTraceHidden, DebuggerStepThrough] - public void OnCompleted(TComplete complete) + public void OnCompleted(Result result) { if (Interlocked.Exchange(ref calledOnCompleted, 1) != 0) { @@ -90,7 +89,7 @@ public void OnCompleted(TComplete complete) try { - OnCompletedCore(complete); + OnCompletedCore(result); } catch (Exception ex) { @@ -102,7 +101,7 @@ public void OnCompleted(TComplete complete) } } - protected abstract void OnCompletedCore(TComplete complete); + protected abstract void OnCompletedCore(Result result); [StackTraceHidden, DebuggerStepThrough] public void Dispose() diff --git a/src/R3/EventSubscribeExtensions.cs b/src/R3/EventSubscribeExtensions.cs index 4ae5ed27..ba1172e9 100644 --- a/src/R3/EventSubscribeExtensions.cs +++ b/src/R3/EventSubscribeExtensions.cs @@ -7,53 +7,42 @@ public static class EventSubscribeExtensions // TODO: with State [DebuggerStepThrough] - public static IDisposable Subscribe(this Event source) + public static IDisposable Subscribe(this Event source) { - return source.Subscribe(NopSubscriber.Instance); + return source.Subscribe(NopSubscriber.Instance); } - [DebuggerStepThrough] - public static IDisposable Subscribe(this Event> source) - { - return source.Subscribe(NopRSubscriber.Instance); - } [DebuggerStepThrough] - public static IDisposable Subscribe(this Event source, Action onNext) + public static IDisposable Subscribe(this Event source, Action onNext) { - return source.Subscribe(new AnonymousSubscriber(onNext, EventSystem.GetUnhandledExceptionHandler(), Stubs.Nop)); + return source.Subscribe(new AnonymousSubscriber(onNext, EventSystem.GetUnhandledExceptionHandler(), Stubs.HandleError)); } [DebuggerStepThrough] - public static IDisposable Subscribe(this Event> source, Action onNext) + public static IDisposable Subscribe(this Event source, Action onNext, Action onComplete) { - return source.Subscribe(new AnonymousRSubscriber(onNext, EventSystem.GetUnhandledExceptionHandler())); + return source.Subscribe(new AnonymousSubscriber(onNext, EventSystem.GetUnhandledExceptionHandler(), onComplete)); } [DebuggerStepThrough] - public static IDisposable Subscribe(this Event source, Action onNext, Action onComplete) + public static IDisposable Subscribe(this Event source, Action onNext, Action onErrorResume, Action onComplete) { - return source.Subscribe(new AnonymousSubscriber(onNext, EventSystem.GetUnhandledExceptionHandler(), onComplete)); - } - - [DebuggerStepThrough] - public static IDisposable Subscribe(this Event source, Action onNext, Action onErrorResume, Action onComplete) - { - return source.Subscribe(new AnonymousSubscriber(onNext, onErrorResume, onComplete)); + return source.Subscribe(new AnonymousSubscriber(onNext, onErrorResume, onComplete)); } } [DebuggerStepThrough] -internal sealed class NopSubscriber : Subscriber +internal sealed class NopSubscriber : Subscriber { - public static readonly NopSubscriber Instance = new(); + public static readonly NopSubscriber Instance = new(); private NopSubscriber() { } [DebuggerStepThrough] - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { } @@ -64,46 +53,20 @@ protected override void OnErrorResumeCore(Exception error) } [DebuggerStepThrough] - protected override void OnCompletedCore(TComplete complete) - { - } -} - -[DebuggerStepThrough] -internal sealed class NopRSubscriber : Subscriber> -{ - public static readonly NopRSubscriber Instance = new(); - - private NopRSubscriber() - { - } - - [DebuggerStepThrough] - protected override void OnNextCore(TMessage message) + protected override void OnCompletedCore(Result result) { - } - - [DebuggerStepThrough] - protected override void OnErrorResumeCore(Exception error) - { - EventSystem.GetUnhandledExceptionHandler().Invoke(error); - } - - [DebuggerStepThrough] - protected override void OnCompletedCore(Result complete) - { - if (complete.IsFailure) + if (result.IsFailure) { - EventSystem.GetUnhandledExceptionHandler().Invoke(complete.Exception); + EventSystem.GetUnhandledExceptionHandler().Invoke(result.Exception); } } } [DebuggerStepThrough] -internal sealed class AnonymousSubscriber(Action onNext, Action onErrorResume, Action onComplete) : Subscriber +internal sealed class AnonymousRSubscriber(Action onNext, Action onErrorResume) : Subscriber { [DebuggerStepThrough] - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { onNext(message); } @@ -115,17 +78,21 @@ protected override void OnErrorResumeCore(Exception error) } [DebuggerStepThrough] - protected override void OnCompletedCore(TComplete complete) + protected override void OnCompletedCore(Result result) { - onComplete(complete); + if (result.IsFailure) + { + EventSystem.GetUnhandledExceptionHandler().Invoke(result.Exception); + } } } + [DebuggerStepThrough] -internal sealed class AnonymousRSubscriber(Action onNext, Action onErrorResume) : Subscriber> +internal sealed class AnonymousSubscriber(Action onNext, Action onErrorResume, Action onComplete) : Subscriber { [DebuggerStepThrough] - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { onNext(message); } @@ -137,11 +104,8 @@ protected override void OnErrorResumeCore(Exception error) } [DebuggerStepThrough] - protected override void OnCompletedCore(Result complete) + protected override void OnCompletedCore(Result complete) { - if (complete.IsFailure) - { - EventSystem.GetUnhandledExceptionHandler().Invoke(complete.Exception); - } + onComplete(complete); } } diff --git a/src/R3/Factories/Empty.cs b/src/R3/Factories/Empty.cs index b579ac5c..2abdffa5 100644 --- a/src/R3/Factories/Empty.cs +++ b/src/R3/Factories/Empty.cs @@ -2,30 +2,30 @@ public static partial class Event { - public static Event Empty() + public static Event Empty() { - return R3.Empty.Instance; + return R3.Empty.Instance; } - public static Event Empty(TimeProvider timeProvider) + public static Event Empty(TimeProvider timeProvider) { - return ReturnOnCompleted(default, timeProvider); + return ReturnOnCompleted(Result.Success, timeProvider); } - public static Event Empty(TimeSpan dueTime, TimeProvider timeProvider) + public static Event Empty(TimeSpan dueTime, TimeProvider timeProvider) { - return ReturnOnCompleted(default, dueTime, timeProvider); + return ReturnOnCompleted(Result.Success, dueTime, timeProvider); } } -internal sealed class Empty : Event +internal sealed class Empty : Event { // singleton - public static readonly Empty Instance = new Empty(); + public static readonly Empty Instance = new Empty(); - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { - subscriber.OnCompleted(default); + subscriber.OnCompleted(); return Disposable.Empty; } diff --git a/src/R3/Factories/Never.cs b/src/R3/Factories/Never.cs index f33e1798..a2272164 100644 --- a/src/R3/Factories/Never.cs +++ b/src/R3/Factories/Never.cs @@ -3,23 +3,23 @@ public static partial class Event { // Never - public static Event Never() + public static Event Never() { - return R3.Never.Instance; + return R3.Never.Instance; } } -internal sealed class Never : Event +internal sealed class Never : Event { // singleton - public static readonly Never Instance = new Never(); + public static readonly Never Instance = new Never(); Never() { } - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { return Disposable.Empty; } diff --git a/src/R3/Factories/Range.cs b/src/R3/Factories/Range.cs index fe9096f5..74ac043e 100644 --- a/src/R3/Factories/Range.cs +++ b/src/R3/Factories/Range.cs @@ -4,7 +4,7 @@ public static partial class Event { // no scheduler(TimeProvider) overload - public static Event Range(int start, int count) + public static Event Range(int start, int count) { long max = ((long)start) + count - 1; if (count < 0 || max > int.MaxValue) @@ -20,7 +20,7 @@ public static Event Range(int start, int count) return new Range(start, count); } - public static Event Range(int start, int count, CancellationToken cancellationToken) + public static Event Range(int start, int count, CancellationToken cancellationToken) { long max = ((long)start) + count - 1; if (count < 0 || max > int.MaxValue) @@ -37,9 +37,9 @@ public static Event Range(int start, int count, CancellationToken can } } -internal sealed class Range(int start, int count) : Event +internal sealed class Range(int start, int count) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { for (int i = 0; i < count; i++) { @@ -50,9 +50,9 @@ protected override IDisposable SubscribeCore(Subscriber subscriber) } } -internal sealed class RangeC(int start, int count, CancellationToken cancellationToken) : Event +internal sealed class RangeC(int start, int count, CancellationToken cancellationToken) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { for (int i = 0; i < count; i++) { diff --git a/src/R3/Factories/Repeat.cs b/src/R3/Factories/Repeat.cs index cd5335b2..7968a386 100644 --- a/src/R3/Factories/Repeat.cs +++ b/src/R3/Factories/Repeat.cs @@ -5,7 +5,7 @@ public static partial class Event // no scheduler(TimeProvider) overload // no infinitely overload - public static Event Repeat(TMessage value, int count) + public static Event Repeat(T value, int count) { if (count < 0) { @@ -14,13 +14,13 @@ public static Event Repeat(TMessage value, int count) if (count == 0) { - return Empty(); + return Empty(); } - return new Repeat(value, count); + return new Repeat(value, count); } - public static Event Repeat(TMessage value, int count, CancellationToken cancellationToken) + public static Event Repeat(T value, int count, CancellationToken cancellationToken) { if (count < 0) { @@ -29,16 +29,16 @@ public static Event Repeat(TMessage value, int count, if (count == 0) { - return Empty(); + return Empty(); } - return new RepeatC(value, count, cancellationToken); + return new RepeatC(value, count, cancellationToken); } } -internal sealed class Repeat(TMessage value, int count) : Event +internal sealed class Repeat(T value, int count) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { for (int i = 0; i < count; i++) { @@ -49,9 +49,9 @@ protected override IDisposable SubscribeCore(Subscriber subscrib } } -internal sealed class RepeatC(TMessage value, int count, CancellationToken cancellationToken) : Event +internal sealed class RepeatC(T value, int count, CancellationToken cancellationToken) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { for (int i = 0; i < count; i++) { diff --git a/src/R3/Factories/Return.cs b/src/R3/Factories/Return.cs index d7a5b61f..9a589228 100644 --- a/src/R3/Factories/Return.cs +++ b/src/R3/Factories/Return.cs @@ -2,86 +2,59 @@ public static partial class Event { - public static Event Return(TMessage value) + public static Event Return(T value) { - return new ImmediateScheduleReturn(value, default); // immediate + return new ImmediateScheduleReturn(value); // immediate } - public static Event Return(TMessage value, TimeProvider timeProvider) + public static Event Return(T value, TimeProvider timeProvider) { return Return(value, TimeSpan.Zero, timeProvider); } - public static Event Return(TMessage value, TimeSpan dueTime, TimeProvider timeProvider) + public static Event Return(T value, TimeSpan dueTime, TimeProvider timeProvider) { if (dueTime == TimeSpan.Zero) { if (timeProvider == TimeProvider.System) { - return new ThreadPoolScheduleReturn(value, default, null); // optimize for SystemTimeProvidr, use ThreadPool.UnsafeQueueUserWorkItem + return new ThreadPoolScheduleReturn(value); // optimize for SystemTimeProvidr, use ThreadPool.UnsafeQueueUserWorkItem } } - return new Return(value, default, dueTime, timeProvider); // use ITimer - } - - // OnCompleted - - public static Event Return(TMessage value, TComplete complete) - { - return new ImmediateScheduleReturn(value, complete); // immediate - } - - public static Event Return(TMessage value, TComplete complete, TimeProvider timeProvider) - { - return Return(value, complete, TimeSpan.Zero, timeProvider); - } - - public static Event Return(TMessage value, TComplete complete, TimeSpan dueTime, TimeProvider timeProvider) - { - if (dueTime == TimeSpan.Zero) - { - if (timeProvider == TimeProvider.System) - { - return new ThreadPoolScheduleReturn(value, complete, null); // optimize for SystemTimeProvidr, use ThreadPool.UnsafeQueueUserWorkItem - } - } - - return new Return(value, complete, dueTime, timeProvider); // use ITimer + return new Return(value, dueTime, timeProvider); // use ITimer } } -internal class Return(TMessage value, TComplete complete, TimeSpan dueTime, TimeProvider timeProvider) : Event +internal class Return(T value, TimeSpan dueTime, TimeProvider timeProvider) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { - var method = new _Return(value, complete, subscriber); + var method = new _Return(value, subscriber); method.Timer = timeProvider.CreateStoppedTimer(_Return.timerCallback, method); method.Timer.InvokeOnce(dueTime); return method; } - sealed class _Return(TMessage value, TComplete complete, Subscriber subscriber) : IDisposable + sealed class _Return(T value, Subscriber subscriber) : IDisposable { public static readonly TimerCallback timerCallback = NextTick; - readonly TMessage value = value; - readonly TComplete complete = complete; - readonly Subscriber subscriber = subscriber; + readonly T value = value; + readonly Subscriber subscriber = subscriber; public ITimer? Timer { get; set; } static void NextTick(object? state) { var self = (_Return)state!; - try + if (self.subscriber.OnNext(self.value)) { - self.subscriber.OnNext(self.value); - self.subscriber.OnCompleted(self.complete); + self.subscriber.OnCompleted(); } - finally + else { - self.Dispose(); + self.subscriber.OnCompleted(Result.Failure); } } @@ -93,26 +66,26 @@ public void Dispose() } } -internal class ImmediateScheduleReturn(TMessage value, TComplete complete) : Event +internal class ImmediateScheduleReturn(T value) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { subscriber.OnNext(value); - subscriber.OnCompleted(complete); + subscriber.OnCompleted(); return Disposable.Empty; } } -internal class ThreadPoolScheduleReturn(TMessage value, TComplete complete, Action? unhandledExceptionHandler) : Event +internal class ThreadPoolScheduleReturn(T value) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { - var method = new _Return(value, complete, unhandledExceptionHandler, subscriber); + var method = new _Return(value, subscriber); ThreadPool.UnsafeQueueUserWorkItem(method, preferLocal: false); return method; } - sealed class _Return(TMessage value, TComplete complete, Action? unhandledExceptionHandler, Subscriber subscriber) : IDisposable, IThreadPoolWorkItem + sealed class _Return(T value, Subscriber subscriber) : IDisposable, IThreadPoolWorkItem { bool stop; @@ -120,22 +93,8 @@ public void Execute() { if (stop) return; - try - { - subscriber.OnNext(value); - subscriber.OnCompleted(complete); - } - catch (Exception ex) - { - if (unhandledExceptionHandler == null) - { - throw; - } - else - { - unhandledExceptionHandler?.Invoke(ex); - } - } + subscriber.OnNext(value); + subscriber.OnCompleted(); } public void Dispose() diff --git a/src/R3/Factories/ReturnOnCompleted.cs b/src/R3/Factories/ReturnOnCompleted.cs index 683fe246..cc949c2f 100644 --- a/src/R3/Factories/ReturnOnCompleted.cs +++ b/src/R3/Factories/ReturnOnCompleted.cs @@ -2,35 +2,42 @@ public static partial class Event { - // similar as Empty, only return OnCompleted - - public static Event ReturnOnCompleted(TComplete complete) + public static Event ReturnOnCompleted(Result result) { - return new ImmediateScheduleReturnOnCompleted(complete); // immediate + return new ImmediateScheduleReturnOnCompleted(result); // immediate } - public static Event ReturnOnCompleted(TComplete complete, TimeProvider timeProvider) + public static Event ReturnOnCompleted(Result result, TimeProvider timeProvider) { - return ReturnOnCompleted(complete, TimeSpan.Zero, timeProvider); + return ReturnOnCompleted(result, TimeSpan.Zero, timeProvider); } - public static Event ReturnOnCompleted(TComplete complete, TimeSpan dueTime, TimeProvider timeProvider) + public static Event ReturnOnCompleted(Result result, TimeSpan dueTime, TimeProvider timeProvider) { if (dueTime == TimeSpan.Zero) { if (timeProvider == TimeProvider.System) { - return new ThreadPoolScheduleReturnOnCompleted(complete, null); // optimize for SystemTimeProvidr, use ThreadPool.UnsafeQueueUserWorkItem + return new ThreadPoolScheduleReturnOnCompleted(result); // optimize for SystemTimeProvidr, use ThreadPool.UnsafeQueueUserWorkItem } } - return new ReturnOnCompleted(complete, dueTime, timeProvider); // use ITimer + return new ReturnOnCompleted(result, dueTime, timeProvider); // use ITimer } } -internal class ReturnOnCompleted(TComplete complete, TimeSpan dueTime, TimeProvider timeProvider) : Event +internal class ImmediateScheduleReturnOnCompleted(Result result) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) + { + subscriber.OnCompleted(result); + return Disposable.Empty; + } +} + +internal class ReturnOnCompleted(Result complete, TimeSpan dueTime, TimeProvider timeProvider) : Event +{ + protected override IDisposable SubscribeCore(Subscriber subscriber) { var method = new _ReturnOnCompleted(complete, subscriber); method.Timer = timeProvider.CreateStoppedTimer(_ReturnOnCompleted.timerCallback, method); @@ -38,12 +45,12 @@ protected override IDisposable SubscribeCore(Subscriber sub return method; } - sealed class _ReturnOnCompleted(TComplete complete, Subscriber subscriber) : IDisposable + sealed class _ReturnOnCompleted(Result result, Subscriber subscriber) : IDisposable { public static readonly TimerCallback timerCallback = NextTick; - readonly TComplete complete = complete; - readonly Subscriber subscriber = subscriber; + readonly Result result = result; + readonly Subscriber subscriber = subscriber; public ITimer? Timer { get; set; } @@ -52,7 +59,7 @@ static void NextTick(object? state) var self = (_ReturnOnCompleted)state!; try { - self.subscriber.OnCompleted(self.complete); + self.subscriber.OnCompleted(self.result); } finally { @@ -68,25 +75,16 @@ public void Dispose() } } -internal class ImmediateScheduleReturnOnCompleted(TComplete complete) : Event -{ - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - subscriber.OnCompleted(complete); - return Disposable.Empty; - } -} - -internal class ThreadPoolScheduleReturnOnCompleted(TComplete complete, Action? unhandledExceptionHandler) : Event +internal class ThreadPoolScheduleReturnOnCompleted(Result result) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { - var method = new _ReturnOnCompleted(complete, unhandledExceptionHandler, subscriber); + var method = new _ReturnOnCompleted(result, subscriber); ThreadPool.UnsafeQueueUserWorkItem(method, preferLocal: false); return method; } - sealed class _ReturnOnCompleted(TComplete complete, Action? unhandledExceptionHandler, Subscriber subscriber) : IDisposable, IThreadPoolWorkItem + sealed class _ReturnOnCompleted(Result result, Subscriber subscriber) : IDisposable, IThreadPoolWorkItem { bool stop; @@ -94,21 +92,7 @@ public void Execute() { if (stop) return; - try - { - subscriber.OnCompleted(complete); - } - catch (Exception ex) - { - if (unhandledExceptionHandler == null) - { - throw; - } - else - { - unhandledExceptionHandler?.Invoke(ex); - } - } + subscriber.OnCompleted(result); } public void Dispose() diff --git a/src/R3/Factories/Throw.cs b/src/R3/Factories/Throw.cs index ce8558f6..5ae96ae9 100644 --- a/src/R3/Factories/Throw.cs +++ b/src/R3/Factories/Throw.cs @@ -2,33 +2,20 @@ public static partial class Event { - public static Event> Throw(Exception exception) + public static Event Throw(Exception exception) { - return Throw(exception); + return ReturnOnCompleted(Result.Failure(exception)); } - public static Event> Throw(Exception exception) + public static Event Throw(Exception exception, TimeProvider timeProvider) { - return ReturnOnCompleted>(Result.Failure(exception)); + return ReturnOnCompleted(Result.Failure(exception), timeProvider); } - public static Event> Throw(Exception exception, TimeProvider timeProvider) + public static Event Throw(Exception exception, TimeSpan dueTime, TimeProvider timeProvider) { - return Throw(exception, timeProvider); + return ReturnOnCompleted(Result.Failure(exception), dueTime, timeProvider); } +} - public static Event> Throw(Exception exception, TimeProvider timeProvider) - { - return ReturnOnCompleted>(Result.Failure(exception), timeProvider); - } - public static Event> Throw(Exception exception, TimeSpan dueTime, TimeProvider timeProvider) - { - return Throw(exception, dueTime, timeProvider); - } - - public static Event> Throw(Exception exception, TimeSpan dueTime, TimeProvider timeProvider) - { - return ReturnOnCompleted>(Result.Failure(exception), dueTime, timeProvider); - } -} diff --git a/src/R3/Factories/Timer.cs b/src/R3/Factories/Timer.cs index f9209e05..50483b48 100644 --- a/src/R3/Factories/Timer.cs +++ b/src/R3/Factories/Timer.cs @@ -2,13 +2,13 @@ public static partial class Event { - public static Event Timer(TimeSpan dueTime, TimeProvider timeProvider) + public static Event Timer(TimeSpan dueTime, TimeProvider timeProvider) { return new Timer(dueTime, timeProvider); } } -internal sealed class Timer : Event +internal sealed class Timer : Event { readonly TimeSpan dueTime; readonly TimeProvider timeProvider; @@ -19,7 +19,7 @@ public Timer(TimeSpan dueTime, TimeProvider timeProvider) this.timeProvider = timeProvider; } - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { var method = new _Timer(subscriber); method.Timer = timeProvider.CreateStoppedTimer(_Timer.timerCallback, method); @@ -27,11 +27,11 @@ protected override IDisposable SubscribeCore(Subscriber subscriber) return method; } - sealed class _Timer(Subscriber subscriber) : IDisposable + sealed class _Timer(Subscriber subscriber) : IDisposable { public static readonly TimerCallback timerCallback = NextTick; - Subscriber subscriber = subscriber; + Subscriber subscriber = subscriber; public ITimer? Timer { get; set; } diff --git a/src/R3/Factories/ToCompletableEvent.cs b/src/R3/Factories/ToCompletableEvent.cs index 4cfc9ee7..aa9521c0 100644 --- a/src/R3/Factories/ToCompletableEvent.cs +++ b/src/R3/Factories/ToCompletableEvent.cs @@ -2,22 +2,22 @@ public static partial class Event { - public static Event> ToCompletableEvent(this Task task) + public static Event ToCompletableEvent(this Task task) { - return new ToCompletableEvent(task); + return new ToCompletableEvent(task); } } -internal sealed class ToCompletableEvent(Task task) : Event> +internal sealed class ToCompletableEvent(Task task) : Event { - protected override IDisposable SubscribeCore(Subscriber> subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { var subscription = new CancellationDisposable(); SubscribeTask(subscriber, subscription.Token); return subscription; } - async void SubscribeTask(Subscriber> subscriber, CancellationToken cancellationToken) + async void SubscribeTask(Subscriber subscriber, CancellationToken cancellationToken) { TMessage? result; try @@ -26,7 +26,7 @@ async void SubscribeTask(Subscriber> subscriber, Cancella } catch (Exception ex) { - subscriber.OnCompleted(Result.Failure(ex)); + subscriber.OnCompleted(Result.Failure(ex)); return; } diff --git a/src/R3/Factories/ToEvent.cs b/src/R3/Factories/ToEvent.cs index ec0a4fe4..99e77ce9 100644 --- a/src/R3/Factories/ToEvent.cs +++ b/src/R3/Factories/ToEvent.cs @@ -2,19 +2,19 @@ public static partial class Event { - public static Event ToEvent(this IEnumerable source) + public static Event ToEvent(this IEnumerable source) { - return new ToEvent(source); + return new ToEvent(source); } } -internal class ToEvent(IEnumerable source) : Event +internal class ToEvent(IEnumerable source) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { foreach (var message in source) { - subscriber.OnNext(message); + subscriber.OnNext(value); } subscriber.OnCompleted(default); return Disposable.Empty; diff --git a/src/R3/Factories/_EventFactory.cs b/src/R3/Factories/_EventFactory.cs index 32eaa6f7..ee9420b8 100644 --- a/src/R3/Factories/_EventFactory.cs +++ b/src/R3/Factories/_EventFactory.cs @@ -23,27 +23,27 @@ public static partial class Event // AsSingleUnitObservable // AsUnitObservable - // AsUnitComplete + // AsUniResult // AsNeverComplete // TODO: use SystemDefault - public static Event EveryUpdate() + public static Event EveryUpdate() { return new EveryUpdate(EventSystem.DefaultFrameProvider, CancellationToken.None); } - public static Event EveryUpdate(CancellationToken cancellationToken) + public static Event EveryUpdate(CancellationToken cancellationToken) { return new EveryUpdate(EventSystem.DefaultFrameProvider, cancellationToken); } - public static Event EveryUpdate(FrameProvider frameProvider) + public static Event EveryUpdate(FrameProvider frameProvider) { return new EveryUpdate(frameProvider, CancellationToken.None); } - public static Event EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) + public static Event EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) { return new EveryUpdate(frameProvider, cancellationToken); } @@ -51,16 +51,16 @@ public static Event EveryUpdate(FrameProvider frameProvider, Cancell -internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) : Event +internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { var runner = new EveryUpdateRunnerWorkItem(subscriber, cancellationToken); frameProvider.Register(runner); return runner; } - class EveryUpdateRunnerWorkItem(Subscriber subscriber, CancellationToken cancellationToken) : IFrameRunnerWorkItem, IDisposable + class EveryUpdateRunnerWorkItem(Subscriber subscriber, CancellationToken cancellationToken) : IFrameRunnerWorkItem, IDisposable { bool isDisposed; diff --git a/src/R3/Internal/Stubs.cs b/src/R3/Internal/Stubs.cs index 3563f949..1e64b15b 100644 --- a/src/R3/Internal/Stubs.cs +++ b/src/R3/Internal/Stubs.cs @@ -1,7 +1,17 @@ namespace R3.Internal; +internal static class Stubs +{ + internal static readonly Action HandleError = static x => + { + if (x.IsFailure) + { + EventSystem.GetUnhandledExceptionHandler().Invoke(x.Exception); + } + }; +} + internal static class Stubs { internal static readonly Func ReturnSelf = static x => x; - internal static readonly Action Nop = static _ => { }; } diff --git a/src/R3/Internal/TaskSubscriberBase.cs b/src/R3/Internal/TaskSubscriberBase.cs index 5e1531f3..353a2b65 100644 --- a/src/R3/Internal/TaskSubscriberBase.cs +++ b/src/R3/Internal/TaskSubscriberBase.cs @@ -1,10 +1,9 @@ - -namespace R3.Internal; +namespace R3.Internal; // for return Task(tcs.TrySet***) // include proper Cancel registration -internal abstract class TaskSubscriberBase : Subscriber +internal abstract class TaskSubscriberBase : Subscriber { TaskCompletionSource tcs; // use this field. @@ -23,7 +22,7 @@ public TaskSubscriberBase(CancellationToken cancellationToken) // register before call Subscribe this.tokenRegistration = cancellationToken.UnsafeRegister(static state => { - var s = (TaskSubscriberBase)state!; + var s = (TaskSubscriberBase)state!; s.Dispose(); // subscriber is subscription, dispose s.tcs.TrySetCanceled(s.cancellationToken); diff --git a/src/R3/LiveList.cs b/src/R3/LiveList.cs index 0f0fbb97..403fa092 100644 --- a/src/R3/LiveList.cs +++ b/src/R3/LiveList.cs @@ -6,32 +6,32 @@ namespace R3; public static partial class EventExtensions { - public static LiveList ToLiveList(this Event source) + public static LiveList ToLiveList(this Event source) { - return new LiveList(source); + return new LiveList(source); } - public static LiveList ToLiveList(this Event source, int bufferSize) + public static LiveList ToLiveList(this Event source, int bufferSize) { - return new LiveList(source, bufferSize); + return new LiveList(source, bufferSize); } } -public sealed class LiveList : IReadOnlyList, IDisposable +public sealed class LiveList : IReadOnlyList, IDisposable { readonly IReadOnlyList list; // RingBuffer or List readonly IDisposable sourceSubscription; readonly int bufferSize; bool isCompleted; - TComplete? completedValue; + Result? completedValue; [MemberNotNullWhen(true, nameof(CompletedValue))] public bool IsCompleted => isCompleted; - public TComplete? CompletedValue => completedValue; + public Result? CompletedValue => completedValue; - public LiveList(Event source) + public LiveList(Event source) { if (bufferSize == 0) bufferSize = 1; this.bufferSize = -1; @@ -39,7 +39,7 @@ public LiveList(Event source) this.sourceSubscription = source.Subscribe(new ListSubscriber(this)); } - public LiveList(Event source, int bufferSize) + public LiveList(Event source, int bufferSize) { if (bufferSize == 0) bufferSize = 1; this.bufferSize = bufferSize; // bufferSize must set before Subscribe(sometimes Subscribe run immediately) @@ -132,7 +132,7 @@ IEnumerator IEnumerable.GetEnumerator() } } - sealed class ListSubscriber(LiveList parent) : Subscriber + sealed class ListSubscriber(LiveList parent) : Subscriber { protected override void OnNextCore(T message) { @@ -160,7 +160,7 @@ protected override void OnErrorResumeCore(Exception error) EventSystem.GetUnhandledExceptionHandler().Invoke(error); } - protected override void OnCompletedCore(TComplete complete) + protected override void OnCompletedCore(Result complete) { lock (parent.list) { diff --git a/src/R3/Operators/AggregateAsync.cs b/src/R3/Operators/AggregateAsync.cs index cbfc2038..88d138f8 100644 --- a/src/R3/Operators/AggregateAsync.cs +++ b/src/R3/Operators/AggregateAsync.cs @@ -2,43 +2,31 @@ public static partial class EventExtensions { - public static Task AggregateAsync - (this Event source, + public static Task AggregateAsync + (this Event source, TAccumulate seed, - Func func, - Func resultSelector, + Func func, + Func resultSelector, CancellationToken cancellationToken = default) { - var subscriber = new AggregateAsync(seed, func, resultSelector, cancellationToken); - source.Subscribe(subscriber); - return subscriber.Task; - } - - public static Task AggregateAsync - (this Event> source, - TAccumulate seed, - Func func, - Func, TResult> resultSelector, - CancellationToken cancellationToken = default) - { - var subscriber = new AggregateAsyncR(seed, func, resultSelector, cancellationToken); + var subscriber = new AggregateAsync(seed, func, resultSelector, cancellationToken); source.Subscribe(subscriber); return subscriber.Task; } } -internal sealed class AggregateAsync( +internal sealed class AggregateAsync( TAccumulate seed, - Func func, - Func resultSelector, + Func func, + Func resultSelector, CancellationToken cancellationToken) - : TaskSubscriberBase(cancellationToken) + : TaskSubscriberBase(cancellationToken) { TAccumulate value = seed; - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { - value = func(value, message); // OnNext error is route to OnErrorResumeCore + this.value = func(this.value, value); // OnNext error is route to OnErrorResumeCore } protected override void OnErrorResumeCore(Exception error) @@ -46,51 +34,18 @@ protected override void OnErrorResumeCore(Exception error) TrySetException(error); } - protected override void OnCompletedCore(TComplete complete) + protected override void OnCompletedCore(Result result) { - try + if (result.IsFailure) { - var result = resultSelector(value, complete); // trap this resultSelector exception - TrySetResult(result); + TrySetException(result.Exception); + return; } - catch (Exception ex) - { - TrySetException(ex); - } - } -} -internal sealed class AggregateAsyncR( - TAccumulate seed, - Func func, - Func, TResult> resultSelector, - CancellationToken cancellationToken) : TaskSubscriberBase, TResult>(cancellationToken) -{ - TAccumulate value = seed; - - protected override void OnNextCore(TMessage message) - { - value = func(value, message); - } - - protected override void OnErrorResumeCore(Exception error) - { - TrySetException(error); - } - - protected override void OnCompletedCore(Result complete) - { try { - var result = resultSelector(value, complete); // trap this resultSelector exception - if (complete.IsSuccess) - { - TrySetResult(result); - } - else - { - TrySetException(complete.Exception); - } + var v = resultSelector(value); // trap this resultSelector exception + TrySetResult(v); } catch (Exception ex) { diff --git a/src/R3/Operators/AggregateOperators.cs b/src/R3/Operators/AggregateOperators.cs index debb6eea..b50a49b9 100644 --- a/src/R3/Operators/AggregateOperators.cs +++ b/src/R3/Operators/AggregateOperators.cs @@ -7,166 +7,96 @@ namespace R3; public static partial class EventExtensions { - public static Task ToArrayAsync(this Event source, CancellationToken cancellationToken = default) + public static Task ToArrayAsync(this Event source, CancellationToken cancellationToken = default) { - return AggregateAsync(source, new List(), static (list, message) => + return AggregateAsync(source, new List(), static (list, message) => { list.Add(message); return list; - }, (list, _) => list.ToArray(), cancellationToken); // ignore complete + }, (list) => list.ToArray(), cancellationToken); // ignore complete } - public static Task ToArrayAsync(this Event> source, CancellationToken cancellationToken = default) + public static Task> ToListAsync(this Event source, CancellationToken cancellationToken = default) { - return AggregateAsync(source, new List(), static (list, message) => + return AggregateAsync(source, new List(), static (list, message) => { list.Add(message); return list; - }, (list, _) => list.ToArray(), cancellationToken); // ignore complete + }, (list) => list, cancellationToken); // ignore complete } - public static Task> ToListAsync(this Event source, CancellationToken cancellationToken = default) - { - return AggregateAsync(source, new List(), static (list, message) => - { - list.Add(message); - return list; - }, (list, _) => list, cancellationToken); // ignore complete - } - - public static Task> ToListAsync(this Event> source, CancellationToken cancellationToken = default) - { - return AggregateAsync(source, new List(), static (list, message) => - { - list.Add(message); - return list; - }, (list, _) => list, cancellationToken); // ignore complete - } - - public static Task> ToHashSetAsync(this Event source, CancellationToken cancellationToken = default) + public static Task> ToHashSetAsync(this Event source, CancellationToken cancellationToken = default) { return ToHashSetAsync(source, null, cancellationToken); } - public static Task> ToHashSetAsync(this Event> source, CancellationToken cancellationToken = default) - { - return ToHashSetAsync(source, null, cancellationToken); - } - public static Task> ToHashSetAsync(this Event source, IEqualityComparer? equalityComparer, CancellationToken cancellationToken = default) + public static Task> ToHashSetAsync(this Event source, IEqualityComparer? equalityComparer, CancellationToken cancellationToken = default) { - return AggregateAsync(source, new HashSet(equalityComparer), static (value, message) => + return AggregateAsync(source, new HashSet(equalityComparer), static (value, message) => { value.Add(message); return value; - }, (value, _) => value, cancellationToken); // ignore complete + }, (value) => value, cancellationToken); // ignore complete } - public static Task> ToHashSetAsync(this Event> source, IEqualityComparer? equalityComparer, CancellationToken cancellationToken = default) - { - return AggregateAsync(source, new HashSet(equalityComparer), static (value, message) => - { - value.Add(message); - return value; - }, (value, _) => value, cancellationToken); // ignore complete - } // CountAsync using AggregateAsync - public static Task CountAsync(this Event source, CancellationToken cancellationToken = default) + public static Task CountAsync(this Event source, CancellationToken cancellationToken = default) { - return AggregateAsync(source, 0, static (count, _) => checked(count + 1), (count, _) => count, cancellationToken); // ignore complete - } - - // CountAsync Result variation - public static Task CountAsync(this Event> source, CancellationToken cancellationToken = default) - { - return AggregateAsync(source, 0, static (count, _) => checked(count + 1), (count, _) => count, cancellationToken); // ignore complete + return AggregateAsync(source, 0, static (count, _) => checked(count + 1), Stubs.ReturnSelf, cancellationToken); // ignore complete } // LongCountAsync using AggregateAsync - public static Task LongCountAsync(this Event source, CancellationToken cancellationToken = default) - { - return AggregateAsync(source, 0L, static (count, _) => checked(count + 1), (count, _) => count, cancellationToken); // ignore complete - } - - // LongCountAsync using AggregateAsync Result variation - public static Task LongCountAsync(this Event> source, CancellationToken cancellationToken = default) + public static Task LongCountAsync(this Event source, CancellationToken cancellationToken = default) { - return AggregateAsync(source, 0L, static (count, _) => checked(count + 1), (count, _) => count, cancellationToken); // ignore complete + return AggregateAsync(source, 0L, static (count, _) => checked(count + 1), Stubs.ReturnSelf, cancellationToken); // ignore complete } - public static Task MinAsync(this Event source, CancellationToken cancellationToken = default) + public static Task MinAsync(this Event source, CancellationToken cancellationToken = default) { - return AggregateAsync(source, (default(TMessage)!, hasValue: false), + return AggregateAsync(source, (default(T)!, hasValue: false), static (min, message) => { if (!min.hasValue) return (message, true); // first - return Comparer.Default.Compare(min.Item1, message) < 0 ? (min.Item1, true) : (message, true); + return Comparer.Default.Compare(min.Item1, message) < 0 ? (min.Item1, true) : (message, true); }, - static (min, _) => + static (min) => { if (!min.hasValue) throw new InvalidOperationException("Sequence contains no elements"); return min.Item1; }, cancellationToken); } - public static Task MinAsync(this Event> source, CancellationToken cancellationToken = default) - { - return AggregateAsync(source, (default(TMessage)!, hasValue: false), - static (min, message) => - { - if (!min.hasValue) return (message, true); // first - return Comparer.Default.Compare(min.Item1, message) < 0 ? (min.Item1, true) : (message, true); - }, - static (min, _) => - { - if (!min.hasValue) throw new InvalidOperationException("Sequence contains no elements"); - return min.Item1; - }, cancellationToken); - } - public static Task MaxAsync(this Event source, CancellationToken cancellationToken = default) + public static Task MaxAsync(this Event source, CancellationToken cancellationToken = default) { - return AggregateAsync(source, (default(TMessage)!, hasValue: false), + return AggregateAsync(source, (default(T)!, hasValue: false), static (max, message) => { if (!max.hasValue) return (message, true); // first - return Comparer.Default.Compare(max.Item1, message) > 0 ? (max.Item1, true) : (message, true); + return Comparer.Default.Compare(max.Item1, message) > 0 ? (max.Item1, true) : (message, true); }, - static (max, _) => + static (max) => { if (!max.hasValue) throw new InvalidOperationException("Sequence contains no elements"); return max.Item1; }, cancellationToken); } - public static Task MaxAsync(this Event> source, CancellationToken cancellationToken = default) - { - return AggregateAsync(source, (default(TMessage)!, hasValue: false), - static (max, message) => - { - if (!max.hasValue) return (message, true); // first - return Comparer.Default.Compare(max.Item1, message) > 0 ? (max.Item1, true) : (message, true); - }, - static (max, _) => - { - if (!max.hasValue) throw new InvalidOperationException("Sequence contains no elements"); - return max.Item1; - }, cancellationToken); - } - public static Task<(TMessage Min, TMessage Max)> MinMaxAsync(this Event source, CancellationToken cancellationToken = default) + public static Task<(T Min, T Max)> MinMaxAsync(this Event source, CancellationToken cancellationToken = default) { return AggregateAsync(source, - (min: default(TMessage)!, max: default(TMessage)!, hasValue: false), + (min: default(T)!, max: default(T)!, hasValue: false), static (minmax, message) => { if (!minmax.hasValue) return (message, message, true); // first - var min = Comparer.Default.Compare(minmax.min, message) < 0 ? minmax.min : message; - var max = Comparer.Default.Compare(minmax.max, message) > 0 ? minmax.max : message; + var min = Comparer.Default.Compare(minmax.min, message) < 0 ? minmax.min : message; + var max = Comparer.Default.Compare(minmax.max, message) > 0 ? minmax.max : message; return (min, max, true); }, - static (minmax, _) => + static (minmax) => { if (!minmax.hasValue) throw new InvalidOperationException("Sequence contains no elements"); return (minmax.min, minmax.max); @@ -174,46 +104,23 @@ public static Task MaxAsync(this Event MinMaxAsync(this Event> source, CancellationToken cancellationToken = default) - { - return AggregateAsync(source, - (min: default(TMessage)!, max: default(TMessage)!, hasValue: false), - static (minmax, message) => - { - if (!minmax.hasValue) return (message, message, true); // first - var min = Comparer.Default.Compare(minmax.min, message) < 0 ? minmax.min : message; - var max = Comparer.Default.Compare(minmax.max, message) > 0 ? minmax.max : message; - return (min, max, true); - }, - static (minmax, _) => - { - if (!minmax.hasValue) throw new InvalidOperationException("Sequence contains no elements"); - return (minmax.min, minmax.max); - }, cancellationToken); - } - - public static Task SumAsync(this Event source, CancellationToken cancellationToken = default) - where TMessage : IAdditionOperators + public static Task SumAsync(this Event source, CancellationToken cancellationToken = default) + where T : IAdditionOperators { - return AggregateAsync(source, default(TMessage)!, static (sum, message) => checked(sum + message), (sum, _) => sum, cancellationToken); // ignore complete + return AggregateAsync(source, default(T)!, static (sum, message) => checked(sum + message), Stubs.ReturnSelf, cancellationToken); // ignore complete } - public static Task SumAsync(this Event> source, CancellationToken cancellationToken = default) - where TMessage : IAdditionOperators - { - return AggregateAsync(source, default(TMessage)!, static (sum, message) => checked(sum + message), (sum, _) => sum, cancellationToken); // ignore complete - } - public static Task AverageAsync(this Event source, CancellationToken cancellationToken = default) - where TMessage : INumberBase + public static Task AverageAsync(this Event source, CancellationToken cancellationToken = default) + where T : INumberBase { return AggregateAsync(source, - (sum: default(TMessage)!, count: 0, hasValue: false), + (sum: default(T)!, count: 0, hasValue: false), static (avg, message) => { return (checked(avg.sum + message), checked(avg.count + 1), true); // sum, count, hasValue }, - static (avg, _) => + static (avg) => { if (!avg.hasValue) throw new InvalidOperationException("Sequence contains no elements"); return double.CreateChecked(avg.sum) / double.CreateChecked(avg.count); @@ -221,25 +128,9 @@ public static Task AverageAsync(this Event AverageAsync(this Event> source, CancellationToken cancellationToken = default) - where TMessage : INumberBase - { - return AggregateAsync(source, - (default(TMessage)!, 0), - static ((TMessage Sum, long Count) avg, TMessage message) => (avg.Sum + message, checked(avg.Count + 1)), - (avg, _) => double.CreateChecked(avg.Item1) / double.CreateChecked(avg.Item2), // ignore complete - cancellationToken); - } - - public static Task WaitAsync(this Event source, CancellationToken cancellationToken = default) - { - // get only complete value. - return AggregateAsync(source, 0, static (_, _) => 0, static (_, result) => result, cancellationToken); - } - public static Task WaitAsync(this Event source, CancellationToken cancellationToken = default) + public static Task WaitAsync(this Event source, CancellationToken cancellationToken = default) { - // get only complete value. - return AggregateAsync(source, 0, static (_, _) => 0, static (_, result) => result, cancellationToken); + return AggregateAsync(source, 0, static (_, _) => 0, Stubs.ReturnSelf, cancellationToken); } } diff --git a/src/R3/Operators/CombineLatest.cs b/src/R3/Operators/CombineLatest.cs index 7ffdc6d6..29d8e4bd 100644 --- a/src/R3/Operators/CombineLatest.cs +++ b/src/R3/Operators/CombineLatest.cs @@ -1,143 +1,142 @@ -namespace R3; - -public static partial class EventExtensions -{ - public static Event CombineLatest( - this Event left, - Event right, - Func selector) - { - return new CombineLatest(left, right, selector, static (x, y) => default); - } - - public static Event CombineLatest( - this Event left, - Event right, - Func selector, - Func completeSelector) - { - return new CombineLatest(left, right, selector, completeSelector); - } -} - -internal sealed class CombineLatest( - Event left, - Event right, - Func selector, - Func completeSelector) : Event -{ - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - var method = new _CombineLatest(subscriber, selector, completeSelector); - - var d1 = left.Subscribe(new LeftSubscriber(method)); - try - { - var d2 = right.Subscribe(new RightSubscriber(method)); - return Disposable.Combine(d1, d2); - } - catch - { - d1.Dispose(); - throw; - } - } - - sealed class _CombineLatest(Subscriber subscriber, Func selector, Func completeSelector) - { - internal TLeft? message1; - internal bool hasMessage1; - internal TLeftComplete? complete1; - internal bool hasComplete1; - - internal TRight? message2; - internal bool hasMessage2; - internal TRightComplete? complete2; - internal bool hasComplete2; - - public void OnErrorResume(Exception error) - { - subscriber.OnErrorResume(error); - } - - internal void Publish() - { - if (hasMessage1 && hasMessage2) - { - var result = selector(message1!, message2!); - subscriber.OnNext(result); - } - } - - internal void Complete() - { - if (hasComplete1 && hasComplete2) - { - var result = completeSelector(complete1!, complete2!); - subscriber.OnCompleted(result); - } - } - } - - sealed class LeftSubscriber(_CombineLatest parent) : Subscriber - { - protected override void OnNextCore(TLeft message) - { - lock (parent) // `_CombineLatest` is hide in Disposable.Combine so safe to use lock - { - parent.hasMessage1 = true; - parent.message1 = message; - parent.Publish(); - } - } - - protected override void OnErrorResumeCore(Exception error) - { - lock (parent) - { - parent.OnErrorResume(error); - } - } - - protected override void OnCompletedCore(TLeftComplete complete) - { - lock (parent) - { - parent.hasComplete1 = true; - parent.complete1 = complete; - parent.Complete(); - } - } - } - - sealed class RightSubscriber(_CombineLatest parent) : Subscriber - { - protected override void OnNextCore(TRight message) - { - lock (parent) // `_CombineLatest` is hide in Disposable.Combine so safe to use lock - { - parent.hasMessage2 = true; - parent.message2 = message; - parent.Publish(); - } - } - - protected override void OnErrorResumeCore(Exception error) - { - lock (parent) - { - parent.OnErrorResume(error); - } - } - - protected override void OnCompletedCore(TRightComplete complete) - { - lock (parent) - { - parent.hasComplete2 = true; - parent.complete2 = complete; - parent.Complete(); - } - } - } -} +//namespace R3; + +//public static partial class EventExtensions +//{ +// public static Event CombineLatest( +// this Event left, +// Event right, +// Func selector) +// { +// return new CombineLatest(left, right, selector); +// } +//} + +//internal sealed class CombineLatest(Event left, Event right, Func selector) : Event +//{ +// protected override IDisposable SubscribeCore(Subscriber subscriber) +// { +// var method = new _CombineLatest(subscriber, selector); +// var left = new LeftSubscriber(method); +// var right = new RightSubscriber(method); + +// var leftD = left.Subscribe(new LeftSubscriber(method)); +// try +// { +// var rightD = right.Subscribe(new RightSubscriber(method)); +// return Disposable.Combine(leftD, rightD); +// } +// catch +// { +// leftD.Dispose(); +// throw; +// } +// } + +// internal sealed class _CombineLatest(Subscriber subscriber, Func selector) +// { +// internal TLeft? message1; +// internal bool hasMessage1; +// internal bool hasComplete1; + +// internal TRight? message2; +// internal bool hasMessage2; +// internal bool hasComplete2; + +// public void OnErrorResume(Exception error) +// { +// subscriber.OnErrorResume(error); +// } + +// internal void Publish() +// { +// if (hasMessage1 && hasMessage2) +// { +// var result = selector(message1!, message2!); +// subscriber.OnNext(result); +// } +// } + +// internal void Complete() +// { +// if (hasComplete1 && hasComplete2) +// { +// var result = completeSelector(complete1!, complete2!); +// subscriber.OnCompleted(result); +// } +// } + +// internal void CompleteError() +// { +// if (hasComplete1 && hasComplete2) +// { +// var result = completeSelector(complete1!, complete2!); +// subscriber.OnCompleted(result); +// } +// } +// } + +// internal sealed class LeftSubscriber(_CombineLatest parent) : Subscriber +// { +// protected override void OnNextCore(TLeft message) +// { +// lock (parent) // `_CombineLatest` is hide in Disposable.Combine so safe to use lock +// { +// parent.hasMessage1 = true; +// parent.message1 = message; +// parent.Publish(); +// } +// } + +// protected override void OnErrorResumeCore(Exception error) +// { +// lock (parent) +// { +// parent.OnErrorResume(error); +// } +// } + +// protected override void OnCompletedCore(Result complete) +// { +// lock (parent) +// { +// parent.hasComplete1 = true; +// parent.complete1 = complete; +// parent.Complete(); +// } +// } +// } + +// internal sealed class RightSubscriber(_CombineLatest parent) : Subscriber +// { +// protected override void OnNextCore(TRight message) +// { +// lock (parent) // `_CombineLatest` is hide in Disposable.Combine so safe to use lock +// { +// parent.hasMessage2 = true; +// parent.message2 = message; +// parent.Publish(); +// } +// } + +// protected override void OnErrorResumeCore(Exception error) +// { +// lock (parent) +// { +// parent.OnErrorResume(error); +// } +// } + +// protected override void OnCompletedCore(Result complete) +// { +// lock (parent) +// { +// parent.hasComplete2 = true; +// parent.complete2 = complete; +// parent.Complete(); +// } +// } +// } +//} + + + diff --git a/src/R3/Operators/Delay.cs b/src/R3/Operators/Delay.cs index 4aa08ae6..e562b337 100644 --- a/src/R3/Operators/Delay.cs +++ b/src/R3/Operators/Delay.cs @@ -4,40 +4,40 @@ namespace R3; public static partial class EventExtensions { - //public static Event Delay(this Event source, TimeSpan dueTime, TimeProvider timeProvider) + //public static Event Delay(this Event source, TimeSpan dueTime, TimeProvider timeProvider) //{ - // return new Delay(source, dueTime, timeProvider); + // return new Delay(source, dueTime, timeProvider); //} - //public static ICompletableEvent Delay(this ICompletableEvent source, TimeSpan dueTime, TimeProvider timeProvider) + //public static ICompletableEvent Delay(this ICompletableEvent source, TimeSpan dueTime, TimeProvider timeProvider) //{ - // return new Delay(source, dueTime, timeProvider); + // return new Delay(source, dueTime, timeProvider); //} } // TODO:dueTime validation -//internal sealed class Delay(Event source, TimeSpan dueTime, TimeProvider timeProvider) : Event +//internal sealed class Delay(Event source, TimeSpan dueTime, TimeProvider timeProvider) : Event //{ -// protected override IDisposable SubscribeCore(Subscriber subscriber) +// protected override IDisposable SubscribeCore(Subscriber subscriber) // { // var delay = new _Delay(subscriber, dueTime, timeProvider); // source.Subscribe(delay); // return delay; // } -// class _Delay : Subscriber, IDisposable +// class _Delay : Subscriber, IDisposable // { // static readonly TimerCallback timerCallback = DrainMessages; -// readonly Subscriber subscriber; +// readonly Subscriber subscriber; // readonly TimeSpan dueTime; // readonly TimeProvider timeProvider; // ITimer? timer; -// readonly Queue<(long timestamp, TMessage message)> queue = new Queue<(long, TMessage)>(); // lock gate +// readonly Queue<(long timestamp, T value)> queue = new Queue<(long, TMessage)>(); // lock gate // bool running; -// public _Delay(Subscriber subscriber, TimeSpan dueTime, TimeProvider timeProvider) +// public _Delay(Subscriber subscriber, TimeSpan dueTime, TimeProvider timeProvider) // { // this.subscriber = subscriber; // this.dueTime = dueTime; @@ -45,7 +45,7 @@ public static partial class EventExtensions // this.timer = timeProvider.CreateStoppedTimer(timerCallback, this); // } -// protected override void OnNextCore(TMessage message) +// protected override void OnNextCore(T value) // { // var timestamp = timeProvider.GetTimestamp(); // lock (queue) @@ -76,7 +76,7 @@ public static partial class EventExtensions // var self = (_Delay)state!; // var queue = self.queue; -// TMessage message; +// T value; // while (true) // { // lock (queue) @@ -111,7 +111,7 @@ public static partial class EventExtensions // try // { -// self.subscriber.OnNext(message); +// self.subscriber.OnNext(value); // continue; // loop to drain all messages // } // catch @@ -148,9 +148,9 @@ public static partial class EventExtensions // } //} -//internal sealed class Delay(ICompletableEvent source, TimeSpan dueTime, TimeProvider timeProvider) : ICompletableEvent +//internal sealed class Delay(ICompletableEvent source, TimeSpan dueTime, TimeProvider timeProvider) : ICompletableEvent //{ -// public IDisposable Subscribe(ISubscriber subscriber) +// public IDisposable Subscribe(ISubscriber subscriber) // { // var delay = new _Delay(subscriber, dueTime, timeProvider); // var sourceSubscription = source.Subscribe(delay); @@ -158,28 +158,28 @@ public static partial class EventExtensions // return Disposable.Combine(delaySubscription, sourceSubscription); // call delay's dispose first // } -// class _Delay : ISubscriber +// class _Delay : ISubscriber // { // static readonly TimerCallback timerCallback = DrainMessages; // static readonly Action disposeCallback = OnDisposed; // public CallbackDisposable Subscription { get; private set; } -// readonly ISubscriber subscriber; +// readonly ISubscriber subscriber; // readonly TimeSpan dueTime; // readonly TimeProvider timeProvider; // readonly ITimer timer; -// readonly Queue<(long timestamp, TMessage message)> queue = new Queue<(long, TMessage)>(); // lock gate +// readonly Queue<(long timestamp, T value)> queue = new Queue<(long, TMessage)>(); // lock gate // bool running; // // for Completed event -// TComplete? completeMessage; +// Result? completeMessage; // DateTimeOffset completeAt; // bool isCompleted; -// public _Delay(ISubscriber subscriber, TimeSpan dueTime, TimeProvider timeProvider) +// public _Delay(ISubscriber subscriber, TimeSpan dueTime, TimeProvider timeProvider) // { // this.dueTime = dueTime; // this.subscriber = subscriber; @@ -188,7 +188,7 @@ public static partial class EventExtensions // this.Subscription = new CallbackDisposable(disposeCallback, this); // } -// public void OnNext(TMessage message) +// public void OnNext(T value) // { // var current = timeProvider.GetTimestamp(); // lock (queue) @@ -208,7 +208,7 @@ public static partial class EventExtensions // } // } -// public void OnCompleted(TComplete complete) +// public void OnCompleted(Result complete) // { // var completeAt = timeProvider.GetUtcNow() + dueTime; // lock (queue) @@ -236,7 +236,7 @@ public static partial class EventExtensions // var self = (_Delay)state!; // var queue = self.queue; -// TMessage message; +// T value; // bool invokeCompleted = false; // while (true) // { @@ -281,7 +281,7 @@ public static partial class EventExtensions // { // try // { -// self.subscriber.OnNext(message); +// self.subscriber.OnNext(value); // continue; // succeed, loop to drain all messages // } // catch diff --git a/src/R3/Operators/DelayFrame.cs b/src/R3/Operators/DelayFrame.cs index 8398f0fb..557a8dec 100644 --- a/src/R3/Operators/DelayFrame.cs +++ b/src/R3/Operators/DelayFrame.cs @@ -2,42 +2,42 @@ public static partial class EventExtensions { - //public static Event DelayFrame(this Event source, int delayFrameCount, FrameProvider frameProvider) + //public static Event DelayFrame(this Event source, int delayFrameCount, FrameProvider frameProvider) //{ - // return new DelayFrame(source, delayFrameCount, frameProvider); + // return new DelayFrame(source, delayFrameCount, frameProvider); //} } // TODO:dueTime validation // TODO:impl minaosi. -//internal sealed class DelayFrame(Event source, int delayFrameCount, FrameProvider frameProvider) : Event +//internal sealed class DelayFrame(Event source, int delayFrameCount, FrameProvider frameProvider) : Event //{ -// protected override IDisposable SubscribeCore(Subscriber subscriber) +// protected override IDisposable SubscribeCore(Subscriber subscriber) // { // var delay = new _DelayFrame(subscriber, delayFrameCount, frameProvider); // source.Subscribe(delay); // source subscription is included in _DelayFrame // return delay; // } -// class _DelayFrame : Subscriber, IFrameRunnerWorkItem +// class _DelayFrame : Subscriber, IFrameRunnerWorkItem // { -// readonly Subscriber subscriber; +// readonly Subscriber subscriber; // readonly int delayFrameCount; // readonly FrameProvider frameProvider; -// readonly Queue<(long frameCount, TMessage message)> queue = new Queue<(long, TMessage)>(); // lock gate +// readonly Queue<(long frameCount, T value)> queue = new Queue<(long, TMessage)>(); // lock gate // bool running; // long nextTick; // bool stopRunner; -// public _DelayFrame(Subscriber subscriber, int delayFrameCount, FrameProvider frameProvider) +// public _DelayFrame(Subscriber subscriber, int delayFrameCount, FrameProvider frameProvider) // { // this.subscriber = subscriber; // this.delayFrameCount = delayFrameCount; // this.frameProvider = frameProvider; // } -// protected override void OnNextCore(TMessage message) +// protected override void OnNextCore(T value) // { // var currentCount = frameProvider.GetFrameCount(); // lock (queue) @@ -76,7 +76,7 @@ public static partial class EventExtensions // return true; // } -// TMessage message; +// T value; // while (true) // { // lock (queue) @@ -111,7 +111,7 @@ public static partial class EventExtensions // try // { -// subscriber.OnNext(message); +// subscriber.OnNext(value); // continue; // loop to drain all messages // } // catch diff --git a/src/R3/Operators/DoOnCompleted.cs b/src/R3/Operators/DoOnCompleted.cs index 39056a23..66c3e060 100644 --- a/src/R3/Operators/DoOnCompleted.cs +++ b/src/R3/Operators/DoOnCompleted.cs @@ -4,28 +4,28 @@ public static partial class EventExtensions { // TODO: more accurate impl // TODO: with state - public static Event DoOnCompleted(this Event source, Action action) + public static Event DoOnCompleted(this Event source, Action action) { - return new DoOnCompleted(source, action); + return new DoOnCompleted(source, action); } } -internal sealed class DoOnCompleted(Event source, Action action) : Event +internal sealed class DoOnCompleted(Event source, Action action) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { var method = new _DoOnCompleted(subscriber, action); source.Subscribe(method); return method; } - class _DoOnCompleted(Subscriber subscriber, Action action) : Subscriber, IDisposable + class _DoOnCompleted(Subscriber subscriber, Action action) : Subscriber, IDisposable { - Action? action = action; + Action? action = action; - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { - subscriber.OnNext(message); + subscriber.OnNext(value); } protected override void OnErrorResumeCore(Exception error) @@ -33,10 +33,10 @@ protected override void OnErrorResumeCore(Exception error) subscriber.OnErrorResume(error); } - protected override void OnCompletedCore(TComplete complete) + protected override void OnCompletedCore(Result result) { - Interlocked.Exchange(ref action, null)?.Invoke(); - subscriber.OnCompleted(complete); + Interlocked.Exchange(ref action, null)?.Invoke(result); + subscriber.OnCompleted(result); } -} + } } diff --git a/src/R3/Operators/DoOnDisposed.cs b/src/R3/Operators/DoOnDisposed.cs index cc41a347..9c22a1f1 100644 --- a/src/R3/Operators/DoOnDisposed.cs +++ b/src/R3/Operators/DoOnDisposed.cs @@ -5,33 +5,35 @@ public static partial class EventExtensions // TODO: doOnSubscribed // Finally - public static Event DoOnDisposed(this Event source, Action action) + public static Event DoOnDisposed(this Event source, Action action) { - return new DoOnDisposed(source, action); + return new DoOnDisposed(source, action); } - public static Event DoOnDisposed(this Event source, Action action, TState state) + public static Event DoOnDisposed(this Event source, Action action, TState state) { - return new DoOnDisposed(source, action, state); + return new DoOnDisposed(source, action, state); } } -internal sealed class DoOnDisposed(Event source, Action action) : Event + + +internal sealed class DoOnDisposed(Event source, Action action) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { var method = new _DoOnDisposed(subscriber, action); source.Subscribe(method); return method; } - class _DoOnDisposed(Subscriber subscriber, Action action) : Subscriber, IDisposable + class _DoOnDisposed(Subscriber subscriber, Action action) : Subscriber, IDisposable { Action? action = action; - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { - subscriber.OnNext(message); + subscriber.OnNext(value); } protected override void OnErrorResumeCore(Exception error) @@ -39,9 +41,9 @@ protected override void OnErrorResumeCore(Exception error) subscriber.OnErrorResume(error); } - protected override void OnCompletedCore(TComplete complete) + protected override void OnCompletedCore(Result result) { - subscriber.OnCompleted(complete); + subscriber.OnCompleted(result); } protected override void DisposeCore() @@ -51,23 +53,23 @@ protected override void DisposeCore() } } -internal sealed class DoOnDisposed(Event source, Action action, TState state) : Event +internal sealed class DoOnDisposed(Event source, Action action, TState state) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { var method = new _DoOnDisposed(subscriber, action, state); source.Subscribe(method); return method; } - class _DoOnDisposed(Subscriber subscriber, Action action, TState state) : Subscriber, IDisposable + class _DoOnDisposed(Subscriber subscriber, Action action, TState state) : Subscriber, IDisposable { Action? action = action; TState state = state; - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { - subscriber.OnNext(message); + subscriber.OnNext(value); } protected override void OnErrorResumeCore(Exception error) @@ -75,9 +77,9 @@ protected override void OnErrorResumeCore(Exception error) subscriber.OnErrorResume(error); } - protected override void OnCompletedCore(TComplete complete) + protected override void OnCompletedCore(Result result) { - subscriber.OnCompleted(complete); + subscriber.OnCompleted(result); } protected override void DisposeCore() diff --git a/src/R3/Operators/FirstLastSingle.cs b/src/R3/Operators/FirstLastSingle.cs index 70155dc6..cd2c9477 100644 --- a/src/R3/Operators/FirstLastSingle.cs +++ b/src/R3/Operators/FirstLastSingle.cs @@ -4,107 +4,38 @@ public static partial class EventExtensions { - // Completable - - public static Task FirstAsync(this Event source, CancellationToken cancellationToken = default) => FirstAsync(source, static _ => true, cancellationToken); - public static Task FirstOrDefaultAsync(this Event source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); - public static Task LastAsync(this Event source, CancellationToken cancellationToken = default) => LastAsync(source, static _ => true, cancellationToken); - public static Task LastOrDefaultAsync(this Event source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => LastOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); - public static Task SingleAsync(this Event source, CancellationToken cancellationToken = default) => SingleAsync(source, static _ => true, cancellationToken); - public static Task SingleOrDefaultAsync(this Event source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => SingleOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); + public static Task FirstAsync(this Event source, CancellationToken cancellationToken = default) => FirstAsync(source, static _ => true, cancellationToken); + public static Task FirstOrDefaultAsync(this Event source, T? defaultValue = default, CancellationToken cancellationToken = default) => FirstOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); + public static Task LastAsync(this Event source, CancellationToken cancellationToken = default) => LastAsync(source, static _ => true, cancellationToken); + public static Task LastOrDefaultAsync(this Event source, T? defaultValue = default, CancellationToken cancellationToken = default) => LastOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); + public static Task SingleAsync(this Event source, CancellationToken cancellationToken = default) => SingleAsync(source, static _ => true, cancellationToken); + public static Task SingleOrDefaultAsync(this Event source, T? defaultValue = default, CancellationToken cancellationToken = default) => SingleOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); // with predicate - - public static Task FirstAsync(this Event source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, false, default, predicate, cancellationToken); - public static Task FirstOrDefaultAsync(this Event source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, true, defaultValue, predicate, cancellationToken); - public static Task LastAsync(this Event source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, false, default, predicate, cancellationToken); - public static Task LastOrDefaultAsync(this Event source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, true, defaultValue, predicate, cancellationToken); - public static Task SingleAsync(this Event source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, false, default, predicate, cancellationToken); - public static Task SingleOrDefaultAsync(this Event source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, true, defaultValue, predicate, cancellationToken); - - // Result variation - - public static Task FirstAsync(this Event> source, CancellationToken cancellationToken = default) => FirstAsync(source, static _ => true, cancellationToken); - public static Task FirstOrDefaultAsync(this Event> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); - public static Task LastAsync(this Event> source, CancellationToken cancellationToken = default) => LastAsync(source, static _ => true, cancellationToken); - public static Task LastOrDefaultAsync(this Event> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => LastOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); - public static Task SingleAsync(this Event> source, CancellationToken cancellationToken = default) => SingleAsync(source, static _ => true, cancellationToken); - public static Task SingleOrDefaultAsync(this Event> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => SingleOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); - public static Task FirstAsync(this Event> source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, false, default, predicate, cancellationToken); - public static Task FirstOrDefaultAsync(this Event> source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, true, defaultValue, predicate, cancellationToken); - public static Task LastAsync(this Event> source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, false, default, predicate, cancellationToken); - public static Task LastOrDefaultAsync(this Event> source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, true, defaultValue, predicate, cancellationToken); - public static Task SingleAsync(this Event> source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, false, default, predicate, cancellationToken); - public static Task SingleOrDefaultAsync(this Event> source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, true, defaultValue, predicate, cancellationToken); - - static Task FirstLastSingleAsync(this Event source, FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func predicate, CancellationToken cancellationToken) - { - var subscriber = new FirstLastSingle(operation, useDefaultIfEmpty, defaultValue, predicate, cancellationToken); - source.Subscribe(subscriber); - return subscriber.Task; - } - - static Task FirstLastSingleAsync(this Event> source, FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func predicate, CancellationToken cancellationToken) + public static Task FirstAsync(this Event source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, false, default, predicate, cancellationToken); + public static Task FirstOrDefaultAsync(this Event source, Func predicate, T? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, true, defaultValue, predicate, cancellationToken); + public static Task LastAsync(this Event source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, false, default, predicate, cancellationToken); + public static Task LastOrDefaultAsync(this Event source, Func predicate, T? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, true, defaultValue, predicate, cancellationToken); + public static Task SingleAsync(this Event source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, false, default, predicate, cancellationToken); + public static Task SingleOrDefaultAsync(this Event source, Func predicate, T? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, true, defaultValue, predicate, cancellationToken); + + static Task FirstLastSingleAsync(this Event source, FirstLastSingleOperation operation, bool useDefaultIfEmpty, T? defaultValue, Func predicate, CancellationToken cancellationToken) { - var subscriber = new FirstLastSingleR(operation, useDefaultIfEmpty, defaultValue, predicate, cancellationToken); + var subscriber = new FirstLastSingle(operation, useDefaultIfEmpty, defaultValue, predicate, cancellationToken); source.Subscribe(subscriber); return subscriber.Task; } } -internal sealed class FirstLastSingle(FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func predicate, CancellationToken cancellationToken) - : TaskSubscriberBase(cancellationToken) -{ - bool hasValue; - TMessage? latestValue = defaultValue; - - protected override void OnNextCore(TMessage message) - { - if (!predicate(message)) return; - - if (operation == FirstLastSingleOperation.Single && hasValue) - { - TrySetException(new InvalidOperationException("Sequence contains more than one element.")); - return; - } - - hasValue = true; - if (operation == FirstLastSingleOperation.First) - { - TrySetResult(message); // First / FirstOrDefault - } - else - { - latestValue = message; - } - } - - protected override void OnErrorResumeCore(Exception error) - { - TrySetException(error); - } - - protected override void OnCompletedCore(TComplete complete) - { - if (hasValue || useDefaultIfEmpty) - { - TrySetResult(latestValue!); // FirstOrDefault / Last / LastOrDefault / Single / SingleOrDefault - return; - } - - TrySetException(new InvalidOperationException("Sequence contains no elements.")); - } -} - -internal sealed class FirstLastSingleR(FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func predicate, CancellationToken cancellationToken) - : TaskSubscriberBase, TMessage>(cancellationToken) +internal sealed class FirstLastSingle(FirstLastSingleOperation operation, bool useDefaultIfEmpty, T? defaultValue, Func predicate, CancellationToken cancellationToken) + : TaskSubscriberBase(cancellationToken) { bool hasValue; - TMessage? latestValue = defaultValue; + T? latestValue = defaultValue; - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { - if (!predicate(message)) return; + if (!predicate(value)) return; if (operation == FirstLastSingleOperation.Single && hasValue) { @@ -115,11 +46,11 @@ protected override void OnNextCore(TMessage message) hasValue = true; if (operation == FirstLastSingleOperation.First) { - TrySetResult(message); // First / FirstOrDefault + TrySetResult(value); // First / FirstOrDefault } else { - latestValue = message; + latestValue = value; } } @@ -128,17 +59,17 @@ protected override void OnErrorResumeCore(Exception error) TrySetException(error); } - protected override void OnCompletedCore(Result complete) + protected override void OnCompletedCore(Result result) { - if (complete.IsFailure) + if (result.IsFailure) { - TrySetException(complete.Exception); + TrySetException(result.Exception); return; } if (hasValue || useDefaultIfEmpty) { - TrySetResult(latestValue!); // FirstOrDefault / Last / LastOrDefault / Single / SingleOrDefault + TrySetResult(latestValue!); return; } diff --git a/src/R3/Operators/OnErrorAsComplete.cs b/src/R3/Operators/OnErrorAsComplete.cs index 19c3d187..357ad0cb 100644 --- a/src/R3/Operators/OnErrorAsComplete.cs +++ b/src/R3/Operators/OnErrorAsComplete.cs @@ -2,32 +2,32 @@ public static partial class EventExtensions { - public static Event> OnErrorAsComplete(this Event source) + public static Event> OnErrorAsComplete(this Event source) { - return new OnErrorAsComplete(source); + return new OnErrorAsComplete(source); } } -internal class OnErrorAsComplete(Event source) : Event> +internal class OnErrorAsComplete(Event source) : Event> { - protected override IDisposable SubscribeCore(Subscriber> subscriber) + protected override IDisposable SubscribeCore(Subscriber> subscriber) { return source.Subscribe(new _OnErrorAsComplete(subscriber)); } - sealed class _OnErrorAsComplete(Subscriber> subscriber) : Subscriber + sealed class _OnErrorAsComplete(Subscriber> subscriber) : Subscriber { - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { - subscriber.OnNext(message); + subscriber.OnNext(value); } protected override void OnErrorResumeCore(Exception error) { - subscriber.OnCompleted(Result.Failure(error)); + subscriber.OnCompleted(Result.Failure(error)); } - protected override void OnCompletedCore(TComplete complete) + protected override void OnCompletedCore(Result complete) { subscriber.OnCompleted(Result.Success(complete)); } diff --git a/src/R3/Operators/Select.cs b/src/R3/Operators/Select.cs index 4b034e4f..501de3f3 100644 --- a/src/R3/Operators/Select.cs +++ b/src/R3/Operators/Select.cs @@ -6,39 +6,39 @@ public static partial class EventExtensions // TODO: CompletableEvent.Select // TODO: Element index overload - // TODO: Select for TComplete + // TODO: Select for Result - public static Event Select( - this Event source, + public static Event Select( + this Event source, Func messageSelector) { - return new Select(source, messageSelector, Stubs.ReturnSelf); + return new Select(source, messageSelector, Stubs.ReturnSelf); } - public static Event Select( - this Event source, + public static Event Select( + this Event source, Func messageSelector, - Func completeSelector) + Func completeSelector) { - return new Select(source, messageSelector, completeSelector); + return new Select(source, messageSelector, completeSelector); } } -internal sealed class Select( - Event source, +internal sealed class Select( + Event source, Func messageSelector, - Func completeSelector - ) : Event + Func completeSelector + ) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { return source.Subscribe(new _Select(subscriber, messageSelector, completeSelector)); } - class _Select(Subscriber subscriber, Func messageSelector, Func completeSelector) : Subscriber + class _Select(Subscriber subscriber, Func messageSelector, Func completeSelector) : Subscriber { - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { subscriber.OnNext(messageSelector(message)); } @@ -48,7 +48,7 @@ protected override void OnErrorResumeCore(Exception error) subscriber.OnErrorResume(error); } - protected override void OnCompletedCore(TComplete complete) + protected override void OnCompletedCore(Result complete) { subscriber.OnCompleted(completeSelector(complete)); } diff --git a/src/R3/Operators/Take.cs b/src/R3/Operators/Take.cs index 52886d17..3d8ec430 100644 --- a/src/R3/Operators/Take.cs +++ b/src/R3/Operators/Take.cs @@ -2,39 +2,39 @@ public static partial class EventExtensions { - public static Event Take(this Event source, int count) + public static Event Take(this Event source, int count) { - return new Take(source, count, default, null); + return new Take(source, count, default, null); } - public static Event Take(this Event source, int count, TComplete interruptMessage) + public static Event Take(this Event source, int count interruptMessage) { - return new Take(source, count, interruptMessage, null); + return new Take(source, count, interruptMessage, null); } - public static Event Take(this Event source, int count, Func interruptMessageFactory) + public static Event Take(this Event source, int count, Func interruptMessageFactory) { - return new Take(source, count, default!, interruptMessageFactory); + return new Take(source, count, default!, interruptMessageFactory); } } -internal sealed class Take(Event source, int count, TComplete interruptMessage, Func? interruptMessageFactory) : Event +internal sealed class Take(Event source, int count interruptMessage, Func? interruptMessageFactory) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { return source.Subscribe(new _Take(subscriber, count, interruptMessage, interruptMessageFactory)); } - sealed class _Take(Subscriber subscriber, int count, TComplete interruptMessage, Func? interruptMessageFactory) : Subscriber, IDisposable + sealed class _Take(Subscriber subscriber, int count interruptMessage, Func? interruptMessageFactory) : Subscriber, IDisposable { int remaining = count; - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { if (remaining > 0) { remaining--; - subscriber.OnNext(message); + subscriber.OnNext(value); } else { @@ -54,7 +54,7 @@ protected override void OnErrorResumeCore(Exception error) subscriber.OnErrorResume(error); } - protected override void OnCompletedCore(TComplete complete) + protected override void OnCompletedCore(Result complete) { subscriber.OnCompleted(complete); } diff --git a/src/R3/Operators/ToObservable.cs b/src/R3/Operators/ToObservable.cs index aa6664cc..2c0f2640 100644 --- a/src/R3/Operators/ToObservable.cs +++ b/src/R3/Operators/ToObservable.cs @@ -3,29 +3,29 @@ public static partial class EventExtensions { // TODO: more overload? - public static IObservable ToObservable(this Event source) + public static IObservable ToObservable(this Event source) { - return new ToObservable(source); + return new ToObservable(source); } - public static IObservable ToObservable(this Event> source) + public static IObservable ToObservable(this Event source) { - return new ToObservableR(source); + return new ToObservableR(source); } } -internal sealed class ToObservable(Event source) : IObservable +internal sealed class ToObservable(Event source) : IObservable { - public IDisposable Subscribe(IObserver observer) + public IDisposable Subscribe(IObserver observer) { return source.Subscribe(new ObserverToSubscriber(observer)); } - sealed class ObserverToSubscriber(IObserver observer) : Subscriber + sealed class ObserverToSubscriber(IObserver observer) : Subscriber { - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { - observer.OnNext(message); + observer.OnNext(value); } protected override void OnErrorResumeCore(Exception error) @@ -40,18 +40,18 @@ protected override void OnCompletedCore(Unit complete) } } -internal sealed class ToObservableR(Event> source) : IObservable +internal sealed class ToObservableR(Event source) : IObservable { - public IDisposable Subscribe(IObserver observer) + public IDisposable Subscribe(IObserver observer) { return source.Subscribe(new ObserverToSubscriber(observer)); } - sealed class ObserverToSubscriber(IObserver observer) : Subscriber> + sealed class ObserverToSubscriber(IObserver observer) : Subscriber { - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { - observer.OnNext(message); + observer.OnNext(value); } protected override void OnErrorResumeCore(Exception error) diff --git a/src/R3/Operators/Where.cs b/src/R3/Operators/Where.cs index 9b470df8..a6f8665c 100644 --- a/src/R3/Operators/Where.cs +++ b/src/R3/Operators/Where.cs @@ -2,41 +2,41 @@ public static partial class EventExtensions { - public static Event Where(this Event source, Func predicate) + public static Event Where(this Event source, Func predicate) { - if (source is Where where) + if (source is Where where) { // Optimize for Where.Where, create combined predicate. var p = where.predicate; - return new Where(where.source, x => p(x) && predicate(x)); + return new Where(where.source, x => p(x) && predicate(x)); } - return new Where(source, predicate); + return new Where(source, predicate); } - public static Event Where(this Event source, Func predicate) + public static Event Where(this Event source, Func predicate) { - return new WhereIndexed(source, predicate); + return new WhereIndexed(source, predicate); } } -internal sealed class Where(Event source, Func predicate) : Event +internal sealed class Where(Event source, Func predicate) : Event { - internal Event source = source; + internal Event source = source; internal Func predicate = predicate; - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { return source.Subscribe(new _Where(subscriber, predicate)); } - class _Where(Subscriber subscriber, Func predicate) : Subscriber + class _Where(Subscriber subscriber, Func predicate) : Subscriber { - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { if (predicate(message)) { - subscriber.OnNext(message); + subscriber.OnNext(value); } } @@ -45,29 +45,29 @@ protected override void OnErrorResumeCore(Exception error) subscriber.OnErrorResume(error); } - protected override void OnCompletedCore(TComplete complete) + protected override void OnCompletedCore(Result complete) { subscriber.OnCompleted(complete); } } } -internal sealed class WhereIndexed(Event source, Func predicate) : Event +internal sealed class WhereIndexed(Event source, Func predicate) : Event { - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { return source.Subscribe(new _Where(subscriber, predicate)); } - class _Where(Subscriber subscriber, Func predicate) : Subscriber + class _Where(Subscriber subscriber, Func predicate) : Subscriber { int index = 0; - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { if (predicate(message, index++)) { - subscriber.OnNext(message); + subscriber.OnNext(value); } } @@ -76,7 +76,7 @@ protected override void OnErrorResumeCore(Exception error) subscriber.OnErrorResume(error); } - protected override void OnCompletedCore(TComplete complete) + protected override void OnCompletedCore(Result complete) { subscriber.OnCompleted(complete); } diff --git a/src/R3/Operators/_Operators.cs b/src/R3/Operators/_Operators.cs index 581dba86..98d3fadd 100644 --- a/src/R3/Operators/_Operators.cs +++ b/src/R3/Operators/_Operators.cs @@ -27,18 +27,18 @@ public static partial class EventExtensions -internal sealed class ElementAtAsync(Event source, int index, bool useDefaultValue, TMessage? defaultValue, CancellationToken cancellationToken) - : TaskSubscriberBase(cancellationToken) +internal sealed class ElementAtAsync(int index, bool useDefaultValue, T? defaultValue, CancellationToken cancellationToken) + : TaskSubscriberBase(cancellationToken) { int count = 0; bool hasValue; - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { hasValue = true; if (count++ == index) { - TrySetResult(message); + TrySetResult(value); } } @@ -47,22 +47,28 @@ protected override void OnErrorResumeCore(Exception error) TrySetException(error); } - protected override void OnCompletedCore(TComplete complete) + protected override void OnCompletedCore(Result result) { - throw new NotImplementedException(); + if (result.IsFailure) + { + // TODO:... + } + else + { + // TODO:... + } } } // Index.IsFromEnd -internal sealed class ElementAtFromEndAsync(Event source, int fromEndIndex, bool useDefaultValue, TMessage? defaultValue, CancellationToken cancellationToken) - : TaskSubscriberBase(cancellationToken) +internal sealed class ElementAtFromEndAsync(int fromEndIndex, bool useDefaultValue, T? defaultValue, CancellationToken cancellationToken) + : TaskSubscriberBase(cancellationToken) { - int count = 0; bool hasValue; - Queue queue = new Queue(fromEndIndex); + Queue queue = new Queue(fromEndIndex); - protected override void OnNextCore(TMessage message) + protected override void OnNextCore(T value) { hasValue = true; if (queue.Count == fromEndIndex) @@ -70,7 +76,7 @@ protected override void OnNextCore(TMessage message) queue.Dequeue(); } - queue.Enqueue(message); + queue.Enqueue(value); } protected override void OnErrorResumeCore(Exception error) @@ -78,12 +84,12 @@ protected override void OnErrorResumeCore(Exception error) TrySetException(error); } - protected override void OnCompletedCore(TComplete complete) + protected override void OnCompletedCore(Result result) { if (queue.Count == fromEndIndex) { - var result = queue.Dequeue(); - TrySetResult(result); + var value = queue.Dequeue(); + TrySetResult(value); return; } diff --git a/src/R3/Publisher.cs b/src/R3/Publisher.cs index 6db4dc61..f2b0a16d 100644 --- a/src/R3/Publisher.cs +++ b/src/R3/Publisher.cs @@ -2,16 +2,16 @@ namespace R3; -public interface IEventPublisher +public interface IEventPublisher { - void PublishOnNext(TMessage message); - void PublishOnCompleted(TComplete complete); + void PublishOnNext(T value); + void PublishOnCompleted(Result complete); } -public sealed class Publisher : Event, IEventPublisher, IDisposable +public sealed class Publisher : Event, IEventPublisher, IDisposable { int calledCompleted = 0; - TComplete? completeValue; + Result? completeValue; FreeListCore<_CompletablePublisher> list; readonly object completedLock = new object(); @@ -20,7 +20,7 @@ public Publisher() list = new FreeListCore<_CompletablePublisher>(this); } - public void PublishOnNext(TMessage message) + public void PublishOnNext(T value) { if (list.IsDisposed) ThrowDisposed(); if (Volatile.Read(ref calledCompleted) != 0) return; @@ -29,7 +29,7 @@ public void PublishOnNext(TMessage message) { if (subscriber != null) { - subscriber.OnNext(message); + subscriber.OnNext(value); } } } @@ -48,7 +48,7 @@ public void PublishOnErrorResume(Exception error) } } - public void PublishOnCompleted(TComplete complete) + public void PublishOnCompleted(Result complete) { if (list.IsDisposed) ThrowDisposed(); if (Volatile.Read(ref calledCompleted) != 0) return; @@ -69,7 +69,7 @@ public void PublishOnCompleted(TComplete complete) } } - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { if (list.IsDisposed) ThrowDisposed(); @@ -103,14 +103,14 @@ static void ThrowDisposed() throw new ObjectDisposedException("CompletablePublisher"); } - sealed class _CompletablePublisher(Publisher? parent, Subscriber subscriber) : IDisposable + sealed class _CompletablePublisher(Publisher? parent, Subscriber subscriber) : IDisposable { public int removeKey; [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void OnNext(TMessage message) + public void OnNext(T value) { - subscriber.OnNext(message); + subscriber.OnNext(value); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -120,7 +120,7 @@ public void OnErrorResume(Exception error) } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void OnCompleted(TComplete complete) + public void OnCompleted(Result complete) { subscriber.OnCompleted(complete); } diff --git a/src/R3/PublisherExtensions.cs b/src/R3/PublisherExtensions.cs index d45d1c9a..d7d13228 100644 --- a/src/R3/PublisherExtensions.cs +++ b/src/R3/PublisherExtensions.cs @@ -2,7 +2,7 @@ public static class PublisherExtensions { - public static void PublishOnCompleted(this Publisher publisher) + public static void PublishOnCompleted(this Publisher publisher) { publisher.PublishOnCompleted(default); } diff --git a/src/R3/ReactiveProperty.cs b/src/R3/ReactiveProperty.cs index 61ac1006..14de0c7d 100644 --- a/src/R3/ReactiveProperty.cs +++ b/src/R3/ReactiveProperty.cs @@ -3,7 +3,7 @@ namespace R3; -public abstract class ReadOnlyReactiveProperty : Event +public abstract class ReadOnlyReactiveProperty : Event { public abstract T CurrentValue { get; } } @@ -60,7 +60,7 @@ public void PublishOnErrorResume(Exception error) } } - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { var value = this.value; ObjectDisposedException.ThrowIf(list.IsDisposed, this); @@ -89,14 +89,14 @@ public void Dispose() return (value == null) ? "(null)" : value.ToString(); } - sealed class Subscription(ReactiveProperty? parent, Subscriber subscriber) : IDisposable + sealed class Subscription(ReactiveProperty? parent, Subscriber subscriber) : IDisposable { public int removeKey; [MethodImpl(MethodImplOptions.AggressiveInlining)] public void OnNext(T message) { - subscriber.OnNext(message); + subscriber.OnNext(value); } [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/src/R3/Result.cs b/src/R3/Result.cs index ed1c6905..4f29951e 100644 --- a/src/R3/Result.cs +++ b/src/R3/Result.cs @@ -3,52 +3,23 @@ namespace R3; -public static class Result +// default is Succeeded +public readonly struct Result { - public static Result Success(T value) => new(value, null); - public static Result Failure(Exception exception) => new(default, exception); - public static Result Failure(Exception exception) => new(default, exception); -} + public static Result Success => default; + public static Result Failure(Exception exception) => new(exception); -public readonly struct Result(T? value, Exception? exception) -{ - public readonly T? Value = value; - public readonly Exception? Exception = exception; + public Exception? Exception { get; } [MemberNotNullWhen(false, nameof(Exception))] - [MemberNotNullWhen(true, nameof(Value))] public bool IsSuccess => Exception == null; [MemberNotNullWhen(true, nameof(Exception))] - [MemberNotNullWhen(false, nameof(Value))] public bool IsFailure => Exception != null; - public bool TryGetValue([NotNullWhen(true)] out T? value) - { - if (IsSuccess) - { - value = Value; - return true; - } - else - { - value = default; - return false; - } - } - - public bool TryGetException([NotNullWhen(true)] out Exception? exception) + public Result(Exception? exception) { - if (IsFailure) - { - exception = Exception; - return true; - } - else - { - exception = default; - return false; - } + this.Exception = exception; } public void TryThrow() @@ -59,17 +30,11 @@ public void TryThrow() } } - public void Deconstruct(out T? value, out Exception? exception) - { - exception = Exception; - value = Value; - } - public override string ToString() { if (IsSuccess) { - return $"Success{{{Value}}}"; + return $"Success"; } else { diff --git a/src/R3/SubscriberExtensions.cs b/src/R3/SubscriberExtensions.cs index 95b542eb..b3930af5 100644 --- a/src/R3/SubscriberExtensions.cs +++ b/src/R3/SubscriberExtensions.cs @@ -2,48 +2,23 @@ public static class SubscriberExtensions { - public static void OnCompleted(this Subscriber subscriber) + public static void OnCompleted(this Subscriber subscriber) { - subscriber.OnCompleted(default); + subscriber.OnCompleted(Result.Success); } - public static IObserver ToObserver(this Subscriber subscriber) + public static void OnCompleted(this Subscriber subscriber, Exception exception) { - return new SubscriberToObserver(subscriber); + subscriber.OnCompleted(Result.Failure(exception)); } - public static IObserver ToObserver(this Subscriber> subscriber) + public static IObserver ToObserver(this Subscriber subscriber) { - return new SubscriberToObserverR(subscriber); - } -} - -internal sealed class SubscriberToObserver(Subscriber subscriber) : IObserver -{ - public void OnNext(T value) - { - subscriber.OnNext(value); - } - - public void OnError(Exception error) - { - try - { - subscriber.OnErrorResume(error); - } - finally - { - subscriber.Dispose(); - } - } - - public void OnCompleted() - { - subscriber.OnCompleted(default); + return new SubscriberToObserver(subscriber); } } -internal sealed class SubscriberToObserverR(Subscriber> subscriber) : IObserver +internal sealed class SubscriberToObserver(Subscriber subscriber) : IObserver { public void OnNext(T value) { @@ -52,11 +27,11 @@ public void OnNext(T value) public void OnError(Exception error) { - subscriber.OnCompleted(Result.Failure(error)); + subscriber.OnCompleted(error); } public void OnCompleted() { - subscriber.OnCompleted(Result.Success(Unit.Default)); + subscriber.OnCompleted(); } } diff --git a/src/R3/Unit.cs b/src/R3/Unit.cs index b70b0d1a..21f9eac2 100644 --- a/src/R3/Unit.cs +++ b/src/R3/Unit.cs @@ -4,12 +4,12 @@ { public static readonly Unit Default = default; - public static bool operator ==(Unit first, Unit second) + public static bool operator ==(Unit first second) { return true; } - public static bool operator !=(Unit first, Unit second) + public static bool operator !=(Unit first second) { return false; } @@ -32,4 +32,4 @@ public override string ToString() { return "()"; } -} \ No newline at end of file +} diff --git a/tests/R3.Tests/FactoryTests/EmptyTest.cs b/tests/R3.Tests/FactoryTests/EmptyTest.cs index 36f41e96..5b6476ed 100644 --- a/tests/R3.Tests/FactoryTests/EmptyTest.cs +++ b/tests/R3.Tests/FactoryTests/EmptyTest.cs @@ -18,7 +18,7 @@ public void EmptyWithTime() using var list = Event.Empty(TimeSpan.FromSeconds(5), fakeTime).ToLiveList(); fakeTime.Advance(TimeSpan.FromSeconds(4)); - list.AssertIsNotCompleted(); + list.AssertIsNoResultd(); fakeTime.Advance(TimeSpan.FromSeconds(1)); list.AssertIsCompleted(); diff --git a/tests/R3.Tests/FactoryTests/NeverTest.cs b/tests/R3.Tests/FactoryTests/NeverTest.cs index a348ab83..e14091fc 100644 --- a/tests/R3.Tests/FactoryTests/NeverTest.cs +++ b/tests/R3.Tests/FactoryTests/NeverTest.cs @@ -5,7 +5,7 @@ public class NeverTest [Fact] public void Never() { - using var list = Event.Never().ToLiveList(); + using var list = Event.Never().ToLiveList(); list.AssertEqual([]); } @@ -14,6 +14,6 @@ public void Never() public void NeverComplete() { using var list = Event.Never().ToLiveList(); - list.AssertIsNotCompleted(); + list.AssertIsNoResultd(); } } diff --git a/tests/R3.Tests/FactoryTests/ReturnOnCompletedTest.cs b/tests/R3.Tests/FactoryTests/ReturnOnCompletedTest.cs index 8210d57b..4a281bd6 100644 --- a/tests/R3.Tests/FactoryTests/ReturnOnCompletedTest.cs +++ b/tests/R3.Tests/FactoryTests/ReturnOnCompletedTest.cs @@ -11,7 +11,7 @@ public void ReturnOnCompleted() using var list = Event.ReturnOnCompleted("foo").ToLiveList(); list.AssertEqual([]); list.AssertIsCompleted(); - list.AssertCompletedValue("foo"); + list.AsserResultdValue("foo"); } { var fakeTime = new FakeTimeProvider(); @@ -20,12 +20,12 @@ public void ReturnOnCompleted() fakeTime.Advance(TimeSpan.FromSeconds(4)); list.AssertEqual([]); - list.AssertIsNotCompleted(); + list.AssertIsNoResultd(); fakeTime.Advance(TimeSpan.FromSeconds(1)); list.AssertEqual([]); list.AssertIsCompleted(); - list.AssertCompletedValue("foo"); + list.AsserResultdValue("foo"); } } } diff --git a/tests/R3.Tests/FactoryTests/ReturnTest.cs b/tests/R3.Tests/FactoryTests/ReturnTest.cs index 71cd52a5..9f1e1bee 100644 --- a/tests/R3.Tests/FactoryTests/ReturnTest.cs +++ b/tests/R3.Tests/FactoryTests/ReturnTest.cs @@ -27,7 +27,7 @@ public void Return() fakeTime.Advance(TimeSpan.FromSeconds(4)); list.AssertEqual([]); - list.AssertIsNotCompleted(); + list.AssertIsNoResultd(); fakeTime.Advance(TimeSpan.FromSeconds(1)); list.AssertEqual([10]); @@ -54,7 +54,7 @@ public void ReturnOnCompleted() using var list = Event.Return(0, "foo").ToLiveList(); list.AssertEqual([0]); list.AssertIsCompleted(); - list.AssertCompletedValue("foo"); + list.AsserResultdValue("foo"); } { var fakeTime = new FakeTimeProvider(); @@ -64,12 +64,12 @@ public void ReturnOnCompleted() fakeTime.Advance(TimeSpan.FromSeconds(4)); list.AssertEqual([]); - list.AssertIsNotCompleted(); + list.AssertIsNoResultd(); fakeTime.Advance(TimeSpan.FromSeconds(1)); list.AssertEqual([10]); list.AssertIsCompleted(); - list.AssertCompletedValue("foo"); + list.AsserResultdValue("foo"); } } } diff --git a/tests/R3.Tests/FactoryTests/ThrowTest.cs b/tests/R3.Tests/FactoryTests/ThrowTest.cs index 5800870f..1c481a3c 100644 --- a/tests/R3.Tests/FactoryTests/ThrowTest.cs +++ b/tests/R3.Tests/FactoryTests/ThrowTest.cs @@ -21,7 +21,7 @@ public void Throw() fakeTime.Advance(TimeSpan.FromSeconds(4)); list.AssertEqual([]); - list.AssertIsNotCompleted(); + list.AssertIsNoResultd(); fakeTime.Advance(TimeSpan.FromSeconds(1)); list.AssertEqual([]); diff --git a/tests/R3.Tests/LiveListTest.cs b/tests/R3.Tests/LiveListTest.cs index fb9f5849..16363cc2 100644 --- a/tests/R3.Tests/LiveListTest.cs +++ b/tests/R3.Tests/LiveListTest.cs @@ -5,7 +5,7 @@ public class LiveListTest [Fact] public void FromEvent() { - var publisher = new Publisher(); + var publisher = new Publisher(); var list = publisher.ToLiveList(); list.AssertEqual([]); @@ -28,7 +28,7 @@ public void FromEvent() [Fact] public void BufferSize() { - var publisher = new Publisher(); + var publisher = new Publisher(); var list = publisher.ToLiveList(bufferSize: 5); publisher.PublishOnNext(10); diff --git a/tests/R3.Tests/OperatorTests/AggregateTest.cs b/tests/R3.Tests/OperatorTests/AggregateTest.cs index e1d5c0a2..c6c6a710 100644 --- a/tests/R3.Tests/OperatorTests/AggregateTest.cs +++ b/tests/R3.Tests/OperatorTests/AggregateTest.cs @@ -7,7 +7,7 @@ public class AggregateTest [Fact] public async Task Aggreagte() { - var publisher = new Publisher(); + var publisher = new Publisher(); var listTask = publisher.AggregateAsync(new List(), (x, i) => { x.Add(i); return x; }, (x, _) => x); @@ -38,7 +38,7 @@ public async Task BeforeCanceled() var cts = new CancellationTokenSource(); cts.Cancel(); - var publisher = new Publisher(); + var publisher = new Publisher(); var isDisposed = false; var listTask = publisher diff --git a/tests/R3.Tests/OperatorTests/FirstLastSingleTest.cs b/tests/R3.Tests/OperatorTests/FirstLastSingleTest.cs index d7ba2ec9..8d771e49 100644 --- a/tests/R3.Tests/OperatorTests/FirstLastSingleTest.cs +++ b/tests/R3.Tests/OperatorTests/FirstLastSingleTest.cs @@ -13,7 +13,7 @@ public class FirstLastSingleTest [Fact] public async Task First2() { - var publisher = new Publisher(); + var publisher = new Publisher(); var task = publisher.FirstAsync(); publisher.PublishOnNext(10); (await task).Should().Be(10); @@ -29,7 +29,7 @@ public async Task First2() publisher.PublishOnCompleted(default); await Assert.ThrowsAsync(async () => await task3); - publisher = new Publisher(); + publisher = new Publisher(); var task4 = publisher.FirstAsync(x => x % 3 == 0); publisher.PublishOnNext(5); task4.Status.Should().NotBe(TaskStatus.RanToCompletion); @@ -41,7 +41,7 @@ public async Task First2() [Fact] public async Task FirstOrDefault() { - var publisher = new Publisher(); + var publisher = new Publisher(); var task = publisher.FirstOrDefaultAsync(); publisher.PublishOnNext(10); (await task).Should().Be(10); @@ -57,7 +57,7 @@ public async Task FirstOrDefault() publisher.PublishOnCompleted(default); (await task3).Should().Be(9999); - publisher = new Publisher(); + publisher = new Publisher(); var task4 = publisher.FirstOrDefaultAsync(x => x % 3 == 0); publisher.PublishOnNext(5); task4.Status.Should().NotBe(TaskStatus.RanToCompletion); @@ -69,13 +69,13 @@ public async Task FirstOrDefault() [Fact] public async Task LastAsync() { - var publisher = new Publisher(); + var publisher = new Publisher(); var task = publisher.LastAsync(); publisher.PublishOnNext(10); publisher.PublishOnCompleted(default); (await task).Should().Be(10); - publisher = new Publisher(); + publisher = new Publisher(); var task2 = publisher.LastAsync(); publisher.PublishOnNext(15); publisher.PublishOnNext(25); @@ -83,13 +83,13 @@ public async Task LastAsync() (await task2).Should().Be(25); - publisher = new Publisher(); + publisher = new Publisher(); var task3 = publisher.LastAsync(); publisher.PublishOnCompleted(default); await Assert.ThrowsAsync(async () => await task3); - publisher = new Publisher(); + publisher = new Publisher(); var task4 = publisher.LastAsync(x => x % 3 == 0); publisher.PublishOnNext(5); task4.Status.Should().NotBe(TaskStatus.RanToCompletion); @@ -103,13 +103,13 @@ public async Task LastAsync() [Fact] public async Task LastOrDefaultAsync() { - var publisher = new Publisher(); + var publisher = new Publisher(); var task = publisher.LastOrDefaultAsync(); publisher.PublishOnNext(10); publisher.PublishOnCompleted(default); (await task).Should().Be(10); - publisher = new Publisher(); + publisher = new Publisher(); var task2 = publisher.LastOrDefaultAsync(); publisher.PublishOnNext(15); publisher.PublishOnNext(25); @@ -117,13 +117,13 @@ public async Task LastOrDefaultAsync() (await task2).Should().Be(25); - publisher = new Publisher(); + publisher = new Publisher(); var task3 = publisher.LastOrDefaultAsync(9999); publisher.PublishOnCompleted(default); (await task3).Should().Be(9999); - publisher = new Publisher(); + publisher = new Publisher(); var task4 = publisher.LastOrDefaultAsync(x => x % 3 == 0); publisher.PublishOnNext(5); task4.Status.Should().NotBe(TaskStatus.RanToCompletion); @@ -137,25 +137,25 @@ public async Task LastOrDefaultAsync() [Fact] public async Task Single() { - var publisher = new Publisher(); + var publisher = new Publisher(); var task = publisher.SingleAsync(); publisher.PublishOnNext(10); publisher.PublishOnCompleted(); (await task).Should().Be(10); - publisher = new Publisher(); + publisher = new Publisher(); var task2 = publisher.SingleAsync(); publisher.PublishOnNext(15); publisher.PublishOnNext(25); await Assert.ThrowsAsync(async () => await task2); - publisher = new Publisher(); + publisher = new Publisher(); var task3 = publisher.SingleAsync(); publisher.PublishOnCompleted(default); await Assert.ThrowsAsync(async () => await task3); - publisher = new Publisher(); + publisher = new Publisher(); var task4 = publisher.SingleAsync(x => x % 3 == 0); publisher.PublishOnNext(5); task4.Status.Should().NotBe(TaskStatus.RanToCompletion); @@ -168,25 +168,25 @@ public async Task Single() [Fact] public async Task SingleOrDefault() { - var publisher = new Publisher(); + var publisher = new Publisher(); var task = publisher.SingleOrDefaultAsync(9999); publisher.PublishOnNext(10); publisher.PublishOnCompleted(); (await task).Should().Be(10); - publisher = new Publisher(); + publisher = new Publisher(); var task2 = publisher.SingleOrDefaultAsync(9999); publisher.PublishOnNext(15); publisher.PublishOnNext(25); await Assert.ThrowsAsync(async () => await task2); - publisher = new Publisher(); + publisher = new Publisher(); var task3 = publisher.SingleOrDefaultAsync(9999); publisher.PublishOnCompleted(default); (await task3).Should().Be(9999); - publisher = new Publisher(); + publisher = new Publisher(); var task4 = publisher.SingleOrDefaultAsync(x => x % 3 == 0); publisher.PublishOnNext(5); task4.Status.Should().NotBe(TaskStatus.RanToCompletion); @@ -199,7 +199,7 @@ public async Task SingleOrDefault() [Fact] public async Task ErrorStream() { - var publisher = new Publisher>(); + var publisher = new Publisher(); var task = publisher.LastAsync(); publisher.PublishOnNext(10); diff --git a/tests/R3.Tests/OperatorTests/ToListTest.cs b/tests/R3.Tests/OperatorTests/ToListTest.cs index 4fe97194..4b42b5a8 100644 --- a/tests/R3.Tests/OperatorTests/ToListTest.cs +++ b/tests/R3.Tests/OperatorTests/ToListTest.cs @@ -5,7 +5,7 @@ public class ToListTest [Fact] public async Task ToList() { - var publisher = new Publisher(); + var publisher = new Publisher(); var listTask = publisher.ToListAsync(); @@ -25,7 +25,7 @@ public async Task ToList() [Fact] public async Task ResultCompletableFault() { - var publisher = new Publisher>(); + var publisher = new Publisher(); var listTask = publisher.ToListAsync(); @@ -37,7 +37,7 @@ public async Task ResultCompletableFault() listTask.Status.Should().Be(TaskStatus.WaitingForActivation); - publisher.PublishOnCompleted(Result.Failure(new Exception("foo"))); + publisher.PublishOnCompleted(Result.Failure(new Exception("foo"))); await Assert.ThrowsAsync(async () => await listTask); } @@ -48,7 +48,7 @@ public async Task ResultCompletableCancel() var cts = new CancellationTokenSource(); var isDisposed = false; - var publisher = new Publisher>(); + var publisher = new Publisher(); var listTask = publisher.DoOnDisposed(() => isDisposed = true).ToListAsync(cts.Token); @@ -71,7 +71,7 @@ public async Task ResultCompletableCancel() [Fact] public async Task ToArray() { - var publisher = new Publisher(); + var publisher = new Publisher(); var listTask = publisher.ToArrayAsync(); @@ -91,7 +91,7 @@ public async Task ToArray() [Fact] public async Task ToArray2() { - var publisher = new Publisher>(); + var publisher = new Publisher(); var listTask = publisher.ToArrayAsync(); @@ -103,7 +103,7 @@ public async Task ToArray2() listTask.Status.Should().Be(TaskStatus.WaitingForActivation); - publisher.PublishOnCompleted(Result.Failure(new Exception("foo"))); + publisher.PublishOnCompleted(Result.Failure(new Exception("foo"))); await Assert.ThrowsAsync(async () => await listTask); } diff --git a/tests/R3.Tests/OperatorTests/WhereTest.cs b/tests/R3.Tests/OperatorTests/WhereTest.cs index 73ed5050..4d537b21 100644 --- a/tests/R3.Tests/OperatorTests/WhereTest.cs +++ b/tests/R3.Tests/OperatorTests/WhereTest.cs @@ -6,7 +6,7 @@ public class WhereTest(ITestOutputHelper output) [Fact] public void WhereWhere() { - var p = new Publisher(); + var p = new Publisher(); using var list = p.Where(x => x % 2 != 0).Where(x => x % 3 != 0).ToLiveList(); @@ -34,7 +34,7 @@ public void WhereWhere() [Fact] public void WhereCompletable() { - var p = new Publisher(); + var p = new Publisher(); using var list = p.Where(x => x % 2 != 0).ToLiveList(); @@ -50,7 +50,7 @@ public void WhereCompletable() p.PublishOnNext(30); list.AssertEqual([1, 3]); - list.AssertIsNotCompleted(); + list.AssertIsNoResultd(); p.PublishOnCompleted(default); @@ -62,7 +62,7 @@ public void WhereCompletable() [Fact] public void WhereCompletableIndexed() { - var p = new Publisher(); + var p = new Publisher(); using var list = p.Where((x, i) => i % 2 != 0).ToLiveList(); @@ -84,7 +84,7 @@ public void WhereCompletableIndexed() p.PublishOnNext(8); list.AssertEqual([1, 5, 8]); - list.AssertIsNotCompleted(); + list.AssertIsNoResultd(); p.PublishOnCompleted(default); diff --git a/tests/R3.Tests/_TestHelper.cs b/tests/R3.Tests/_TestHelper.cs index a8f73a8d..c554ebc6 100644 --- a/tests/R3.Tests/_TestHelper.cs +++ b/tests/R3.Tests/_TestHelper.cs @@ -15,12 +15,12 @@ public static void AssertIsCompleted(this LiveList list) list.IsCompleted.Should().BeTrue(); } - public static void AssertIsNotCompleted(this LiveList list) + public static void AssertIsNoResultd(this LiveList list) { list.IsCompleted.Should().BeFalse(); } - public static void AssertCompletedValue(this LiveList list, TC value) + public static void AsserResultdValue(this LiveList list, TC value) { list.CompletedValue.Should().Be(value); }