From 31793f05cc306c40c0b1a089d4406eb28e21b52b Mon Sep 17 00:00:00 2001 From: neuecc Date: Wed, 13 Dec 2023 18:14:59 +0900 Subject: [PATCH] CompletableEvent is dead, all Event --- sandbox/ConsoleApp1/LiveList.cs | 99 ------- sandbox/ConsoleApp1/Program.cs | 9 +- src/R3/Event.cs | 96 +------ src/R3/EventSubscribeExtensions.cs | 147 ++++++++++ src/R3/Factories/Empty.cs | 8 +- src/R3/Factories/Never.cs | 33 +-- src/R3/Factories/Range.cs | 8 +- src/R3/Factories/Repeat.cs | 8 +- src/R3/Factories/Return.cs | 18 +- src/R3/Factories/ReturnOnCompleted.cs | 12 +- src/R3/Factories/Throw.cs | 12 +- src/R3/Factories/Timer.cs | 4 +- src/R3/Factories/ToCompletableEvent.cs | 4 +- src/R3/Factories/ToEvent.cs | 4 +- src/R3/Factories/_EventFactory.cs | 60 ++-- src/R3/Internal/Stubs.cs | 7 + src/R3/Internal/TaskSubscriberBase.cs | 57 ---- src/R3/LiveList.cs | 155 +--------- src/R3/Operators/AggregateAsync.cs | 4 +- src/R3/Operators/AggregateOperators.cs | 48 ++-- src/R3/Operators/CombineLatest.cs | 112 +------- src/R3/Operators/Delay.cs | 270 +++++++++--------- src/R3/Operators/DelayFrame.cs | 250 ++++++++-------- src/R3/Operators/DoOnCompleted.cs | 4 +- src/R3/Operators/DoOnDisposed.cs | 84 +----- src/R3/Operators/FirstLastSingle.cs | 81 ++---- src/R3/Operators/OnErrorAsComplete.cs | 30 +- src/R3/Operators/Select.cs | 39 +-- src/R3/Operators/Take.cs | 44 +-- src/R3/Operators/ToObservable.cs | 59 +++- src/R3/Operators/Where.cs | 74 +---- src/R3/Operators/_Operators.cs | 23 +- src/R3/Publisher.cs | 99 +------ src/R3/PublisherExtensions.cs | 2 +- src/R3/ReactiveProperty.cs | 11 +- src/R3/SubscribeExtensions.cs | 98 ------- src/R3/SubscriberExtensions.cs | 110 +++---- tests/R3.Tests/FactoryTests/NeverTest.cs | 4 +- tests/R3.Tests/LiveListTest.cs | 4 +- tests/R3.Tests/OperatorTests/AggregateTest.cs | 6 +- .../OperatorTests/FirstLastSingleTest.cs | 70 ++--- tests/R3.Tests/OperatorTests/ToListTest.cs | 10 +- tests/R3.Tests/OperatorTests/WhereTest.cs | 52 +--- tests/R3.Tests/_TestHelper.cs | 5 - 44 files changed, 727 insertions(+), 1607 deletions(-) delete mode 100644 sandbox/ConsoleApp1/LiveList.cs create mode 100644 src/R3/EventSubscribeExtensions.cs create mode 100644 src/R3/Internal/Stubs.cs delete mode 100644 src/R3/SubscribeExtensions.cs diff --git a/sandbox/ConsoleApp1/LiveList.cs b/sandbox/ConsoleApp1/LiveList.cs deleted file mode 100644 index 03c04e6f..00000000 --- a/sandbox/ConsoleApp1/LiveList.cs +++ /dev/null @@ -1,99 +0,0 @@ -using R3; -using System.Collections; -using System.Runtime.InteropServices; - -public sealed class LiveList : IReadOnlyList, IDisposable -{ - readonly List list = new List(); - readonly IDisposable sourceSubscription; - - public LiveList(Event source) - { - sourceSubscription = source.Subscribe(new ListSubscriber(list)); - } - - public T this[int index] - { - get - { - lock (list) - { - return list[index]; - } - } - } - - public int Count - { - get - { - lock (list) - { - return list.Count; - } - } - } - - public void Dispose() - { - sourceSubscription.Dispose(); - } - - public void ForEach(Action action) - { - lock (list) - { - var span = CollectionsMarshal.AsSpan(list); - foreach (ref var item in span) - { - action(item); - } - } - } - - public void ForEach(Action action, TState state) - { - lock (list) - { - var span = CollectionsMarshal.AsSpan(list); - foreach (ref var item in span) - { - action(item, state); - } - } - } - - public IEnumerator GetEnumerator() - { - lock (list) - { - // snapshot - return CollectionsMarshal.AsSpan(list).ToArray().AsEnumerable().GetEnumerator(); - } - } - - IEnumerator IEnumerable.GetEnumerator() - { - lock (list) - { - // snapshot - return CollectionsMarshal.AsSpan(list).ToArray().AsEnumerable().GetEnumerator(); - } - } - - sealed class ListSubscriber(List list) : Subscriber - { - protected override void OnNextCore(T message) - { - lock (list) - { - list.Add(message); - } - } - - protected override void OnErrorResumeCore(Exception error) - { - EventSystem.GetUnhandledExceptionHandler().Invoke(error); - } - } -} diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index c2d31425..bb1ed462 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -51,7 +51,7 @@ -var publisher = new Publisher(); +var publisher = new Publisher(); var d = publisher .Where(x => true) @@ -193,12 +193,7 @@ public static class Extensions { - public static IDisposable WriteLine(this Event source) - { - return source.Subscribe(x => Console.WriteLine(x)); - } - - public static IDisposable WriteLine(this CompletableEvent source) + public static IDisposable WriteLine(this Event source) { return source.Subscribe(x => Console.WriteLine(x), _ => Console.WriteLine("COMPLETED")); } diff --git a/src/R3/Event.cs b/src/R3/Event.cs index b344eb64..6f946da7 100644 --- a/src/R3/Event.cs +++ b/src/R3/Event.cs @@ -1,102 +1,10 @@ -#pragma warning disable CS0618 +#pragma warning disable CS0618 using System.Diagnostics; namespace R3; -// similar as IObservable -// IDisposable Subscribe(Subscriber subscriber) -public abstract class Event -{ - [StackTraceHidden, DebuggerStepThrough] - public IDisposable Subscribe(Subscriber subscriber) - { - try - { - var subscription = SubscribeCore(subscriber); - - if (SubscriptionTracker.TryTrackActiveSubscription(subscription, 2, out var trackableDisposable)) - { - subscription = trackableDisposable; - } - - subscriber.SourceSubscription.Disposable = subscription; - return subscriber; // return subscriber to make subscription chain. - } - catch - { - subscriber.Dispose(); // when SubscribeCore failed, auto detach caller subscriber - throw; - } - } - - [StackTraceHidden, DebuggerStepThrough] - protected abstract IDisposable SubscribeCore(Subscriber subscriber); -} - -// similar as IObserver but no stop on OnError. -public abstract class Subscriber : IDisposable -{ -#if DEBUG - [Obsolete("Only allow in Event.")] -#endif - internal SingleAssignmentDisposableCore SourceSubscription; - - int calledDispose; - - public bool IsDisposed => Volatile.Read(ref calledDispose) != 0; - - [StackTraceHidden, DebuggerStepThrough] - public void OnNext(TMessage message) - { - if (IsDisposed) return; - try - { - OnNextCore(message); - } - catch (Exception ex) - { - OnErrorResume(ex); - } - } - - protected abstract void OnNextCore(TMessage message); - - [StackTraceHidden, DebuggerStepThrough] - public void OnErrorResume(Exception error) - { - if (IsDisposed) return; - - try - { - OnErrorResumeCore(error); - } - catch (Exception ex) - { - EventSystem.GetUnhandledExceptionHandler().Invoke(ex); - } - } - - protected abstract void OnErrorResumeCore(Exception error); - - [StackTraceHidden, DebuggerStepThrough] - public void Dispose() - { - if (Interlocked.Exchange(ref calledDispose, 1) != 0) - { - return; - } - - DisposeCore(); // Dispose self - SourceSubscription.Dispose(); // Dispose attached parent - } - - [StackTraceHidden, DebuggerStepThrough] - protected virtual void DisposeCore() { } -} - -// similar as IObservable -public abstract class CompletableEvent +public abstract class Event { [StackTraceHidden, DebuggerStepThrough] public IDisposable Subscribe(Subscriber subscriber) diff --git a/src/R3/EventSubscribeExtensions.cs b/src/R3/EventSubscribeExtensions.cs new file mode 100644 index 00000000..4ae5ed27 --- /dev/null +++ b/src/R3/EventSubscribeExtensions.cs @@ -0,0 +1,147 @@ +using System.Diagnostics; + +namespace R3; + +public static class EventSubscribeExtensions +{ + // TODO: with State + + [DebuggerStepThrough] + public static IDisposable Subscribe(this Event source) + { + return source.Subscribe(NopSubscriber.Instance); + } + + [DebuggerStepThrough] + public static IDisposable Subscribe(this Event> source) + { + return source.Subscribe(NopRSubscriber.Instance); + } + + [DebuggerStepThrough] + public static IDisposable Subscribe(this Event source, Action onNext) + { + return source.Subscribe(new AnonymousSubscriber(onNext, EventSystem.GetUnhandledExceptionHandler(), Stubs.Nop)); + } + + [DebuggerStepThrough] + public static IDisposable Subscribe(this Event> source, Action onNext) + { + return source.Subscribe(new AnonymousRSubscriber(onNext, EventSystem.GetUnhandledExceptionHandler())); + } + + [DebuggerStepThrough] + public static IDisposable Subscribe(this Event source, Action onNext, Action onComplete) + { + return source.Subscribe(new AnonymousSubscriber(onNext, EventSystem.GetUnhandledExceptionHandler(), onComplete)); + } + + [DebuggerStepThrough] + public static IDisposable Subscribe(this Event source, Action onNext, Action onErrorResume, Action onComplete) + { + return source.Subscribe(new AnonymousSubscriber(onNext, onErrorResume, onComplete)); + } +} + +[DebuggerStepThrough] +internal sealed class NopSubscriber : Subscriber +{ + public static readonly NopSubscriber Instance = new(); + + private NopSubscriber() + { + } + + [DebuggerStepThrough] + protected override void OnNextCore(TMessage message) + { + } + + [DebuggerStepThrough] + protected override void OnErrorResumeCore(Exception error) + { + EventSystem.GetUnhandledExceptionHandler().Invoke(error); + } + + [DebuggerStepThrough] + protected override void OnCompletedCore(TComplete complete) + { + } +} + +[DebuggerStepThrough] +internal sealed class NopRSubscriber : Subscriber> +{ + public static readonly NopRSubscriber Instance = new(); + + private NopRSubscriber() + { + } + + [DebuggerStepThrough] + protected override void OnNextCore(TMessage message) + { + } + + [DebuggerStepThrough] + protected override void OnErrorResumeCore(Exception error) + { + EventSystem.GetUnhandledExceptionHandler().Invoke(error); + } + + [DebuggerStepThrough] + protected override void OnCompletedCore(Result complete) + { + if (complete.IsFailure) + { + EventSystem.GetUnhandledExceptionHandler().Invoke(complete.Exception); + } + } +} + +[DebuggerStepThrough] +internal sealed class AnonymousSubscriber(Action onNext, Action onErrorResume, Action onComplete) : Subscriber +{ + [DebuggerStepThrough] + protected override void OnNextCore(TMessage message) + { + onNext(message); + } + + [DebuggerStepThrough] + protected override void OnErrorResumeCore(Exception error) + { + onErrorResume(error); + } + + [DebuggerStepThrough] + protected override void OnCompletedCore(TComplete complete) + { + onComplete(complete); + } +} + +[DebuggerStepThrough] +internal sealed class AnonymousRSubscriber(Action onNext, Action onErrorResume) : Subscriber> +{ + [DebuggerStepThrough] + protected override void OnNextCore(TMessage message) + { + onNext(message); + } + + [DebuggerStepThrough] + protected override void OnErrorResumeCore(Exception error) + { + onErrorResume(error); + } + + [DebuggerStepThrough] + protected override void OnCompletedCore(Result complete) + { + if (complete.IsFailure) + { + EventSystem.GetUnhandledExceptionHandler().Invoke(complete.Exception); + } + } +} diff --git a/src/R3/Factories/Empty.cs b/src/R3/Factories/Empty.cs index 441a196e..878eff8c 100644 --- a/src/R3/Factories/Empty.cs +++ b/src/R3/Factories/Empty.cs @@ -2,17 +2,17 @@ { public static partial class Event { - public static CompletableEvent Empty() + public static Event Empty() { return R3.Factories.Empty.Instance; } - public static CompletableEvent Empty(TimeProvider timeProvider) + public static Event Empty(TimeProvider timeProvider) { return ReturnOnCompleted(default, timeProvider); } - public static CompletableEvent Empty(TimeSpan dueTime, TimeProvider timeProvider) + public static Event Empty(TimeSpan dueTime, TimeProvider timeProvider) { return ReturnOnCompleted(default, dueTime, timeProvider); } @@ -21,7 +21,7 @@ public static CompletableEvent Empty(TimeSpan dueTime, namespace R3.Factories { - internal sealed class Empty : CompletableEvent + internal sealed class Empty : Event { // singleton public static readonly Empty Instance = new Empty(); diff --git a/src/R3/Factories/Never.cs b/src/R3/Factories/Never.cs index 70482f3b..466be29a 100644 --- a/src/R3/Factories/Never.cs +++ b/src/R3/Factories/Never.cs @@ -3,47 +3,22 @@ public static partial class Event { // Never - public static Event Never() + public static Event Never() { - return R3.Factories.Never.Instance; - } - - // NeverComplete - public static CompletableEvent NeverComplete() - { - return R3.Factories.NeverComplete.Instance; + return R3.Factories.Never.Instance; } } } namespace R3.Factories { - // Never - internal sealed class Never : Event + internal sealed class Never : Event { // singleton - public static readonly Never Instance = new Never(); + public static readonly Never Instance = new Never(); Never() { - - } - - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - return Disposable.Empty; - } - } - - - // NeverComplete - internal sealed class NeverComplete : CompletableEvent - { - // singleton - public static readonly NeverComplete Instance = new NeverComplete(); - - NeverComplete() - { } diff --git a/src/R3/Factories/Range.cs b/src/R3/Factories/Range.cs index 5c83b8f4..e09481f9 100644 --- a/src/R3/Factories/Range.cs +++ b/src/R3/Factories/Range.cs @@ -4,7 +4,7 @@ public static partial class Event { // no scheduler(TimeProvider) overload - public static CompletableEvent Range(int start, int count) + public static Event Range(int start, int count) { long max = ((long)start) + count - 1; if (count < 0 || max > int.MaxValue) @@ -20,7 +20,7 @@ public static CompletableEvent Range(int start, int count) return new R3.Factories.Range(start, count); } - public static CompletableEvent Range(int start, int count, CancellationToken cancellationToken) + public static Event Range(int start, int count, CancellationToken cancellationToken) { long max = ((long)start) + count - 1; if (count < 0 || max > int.MaxValue) @@ -40,7 +40,7 @@ public static CompletableEvent Range(int start, int count, Cancellati namespace R3.Factories { - internal sealed class Range(int start, int count) : CompletableEvent + internal sealed class Range(int start, int count) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { @@ -53,7 +53,7 @@ protected override IDisposable SubscribeCore(Subscriber subscriber) } } - internal sealed class RangeC(int start, int count, CancellationToken cancellationToken) : CompletableEvent + internal sealed class RangeC(int start, int count, CancellationToken cancellationToken) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { diff --git a/src/R3/Factories/Repeat.cs b/src/R3/Factories/Repeat.cs index 5163f698..daa90bad 100644 --- a/src/R3/Factories/Repeat.cs +++ b/src/R3/Factories/Repeat.cs @@ -5,7 +5,7 @@ public static partial class Event // no scheduler(TimeProvider) overload // no infinitely overload - public static CompletableEvent Repeat(TMessage value, int count) + public static Event Repeat(TMessage value, int count) { if (count < 0) { @@ -20,7 +20,7 @@ public static CompletableEvent Repeat(TMessage value, return new Repeat(value, count); } - public static CompletableEvent Repeat(TMessage value, int count, CancellationToken cancellationToken) + public static Event Repeat(TMessage value, int count, CancellationToken cancellationToken) { if (count < 0) { @@ -39,7 +39,7 @@ public static CompletableEvent Repeat(TMessage value, namespace R3.Factories { - internal sealed class Repeat(TMessage value, int count) : CompletableEvent + internal sealed class Repeat(TMessage value, int count) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { @@ -52,7 +52,7 @@ protected override IDisposable SubscribeCore(Subscriber subscrib } } - internal sealed class RepeatC(TMessage value, int count, CancellationToken cancellationToken) : CompletableEvent + internal sealed class RepeatC(TMessage value, int count, CancellationToken cancellationToken) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { diff --git a/src/R3/Factories/Return.cs b/src/R3/Factories/Return.cs index 92c3202f..5521d9d0 100644 --- a/src/R3/Factories/Return.cs +++ b/src/R3/Factories/Return.cs @@ -2,17 +2,17 @@ { public static partial class Event { - public static CompletableEvent Return(TMessage value) + public static Event Return(TMessage value) { return new ImmediateScheduleReturn(value, default); // immediate } - public static CompletableEvent Return(TMessage value, TimeProvider timeProvider) + public static Event Return(TMessage value, TimeProvider timeProvider) { return Return(value, TimeSpan.Zero, timeProvider); } - public static CompletableEvent Return(TMessage value, TimeSpan dueTime, TimeProvider timeProvider) + public static Event Return(TMessage value, TimeSpan dueTime, TimeProvider timeProvider) { if (dueTime == TimeSpan.Zero) { @@ -27,17 +27,17 @@ public static CompletableEvent Return(TMessage value, // OnCompleted - public static CompletableEvent Return(TMessage value, TComplete complete) + public static Event Return(TMessage value, TComplete complete) { return new ImmediateScheduleReturn(value, complete); // immediate } - public static CompletableEvent Return(TMessage value, TComplete complete, TimeProvider timeProvider) + public static Event Return(TMessage value, TComplete complete, TimeProvider timeProvider) { return Return(value, complete, TimeSpan.Zero, timeProvider); } - public static CompletableEvent Return(TMessage value, TComplete complete, TimeSpan dueTime, TimeProvider timeProvider) + public static Event Return(TMessage value, TComplete complete, TimeSpan dueTime, TimeProvider timeProvider) { if (dueTime == TimeSpan.Zero) { @@ -54,7 +54,7 @@ public static CompletableEvent Return( namespace R3.Factories { - internal class Return(TMessage value, TComplete complete, TimeSpan dueTime, TimeProvider timeProvider) : CompletableEvent + internal class Return(TMessage value, TComplete complete, TimeSpan dueTime, TimeProvider timeProvider) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { @@ -96,7 +96,7 @@ public void Dispose() } } - internal class ImmediateScheduleReturn(TMessage value, TComplete complete) : CompletableEvent + internal class ImmediateScheduleReturn(TMessage value, TComplete complete) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { @@ -106,7 +106,7 @@ protected override IDisposable SubscribeCore(Subscriber sub } } - internal class ThreadPoolScheduleReturn(TMessage value, TComplete complete, Action? unhandledExceptionHandler) : CompletableEvent + internal class ThreadPoolScheduleReturn(TMessage value, TComplete complete, Action? unhandledExceptionHandler) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { diff --git a/src/R3/Factories/ReturnOnCompleted.cs b/src/R3/Factories/ReturnOnCompleted.cs index 36292e85..610fe26a 100644 --- a/src/R3/Factories/ReturnOnCompleted.cs +++ b/src/R3/Factories/ReturnOnCompleted.cs @@ -4,17 +4,17 @@ public static partial class Event { // similar as Empty, only return OnCompleted - public static CompletableEvent ReturnOnCompleted(TComplete complete) + public static Event ReturnOnCompleted(TComplete complete) { return new ImmediateScheduleReturnOnCompleted(complete); // immediate } - public static CompletableEvent ReturnOnCompleted(TComplete complete, TimeProvider timeProvider) + public static Event ReturnOnCompleted(TComplete complete, TimeProvider timeProvider) { return ReturnOnCompleted(complete, TimeSpan.Zero, timeProvider); } - public static CompletableEvent ReturnOnCompleted(TComplete complete, TimeSpan dueTime, TimeProvider timeProvider) + public static Event ReturnOnCompleted(TComplete complete, TimeSpan dueTime, TimeProvider timeProvider) { if (dueTime == TimeSpan.Zero) { @@ -31,7 +31,7 @@ public static CompletableEvent ReturnOnCompleted(TComplete complete, TimeSpan dueTime, TimeProvider timeProvider) : CompletableEvent + internal class ReturnOnCompleted(TComplete complete, TimeSpan dueTime, TimeProvider timeProvider) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { @@ -71,7 +71,7 @@ public void Dispose() } } - internal class ImmediateScheduleReturnOnCompleted(TComplete complete) : CompletableEvent + internal class ImmediateScheduleReturnOnCompleted(TComplete complete) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { @@ -80,7 +80,7 @@ protected override IDisposable SubscribeCore(Subscriber sub } } - internal class ThreadPoolScheduleReturnOnCompleted(TComplete complete, Action? unhandledExceptionHandler) : CompletableEvent + internal class ThreadPoolScheduleReturnOnCompleted(TComplete complete, Action? unhandledExceptionHandler) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { diff --git a/src/R3/Factories/Throw.cs b/src/R3/Factories/Throw.cs index 5b99f312..701d7717 100644 --- a/src/R3/Factories/Throw.cs +++ b/src/R3/Factories/Throw.cs @@ -2,32 +2,32 @@ { public static partial class Event { - public static CompletableEvent> Throw(Exception exception) + public static Event> Throw(Exception exception) { return Throw(exception); } - public static CompletableEvent> Throw(Exception exception) + public static Event> Throw(Exception exception) { return ReturnOnCompleted>(Result.Failure(exception)); } - public static CompletableEvent> Throw(Exception exception, TimeProvider timeProvider) + public static Event> Throw(Exception exception, TimeProvider timeProvider) { return Throw(exception, timeProvider); } - public static CompletableEvent> Throw(Exception exception, TimeProvider timeProvider) + public static Event> Throw(Exception exception, TimeProvider timeProvider) { return ReturnOnCompleted>(Result.Failure(exception), timeProvider); } - public static CompletableEvent> Throw(Exception exception, TimeSpan dueTime, TimeProvider timeProvider) + public static Event> Throw(Exception exception, TimeSpan dueTime, TimeProvider timeProvider) { return Throw(exception, dueTime, timeProvider); } - public static CompletableEvent> Throw(Exception exception, TimeSpan dueTime, TimeProvider timeProvider) + public static Event> Throw(Exception exception, TimeSpan dueTime, TimeProvider timeProvider) { return ReturnOnCompleted>(Result.Failure(exception), dueTime, timeProvider); } diff --git a/src/R3/Factories/Timer.cs b/src/R3/Factories/Timer.cs index 71a87686..b431db59 100644 --- a/src/R3/Factories/Timer.cs +++ b/src/R3/Factories/Timer.cs @@ -2,7 +2,7 @@ { public static partial class Event { - public static CompletableEvent Timer(TimeSpan dueTime, TimeProvider timeProvider) + public static Event Timer(TimeSpan dueTime, TimeProvider timeProvider) { return new R3.Factories.Timer(dueTime, timeProvider); } @@ -11,7 +11,7 @@ public static CompletableEvent Timer(TimeSpan dueTime, TimeProvider namespace R3.Factories { - internal sealed class Timer : CompletableEvent + internal sealed class Timer : Event { readonly TimeSpan dueTime; readonly TimeProvider timeProvider; diff --git a/src/R3/Factories/ToCompletableEvent.cs b/src/R3/Factories/ToCompletableEvent.cs index 9f787f5c..d538c255 100644 --- a/src/R3/Factories/ToCompletableEvent.cs +++ b/src/R3/Factories/ToCompletableEvent.cs @@ -2,7 +2,7 @@ { public static partial class Event { - public static CompletableEvent> ToCompletableEvent(this Task task) + public static Event> ToCompletableEvent(this Task task) { return new R3.Factories.ToCompletableEvent(task); } @@ -11,7 +11,7 @@ public static CompletableEvent> ToCompletableEvent(Task task) : CompletableEvent> + internal sealed class ToCompletableEvent(Task task) : Event> { protected override IDisposable SubscribeCore(Subscriber> subscriber) { diff --git a/src/R3/Factories/ToEvent.cs b/src/R3/Factories/ToEvent.cs index c4bff8a9..f6092921 100644 --- a/src/R3/Factories/ToEvent.cs +++ b/src/R3/Factories/ToEvent.cs @@ -2,7 +2,7 @@ { public static partial class Event { - public static CompletableEvent ToEvent(this IEnumerable source) + public static Event ToEvent(this IEnumerable source) { return new ToEvent(source); } @@ -11,7 +11,7 @@ public static CompletableEvent ToEvent(this IEnumerabl namespace R3.Factories { - internal class ToEvent(IEnumerable source) : CompletableEvent + internal class ToEvent(IEnumerable source) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { diff --git a/src/R3/Factories/_EventFactory.cs b/src/R3/Factories/_EventFactory.cs index c7924cf6..4bf1ae28 100644 --- a/src/R3/Factories/_EventFactory.cs +++ b/src/R3/Factories/_EventFactory.cs @@ -41,36 +41,36 @@ public static partial class Event namespace R3.Factories { - internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) : Event - { - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - var runner = new EveryUpdateRunnerWorkItem(subscriber, cancellationToken); - frameProvider.Register(runner); - return runner; - } - - class EveryUpdateRunnerWorkItem(Subscriber subscriber, CancellationToken cancellationToken) : IFrameRunnerWorkItem, IDisposable - { - bool isDisposed; - - public bool MoveNext(long frameCount) - { - if (isDisposed || cancellationToken.IsCancellationRequested) - { - return false; - } - - subscriber.OnNext(default); - return true; - } - - public void Dispose() - { - isDisposed = true; - } - } - } + //internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) : Event + //{ + // protected override IDisposable SubscribeCore(Subscriber subscriber) + // { + // var runner = new EveryUpdateRunnerWorkItem(subscriber, cancellationToken); + // frameProvider.Register(runner); + // return runner; + // } + + // class EveryUpdateRunnerWorkItem(Subscriber subscriber, CancellationToken cancellationToken) : IFrameRunnerWorkItem, IDisposable + // { + // bool isDisposed; + + // public bool MoveNext(long frameCount) + // { + // if (isDisposed || cancellationToken.IsCancellationRequested) + // { + // return false; + // } + + // subscriber.OnNext(default); + // return true; + // } + + // public void Dispose() + // { + // isDisposed = true; + // } + // } + //} //internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) : Event //{ diff --git a/src/R3/Internal/Stubs.cs b/src/R3/Internal/Stubs.cs new file mode 100644 index 00000000..3563f949 --- /dev/null +++ b/src/R3/Internal/Stubs.cs @@ -0,0 +1,7 @@ +namespace R3.Internal; + +internal static class Stubs +{ + internal static readonly Func ReturnSelf = static x => x; + internal static readonly Action Nop = static _ => { }; +} diff --git a/src/R3/Internal/TaskSubscriberBase.cs b/src/R3/Internal/TaskSubscriberBase.cs index e9e73adc..5e1531f3 100644 --- a/src/R3/Internal/TaskSubscriberBase.cs +++ b/src/R3/Internal/TaskSubscriberBase.cs @@ -4,63 +4,6 @@ namespace R3.Internal; // for return Task(tcs.TrySet***) // include proper Cancel registration -internal abstract class TaskSubscriberBase : Subscriber -{ - TaskCompletionSource tcs; - CancellationToken cancellationToken; - CancellationTokenRegistration tokenRegistration; - - public Task Task => tcs.Task; - - public TaskSubscriberBase(CancellationToken cancellationToken) - { - this.tcs = new TaskCompletionSource(); - this.cancellationToken = cancellationToken; - - if (cancellationToken.CanBeCanceled) - { - // register before call Subscribe - this.tokenRegistration = cancellationToken.UnsafeRegister(static state => - { - var s = (TaskSubscriberBase)state!; - - s.Dispose(); // subscriber is subscription, dispose - s.tcs.TrySetCanceled(s.cancellationToken); - }, this); - } - } - - // if override, should call base.DisposeCore(), be careful. - protected override void DisposeCore() - { - tokenRegistration.Dispose(); - } - - protected void TrySetResult(TTask result) - { - try - { - tcs.TrySetResult(result); - } - finally - { - Dispose(); - } - } - - protected void TrySetException(Exception exception) - { - try - { - tcs.TrySetException(exception); - } - finally - { - Dispose(); - } - } -} - internal abstract class TaskSubscriberBase : Subscriber { TaskCompletionSource tcs; // use this field. diff --git a/src/R3/LiveList.cs b/src/R3/LiveList.cs index 9e0282c5..0f0fbb97 100644 --- a/src/R3/LiveList.cs +++ b/src/R3/LiveList.cs @@ -6,164 +6,17 @@ namespace R3; public static partial class EventExtensions { - public static LiveList ToLiveList(this Event source) - { - return new LiveList(source); - } - - public static LiveList ToLiveList(this Event source, int bufferSize) - { - return new LiveList(source, bufferSize); - } - - public static LiveList ToLiveList(this CompletableEvent source) + public static LiveList ToLiveList(this Event source) { return new LiveList(source); } - public static LiveList ToLiveList(this CompletableEvent source, int bufferSize) + public static LiveList ToLiveList(this Event source, int bufferSize) { return new LiveList(source, bufferSize); } } -public sealed class LiveList : IReadOnlyList, IDisposable -{ - readonly IReadOnlyList list; // RingBuffer or List - readonly IDisposable sourceSubscription; - readonly int bufferSize; - - public LiveList(Event source) - { - if (bufferSize == 0) bufferSize = 1; - this.bufferSize = -1; - this.list = new List(); - this.sourceSubscription = source.Subscribe(new ListSubscriber(this)); - } - - public LiveList(Event source, int bufferSize) - { - if (bufferSize == 0) bufferSize = 1; - this.bufferSize = bufferSize; // bufferSize must set before Subscribe(sometimes Subscribe run immediately) - this.list = new RingBuffer(bufferSize); - this.sourceSubscription = source.Subscribe(new ListSubscriber(this)); - } - - public T this[int index] - { - get - { - lock (list) - { - return list[index]; - } - } - } - - public int Count - { - get - { - lock (list) - { - return list.Count; - } - } - } - - public void Clear() - { - lock (list) - { - list.Clear(); - } - } - - public void Dispose() - { - sourceSubscription.Dispose(); - } - - public void ForEach(Action action) - { - lock (list) - { - var span = list.GetSpan(); - foreach (ref readonly var item in span) - { - action(item); - } - } - } - - public void ForEach(Action action, TState state) - { - lock (list) - { - var span = list.GetSpan(); - foreach (ref readonly var item in span) - { - action(item, state); - } - } - } - - public T[] ToArray() - { - lock (list) - { - return list.ToArray(); - } - } - - IEnumerator IEnumerable.GetEnumerator() - { - lock (list) - { - // snapshot - return ToArray().AsEnumerable().GetEnumerator(); - } - } - - IEnumerator IEnumerable.GetEnumerator() - { - lock (list) - { - // snapshot - return ToArray().AsEnumerable().GetEnumerator(); - } - } - - sealed class ListSubscriber(LiveList parent) : Subscriber - { - protected override void OnNextCore(T message) - { - lock (parent.list) - { - if (parent.bufferSize == -1) - { - ((List)parent.list).Add(message); - } - else - { - var ring = (RingBuffer)parent.list; - - if (ring.Count == parent.bufferSize) - { - ring.RemoveFirst(); - } - ring.AddLast(message); - } - } - } - - protected override void OnErrorResumeCore(Exception error) - { - EventSystem.GetUnhandledExceptionHandler().Invoke(error); - } - } -} - public sealed class LiveList : IReadOnlyList, IDisposable { readonly IReadOnlyList list; // RingBuffer or List @@ -178,7 +31,7 @@ public sealed class LiveList : IReadOnlyList, IDisposable public TComplete? CompletedValue => completedValue; - public LiveList(CompletableEvent source) + public LiveList(Event source) { if (bufferSize == 0) bufferSize = 1; this.bufferSize = -1; @@ -186,7 +39,7 @@ public LiveList(CompletableEvent source) this.sourceSubscription = source.Subscribe(new ListSubscriber(this)); } - public LiveList(CompletableEvent source, int bufferSize) + public LiveList(Event source, int bufferSize) { if (bufferSize == 0) bufferSize = 1; this.bufferSize = bufferSize; // bufferSize must set before Subscribe(sometimes Subscribe run immediately) diff --git a/src/R3/Operators/AggregateAsync.cs b/src/R3/Operators/AggregateAsync.cs index c33c9e04..ecc92e13 100644 --- a/src/R3/Operators/AggregateAsync.cs +++ b/src/R3/Operators/AggregateAsync.cs @@ -3,7 +3,7 @@ public static partial class EventExtensions { public static Task AggregateAsync - (this CompletableEvent source, + (this Event source, TAccumulate seed, Func func, Func resultSelector, @@ -15,7 +15,7 @@ public static Task AggregateAsync AggregateAsync - (this CompletableEvent> source, + (this Event> source, TAccumulate seed, Func func, Func, TResult> resultSelector, diff --git a/src/R3/Operators/AggregateOperators.cs b/src/R3/Operators/AggregateOperators.cs index 062fe18a..8c0a4327 100644 --- a/src/R3/Operators/AggregateOperators.cs +++ b/src/R3/Operators/AggregateOperators.cs @@ -12,7 +12,7 @@ namespace R3 public static partial class EventExtensions { - public static Task ToArrayAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + public static Task ToArrayAsync(this Event source, CancellationToken cancellationToken = default) { return AggregateAsync(source, new List(), static (list, message) => { @@ -21,7 +21,7 @@ public static Task ToArrayAsync(this Completabl }, (list, _) => list.ToArray(), cancellationToken); // ignore complete } - public static Task ToArrayAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + public static Task ToArrayAsync(this Event> source, CancellationToken cancellationToken = default) { return AggregateAsync(source, new List(), static (list, message) => { @@ -30,7 +30,7 @@ public static Task ToArrayAsync(this Completabl }, (list, _) => list.ToArray(), cancellationToken); // ignore complete } - public static Task> ToListAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + public static Task> ToListAsync(this Event source, CancellationToken cancellationToken = default) { return AggregateAsync(source, new List(), static (list, message) => { @@ -39,7 +39,7 @@ public static Task> ToListAsync(this Complet }, (list, _) => list, cancellationToken); // ignore complete } - public static Task> ToListAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + public static Task> ToListAsync(this Event> source, CancellationToken cancellationToken = default) { return AggregateAsync(source, new List(), static (list, message) => { @@ -48,17 +48,17 @@ public static Task> ToListAsync(this Complet }, (list, _) => list, cancellationToken); // ignore complete } - public static Task> ToHashSetAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + public static Task> ToHashSetAsync(this Event source, CancellationToken cancellationToken = default) { return ToHashSetAsync(source, null, cancellationToken); } - public static Task> ToHashSetAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + public static Task> ToHashSetAsync(this Event> source, CancellationToken cancellationToken = default) { return ToHashSetAsync(source, null, cancellationToken); } - public static Task> ToHashSetAsync(this CompletableEvent source, IEqualityComparer? equalityComparer, CancellationToken cancellationToken = default) + public static Task> ToHashSetAsync(this Event source, IEqualityComparer? equalityComparer, CancellationToken cancellationToken = default) { return AggregateAsync(source, new HashSet(equalityComparer), static (value, message) => { @@ -67,7 +67,7 @@ public static Task> ToHashSetAsync(this C }, (value, _) => value, cancellationToken); // ignore complete } - public static Task> ToHashSetAsync(this CompletableEvent> source, IEqualityComparer? equalityComparer, CancellationToken cancellationToken = default) + public static Task> ToHashSetAsync(this Event> source, IEqualityComparer? equalityComparer, CancellationToken cancellationToken = default) { return AggregateAsync(source, new HashSet(equalityComparer), static (value, message) => { @@ -77,30 +77,30 @@ public static Task> ToHashSetAsync(this C } // CountAsync using AggregateAsync - public static Task CountAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + public static Task CountAsync(this Event source, CancellationToken cancellationToken = default) { return AggregateAsync(source, 0, static (count, _) => checked(count + 1), (count, _) => count, cancellationToken); // ignore complete } // CountAsync Result variation - public static Task CountAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + public static Task CountAsync(this Event> source, CancellationToken cancellationToken = default) { return AggregateAsync(source, 0, static (count, _) => checked(count + 1), (count, _) => count, cancellationToken); // ignore complete } // LongCountAsync using AggregateAsync - public static Task LongCountAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + public static Task LongCountAsync(this Event source, CancellationToken cancellationToken = default) { return AggregateAsync(source, 0L, static (count, _) => checked(count + 1), (count, _) => count, cancellationToken); // ignore complete } // LongCountAsync using AggregateAsync Result variation - public static Task LongCountAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + public static Task LongCountAsync(this Event> source, CancellationToken cancellationToken = default) { return AggregateAsync(source, 0L, static (count, _) => checked(count + 1), (count, _) => count, cancellationToken); // ignore complete } - public static Task MinAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + public static Task MinAsync(this Event source, CancellationToken cancellationToken = default) { return AggregateAsync(source, (default(TMessage)!, hasValue: false), static (min, message) => @@ -115,7 +115,7 @@ public static Task MinAsync(this CompletableEvent }, cancellationToken); } - public static Task MinAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + public static Task MinAsync(this Event> source, CancellationToken cancellationToken = default) { return AggregateAsync(source, (default(TMessage)!, hasValue: false), static (min, message) => @@ -130,7 +130,7 @@ public static Task MinAsync(this CompletableEvent }, cancellationToken); } - public static Task MaxAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + public static Task MaxAsync(this Event source, CancellationToken cancellationToken = default) { return AggregateAsync(source, (default(TMessage)!, hasValue: false), static (max, message) => @@ -145,7 +145,7 @@ public static Task MaxAsync(this CompletableEvent }, cancellationToken); } - public static Task MaxAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + public static Task MaxAsync(this Event> source, CancellationToken cancellationToken = default) { return AggregateAsync(source, (default(TMessage)!, hasValue: false), static (max, message) => @@ -160,7 +160,7 @@ public static Task MaxAsync(this CompletableEvent }, cancellationToken); } - public static Task<(TMessage Min, TMessage Max)> MinMaxAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + public static Task<(TMessage Min, TMessage Max)> MinMaxAsync(this Event source, CancellationToken cancellationToken = default) { return AggregateAsync(source, (min: default(TMessage)!, max: default(TMessage)!, hasValue: false), @@ -179,7 +179,7 @@ public static Task MaxAsync(this CompletableEvent } - public static Task<(TMessage Min, TMessage Max)> MinMaxAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + public static Task<(TMessage Min, TMessage Max)> MinMaxAsync(this Event> source, CancellationToken cancellationToken = default) { return AggregateAsync(source, (min: default(TMessage)!, max: default(TMessage)!, hasValue: false), @@ -197,19 +197,19 @@ public static Task MaxAsync(this CompletableEvent }, cancellationToken); } - public static Task SumAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + public static Task SumAsync(this Event source, CancellationToken cancellationToken = default) where TMessage : IAdditionOperators { return AggregateAsync(source, default(TMessage)!, static (sum, message) => checked(sum + message), (sum, _) => sum, cancellationToken); // ignore complete } - public static Task SumAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + public static Task SumAsync(this Event> source, CancellationToken cancellationToken = default) where TMessage : IAdditionOperators { return AggregateAsync(source, default(TMessage)!, static (sum, message) => checked(sum + message), (sum, _) => sum, cancellationToken); // ignore complete } - public static Task AverageAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + public static Task AverageAsync(this Event source, CancellationToken cancellationToken = default) where TMessage : INumberBase { return AggregateAsync(source, @@ -226,7 +226,7 @@ public static Task AverageAsync(this CompletableEve cancellationToken); } - public static Task AverageAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) + public static Task AverageAsync(this Event> source, CancellationToken cancellationToken = default) where TMessage : INumberBase { return AggregateAsync(source, @@ -236,13 +236,13 @@ public static Task AverageAsync(this CompletableEve cancellationToken); } - public static Task WaitAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + public static Task WaitAsync(this Event source, CancellationToken cancellationToken = default) { // get only complete value. return AggregateAsync(source, 0, static (_, _) => 0, static (_, result) => result, cancellationToken); } - public static Task WaitAsync(this CompletableEvent source, CancellationToken cancellationToken = default) + public static Task WaitAsync(this Event source, CancellationToken cancellationToken = default) { // get only complete value. return AggregateAsync(source, 0, static (_, _) => 0, static (_, result) => result, cancellationToken); diff --git a/src/R3/Operators/CombineLatest.cs b/src/R3/Operators/CombineLatest.cs index 18243534..c3493bc1 100644 --- a/src/R3/Operators/CombineLatest.cs +++ b/src/R3/Operators/CombineLatest.cs @@ -2,122 +2,32 @@ { public static partial class EventExtensions { - public static Event CombineLatest(this Event left, Event right, Func selector) + public static Event CombineLatest( + this Event left, + Event right, + Func selector) { - return new CombineLatest(left, right, selector); + return new CombineLatest(left, right, selector, static (x, y) => default); } - public static CompletableEvent CombineLatest( - this CompletableEvent left, - CompletableEvent right, + public static Event CombineLatest( + this Event left, + Event right, Func selector, Func completeSelector) { return new CombineLatest(left, right, selector, completeSelector); } - - public static CompletableEvent CombineLatest( - this CompletableEvent left, - CompletableEvent right, - Func selector) - { - return new CombineLatest(left, right, selector, static (x, y) => default); - } } } namespace R3.Operators { - internal sealed class CombineLatest(Event left, Event right, Func selector) : Event - { - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - var method = new _CombineLatest(subscriber, selector); - - var d1 = left.Subscribe(new LeftSubscriber(method)); - try - { - var d2 = right.Subscribe(new RightSubscriber(method)); - return Disposable.Combine(d1, d2); - } - catch - { - d1.Dispose(); - throw; - } - } - - sealed class _CombineLatest(Subscriber subscriber, Func selector) - { - internal TLeft? message1; - internal bool hasMessage1; - - internal TRight? message2; - internal bool hasMessage2; - - public void OnErrorResume(Exception error) - { - subscriber.OnErrorResume(error); - } - - internal void Publish() - { - if (hasMessage1 && hasMessage2) - { - var result = selector(message1!, message2!); - subscriber.OnNext(result); - } - } - } - - sealed class LeftSubscriber(_CombineLatest parent) : Subscriber - { - protected override void OnNextCore(TLeft message) - { - lock (parent) // `_CombineLatest` is hide in Disposable.Combine so safe to use lock - { - parent.hasMessage1 = true; - parent.message1 = message; - parent.Publish(); - } - } - - protected override void OnErrorResumeCore(Exception error) - { - lock (parent) - { - parent.OnErrorResume(error); - } - } - } - - sealed class RightSubscriber(_CombineLatest parent) : Subscriber - { - protected override void OnNextCore(TRight message) - { - lock (parent) // `_CombineLatest` is hide in Disposable.Combine so safe to use lock - { - parent.hasMessage2 = true; - parent.message2 = message; - parent.Publish(); - } - } - - protected override void OnErrorResumeCore(Exception error) - { - lock (parent) - { - parent.OnErrorResume(error); - } - } - } - } - internal sealed class CombineLatest( - CompletableEvent left, - CompletableEvent right, + Event left, + Event right, Func selector, - Func completeSelector) : CompletableEvent + Func completeSelector) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { diff --git a/src/R3/Operators/Delay.cs b/src/R3/Operators/Delay.cs index d8afa84c..5bbdb9f3 100644 --- a/src/R3/Operators/Delay.cs +++ b/src/R3/Operators/Delay.cs @@ -4,10 +4,10 @@ namespace R3 { public static partial class EventExtensions { - public static Event Delay(this Event source, TimeSpan dueTime, TimeProvider timeProvider) - { - return new Delay(source, dueTime, timeProvider); - } + //public static Event Delay(this Event source, TimeSpan dueTime, TimeProvider timeProvider) + //{ + // return new Delay(source, dueTime, timeProvider); + //} //public static ICompletableEvent Delay(this ICompletableEvent source, TimeSpan dueTime, TimeProvider timeProvider) //{ @@ -19,137 +19,137 @@ public static Event Delay(this Event source, TimeS namespace R3.Operators { // TODO:dueTime validation - internal sealed class Delay(Event source, TimeSpan dueTime, TimeProvider timeProvider) : Event - { - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - var delay = new _Delay(subscriber, dueTime, timeProvider); - source.Subscribe(delay); - return delay; - } - - class _Delay : Subscriber, IDisposable - { - static readonly TimerCallback timerCallback = DrainMessages; - - readonly Subscriber subscriber; - readonly TimeSpan dueTime; - readonly TimeProvider timeProvider; - ITimer? timer; - readonly Queue<(long timestamp, TMessage message)> queue = new Queue<(long, TMessage)>(); // lock gate - - bool running; - - public _Delay(Subscriber subscriber, TimeSpan dueTime, TimeProvider timeProvider) - { - this.subscriber = subscriber; - this.dueTime = dueTime; - this.timeProvider = timeProvider; - this.timer = timeProvider.CreateStoppedTimer(timerCallback, this); - } - - protected override void OnNextCore(TMessage message) - { - var timestamp = timeProvider.GetTimestamp(); - lock (queue) - { - if (timer == null) - { - return; - } - - queue.Enqueue((timestamp, message)); - if (queue.Count == 1 && !running) - { - // invoke timer - running = true; - timer.InvokeOnce(dueTime); - } - } - } - - protected override void OnErrorResumeCore(Exception error) - { - // TODO: what should we do? - throw new NotImplementedException(); - } - - static void DrainMessages(object? state) - { - var self = (_Delay)state!; - var queue = self.queue; - - TMessage message; - while (true) - { - lock (queue) - { - if (self.timer == null) - { - self.running = false; - return; - } - - if (queue.TryPeek(out var msg)) - { - var elapsed = self.timeProvider.GetElapsedTime(msg.timestamp); - if (self.dueTime <= elapsed) - { - message = queue.Dequeue().message; - } - else - { - // invoke timer again - self.timer.InvokeOnce(self.dueTime - elapsed); - return; - } - } - else - { - // queue is empty, stop timer - self.running = false; - return; - } - } - - try - { - self.subscriber.OnNext(message); - continue; // loop to drain all messages - } - catch - { - lock (queue) - { - if (self.timer != null) - { - if (queue.Count != 0) - { - self.timer.RestartImmediately(); // reserve next timer - } - else - { - self.running = false; - } - } - } - - throw; // go to ITimer UnhandledException handler - } - } - } - - protected override void DisposeCore() - { - lock (queue) - { - timer?.Dispose(); - timer = null!; - queue.Clear(); - } - } - } - } + //internal sealed class Delay(Event source, TimeSpan dueTime, TimeProvider timeProvider) : Event + //{ + // protected override IDisposable SubscribeCore(Subscriber subscriber) + // { + // var delay = new _Delay(subscriber, dueTime, timeProvider); + // source.Subscribe(delay); + // return delay; + // } + + // class _Delay : Subscriber, IDisposable + // { + // static readonly TimerCallback timerCallback = DrainMessages; + + // readonly Subscriber subscriber; + // readonly TimeSpan dueTime; + // readonly TimeProvider timeProvider; + // ITimer? timer; + // readonly Queue<(long timestamp, TMessage message)> queue = new Queue<(long, TMessage)>(); // lock gate + + // bool running; + + // public _Delay(Subscriber subscriber, TimeSpan dueTime, TimeProvider timeProvider) + // { + // this.subscriber = subscriber; + // this.dueTime = dueTime; + // this.timeProvider = timeProvider; + // this.timer = timeProvider.CreateStoppedTimer(timerCallback, this); + // } + + // protected override void OnNextCore(TMessage message) + // { + // var timestamp = timeProvider.GetTimestamp(); + // lock (queue) + // { + // if (timer == null) + // { + // return; + // } + + // queue.Enqueue((timestamp, message)); + // if (queue.Count == 1 && !running) + // { + // // invoke timer + // running = true; + // timer.InvokeOnce(dueTime); + // } + // } + // } + + // protected override void OnErrorResumeCore(Exception error) + // { + // // TODO: what should we do? + // throw new NotImplementedException(); + // } + + // static void DrainMessages(object? state) + // { + // var self = (_Delay)state!; + // var queue = self.queue; + + // TMessage message; + // while (true) + // { + // lock (queue) + // { + // if (self.timer == null) + // { + // self.running = false; + // return; + // } + + // if (queue.TryPeek(out var msg)) + // { + // var elapsed = self.timeProvider.GetElapsedTime(msg.timestamp); + // if (self.dueTime <= elapsed) + // { + // message = queue.Dequeue().message; + // } + // else + // { + // // invoke timer again + // self.timer.InvokeOnce(self.dueTime - elapsed); + // return; + // } + // } + // else + // { + // // queue is empty, stop timer + // self.running = false; + // return; + // } + // } + + // try + // { + // self.subscriber.OnNext(message); + // continue; // loop to drain all messages + // } + // catch + // { + // lock (queue) + // { + // if (self.timer != null) + // { + // if (queue.Count != 0) + // { + // self.timer.RestartImmediately(); // reserve next timer + // } + // else + // { + // self.running = false; + // } + // } + // } + + // throw; // go to ITimer UnhandledException handler + // } + // } + // } + + // protected override void DisposeCore() + // { + // lock (queue) + // { + // timer?.Dispose(); + // timer = null!; + // queue.Clear(); + // } + // } + // } + //} //internal sealed class Delay(ICompletableEvent source, TimeSpan dueTime, TimeProvider timeProvider) : ICompletableEvent //{ diff --git a/src/R3/Operators/DelayFrame.cs b/src/R3/Operators/DelayFrame.cs index e28f79ba..edb5acb4 100644 --- a/src/R3/Operators/DelayFrame.cs +++ b/src/R3/Operators/DelayFrame.cs @@ -2,10 +2,10 @@ { public static partial class EventExtensions { - public static Event DelayFrame(this Event source, int delayFrameCount, FrameProvider frameProvider) - { - return new DelayFrame(source, delayFrameCount, frameProvider); - } + //public static Event DelayFrame(this Event source, int delayFrameCount, FrameProvider frameProvider) + //{ + // return new DelayFrame(source, delayFrameCount, frameProvider); + //} } } @@ -13,138 +13,138 @@ namespace R3.Operators { // TODO:dueTime validation // TODO:impl minaosi. - internal sealed class DelayFrame(Event source, int delayFrameCount, FrameProvider frameProvider) : Event - { - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - var delay = new _DelayFrame(subscriber, delayFrameCount, frameProvider); - source.Subscribe(delay); // source subscription is included in _DelayFrame - return delay; - } + //internal sealed class DelayFrame(Event source, int delayFrameCount, FrameProvider frameProvider) : Event + //{ + // protected override IDisposable SubscribeCore(Subscriber subscriber) + // { + // var delay = new _DelayFrame(subscriber, delayFrameCount, frameProvider); + // source.Subscribe(delay); // source subscription is included in _DelayFrame + // return delay; + // } - class _DelayFrame : Subscriber, IFrameRunnerWorkItem - { - readonly Subscriber subscriber; - readonly int delayFrameCount; - readonly FrameProvider frameProvider; - readonly Queue<(long frameCount, TMessage message)> queue = new Queue<(long, TMessage)>(); // lock gate + // class _DelayFrame : Subscriber, IFrameRunnerWorkItem + // { + // readonly Subscriber subscriber; + // readonly int delayFrameCount; + // readonly FrameProvider frameProvider; + // readonly Queue<(long frameCount, TMessage message)> queue = new Queue<(long, TMessage)>(); // lock gate - bool running; - long nextTick; - bool stopRunner; + // bool running; + // long nextTick; + // bool stopRunner; - public _DelayFrame(Subscriber subscriber, int delayFrameCount, FrameProvider frameProvider) - { - this.subscriber = subscriber; - this.delayFrameCount = delayFrameCount; - this.frameProvider = frameProvider; - } + // public _DelayFrame(Subscriber subscriber, int delayFrameCount, FrameProvider frameProvider) + // { + // this.subscriber = subscriber; + // this.delayFrameCount = delayFrameCount; + // this.frameProvider = frameProvider; + // } - protected override void OnNextCore(TMessage message) - { - var currentCount = frameProvider.GetFrameCount(); - lock (queue) - { - if (IsDisposed) - { - return; - } + // protected override void OnNextCore(TMessage message) + // { + // var currentCount = frameProvider.GetFrameCount(); + // lock (queue) + // { + // if (IsDisposed) + // { + // return; + // } - queue.Enqueue((currentCount, message)); - if (queue.Count == 1 && !running) - { - // invoke timer - running = true; - nextTick = currentCount + delayFrameCount; - frameProvider.Register(this); // start runner - } - } - } + // queue.Enqueue((currentCount, message)); + // if (queue.Count == 1 && !running) + // { + // // invoke timer + // running = true; + // nextTick = currentCount + delayFrameCount; + // frameProvider.Register(this); // start runner + // } + // } + // } - protected override void OnErrorResumeCore(Exception error) - { - // TODO:not yet - throw new NotImplementedException(); - } + // protected override void OnErrorResumeCore(Exception error) + // { + // // TODO:not yet + // throw new NotImplementedException(); + // } - public bool MoveNext(long framecount) - { - if (stopRunner) - { - return false; - } + // public bool MoveNext(long framecount) + // { + // if (stopRunner) + // { + // return false; + // } - if (nextTick < framecount) - { - return true; - } + // if (nextTick < framecount) + // { + // return true; + // } - TMessage message; - while (true) - { - lock (queue) - { - if (IsDisposed) - { - running = false; - return false; - } + // TMessage message; + // while (true) + // { + // lock (queue) + // { + // if (IsDisposed) + // { + // running = false; + // return false; + // } - if (queue.TryPeek(out var msg)) - { - var elapsed = framecount - msg.frameCount; - if (delayFrameCount <= elapsed) - { - message = queue.Dequeue().message; - } - else - { - // invoke timer again - nextTick = framecount + (delayFrameCount - elapsed); - return true; - } - } - else - { - // queue is empty, stop timer - running = false; - return false; - } - } + // if (queue.TryPeek(out var msg)) + // { + // var elapsed = framecount - msg.frameCount; + // if (delayFrameCount <= elapsed) + // { + // message = queue.Dequeue().message; + // } + // else + // { + // // invoke timer again + // nextTick = framecount + (delayFrameCount - elapsed); + // return true; + // } + // } + // else + // { + // // queue is empty, stop timer + // running = false; + // return false; + // } + // } - try - { - subscriber.OnNext(message); - continue; // loop to drain all messages - } - catch - { - lock (queue) - { - if (queue.Count != 0) - { - nextTick = queue.Peek().frameCount + delayFrameCount; - frameProvider.Register(this); // register once more(this loop will be stopped soon) - } - else - { - running = false; - } - } + // try + // { + // subscriber.OnNext(message); + // continue; // loop to drain all messages + // } + // catch + // { + // lock (queue) + // { + // if (queue.Count != 0) + // { + // nextTick = queue.Peek().frameCount + delayFrameCount; + // frameProvider.Register(this); // register once more(this loop will be stopped soon) + // } + // else + // { + // running = false; + // } + // } - throw; // go to IFrameTimer UnhandledException handler and will Remove this work item - } - } - } + // throw; // go to IFrameTimer UnhandledException handler and will Remove this work item + // } + // } + // } - protected override void DisposeCore() - { - lock (queue) - { - stopRunner = true; - queue.Clear(); - } - } - } - } + // protected override void DisposeCore() + // { + // lock (queue) + // { + // stopRunner = true; + // queue.Clear(); + // } + // } + // } + //} } diff --git a/src/R3/Operators/DoOnCompleted.cs b/src/R3/Operators/DoOnCompleted.cs index f85c7054..a49ed9d4 100644 --- a/src/R3/Operators/DoOnCompleted.cs +++ b/src/R3/Operators/DoOnCompleted.cs @@ -4,7 +4,7 @@ public static partial class EventExtensions { // TODO: more accurate impl // TODO: with state - public static CompletableEvent DoOnCompleted(this CompletableEvent source, Action action) + public static Event DoOnCompleted(this Event source, Action action) { return new DoOnCompleted(source, action); } @@ -13,7 +13,7 @@ public static CompletableEvent DoOnCompleted(CompletableEvent source, Action action) : CompletableEvent + internal sealed class DoOnCompleted(Event source, Action action) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { diff --git a/src/R3/Operators/DoOnDisposed.cs b/src/R3/Operators/DoOnDisposed.cs index 99428669..f8bce750 100644 --- a/src/R3/Operators/DoOnDisposed.cs +++ b/src/R3/Operators/DoOnDisposed.cs @@ -5,93 +5,21 @@ public static partial class EventExtensions // TODO: doOnSubscribed // Finally - public static Event DoOnDisposed(this Event source, Action action) + public static Event DoOnDisposed(this Event source, Action action) { - return new DoOnDisposed(source, action); + return new DoOnDisposed(source, action); } - public static Event DoOnDisposed(this Event source, Action action, TState state) + public static Event DoOnDisposed(this Event source, Action action, TState state) { - return new DoOnDisposed(source, action, state); - } - - public static CompletableEvent DoOnDisposed(this CompletableEvent source, Action action) - { - return new DoOnDisposed2(source, action); - } - - public static CompletableEvent DoOnDisposed(this CompletableEvent source, Action action, TState state) - { - return new DoOnDisposed2(source, action, state); + return new DoOnDisposed(source, action, state); } } } namespace R3.Operators { - internal sealed class DoOnDisposed(Event source, Action action) : Event - { - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - var method = new _DoOnDisposed(subscriber, action); - source.Subscribe(method); - return method; - } - - class _DoOnDisposed(Subscriber subscriber, Action action) : Subscriber, IDisposable - { - Action? action = action; - - protected override void OnNextCore(TMessage message) - { - subscriber.OnNext(message); - } - - protected override void OnErrorResumeCore(Exception error) - { - subscriber.OnErrorResume(error); - } - - protected override void DisposeCore() - { - Interlocked.Exchange(ref action, null)?.Invoke(); base.DisposeCore(); - } - } - } - - internal sealed class DoOnDisposed(Event source, Action action, TState state) : Event - { - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - var method = new _DoOnDisposed(subscriber, action, state); - source.Subscribe(method); - return method; - } - - class _DoOnDisposed(Subscriber subscriber, Action action, TState state) : Subscriber, IDisposable - { - Action? action = action; - TState state = state; - - protected override void OnNextCore(TMessage message) - { - subscriber.OnNext(message); - } - - protected override void OnErrorResumeCore(Exception error) - { - subscriber.OnErrorResume(error); - } - - protected override void DisposeCore() - { - Interlocked.Exchange(ref action, null)?.Invoke(state); - state = default!; - } - } - } - - internal sealed class DoOnDisposed2(CompletableEvent source, Action action) : CompletableEvent + internal sealed class DoOnDisposed(Event source, Action action) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { @@ -126,7 +54,7 @@ protected override void DisposeCore() } } - internal sealed class DoOnDisposed2(CompletableEvent source, Action action, TState state) : CompletableEvent + internal sealed class DoOnDisposed(Event source, Action action, TState state) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { diff --git a/src/R3/Operators/FirstLastSingle.cs b/src/R3/Operators/FirstLastSingle.cs index 747f76a6..1ea53861 100644 --- a/src/R3/Operators/FirstLastSingle.cs +++ b/src/R3/Operators/FirstLastSingle.cs @@ -6,56 +6,45 @@ public static partial class EventExtensions { // Completable - public static Task FirstAsync(this CompletableEvent source, CancellationToken cancellationToken = default) => FirstAsync(source, static _ => true, cancellationToken); - public static Task FirstOrDefaultAsync(this CompletableEvent source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); - public static Task LastAsync(this CompletableEvent source, CancellationToken cancellationToken = default) => LastAsync(source, static _ => true, cancellationToken); - public static Task LastOrDefaultAsync(this CompletableEvent source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => LastOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); - public static Task SingleAsync(this CompletableEvent source, CancellationToken cancellationToken = default) => SingleAsync(source, static _ => true, cancellationToken); - public static Task SingleOrDefaultAsync(this CompletableEvent source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => SingleOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); + public static Task FirstAsync(this Event source, CancellationToken cancellationToken = default) => FirstAsync(source, static _ => true, cancellationToken); + public static Task FirstOrDefaultAsync(this Event source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); + public static Task LastAsync(this Event source, CancellationToken cancellationToken = default) => LastAsync(source, static _ => true, cancellationToken); + public static Task LastOrDefaultAsync(this Event source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => LastOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); + public static Task SingleAsync(this Event source, CancellationToken cancellationToken = default) => SingleAsync(source, static _ => true, cancellationToken); + public static Task SingleOrDefaultAsync(this Event source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => SingleOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); // with predicate - public static Task FirstAsync(this CompletableEvent source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, false, default, predicate, cancellationToken); - public static Task FirstOrDefaultAsync(this CompletableEvent source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, true, defaultValue, predicate, cancellationToken); - public static Task LastAsync(this CompletableEvent source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, false, default, predicate, cancellationToken); - public static Task LastOrDefaultAsync(this CompletableEvent source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, true, defaultValue, predicate, cancellationToken); - public static Task SingleAsync(this CompletableEvent source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, false, default, predicate, cancellationToken); - public static Task SingleOrDefaultAsync(this CompletableEvent source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, true, defaultValue, predicate, cancellationToken); + public static Task FirstAsync(this Event source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, false, default, predicate, cancellationToken); + public static Task FirstOrDefaultAsync(this Event source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, true, defaultValue, predicate, cancellationToken); + public static Task LastAsync(this Event source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, false, default, predicate, cancellationToken); + public static Task LastOrDefaultAsync(this Event source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, true, defaultValue, predicate, cancellationToken); + public static Task SingleAsync(this Event source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, false, default, predicate, cancellationToken); + public static Task SingleOrDefaultAsync(this Event source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, true, defaultValue, predicate, cancellationToken); // Result variation - public static Task FirstAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) => FirstAsync(source, static _ => true, cancellationToken); - public static Task FirstOrDefaultAsync(this CompletableEvent> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); - public static Task LastAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) => LastAsync(source, static _ => true, cancellationToken); - public static Task LastOrDefaultAsync(this CompletableEvent> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => LastOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); - public static Task SingleAsync(this CompletableEvent> source, CancellationToken cancellationToken = default) => SingleAsync(source, static _ => true, cancellationToken); - public static Task SingleOrDefaultAsync(this CompletableEvent> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => SingleOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); - public static Task FirstAsync(this CompletableEvent> source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, false, default, predicate, cancellationToken); - public static Task FirstOrDefaultAsync(this CompletableEvent> source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, true, defaultValue, predicate, cancellationToken); - public static Task LastAsync(this CompletableEvent> source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, false, default, predicate, cancellationToken); - public static Task LastOrDefaultAsync(this CompletableEvent> source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, true, defaultValue, predicate, cancellationToken); - public static Task SingleAsync(this CompletableEvent> source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, false, default, predicate, cancellationToken); - public static Task SingleOrDefaultAsync(this CompletableEvent> source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, true, defaultValue, predicate, cancellationToken); - - - // no complete, only use First - public static Task FirstAsync(this Event source, CancellationToken cancellationToken = default) => FirstAsync(source, static _ => true, cancellationToken); - - public static Task FirstAsync(this Event source, Func predicate, CancellationToken cancellationToken = default) - { - var subscriber = new First(predicate, cancellationToken); - source.Subscribe(subscriber); - return subscriber.Task; - } - - static Task FirstLastSingleAsync(this CompletableEvent source, FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func predicate, CancellationToken cancellationToken) + public static Task FirstAsync(this Event> source, CancellationToken cancellationToken = default) => FirstAsync(source, static _ => true, cancellationToken); + public static Task FirstOrDefaultAsync(this Event> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); + public static Task LastAsync(this Event> source, CancellationToken cancellationToken = default) => LastAsync(source, static _ => true, cancellationToken); + public static Task LastOrDefaultAsync(this Event> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => LastOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); + public static Task SingleAsync(this Event> source, CancellationToken cancellationToken = default) => SingleAsync(source, static _ => true, cancellationToken); + public static Task SingleOrDefaultAsync(this Event> source, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => SingleOrDefaultAsync(source, static _ => true, defaultValue, cancellationToken); + public static Task FirstAsync(this Event> source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, false, default, predicate, cancellationToken); + public static Task FirstOrDefaultAsync(this Event> source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, First, true, defaultValue, predicate, cancellationToken); + public static Task LastAsync(this Event> source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, false, default, predicate, cancellationToken); + public static Task LastOrDefaultAsync(this Event> source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Last, true, defaultValue, predicate, cancellationToken); + public static Task SingleAsync(this Event> source, Func predicate, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, false, default, predicate, cancellationToken); + public static Task SingleOrDefaultAsync(this Event> source, Func predicate, TMessage? defaultValue = default, CancellationToken cancellationToken = default) => FirstLastSingleAsync(source, Single, true, defaultValue, predicate, cancellationToken); + + static Task FirstLastSingleAsync(this Event source, FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func predicate, CancellationToken cancellationToken) { var subscriber = new FirstLastSingle(operation, useDefaultIfEmpty, defaultValue, predicate, cancellationToken); source.Subscribe(subscriber); return subscriber.Task; } - static Task FirstLastSingleAsync(this CompletableEvent> source, FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func predicate, CancellationToken cancellationToken) + static Task FirstLastSingleAsync(this Event> source, FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func predicate, CancellationToken cancellationToken) { var subscriber = new FirstLastSingleR(operation, useDefaultIfEmpty, defaultValue, predicate, cancellationToken); source.Subscribe(subscriber); @@ -66,22 +55,6 @@ static Task FirstLastSingleAsync(this Completable namespace R3.Operators { - internal sealed class First(Func predicate, CancellationToken cancellationToken) - : TaskSubscriberBase(cancellationToken) - { - protected override void OnNextCore(TMessage message) - { - if (!predicate(message)) return; - - TrySetResult(message); // First - } - - protected override void OnErrorResumeCore(Exception error) - { - TrySetException(error); - } - } - internal sealed class FirstLastSingle(FirstLastSingleOperation operation, bool useDefaultIfEmpty, TMessage? defaultValue, Func predicate, CancellationToken cancellationToken) : TaskSubscriberBase(cancellationToken) { diff --git a/src/R3/Operators/OnErrorAsComplete.cs b/src/R3/Operators/OnErrorAsComplete.cs index 44df8a77..e9d2a511 100644 --- a/src/R3/Operators/OnErrorAsComplete.cs +++ b/src/R3/Operators/OnErrorAsComplete.cs @@ -2,12 +2,7 @@ { public static partial class EventExtensions { - public static CompletableEvent> OnErrorAsComplete(this Event source) - { - return new OnErrorAsComplete(source); - } - - public static CompletableEvent> OnErrorAsComplete(this CompletableEvent source) + public static Event> OnErrorAsComplete(this Event source) { return new OnErrorAsComplete(source); } @@ -16,28 +11,7 @@ public static CompletableEvent> OnErrorAsComplete(Event source) : CompletableEvent> - { - protected override IDisposable SubscribeCore(Subscriber> subscriber) - { - return source.Subscribe(new _OnErrorAsComplete(subscriber)); - } - - sealed class _OnErrorAsComplete(Subscriber> subscriber) : Subscriber - { - protected override void OnNextCore(TMessage message) - { - subscriber.OnNext(message); - } - - protected override void OnErrorResumeCore(Exception error) - { - subscriber.OnCompleted(Result.Failure(error)); - } - } - } - - internal class OnErrorAsComplete(CompletableEvent source) : CompletableEvent> + internal class OnErrorAsComplete(Event source) : Event> { protected override IDisposable SubscribeCore(Subscriber> subscriber) { diff --git a/src/R3/Operators/Select.cs b/src/R3/Operators/Select.cs index 24dcb2e6..898667b8 100644 --- a/src/R3/Operators/Select.cs +++ b/src/R3/Operators/Select.cs @@ -8,20 +8,16 @@ public static partial class EventExtensions // TODO: Select for TComplete - public static Event Select(this Event source, Func selector) - { - return new Select(source, selector); - } - public static CompletableEvent Select( - this CompletableEvent source, + public static Event Select( + this Event source, Func messageSelector) { - return new Select(source, messageSelector, static x => x); + return new Select(source, messageSelector, Stubs.ReturnSelf); } - public static CompletableEvent Select( - this CompletableEvent source, + public static Event Select( + this Event source, Func messageSelector, Func completeSelector) { @@ -32,32 +28,11 @@ public static CompletableEvent Select(Event source, Func selector) : Event - { - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - return source.Subscribe(new _Select(subscriber, selector)); - } - - class _Select(Subscriber subscriber, Func selector) : Subscriber - { - protected override void OnNextCore(TMessage message) - { - subscriber.OnNext(selector(message)); - } - - protected override void OnErrorResumeCore(Exception error) - { - subscriber.OnErrorResume(error); - } - } - } - internal sealed class Select( - CompletableEvent source, + Event source, Func messageSelector, Func completeSelector - ) : CompletableEvent + ) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { diff --git a/src/R3/Operators/Take.cs b/src/R3/Operators/Take.cs index 868a9053..a766e88f 100644 --- a/src/R3/Operators/Take.cs +++ b/src/R3/Operators/Take.cs @@ -2,22 +2,17 @@ { public static partial class EventExtensions { - public static CompletableEvent Take(this Event source, int count) - { - return new Take(source, count); - } - - public static CompletableEvent Take(this CompletableEvent source, int count) + public static Event Take(this Event source, int count) { return new Take(source, count, default, null); } - public static CompletableEvent Take(this CompletableEvent source, int count, TComplete interruptMessage) + public static Event Take(this Event source, int count, TComplete interruptMessage) { return new Take(source, count, interruptMessage, null); } - public static CompletableEvent Take(this CompletableEvent source, int count, Func interruptMessageFactory) + public static Event Take(this Event source, int count, Func interruptMessageFactory) { return new Take(source, count, default!, interruptMessageFactory); } @@ -26,38 +21,7 @@ public static CompletableEvent Take(th namespace R3.Operators { - internal sealed class Take(Event source, int count) : CompletableEvent - { - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - return source.Subscribe(new _Take(subscriber, count)); - } - - sealed class _Take(Subscriber subscriber, int count) : Subscriber, IDisposable - { - int remaining = count; - - protected override void OnNextCore(TMessage message) - { - if (remaining > 0) - { - remaining--; - subscriber.OnNext(message); - } - else - { - subscriber.OnCompleted(Unit.Default); - } - } - - protected override void OnErrorResumeCore(Exception error) - { - subscriber.OnErrorResume(error); - } - } - } - - internal sealed class Take(CompletableEvent source, int count, TComplete interruptMessage, Func? interruptMessageFactory) : CompletableEvent + internal sealed class Take(Event source, int count, TComplete interruptMessage, Func? interruptMessageFactory) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { diff --git a/src/R3/Operators/ToObservable.cs b/src/R3/Operators/ToObservable.cs index 1028a9ef..a03f6b6b 100644 --- a/src/R3/Operators/ToObservable.cs +++ b/src/R3/Operators/ToObservable.cs @@ -2,41 +2,78 @@ { public static partial class EventExtensions { - // TODO: more overload - public static IObservable ToObservable(this Event source) + // TODO: more overload? + public static IObservable ToObservable(this Event source) { return new ToObservable(source); } + + public static IObservable ToObservable(this Event> source) + { + return new ToObservableR(source); + } } } namespace R3.Operators { - internal sealed class ToObservable(Event source) : IObservable + internal sealed class ToObservable(Event source) : IObservable { public IDisposable Subscribe(IObserver observer) { - return source.Subscribe(new ObserverToSubscriber(observer)); + return source.Subscribe(new ObserverToSubscriber(observer)); + } + + sealed class ObserverToSubscriber(IObserver observer) : Subscriber + { + protected override void OnNextCore(TMessage message) + { + observer.OnNext(message); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnError(error); + } + + protected override void OnCompletedCore(Unit complete) + { + observer.OnCompleted(); + } } } - internal sealed class ObserverToSubscriber(IObserver observer) : Subscriber + internal sealed class ToObservableR(Event> source) : IObservable { - protected override void OnNextCore(TMessage message) + public IDisposable Subscribe(IObserver observer) { - observer.OnNext(message); + return source.Subscribe(new ObserverToSubscriber(observer)); } - protected override void OnErrorResumeCore(Exception error) + sealed class ObserverToSubscriber(IObserver observer) : Subscriber> { - try + protected override void OnNextCore(TMessage message) + { + observer.OnNext(message); + } + + protected override void OnErrorResumeCore(Exception error) { observer.OnError(error); } - finally + + protected override void OnCompletedCore(Result complete) { - Dispose(); + if (complete.IsFailure) + { + observer.OnError(complete.Exception); + } + else + { + observer.OnCompleted(); + } } } } + } diff --git a/src/R3/Operators/Where.cs b/src/R3/Operators/Where.cs index bb4dd472..5cf7cefc 100644 --- a/src/R3/Operators/Where.cs +++ b/src/R3/Operators/Where.cs @@ -2,29 +2,19 @@ { public static partial class EventExtensions { - public static Event Where(this Event source, Func predicate) + public static Event Where(this Event source, Func predicate) { - if (source is Where where) + if (source is Where where) { // Optimize for Where.Where, create combined predicate. var p = where.predicate; - return new Where(where.source, x => p(x) && predicate(x)); + return new Where(where.source, x => p(x) && predicate(x)); } - return new Where(source, predicate); - } - - public static CompletableEvent Where(this CompletableEvent source, Func predicate) - { return new Where(source, predicate); } - public static Event Where(this Event source, Func predicate) - { - return new WhereIndexed(source, predicate); - } - - public static CompletableEvent Where(this CompletableEvent source, Func predicate) + public static Event Where(this Event source, Func predicate) { return new WhereIndexed(source, predicate); } @@ -33,35 +23,11 @@ public static CompletableEvent Where(t namespace R3.Operators { - internal sealed class Where(Event source, Func predicate) : Event + internal sealed class Where(Event source, Func predicate) : Event { - internal Event source = source; + internal Event source = source; internal Func predicate = predicate; - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - return source.Subscribe(new _Where(subscriber, predicate)); - } - - class _Where(Subscriber subscriber, Func predicate) : Subscriber - { - protected override void OnNextCore(TMessage message) - { - if (predicate(message)) - { - subscriber.OnNext(message); - } - } - - protected override void OnErrorResumeCore(Exception error) - { - subscriber.OnErrorResume(error); - } - } - } - - internal sealed class Where(CompletableEvent source, Func predicate) : CompletableEvent - { protected override IDisposable SubscribeCore(Subscriber subscriber) { return source.Subscribe(new _Where(subscriber, predicate)); @@ -89,33 +55,7 @@ protected override void OnCompletedCore(TComplete complete) } } - internal sealed class WhereIndexed(Event source, Func predicate) : Event - { - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - return source.Subscribe(new _Where(subscriber, predicate)); - } - - class _Where(Subscriber subscriber, Func predicate) : Subscriber - { - int index = 0; - - protected override void OnNextCore(TMessage message) - { - if (predicate(message, index++)) - { - subscriber.OnNext(message); - } - } - - protected override void OnErrorResumeCore(Exception error) - { - subscriber.OnErrorResume(error); - } - } - } - - internal sealed class WhereIndexed(CompletableEvent source, Func predicate) : CompletableEvent + internal sealed class WhereIndexed(Event source, Func predicate) : Event { protected override IDisposable SubscribeCore(Subscriber subscriber) { diff --git a/src/R3/Operators/_Operators.cs b/src/R3/Operators/_Operators.cs index d2676612..bd6197e2 100644 --- a/src/R3/Operators/_Operators.cs +++ b/src/R3/Operators/_Operators.cs @@ -31,26 +31,9 @@ namespace R3.Operators { // TODO: now working - internal sealed class ElementAtAsync(Event source, int index, CancellationToken cancellationToken) - : TaskSubscriberBase(cancellationToken) - { - int count = 0; - - protected override void OnNextCore(TMessage message) - { - if (count++ == index) - { - TrySetResult(message); - } - } - - protected override void OnErrorResumeCore(Exception error) - { - TrySetException(error); - } - } + - internal sealed class ElementAtAsync(CompletableEvent source, int index, bool useDefaultValue, TMessage? defaultValue, CancellationToken cancellationToken) + internal sealed class ElementAtAsync(Event source, int index, bool useDefaultValue, TMessage? defaultValue, CancellationToken cancellationToken) : TaskSubscriberBase(cancellationToken) { int count = 0; @@ -77,7 +60,7 @@ protected override void OnCompletedCore(TComplete complete) } // Index.IsFromEnd - internal sealed class ElementAtFromEndAsync(CompletableEvent source, int fromEndIndex, bool useDefaultValue, TMessage? defaultValue, CancellationToken cancellationToken) + internal sealed class ElementAtFromEndAsync(Event source, int fromEndIndex, bool useDefaultValue, TMessage? defaultValue, CancellationToken cancellationToken) : TaskSubscriberBase(cancellationToken) { int count = 0; diff --git a/src/R3/Publisher.cs b/src/R3/Publisher.cs index 7268dac9..6db4dc61 100644 --- a/src/R3/Publisher.cs +++ b/src/R3/Publisher.cs @@ -1,110 +1,21 @@ -using R3.Internal; -using System.Runtime.CompilerServices; +using System.Runtime.CompilerServices; namespace R3; -public interface IEventPublisher -{ - void PublishOnNext(TMessage message); -} - -public interface ICompletableEventPublisher +public interface IEventPublisher { void PublishOnNext(TMessage message); void PublishOnCompleted(TComplete complete); } -public sealed class Publisher : Event, IEventPublisher, IDisposable -{ - FreeListCore<_Publisher> list; - - public Publisher() - { - list = new FreeListCore<_Publisher>(this); - } - - public void PublishOnNext(TMessage message) - { - if (list.IsDisposed) ThrowDisposed(); - - foreach (var subscriber in list.AsSpan()) - { - if (subscriber != null) - { - subscriber.OnNext(message); - } - } - } - - public void PublishOnErrorResume(Exception error) - { - if (list.IsDisposed) ThrowDisposed(); - - foreach (var subscriber in list.AsSpan()) - { - if (subscriber != null) - { - subscriber.OnErrorResume(error); - } - } - } - - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - var subscription = new _Publisher(this, subscriber); - subscription.removeKey = list.Add(subscription); // when disposed, may throw DisposedException in this line - return subscription; - } - - void Unsubscribe(_Publisher subscription) - { - list.Remove(subscription.removeKey); - } - - public void Dispose() - { - list.Dispose(); - } - - static void ThrowDisposed() - { - throw new ObjectDisposedException("Publisher"); - } - - sealed class _Publisher(Publisher? parent, Subscriber subscriber) : IDisposable - { - public int removeKey; - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void OnNext(TMessage message) - { - subscriber.OnNext(message); - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void OnErrorResume(Exception error) - { - subscriber.OnErrorResume(error); - } - - public void Dispose() - { - var p = Interlocked.Exchange(ref parent, null); - if (p == null) return; - - p.Unsubscribe(this); - } - } -} - -public sealed class CompletablePublisher : CompletableEvent, ICompletableEventPublisher, IDisposable +public sealed class Publisher : Event, IEventPublisher, IDisposable { int calledCompleted = 0; TComplete? completeValue; FreeListCore<_CompletablePublisher> list; readonly object completedLock = new object(); - public CompletablePublisher() + public Publisher() { list = new FreeListCore<_CompletablePublisher>(this); } @@ -192,7 +103,7 @@ static void ThrowDisposed() throw new ObjectDisposedException("CompletablePublisher"); } - sealed class _CompletablePublisher(CompletablePublisher? parent, Subscriber subscriber) : IDisposable + sealed class _CompletablePublisher(Publisher? parent, Subscriber subscriber) : IDisposable { public int removeKey; diff --git a/src/R3/PublisherExtensions.cs b/src/R3/PublisherExtensions.cs index 17d97202..d45d1c9a 100644 --- a/src/R3/PublisherExtensions.cs +++ b/src/R3/PublisherExtensions.cs @@ -2,7 +2,7 @@ public static class PublisherExtensions { - public static void PublishOnCompleted(this CompletablePublisher publisher) + public static void PublishOnCompleted(this Publisher publisher) { publisher.PublishOnCompleted(default); } diff --git a/src/R3/ReactiveProperty.cs b/src/R3/ReactiveProperty.cs index e42163d8..61ac1006 100644 --- a/src/R3/ReactiveProperty.cs +++ b/src/R3/ReactiveProperty.cs @@ -3,14 +3,12 @@ namespace R3; -// TODO: call OnCompleted on Dispose. - -public abstract class ReadOnlyReactiveProperty : Event +public abstract class ReadOnlyReactiveProperty : Event { public abstract T CurrentValue { get; } } -// allow inherit? +// allow inherit public class ReactiveProperty : ReadOnlyReactiveProperty, IDisposable { T value; @@ -62,7 +60,7 @@ public void PublishOnErrorResume(Exception error) } } - protected override IDisposable SubscribeCore(Subscriber subscriber) + protected override IDisposable SubscribeCore(Subscriber subscriber) { var value = this.value; ObjectDisposedException.ThrowIf(list.IsDisposed, this); @@ -82,6 +80,7 @@ void Unsubscribe(Subscription subscription) public void Dispose() { + // TODO: call OnCompleted on Dispose. list.Dispose(); } @@ -90,7 +89,7 @@ public void Dispose() return (value == null) ? "(null)" : value.ToString(); } - sealed class Subscription(ReactiveProperty? parent, Subscriber subscriber) : IDisposable + sealed class Subscription(ReactiveProperty? parent, Subscriber subscriber) : IDisposable { public int removeKey; diff --git a/src/R3/SubscribeExtensions.cs b/src/R3/SubscribeExtensions.cs deleted file mode 100644 index 24abb5e5..00000000 --- a/src/R3/SubscribeExtensions.cs +++ /dev/null @@ -1,98 +0,0 @@ -using System.Diagnostics; - -namespace R3; - -public static class SubscribeExtensions -{ - [DebuggerStepThrough] - public static IDisposable Subscribe(this Event source) - { - return source.Subscribe(NopSubscriber.Instance); - } - - [DebuggerStepThrough] - public static IDisposable Subscribe(this Event source, Action onNext) - { - return source.Subscribe(new AnonymousSubscriber(onNext, EventSystem.GetUnhandledExceptionHandler())); - } - - [DebuggerStepThrough] - public static IDisposable Subscribe(this Event source, Action onNext, Action onErrorResume) - { - return source.Subscribe(new AnonymousSubscriber(onNext, onErrorResume)); - } - - // CompletableEvent must handle onComplete. - - [DebuggerStepThrough] - public static IDisposable Subscribe(this CompletableEvent source, Action onNext, Action onComplete) - { - return source.Subscribe(new AnonymousSubscriber(onNext, EventSystem.GetUnhandledExceptionHandler(), onComplete)); - } - - [DebuggerStepThrough] - public static IDisposable Subscribe(this CompletableEvent source, Action onNext, Action onErrorResume, Action onComplete) - { - return source.Subscribe(new AnonymousSubscriber(onNext, onErrorResume, onComplete)); - } -} - -[DebuggerStepThrough] -internal sealed class NopSubscriber : Subscriber -{ - public static readonly NopSubscriber Instance = new(); - - private NopSubscriber() - { - } - - [DebuggerStepThrough] - protected override void OnNextCore(TMessage message) - { - - } - - [DebuggerStepThrough] - protected override void OnErrorResumeCore(Exception error) - { - EventSystem.GetUnhandledExceptionHandler().Invoke(error); - } -} - -[DebuggerStepThrough] -internal sealed class AnonymousSubscriber(Action onNext, Action onErrorResume) : Subscriber -{ - [DebuggerStepThrough] - protected override void OnNextCore(TMessage message) - { - onNext(message); - } - - [DebuggerStepThrough] - protected override void OnErrorResumeCore(Exception error) - { - onErrorResume(error); - } -} - -[DebuggerStepThrough] -internal sealed class AnonymousSubscriber(Action onNext, Action onErrorResume, Action onComplete) : Subscriber -{ - [DebuggerStepThrough] - protected override void OnNextCore(TMessage message) - { - onNext(message); - } - - [DebuggerStepThrough] - protected override void OnErrorResumeCore(Exception error) - { - onErrorResume(error); - } - - [DebuggerStepThrough] - protected override void OnCompletedCore(TComplete complete) - { - onComplete(complete); - } -} diff --git a/src/R3/SubscriberExtensions.cs b/src/R3/SubscriberExtensions.cs index 6fdfa983..25dd1e5e 100644 --- a/src/R3/SubscriberExtensions.cs +++ b/src/R3/SubscriberExtensions.cs @@ -1,92 +1,66 @@ -namespace R3; - -public static class SubscriberExtensions -{ - public static void OnCompleted(this Subscriber subscriber) - { - subscriber.OnCompleted(default); - } - - public static IObserver ToObserver(this Subscriber subscriber) - { - return new R3Observer(subscriber); - } - - public static IObserver ToObserver(this Subscriber subscriber) - { - return new R3Observer2(subscriber); - } - - public static IObserver ToObserver(this Subscriber> subscriber) - { - return new R3Observer3(subscriber); - } -} - -internal sealed class R3Observer(Subscriber subscriber) : IObserver +namespace R3 { - public void OnNext(T value) + public static class SubscriberExtensions { - subscriber.OnNext(value); - } - - public void OnError(Exception error) - { - try + public static void OnCompleted(this Subscriber subscriber) { - subscriber.OnErrorResume(error); + subscriber.OnCompleted(default); } - finally + + public static IObserver ToObserver(this Subscriber subscriber) { - subscriber.Dispose(); + return new SubscriberToObserver(subscriber); } - } - public void OnCompleted() - { - subscriber.Dispose(); + public static IObserver ToObserver(this Subscriber> subscriber) + { + return new SubscriberToObserverR(subscriber); + } } } -internal sealed class R3Observer2(Subscriber subscriber) : IObserver +namespace R3.Operators { - public void OnNext(T value) - { - subscriber.OnNext(value); - } - - public void OnError(Exception error) + internal sealed class SubscriberToObserver(Subscriber subscriber) : IObserver { - try + public void OnNext(T value) { - subscriber.OnErrorResume(error); + subscriber.OnNext(value); } - finally + + public void OnError(Exception error) { - subscriber.Dispose(); + try + { + subscriber.OnErrorResume(error); + } + finally + { + subscriber.Dispose(); + } } - } - public void OnCompleted() - { - subscriber.OnCompleted(default); + public void OnCompleted() + { + subscriber.OnCompleted(default); + } } -} -internal sealed class R3Observer3(Subscriber> subscriber) : IObserver -{ - public void OnNext(T value) + internal sealed class SubscriberToObserverR(Subscriber> subscriber) : IObserver { - subscriber.OnNext(value); - } + public void OnNext(T value) + { + subscriber.OnNext(value); + } - public void OnError(Exception error) - { - subscriber.OnCompleted(Result.Failure(error)); - } + public void OnError(Exception error) + { + subscriber.OnCompleted(Result.Failure(error)); + } - public void OnCompleted() - { - subscriber.OnCompleted(Result.Success(Unit.Default)); + public void OnCompleted() + { + subscriber.OnCompleted(Result.Success(Unit.Default)); + } } } diff --git a/tests/R3.Tests/FactoryTests/NeverTest.cs b/tests/R3.Tests/FactoryTests/NeverTest.cs index 480ae920..a348ab83 100644 --- a/tests/R3.Tests/FactoryTests/NeverTest.cs +++ b/tests/R3.Tests/FactoryTests/NeverTest.cs @@ -5,7 +5,7 @@ public class NeverTest [Fact] public void Never() { - using var list = Event.Never().ToLiveList(); + using var list = Event.Never().ToLiveList(); list.AssertEqual([]); } @@ -13,7 +13,7 @@ public void Never() [Fact] public void NeverComplete() { - using var list = Event.NeverComplete().ToLiveList(); + using var list = Event.Never().ToLiveList(); list.AssertIsNotCompleted(); } } diff --git a/tests/R3.Tests/LiveListTest.cs b/tests/R3.Tests/LiveListTest.cs index 16363cc2..fb9f5849 100644 --- a/tests/R3.Tests/LiveListTest.cs +++ b/tests/R3.Tests/LiveListTest.cs @@ -5,7 +5,7 @@ public class LiveListTest [Fact] public void FromEvent() { - var publisher = new Publisher(); + var publisher = new Publisher(); var list = publisher.ToLiveList(); list.AssertEqual([]); @@ -28,7 +28,7 @@ public void FromEvent() [Fact] public void BufferSize() { - var publisher = new Publisher(); + var publisher = new Publisher(); var list = publisher.ToLiveList(bufferSize: 5); publisher.PublishOnNext(10); diff --git a/tests/R3.Tests/OperatorTests/AggregateTest.cs b/tests/R3.Tests/OperatorTests/AggregateTest.cs index 5e0506c5..e1d5c0a2 100644 --- a/tests/R3.Tests/OperatorTests/AggregateTest.cs +++ b/tests/R3.Tests/OperatorTests/AggregateTest.cs @@ -7,7 +7,7 @@ public class AggregateTest [Fact] public async Task Aggreagte() { - var publisher = new CompletablePublisher(); + var publisher = new Publisher(); var listTask = publisher.AggregateAsync(new List(), (x, i) => { x.Add(i); return x; }, (x, _) => x); @@ -38,7 +38,7 @@ public async Task BeforeCanceled() var cts = new CancellationTokenSource(); cts.Cancel(); - var publisher = new CompletablePublisher(); + var publisher = new Publisher(); var isDisposed = false; var listTask = publisher @@ -207,7 +207,7 @@ public async Task WaitAsync() var source = new int[] { 1, 10, 1, 3, 4, 6, 7, 4 }.ToEvent(); await source.WaitAsync(); - var p = new CompletablePublisher(); + var p = new Publisher(); var task = p.WaitAsync(); p.PublishOnNext(10); diff --git a/tests/R3.Tests/OperatorTests/FirstLastSingleTest.cs b/tests/R3.Tests/OperatorTests/FirstLastSingleTest.cs index fe1af190..d7ba2ec9 100644 --- a/tests/R3.Tests/OperatorTests/FirstLastSingleTest.cs +++ b/tests/R3.Tests/OperatorTests/FirstLastSingleTest.cs @@ -9,39 +9,11 @@ public class FirstLastSingleTest // Event First // Completable First/FirstOrDefault/Last/LastOrDefault - [Fact] - public async Task First() - { - var publisher = new Publisher(); - var task = publisher.FirstAsync(); - publisher.PublishOnNext(10); - (await task).Should().Be(10); - - var task2 = publisher.FirstAsync(); - publisher.PublishOnNext(15); - publisher.PublishOnNext(25); - - (await task2).Should().Be(15); - - var cts = new CancellationTokenSource(); - var task3 = publisher.FirstAsync(cts.Token); - cts.Cancel(); - - await Assert.ThrowsAsync(async () => await task3); - - var task4 = publisher.FirstAsync(x => x % 3 == 0); - publisher.PublishOnNext(5); - task4.Status.Should().NotBe(TaskStatus.RanToCompletion); - - publisher.PublishOnNext(99); - (await task4).Should().Be(99); - } - // CompletablePublisher First Test [Fact] public async Task First2() { - var publisher = new CompletablePublisher(); + var publisher = new Publisher(); var task = publisher.FirstAsync(); publisher.PublishOnNext(10); (await task).Should().Be(10); @@ -57,7 +29,7 @@ public async Task First2() publisher.PublishOnCompleted(default); await Assert.ThrowsAsync(async () => await task3); - publisher = new CompletablePublisher(); + publisher = new Publisher(); var task4 = publisher.FirstAsync(x => x % 3 == 0); publisher.PublishOnNext(5); task4.Status.Should().NotBe(TaskStatus.RanToCompletion); @@ -69,7 +41,7 @@ public async Task First2() [Fact] public async Task FirstOrDefault() { - var publisher = new CompletablePublisher(); + var publisher = new Publisher(); var task = publisher.FirstOrDefaultAsync(); publisher.PublishOnNext(10); (await task).Should().Be(10); @@ -85,7 +57,7 @@ public async Task FirstOrDefault() publisher.PublishOnCompleted(default); (await task3).Should().Be(9999); - publisher = new CompletablePublisher(); + publisher = new Publisher(); var task4 = publisher.FirstOrDefaultAsync(x => x % 3 == 0); publisher.PublishOnNext(5); task4.Status.Should().NotBe(TaskStatus.RanToCompletion); @@ -97,13 +69,13 @@ public async Task FirstOrDefault() [Fact] public async Task LastAsync() { - var publisher = new CompletablePublisher(); + var publisher = new Publisher(); var task = publisher.LastAsync(); publisher.PublishOnNext(10); publisher.PublishOnCompleted(default); (await task).Should().Be(10); - publisher = new CompletablePublisher(); + publisher = new Publisher(); var task2 = publisher.LastAsync(); publisher.PublishOnNext(15); publisher.PublishOnNext(25); @@ -111,13 +83,13 @@ public async Task LastAsync() (await task2).Should().Be(25); - publisher = new CompletablePublisher(); + publisher = new Publisher(); var task3 = publisher.LastAsync(); publisher.PublishOnCompleted(default); await Assert.ThrowsAsync(async () => await task3); - publisher = new CompletablePublisher(); + publisher = new Publisher(); var task4 = publisher.LastAsync(x => x % 3 == 0); publisher.PublishOnNext(5); task4.Status.Should().NotBe(TaskStatus.RanToCompletion); @@ -131,13 +103,13 @@ public async Task LastAsync() [Fact] public async Task LastOrDefaultAsync() { - var publisher = new CompletablePublisher(); + var publisher = new Publisher(); var task = publisher.LastOrDefaultAsync(); publisher.PublishOnNext(10); publisher.PublishOnCompleted(default); (await task).Should().Be(10); - publisher = new CompletablePublisher(); + publisher = new Publisher(); var task2 = publisher.LastOrDefaultAsync(); publisher.PublishOnNext(15); publisher.PublishOnNext(25); @@ -145,13 +117,13 @@ public async Task LastOrDefaultAsync() (await task2).Should().Be(25); - publisher = new CompletablePublisher(); + publisher = new Publisher(); var task3 = publisher.LastOrDefaultAsync(9999); publisher.PublishOnCompleted(default); (await task3).Should().Be(9999); - publisher = new CompletablePublisher(); + publisher = new Publisher(); var task4 = publisher.LastOrDefaultAsync(x => x % 3 == 0); publisher.PublishOnNext(5); task4.Status.Should().NotBe(TaskStatus.RanToCompletion); @@ -165,25 +137,25 @@ public async Task LastOrDefaultAsync() [Fact] public async Task Single() { - var publisher = new CompletablePublisher(); + var publisher = new Publisher(); var task = publisher.SingleAsync(); publisher.PublishOnNext(10); publisher.PublishOnCompleted(); (await task).Should().Be(10); - publisher = new CompletablePublisher(); + publisher = new Publisher(); var task2 = publisher.SingleAsync(); publisher.PublishOnNext(15); publisher.PublishOnNext(25); await Assert.ThrowsAsync(async () => await task2); - publisher = new CompletablePublisher(); + publisher = new Publisher(); var task3 = publisher.SingleAsync(); publisher.PublishOnCompleted(default); await Assert.ThrowsAsync(async () => await task3); - publisher = new CompletablePublisher(); + publisher = new Publisher(); var task4 = publisher.SingleAsync(x => x % 3 == 0); publisher.PublishOnNext(5); task4.Status.Should().NotBe(TaskStatus.RanToCompletion); @@ -196,25 +168,25 @@ public async Task Single() [Fact] public async Task SingleOrDefault() { - var publisher = new CompletablePublisher(); + var publisher = new Publisher(); var task = publisher.SingleOrDefaultAsync(9999); publisher.PublishOnNext(10); publisher.PublishOnCompleted(); (await task).Should().Be(10); - publisher = new CompletablePublisher(); + publisher = new Publisher(); var task2 = publisher.SingleOrDefaultAsync(9999); publisher.PublishOnNext(15); publisher.PublishOnNext(25); await Assert.ThrowsAsync(async () => await task2); - publisher = new CompletablePublisher(); + publisher = new Publisher(); var task3 = publisher.SingleOrDefaultAsync(9999); publisher.PublishOnCompleted(default); (await task3).Should().Be(9999); - publisher = new CompletablePublisher(); + publisher = new Publisher(); var task4 = publisher.SingleOrDefaultAsync(x => x % 3 == 0); publisher.PublishOnNext(5); task4.Status.Should().NotBe(TaskStatus.RanToCompletion); @@ -227,7 +199,7 @@ public async Task SingleOrDefault() [Fact] public async Task ErrorStream() { - var publisher = new CompletablePublisher>(); + var publisher = new Publisher>(); var task = publisher.LastAsync(); publisher.PublishOnNext(10); diff --git a/tests/R3.Tests/OperatorTests/ToListTest.cs b/tests/R3.Tests/OperatorTests/ToListTest.cs index 981bacc9..4fe97194 100644 --- a/tests/R3.Tests/OperatorTests/ToListTest.cs +++ b/tests/R3.Tests/OperatorTests/ToListTest.cs @@ -5,7 +5,7 @@ public class ToListTest [Fact] public async Task ToList() { - var publisher = new CompletablePublisher(); + var publisher = new Publisher(); var listTask = publisher.ToListAsync(); @@ -25,7 +25,7 @@ public async Task ToList() [Fact] public async Task ResultCompletableFault() { - var publisher = new CompletablePublisher>(); + var publisher = new Publisher>(); var listTask = publisher.ToListAsync(); @@ -48,7 +48,7 @@ public async Task ResultCompletableCancel() var cts = new CancellationTokenSource(); var isDisposed = false; - var publisher = new CompletablePublisher>(); + var publisher = new Publisher>(); var listTask = publisher.DoOnDisposed(() => isDisposed = true).ToListAsync(cts.Token); @@ -71,7 +71,7 @@ public async Task ResultCompletableCancel() [Fact] public async Task ToArray() { - var publisher = new CompletablePublisher(); + var publisher = new Publisher(); var listTask = publisher.ToArrayAsync(); @@ -91,7 +91,7 @@ public async Task ToArray() [Fact] public async Task ToArray2() { - var publisher = new CompletablePublisher>(); + var publisher = new Publisher>(); var listTask = publisher.ToArrayAsync(); diff --git a/tests/R3.Tests/OperatorTests/WhereTest.cs b/tests/R3.Tests/OperatorTests/WhereTest.cs index e69c24e6..73ed5050 100644 --- a/tests/R3.Tests/OperatorTests/WhereTest.cs +++ b/tests/R3.Tests/OperatorTests/WhereTest.cs @@ -2,31 +2,11 @@ public class WhereTest(ITestOutputHelper output) { - [Fact] - public void Where() - { - var p = new Publisher(); - - using var list = p.Where(x => x % 2 != 0).ToLiveList(); - - p.PublishOnNext(2); - list.AssertEqual([]); - - p.PublishOnNext(1); - list.AssertEqual([1]); - - p.PublishOnNext(3); - list.AssertEqual([1, 3]); - - p.PublishOnNext(30); - list.AssertEqual([1, 3]); - } - // test WhereWhere optimize [Fact] public void WhereWhere() { - var p = new Publisher(); + var p = new Publisher(); using var list = p.Where(x => x % 2 != 0).Where(x => x % 3 != 0).ToLiveList(); @@ -49,38 +29,12 @@ public void WhereWhere() list.AssertEqual([1, 5, 7]); } - //test where indexed - [Fact] - public void WhereIndexed() - { - var p = new Publisher(); - - using var list = p.Where((x, i) => i % 2 != 0).ToLiveList(); - - p.PublishOnNext(2); - list.AssertEqual([]); - - p.PublishOnNext(1); - list.AssertEqual([1]); - - p.PublishOnNext(3); - list.AssertEqual([1]); - - p.PublishOnNext(5); - list.AssertEqual([1, 5]); - - p.PublishOnNext(6); - list.AssertEqual([1, 5]); - - p.PublishOnNext(8); - list.AssertEqual([1, 5, 8]); - } // test where completable [Fact] public void WhereCompletable() { - var p = new CompletablePublisher(); + var p = new Publisher(); using var list = p.Where(x => x % 2 != 0).ToLiveList(); @@ -108,7 +62,7 @@ public void WhereCompletable() [Fact] public void WhereCompletableIndexed() { - var p = new CompletablePublisher(); + var p = new Publisher(); using var list = p.Where((x, i) => i % 2 != 0).ToLiveList(); diff --git a/tests/R3.Tests/_TestHelper.cs b/tests/R3.Tests/_TestHelper.cs index 95acc450..a8f73a8d 100644 --- a/tests/R3.Tests/_TestHelper.cs +++ b/tests/R3.Tests/_TestHelper.cs @@ -5,11 +5,6 @@ namespace R3.Tests; public static class _TestHelper { - public static void AssertEqual(this LiveList list, params T[] expected) - { - list.Should().Equal(expected); - } - public static void AssertEqual(this LiveList list, params T[] expected) { list.Should().Equal(expected);