Skip to content

Commit

Permalink
Catch
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 26, 2023
1 parent ba5c87a commit 42513bd
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 2 deletions.
2 changes: 1 addition & 1 deletion sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
// Enumerable.Empty<int>().ElementAtOrDefault(

var range = System.Reactive.Linq.Observable.Range(1, 10);

// range.Catch(
// range.Append(


Expand Down
164 changes: 164 additions & 0 deletions src/R3/Operators/Catch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<T> Catch<T>(this Observable<T> source, Observable<T> second)
{
return new Catch<T>(source, second);
}

public static Observable<T> Catch<T, TException>(this Observable<T> source, Func<TException, Observable<T>> errorHandler)
{
return new Catch<T, TException>(source, errorHandler);
}
}

internal sealed class Catch<T>(Observable<T> source, Observable<T> second) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return new _Catch(observer, second).Run(source);
}

sealed class _Catch(Observer<T> observer, Observable<T> second) : IDisposable
{
readonly Observer<T> observer = observer;
readonly Observable<T> second = second;
SingleAssignmentDisposableCore firstSubscription;
SingleAssignmentDisposableCore secondSubscription;

public IDisposable Run(Observable<T> source)
{
return source.Subscribe(new FirstObserver(this));
}

public void Dispose()
{
firstSubscription.Dispose();
secondSubscription.Dispose();
}

internal sealed class FirstObserver(_Catch parent) : Observer<T>
{
protected override void OnNextCore(T value)
{
parent.observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
parent.observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
parent.secondSubscription.Disposable = parent.second.Subscribe(new SecondObserver(parent));
}
else
{
parent.observer.OnCompleted(result);
}
}
}

internal sealed class SecondObserver(_Catch parent) : Observer<T>
{
protected override void OnNextCore(T value)
{
parent.observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
parent.observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
parent.observer.OnCompleted(result);
}

protected override void DisposeCore()
{
parent.Dispose();
}
}
}
}

internal sealed class Catch<T, TException>(Observable<T> source, Func<TException, Observable<T>> errorHandler) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return new _Catch(observer, errorHandler).Run(source);
}

sealed class _Catch(Observer<T> observer, Func<TException, Observable<T>> errorHandler) : IDisposable
{
readonly Observer<T> observer = observer;
readonly Func<TException, Observable<T>> errorHandler = errorHandler;
SingleAssignmentDisposableCore firstSubscription;
SingleAssignmentDisposableCore secondSubscription;

public IDisposable Run(Observable<T> source)
{
return source.Subscribe(new FirstObserver(this));
}

public void Dispose()
{
firstSubscription.Dispose();
secondSubscription.Dispose();
}

internal sealed class FirstObserver(_Catch parent) : Observer<T>
{
protected override void OnNextCore(T value)
{
parent.observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
parent.observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure && result.Exception is TException error)
{
parent.secondSubscription.Disposable = parent.errorHandler(error).Subscribe(new SecondObserver(parent));
}
else
{
parent.observer.OnCompleted(result);
}
}
}

internal sealed class SecondObserver(_Catch parent) : Observer<T>
{
protected override void OnNextCore(T value)
{
parent.observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
parent.observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
parent.observer.OnCompleted(result);
}

protected override void DisposeCore()
{
parent.Dispose();
}
}
}
}
1 change: 0 additions & 1 deletion src/R3/Operators/Concat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ public static Observable<T> Concat<T>(this Observable<T> source, Observable<T> s
}
}


internal sealed class Concat<T>(Observable<T> source, Observable<T> second) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
Expand Down
96 changes: 96 additions & 0 deletions tests/R3.Tests/OperatorTests/CatchTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
namespace R3.Tests.OperatorTests;

public class CatchTest
{
[Fact]
public void CatchSecond()
{
{
var first = new Subject<int>();
var second = new Subject<int>();

using var list = first.Catch(second).ToLiveList();

first.OnNext(10);
first.OnNext(20);
first.OnNext(30);
second.OnNext(9999);
first.OnCompleted();
second.OnNext(9998);

list.AssertEqual([10, 20, 30]);
list.AssertIsCompleted();
}
{
var first = new Subject<int>();
var second = new Subject<int>();

using var list = first.Catch(second).ToLiveList();

first.OnNext(10);
first.OnNext(20);
first.OnNext(30);
second.OnNext(9999);
first.OnCompleted(new Exception()); // error, so switch to second
list.AssertIsNotCompleted();
second.OnNext(9998);

list.AssertEqual([10, 20, 30, 9998]);
second.OnCompleted();
list.AssertIsCompleted();
}
}

[Fact]
public void CatchHandler()
{
{
var first = new Subject<int>();
var second = new Subject<int>();

using var list = first.Catch<int, ArgumentException>(ex => second).ToLiveList();

first.OnNext(10);
first.OnNext(20);
first.OnNext(30);
second.OnNext(9999);
first.OnCompleted();
second.OnNext(9998);

list.AssertEqual([10, 20, 30]);
list.AssertIsCompleted();
}
{
var first = new Subject<int>();
var second = new Subject<int>();

using var list = first.Catch<int, ArgumentException>(ex => second).ToLiveList();

first.OnNext(10);
first.OnNext(20);
first.OnNext(30);
second.OnNext(9999);
first.OnCompleted(new Exception()); // error, but not switch
list.AssertEqual([10, 20, 30]);
list.AssertIsCompleted();
}
{
var first = new Subject<int>();
var second = new Subject<int>();

using var list = first.Catch<int, ArgumentException>(ex => second).ToLiveList();

first.OnNext(10);
first.OnNext(20);
first.OnNext(30);
second.OnNext(9999);
first.OnCompleted(new ArgumentException()); // error, switch
list.AssertIsNotCompleted();
second.OnNext(9998);

list.AssertEqual([10, 20, 30, 9998]);
second.OnCompleted();
list.AssertIsCompleted();
}
}
}

0 comments on commit 42513bd

Please sign in to comment.