Skip to content

Commit

Permalink
IgnoreOnErrorResume(not tested)
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jan 4, 2024
1 parent 8236b68 commit 871e25c
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 4 deletions.
14 changes: 10 additions & 4 deletions src/R3/Operators/IgnoreElements.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,27 @@ public static partial class ObservableExtensions
{
public static Observable<T> IgnoreElements<T>(this Observable<T> source)
{
return new IgnoreElements<T>(source);
return new IgnoreElements<T>(source, null);
}

public static Observable<T> IgnoreElements<T>(this Observable<T> source, Action<T> doOnNext)
{
return new IgnoreElements<T>(source, doOnNext);
}
}

internal sealed class IgnoreElements<T>(Observable<T> source) : Observable<T>
internal sealed class IgnoreElements<T>(Observable<T> source, Action<T>? doOnNext) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _IgnoreElements(observer));
return source.Subscribe(new _IgnoreElements(observer, doOnNext));
}

sealed class _IgnoreElements(Observer<T> observer) : Observer<T>
sealed class _IgnoreElements(Observer<T> observer, Action<T>? doOnNext) : Observer<T>
{
protected override void OnNextCore(T value)
{
doOnNext?.Invoke(value);
}

protected override void OnErrorResumeCore(Exception error)
Expand Down
40 changes: 40 additions & 0 deletions src/R3/Operators/IgnoreOnErrorResume.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<T> IgnoreOnErrorResume<T>(this Observable<T> source)
{
return new IgnoreOnErrorResume<T>(source, null);
}

public static Observable<T> IgnoreOnErrorResume<T>(this Observable<T> source, Action<Exception>? doOnErrorResume)
{
return new IgnoreOnErrorResume<T>(source, doOnErrorResume);
}
}

internal sealed class IgnoreOnErrorResume<T>(Observable<T> source, Action<Exception>? doOnErrorResume) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(new _IgnoreOnErrorResume(observer, doOnErrorResume));
}

sealed class _IgnoreOnErrorResume(Observer<T> observer, Action<Exception>? doOnErrorResume) : Observer<T>
{
protected override void OnNextCore(T value)
{
observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
doOnErrorResume?.Invoke(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}
}
}
2 changes: 2 additions & 0 deletions src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public static partial class ObservableExtensions
// TODO:not-tested:CombineLatest, Zip, ZipLatest, WithLatestFrom
// Switch, Pairwise

// IgnoreElements

// Standard Query:
// Distinct, DistinctBy, DistinctUntilChanged, Scan, DefaultIfEmpty

Expand Down

0 comments on commit 871e25c

Please sign in to comment.