diff --git a/README.md b/README.md index cda4cdb2..10ea5e18 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,35 @@ # R3 Third Generation of Reactive Extensions. + +```csharp +public abstract class Event +{ + public IDisposable Subscribe(Subscriber subscriber); +} + +public abstract class Subscriber : IDisposable +{ + public void OnNext(TMessage message); + public void OnErrorResume(Exception error); +} + +// Completable +public abstract class CompletableEvent +{ + public IDisposable Subscribe(Subscriber subscriber) +} + +public abstract class Subscriber : IDisposable +{ + public void OnNext(TMessage message); + public void OnErrorResume(Exception error); + public void OnCompleted(TComplete complete); +} +``` + +```csharp +// similar as IObserver +CompletableEvent> +``` + diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index 5e1f2bd6..9b75e321 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -24,19 +24,41 @@ EventSystem.Logger = factory.CreateLogger(); var logger = factory.CreateLogger(); + +EventSystem.SetUnhandledExceptionHandler(e => +{ + logger.ZLogError($"{e}"); +}); + + + var publisher = new Publisher(); var d = publisher .Where(x => true) - .Select(x => x) + .Select(x => + { + //if (x == 2) throw new Exception("e"); + return x; + }) + .Take(5) + .OnErrorAsComplete() .Subscribe(x => { + if (x == 2) throw new Exception("e"); logger.ZLogInformation($"OnNext: {x}"); + }, e => + { + logger.ZLogInformation($"failure resume"); + }, + x => + { + logger.ZLogInformation($"end:{x}"); }); SubscriptionTracker.ForEachActiveTask(x => { - logger.ZLogInformation($"{x.TrackingId,3}: {Environment.NewLine}{x.StackTrace.Replace("R2.", "").Replace("C:\\MyGit\\R2\\sandbox\\ConsoleApp1\\", "").Replace("C:\\MyGit\\R2\\src\\R2\\", "")}"); + // logger.ZLogInformation($"{x.TrackingId,3}: {Environment.NewLine}{x.StackTrace.Replace("R2.", "").Replace("C:\\MyGit\\R2\\sandbox\\ConsoleApp1\\", "").Replace("C:\\MyGit\\R2\\src\\R2\\", "")}"); // logger.ZLogInformation($"{x.TrackingId,3}: {x.FormattedType}"); @@ -46,14 +68,15 @@ publisher.PublishOnNext(2); publisher.PublishOnNext(3); -d.Dispose(); - +publisher.PublishOnErrorResume(new Exception("ERROR")); +publisher.PublishOnNext(4); +publisher.PublishOnNext(5); +publisher.PublishOnNext(6); +publisher.PublishOnNext(7); -Observable.Range(1, 10, Scheduler.CurrentThread) - .Take(3) - .Subscribe(); +d.Dispose(); @@ -63,6 +86,7 @@ + // Observable.Throw( // s.Where( @@ -154,3 +178,4 @@ public static IDisposable WriteLine(this CompletableEvent source) } } + diff --git a/src/R3/CancellationDisposable.cs b/src/R3/CancellationDisposable.cs new file mode 100644 index 00000000..25328b24 --- /dev/null +++ b/src/R3/CancellationDisposable.cs @@ -0,0 +1,15 @@ +namespace R3; + +public sealed class CancellationDisposable(CancellationTokenSource cancellationTokenSource) : IDisposable +{ + public CancellationDisposable() + : this(new CancellationTokenSource()) + { + } + + public CancellationToken Token => cancellationTokenSource.Token; + + public bool IsDisposed => cancellationTokenSource.IsCancellationRequested; + + public void Dispose() => cancellationTokenSource.Cancel(); +} diff --git a/src/R3/Event.cs b/src/R3/Event.cs index 316bf1c3..c74ae015 100644 --- a/src/R3/Event.cs +++ b/src/R3/Event.cs @@ -56,21 +56,20 @@ public void OnNext(TMessage message) } catch (Exception ex) { - OnError(ex); + OnErrorResume(ex); } } - public abstract void OnNextCore(TMessage message); + protected abstract void OnNextCore(TMessage message); [StackTraceHidden, DebuggerStepThrough] - public void OnError(Exception error) + public void OnErrorResume(Exception error) { if (IsDisposed) return; - OnErrorCore(error); + OnErrorResumeCore(error); } - [StackTraceHidden, DebuggerStepThrough] - public virtual void OnErrorCore(Exception error) { } + protected abstract void OnErrorResumeCore(Exception error); [StackTraceHidden, DebuggerStepThrough] public void Dispose() @@ -141,22 +140,21 @@ public void OnNext(TMessage message) } catch (Exception ex) { - OnError(ex); + OnErrorResume(ex); } } - public abstract void OnNextCore(TMessage message); + protected abstract void OnNextCore(TMessage message); [StackTraceHidden, DebuggerStepThrough] - public void OnError(Exception error) + public void OnErrorResume(Exception error) { if (IsDisposed || IsCalledCompleted) return; - OnErrorCore(error); + OnErrorResumeCore(error); } - [StackTraceHidden, DebuggerStepThrough] - public virtual void OnErrorCore(Exception error) { } + protected abstract void OnErrorResumeCore(Exception error); [StackTraceHidden, DebuggerStepThrough] public void OnCompleted(TComplete complete) diff --git a/src/R3/EventSystem.cs b/src/R3/EventSystem.cs index 1fb369c9..353d3f90 100644 --- a/src/R3/EventSystem.cs +++ b/src/R3/EventSystem.cs @@ -9,10 +9,17 @@ public class EventSystem { public static ILogger Logger { get; set; } = NullLogger.Instance; - public static Action UnhandledException { get; set; } = Throw; + static Action unhandledException = Throw; - EventSystem() + // Prevent +=, use Set and Get method. + public static void SetUnhandledExceptionHandler(Action unhandledExceptionHandler) { + unhandledException = unhandledExceptionHandler; + } + + public static Action GetUnhandledExceptionHandler() + { + return unhandledException; } static void Throw(Exception exception) diff --git a/src/R3/Factories/ToCompletableEvent.cs b/src/R3/Factories/ToCompletableEvent.cs new file mode 100644 index 00000000..7571f917 --- /dev/null +++ b/src/R3/Factories/ToCompletableEvent.cs @@ -0,0 +1,40 @@ +namespace R3 +{ + public static partial class EventFactory + { + public static CompletableEvent> ToCompletableEvent(this Task task) + { + return new R3.Factories.ToCompletableEvent(task); + } + } +} + +namespace R3.Factories +{ + internal sealed class ToCompletableEvent(Task task) : CompletableEvent> + { + protected override IDisposable SubscribeCore(Subscriber> subscriber) + { + var subscription = new CancellationDisposable(); + SubscribeTask(subscriber, subscription.Token); + return subscription; + } + + async void SubscribeTask(Subscriber> subscriber, CancellationToken cancellationToken) + { + TMessage? result; + try + { + result = await task.WaitAsync(cancellationToken); + } + catch (Exception ex) + { + subscriber.OnCompleted(Result.Failure(ex)); + return; + } + + subscriber.OnNext(result); + subscriber.OnCompleted(Result.Success(default)); + } + } +} diff --git a/src/R3/Factories/_EventFactory.cs b/src/R3/Factories/_EventFactory.cs index 6afbd37d..71fbf01e 100644 --- a/src/R3/Factories/_EventFactory.cs +++ b/src/R3/Factories/_EventFactory.cs @@ -1,4 +1,7 @@ -namespace R3 + +using System.Diagnostics.CodeAnalysis; + +namespace R3 { public static partial class EventFactory { @@ -27,11 +30,8 @@ public static partial class EventFactory // AsUnitComplete // AsNeverComplete - // Repeat - public static CompletableEvent Repeat(TMessage value) - { - return new ImmediateScheduleReturn(value, default); // immediate - } + + } } diff --git a/src/R3/Operators/CombineLatest.cs b/src/R3/Operators/CombineLatest.cs index 9dfb95aa..f3d4fa67 100644 --- a/src/R3/Operators/CombineLatest.cs +++ b/src/R3/Operators/CombineLatest.cs @@ -81,6 +81,11 @@ public void OnNext(TRight message) } } + public void OnError(Exception error) + { + subscriber.OnErrorResume(error); + } + void Publish(TLeft m1, TRight m2) { var result = selector(m1, m2); @@ -90,18 +95,28 @@ void Publish(TLeft m1, TRight m2) sealed class LeftSubscriber(_CombineLatest parent) : Subscriber { - public override void OnNextCore(TLeft message) + protected override void OnNextCore(TLeft message) { parent.OnNext(message); } + + protected override void OnErrorResumeCore(Exception error) + { + parent.OnError(error); + } } sealed class RightSubscriber(_CombineLatest parent) : Subscriber { - public override void OnNextCore(TRight message) + protected override void OnNextCore(TRight message) { parent.OnNext(message); } + + protected override void OnErrorResumeCore(Exception error) + { + parent.OnError(error); + } } } } diff --git a/src/R3/Operators/CountAsync.cs b/src/R3/Operators/CountAsync.cs index fd4a7fe0..d07a7405 100644 --- a/src/R3/Operators/CountAsync.cs +++ b/src/R3/Operators/CountAsync.cs @@ -1,4 +1,5 @@ -namespace R3 + +namespace R3 { public static partial class EventExtensions { @@ -36,7 +37,7 @@ internal sealed class CountAsync(TaskCompletionSource { int count; - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { checked { @@ -44,6 +45,12 @@ public override void OnNextCore(TMessage message) } } + protected override void OnErrorResumeCore(Exception error) + { + tcs.TrySetException(error); + this.Dispose(); // stop subscription. + } + protected override void OnCompletedCore(TComplete complete) { tcs.TrySetResult(count); @@ -54,13 +61,19 @@ internal sealed class CountUAsync(TaskCompletionSource { int count; - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { checked { count++; } } + + protected override void OnErrorResumeCore(Exception error) + { + tcs.TrySetException(error); + this.Dispose(); // stop subscription. + } protected override void OnCompletedCore(Result complete) { diff --git a/src/R3/Operators/Delay.cs b/src/R3/Operators/Delay.cs index 22359c5c..d8afa84c 100644 --- a/src/R3/Operators/Delay.cs +++ b/src/R3/Operators/Delay.cs @@ -48,7 +48,7 @@ public _Delay(Subscriber subscriber, TimeSpan dueTime, TimeProvider ti this.timer = timeProvider.CreateStoppedTimer(timerCallback, this); } - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { var timestamp = timeProvider.GetTimestamp(); lock (queue) @@ -68,6 +68,12 @@ public override void OnNextCore(TMessage message) } } + protected override void OnErrorResumeCore(Exception error) + { + // TODO: what should we do? + throw new NotImplementedException(); + } + static void DrainMessages(object? state) { var self = (_Delay)state!; diff --git a/src/R3/Operators/DelayFrame.cs b/src/R3/Operators/DelayFrame.cs index a314a517..e28f79ba 100644 --- a/src/R3/Operators/DelayFrame.cs +++ b/src/R3/Operators/DelayFrame.cs @@ -40,7 +40,7 @@ public _DelayFrame(Subscriber subscriber, int delayFrameCount, FramePr this.frameProvider = frameProvider; } - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { var currentCount = frameProvider.GetFrameCount(); lock (queue) @@ -61,6 +61,12 @@ public override void OnNextCore(TMessage message) } } + protected override void OnErrorResumeCore(Exception error) + { + // TODO:not yet + throw new NotImplementedException(); + } + public bool MoveNext(long framecount) { if (stopRunner) diff --git a/src/R3/Operators/DoOnCompleted.cs b/src/R3/Operators/DoOnCompleted.cs index 0d75dda8..f85c7054 100644 --- a/src/R3/Operators/DoOnCompleted.cs +++ b/src/R3/Operators/DoOnCompleted.cs @@ -26,11 +26,16 @@ class _DoOnCompleted(Subscriber subscriber, Action action) { Action? action = action; - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { subscriber.OnNext(message); } + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } + protected override void OnCompletedCore(TComplete complete) { Interlocked.Exchange(ref action, null)?.Invoke(); diff --git a/src/R3/Operators/DoOnDisposed.cs b/src/R3/Operators/DoOnDisposed.cs index b87f8fa6..99428669 100644 --- a/src/R3/Operators/DoOnDisposed.cs +++ b/src/R3/Operators/DoOnDisposed.cs @@ -42,11 +42,16 @@ class _DoOnDisposed(Subscriber subscriber, Action action) : Subscriber { Action? action = action; - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { subscriber.OnNext(message); } + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } + protected override void DisposeCore() { Interlocked.Exchange(ref action, null)?.Invoke(); base.DisposeCore(); @@ -68,11 +73,16 @@ class _DoOnDisposed(Subscriber subscriber, Action action, TSta Action? action = action; TState state = state; - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { subscriber.OnNext(message); } + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } + protected override void DisposeCore() { Interlocked.Exchange(ref action, null)?.Invoke(state); @@ -94,11 +104,16 @@ class _DoOnDisposed(Subscriber subscriber, Action action) : { Action? action = action; - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { subscriber.OnNext(message); } + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } + protected override void OnCompletedCore(TComplete complete) { subscriber.OnCompleted(complete); @@ -125,11 +140,16 @@ class _DoOnDisposed(Subscriber subscriber, Action a Action? action = action; TState state = state; - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { subscriber.OnNext(message); } + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } + protected override void OnCompletedCore(TComplete complete) { subscriber.OnCompleted(complete); diff --git a/src/R3/Operators/OnErrorAsComplete.cs b/src/R3/Operators/OnErrorAsComplete.cs new file mode 100644 index 00000000..44df8a77 --- /dev/null +++ b/src/R3/Operators/OnErrorAsComplete.cs @@ -0,0 +1,65 @@ +namespace R3 +{ + public static partial class EventExtensions + { + public static CompletableEvent> OnErrorAsComplete(this Event source) + { + return new OnErrorAsComplete(source); + } + + public static CompletableEvent> OnErrorAsComplete(this CompletableEvent source) + { + return new OnErrorAsComplete(source); + } + } +} + +namespace R3.Operators +{ + internal class OnErrorAsComplete(Event source) : CompletableEvent> + { + protected override IDisposable SubscribeCore(Subscriber> subscriber) + { + return source.Subscribe(new _OnErrorAsComplete(subscriber)); + } + + sealed class _OnErrorAsComplete(Subscriber> subscriber) : Subscriber + { + protected override void OnNextCore(TMessage message) + { + subscriber.OnNext(message); + } + + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnCompleted(Result.Failure(error)); + } + } + } + + internal class OnErrorAsComplete(CompletableEvent source) : CompletableEvent> + { + protected override IDisposable SubscribeCore(Subscriber> subscriber) + { + return source.Subscribe(new _OnErrorAsComplete(subscriber)); + } + + sealed class _OnErrorAsComplete(Subscriber> subscriber) : Subscriber + { + protected override void OnNextCore(TMessage message) + { + subscriber.OnNext(message); + } + + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnCompleted(Result.Failure(error)); + } + + protected override void OnCompletedCore(TComplete complete) + { + subscriber.OnCompleted(Result.Success(complete)); + } + } + } +} diff --git a/src/R3/Operators/OnErrorBubbling.cs b/src/R3/Operators/OnErrorBubbling.cs deleted file mode 100644 index f11b336d..00000000 --- a/src/R3/Operators/OnErrorBubbling.cs +++ /dev/null @@ -1,30 +0,0 @@ -namespace R3.Operators -{ - // TODO:... - internal class OnErrorBubbling(Event source, Func onError) : Event - { - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - return source.Subscribe(new _OnErrorBubbling(subscriber, onError)); - } - - class _OnErrorBubbling(Subscriber subscriber, Func onError) : Subscriber - { - public override void OnNextCore(TMessage message) - { - try - { - subscriber.OnNext(message); - } - catch (Exception ex) - { - // true: stop propagation, false: re-throw - if (!onError(ex)) - { - throw; - } - } - } - } - } -} diff --git a/src/R3/Operators/OnErrorResumeNext.cs b/src/R3/Operators/OnErrorResumeNext.cs deleted file mode 100644 index f5f8c104..00000000 --- a/src/R3/Operators/OnErrorResumeNext.cs +++ /dev/null @@ -1,54 +0,0 @@ -namespace R3.Operators -{ - internal class OnErrorResumeNext(Event source, Action? errorHandler) : Event - { - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - return source.Subscribe(new _OnErrorResumeNext(subscriber, errorHandler)); - } - - sealed class _OnErrorResumeNext(Subscriber subscriber, Action? errorHandler) : Subscriber - { - public override void OnNextCore(TMessage message) - { - try - { - subscriber.OnNext(message); - } - catch (Exception ex) - { - errorHandler?.Invoke(ex); - } - } - } - } - - internal class OnErrorResumeNext2(Event> source, Action? errorHandler) : Event - { - protected override IDisposable SubscribeCore(Subscriber subscriber) - { - return source.Subscribe(new _OnErrorResumeNext(subscriber, errorHandler)); - } - - sealed class _OnErrorResumeNext(Subscriber subscriber, Action? errorHandler) : Subscriber> - { - public override void OnNextCore(Result message) - { - if (message.IsFailure) - { - errorHandler?.Invoke(message.Exception); - return; - } - - try - { - subscriber.OnNext(message.Value); - } - catch (Exception ex) - { - errorHandler?.Invoke(ex); - } - } - } - } -} diff --git a/src/R3/Operators/Select.cs b/src/R3/Operators/Select.cs index 07c351b6..aecd2357 100644 --- a/src/R3/Operators/Select.cs +++ b/src/R3/Operators/Select.cs @@ -34,10 +34,15 @@ protected override IDisposable SubscribeCore(Subscriber subscriber) class _Select(Subscriber subscriber, Func selector) : Subscriber { - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { subscriber.OnNext(selector(message)); } + + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } } } @@ -54,11 +59,16 @@ protected override IDisposable SubscribeCore(Subscriber subscriber, Func messageSelector, Func completeSelector) : Subscriber { - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { subscriber.OnNext(messageSelector(message)); } + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } + protected override void OnCompletedCore(TComplete complete) { subscriber.OnCompleted(completeSelector(complete)); diff --git a/src/R3/Operators/Take.cs b/src/R3/Operators/Take.cs index a09908e1..868a9053 100644 --- a/src/R3/Operators/Take.cs +++ b/src/R3/Operators/Take.cs @@ -37,7 +37,7 @@ sealed class _Take(Subscriber subscriber, int count) : Subscribe { int remaining = count; - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { if (remaining > 0) { @@ -49,6 +49,11 @@ public override void OnNextCore(TMessage message) subscriber.OnCompleted(Unit.Default); } } + + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } } } @@ -63,7 +68,7 @@ sealed class _Take(Subscriber subscriber, int count, TCompl { int remaining = count; - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { if (remaining > 0) { @@ -83,6 +88,11 @@ public override void OnNextCore(TMessage message) } } + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } + protected override void OnCompletedCore(TComplete complete) { subscriber.OnCompleted(complete); diff --git a/src/R3/Operators/Where.cs b/src/R3/Operators/Where.cs index affe6a8d..bb4dd472 100644 --- a/src/R3/Operators/Where.cs +++ b/src/R3/Operators/Where.cs @@ -45,13 +45,18 @@ protected override IDisposable SubscribeCore(Subscriber subscriber) class _Where(Subscriber subscriber, Func predicate) : Subscriber { - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { if (predicate(message)) { subscriber.OnNext(message); } } + + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } } } @@ -64,7 +69,7 @@ protected override IDisposable SubscribeCore(Subscriber sub class _Where(Subscriber subscriber, Func predicate) : Subscriber { - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { if (predicate(message)) { @@ -72,6 +77,11 @@ public override void OnNextCore(TMessage message) } } + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } + protected override void OnCompletedCore(TComplete complete) { subscriber.OnCompleted(complete); @@ -90,13 +100,18 @@ class _Where(Subscriber subscriber, Func predicat { int index = 0; - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { if (predicate(message, index++)) { subscriber.OnNext(message); } } + + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } } } @@ -111,7 +126,7 @@ class _Where(Subscriber subscriber, Func subscriber) { var subscription = new _Publisher(this, subscriber); @@ -61,6 +72,12 @@ public void OnNext(TMessage message) subscriber.OnNext(message); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void OnErrorResume(Exception error) + { + subscriber.OnErrorResume(error); + } + public void Dispose() { var p = Interlocked.Exchange(ref parent, null); @@ -94,6 +111,17 @@ public void PublishOnNext(TMessage message) } } + public void PublishOnErrorResume(Exception error) + { + foreach (var subscriber in list.AsSpan()) + { + if (subscriber != null) + { + subscriber.OnErrorResume(error); + } + } + } + public void PublishOnCompleted(TComplete complete) { var locationValue = Interlocked.CompareExchange(ref calledCompleted, 1, 0); @@ -135,6 +163,12 @@ public void OnNext(TMessage message) subscriber.OnNext(message); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void OnErrorResume(Exception error) + { + subscriber.OnErrorResume(error); + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void OnCompleted(TComplete complete) { diff --git a/src/R3/Result.cs b/src/R3/Result.cs index 6604926d..c121874f 100644 --- a/src/R3/Result.cs +++ b/src/R3/Result.cs @@ -54,4 +54,16 @@ public void Deconstruct(out T? value, out Exception? exception) exception = Exception; value = Value; } + + public override string ToString() + { + if (IsSuccess) + { + return $"Success{{{Value}}}"; + } + else + { + return $"Failure{{{Exception.Message}}}"; + } + } } diff --git a/src/R3/SubscribeExtensions.cs b/src/R3/SubscribeExtensions.cs index 11cf154f..24abb5e5 100644 --- a/src/R3/SubscribeExtensions.cs +++ b/src/R3/SubscribeExtensions.cs @@ -13,31 +13,27 @@ public static IDisposable Subscribe(this Event source) [DebuggerStepThrough] public static IDisposable Subscribe(this Event source, Action onNext) { - return source.Subscribe(new AnonymousSubscriber(onNext, EventSystem.UnhandledException)); + return source.Subscribe(new AnonymousSubscriber(onNext, EventSystem.GetUnhandledExceptionHandler())); } [DebuggerStepThrough] - public static IDisposable Subscribe(this Event source, Action onNext, Action onError) + public static IDisposable Subscribe(this Event source, Action onNext, Action onErrorResume) { - return source.Subscribe(new AnonymousSubscriber(onNext, onError)); + return source.Subscribe(new AnonymousSubscriber(onNext, onErrorResume)); } - [DebuggerStepThrough] - public static IDisposable Subscribe(this CompletableEvent source, Action onNext) - { - return Subscribe(source, onNext, static _ => { }); - } + // CompletableEvent must handle onComplete. [DebuggerStepThrough] public static IDisposable Subscribe(this CompletableEvent source, Action onNext, Action onComplete) { - return source.Subscribe(new AnonymousSubscriber(onNext, EventSystem.UnhandledException, onComplete)); + return source.Subscribe(new AnonymousSubscriber(onNext, EventSystem.GetUnhandledExceptionHandler(), onComplete)); } [DebuggerStepThrough] - public static IDisposable Subscribe(this CompletableEvent source, Action onNext, Action onError, Action onComplete) + public static IDisposable Subscribe(this CompletableEvent source, Action onNext, Action onErrorResume, Action onComplete) { - return source.Subscribe(new AnonymousSubscriber(onNext, onError, onComplete)); + return source.Subscribe(new AnonymousSubscriber(onNext, onErrorResume, onComplete)); } } @@ -51,47 +47,47 @@ private NopSubscriber() } [DebuggerStepThrough] - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { } [DebuggerStepThrough] - public override void OnErrorCore(Exception error) + protected override void OnErrorResumeCore(Exception error) { - EventSystem.UnhandledException(error); + EventSystem.GetUnhandledExceptionHandler().Invoke(error); } } [DebuggerStepThrough] -internal sealed class AnonymousSubscriber(Action onNext, Action onError) : Subscriber +internal sealed class AnonymousSubscriber(Action onNext, Action onErrorResume) : Subscriber { [DebuggerStepThrough] - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { onNext(message); } [DebuggerStepThrough] - public override void OnErrorCore(Exception error) + protected override void OnErrorResumeCore(Exception error) { - onError(error); + onErrorResume(error); } } [DebuggerStepThrough] -internal sealed class AnonymousSubscriber(Action onNext, Action onError, Action onComplete) : Subscriber +internal sealed class AnonymousSubscriber(Action onNext, Action onErrorResume, Action onComplete) : Subscriber { [DebuggerStepThrough] - public override void OnNextCore(TMessage message) + protected override void OnNextCore(TMessage message) { onNext(message); } [DebuggerStepThrough] - public override void OnErrorCore(Exception error) + protected override void OnErrorResumeCore(Exception error) { - onError(error); + onErrorResume(error); } [DebuggerStepThrough]