Skip to content

Commit

Permalink
ContainsAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jan 4, 2024
1 parent 57b0a77 commit 182f252
Showing 1 changed file with 44 additions and 0 deletions.
44 changes: 44 additions & 0 deletions src/R3/Operators/ContainsAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Task<bool> ContainsAsync<T>(this Observable<T> source, T value, CancellationToken cancellationToken = default)
{
return ContainsAsync(source, value, EqualityComparer<T>.Default, cancellationToken);
}

public static Task<bool> ContainsAsync<T>(this Observable<T> source, T value, IEqualityComparer<T> equalityComparer, CancellationToken cancellationToken = default)
{
var observer = new ContainsAsync<T>(value, equalityComparer, cancellationToken);
source.Subscribe(observer);
return observer.Task;
}
}

internal sealed class ContainsAsync<T>(T value, IEqualityComparer<T> equalityComparer, CancellationToken cancellationToken)
: TaskObserverBase<T, bool>(cancellationToken)
{
protected override void OnNextCore(T value)
{
if (!equalityComparer.Equals(value))
{
TrySetResult(true);
}
}

protected override void OnErrorResumeCore(Exception error)
{
TrySetException(error);
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
TrySetException(result.Exception);
return;
}
TrySetResult(false);
}
}

0 comments on commit 182f252

Please sign in to comment.