Skip to content

Commit

Permalink
TakeUntil, TakeWhile
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 14, 2023
1 parent b578ad6 commit 1de8ce8
Show file tree
Hide file tree
Showing 5 changed files with 362 additions and 0 deletions.
7 changes: 7 additions & 0 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,16 @@
//publisher.PublishOnNext(1);



//var xs = await publisher.Take(TimeSpan.FromSeconds(5));

foreach (var item in Enumerable.Range(1, 10).TakeWhile(x => x <= 3))
{
Console.WriteLine(item);
}

var repeat = Observable.Repeat("foo", 10);
// repeat.TakeWhile(



Expand Down
191 changes: 191 additions & 0 deletions src/R3/Operators/TakeUntil.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
namespace R3;

public static partial class EventExtensions
{
public static Event<T> TakeUntil<T, TOther>(this Event<T> source, Event<TOther> other)
{
return new TakeUntil<T, TOther>(source, other);
}

public static Event<T> TakeUntil<T>(this Event<T> source, CancellationToken cancellationToken)
{
if (!cancellationToken.CanBeCanceled)
{
return source;
}
if (cancellationToken.IsCancellationRequested)
{
return Event.Empty<T>();
}

return new TakeUntilC<T>(source, cancellationToken);
}

public static Event<T> TakeUntil<T>(this Event<T> source, Task task)
{
return new TakeUntilT<T>(source, task);
}
}

internal sealed class TakeUntil<T, TOther>(Event<T> source, Event<TOther> other) : Event<T>
{
protected override IDisposable SubscribeCore(Subscriber<T> subscriber)
{
var takeUntil = new _TakeUntil(subscriber);
var stopperSubscription = other.Subscribe(takeUntil.stopper);
try
{
return source.Subscribe(takeUntil); // subscription contains self and stopper.
}
catch
{
stopperSubscription.Dispose();
throw;
}
}

sealed class _TakeUntil : Subscriber<T>
{
readonly Subscriber<T> subscriber;
internal readonly TakeUntilStopperSubscriber stopper; // this instance is not exposed to public so can use lock.

public _TakeUntil(Subscriber<T> subscriber)
{
this.subscriber = subscriber;
this.stopper = new TakeUntilStopperSubscriber(this);
}

protected override void OnNextCore(T value)
{
subscriber.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
subscriber.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
subscriber.OnCompleted(result);
}

protected override void DisposeCore()
{
stopper.Dispose();
}
}

sealed class TakeUntilStopperSubscriber(_TakeUntil parent) : Subscriber<TOther>
{
protected override void OnNextCore(TOther value)
{
parent.OnCompleted(Result.Success);
}

protected override void OnErrorResumeCore(Exception error)
{
parent.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
parent.OnCompleted(result);
}
}
}

internal sealed class TakeUntilC<T>(Event<T> source, CancellationToken cancellationToken) : Event<T>
{
protected override IDisposable SubscribeCore(Subscriber<T> subscriber)
{
return source.Subscribe(new _TakeUntil(subscriber, cancellationToken));
}

sealed class _TakeUntil : Subscriber<T>, IDisposable
{
static readonly Action<object?> cancellationCallback = CancellationCallback;

readonly Subscriber<T> subscriber;
CancellationTokenRegistration tokenRegistration;

public _TakeUntil(Subscriber<T> subscriber, CancellationToken cancellationToken)
{
this.subscriber = subscriber;
this.tokenRegistration = cancellationToken.Register(cancellationCallback, this);
}

protected override void OnNextCore(T value)
{
subscriber.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
subscriber.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
subscriber.OnCompleted(result);
}

static void CancellationCallback(object? state)
{
var self = (_TakeUntil)state!;
self.OnCompleted();
}

protected override void DisposeCore()
{
tokenRegistration.Dispose();
}
}
}

internal sealed class TakeUntilT<T>(Event<T> source, Task task) : Event<T>
{
protected override IDisposable SubscribeCore(Subscriber<T> subscriber)
{
return source.Subscribe(new _TakeUntil(subscriber, task));
}

sealed class _TakeUntil : Subscriber<T>, IDisposable
{
readonly Subscriber<T> subscriber;

public _TakeUntil(Subscriber<T> subscriber, Task task)
{
this.subscriber = subscriber;
TaskAwait(task);
}

protected override void OnNextCore(T value)
{
subscriber.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
subscriber.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
subscriber.OnCompleted(result);
}

async void TaskAwait(Task task)
{
try
{
await task.ConfigureAwait(false);
OnCompleted(Result.Success);
}
catch (Exception ex)
{
OnCompleted(Result.Failure(ex));
}
}
}
}
82 changes: 82 additions & 0 deletions src/R3/Operators/TakeWhile.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
namespace R3;

public static partial class EventExtensions
{
public static Event<T> TakeWhile<T>(this Event<T> source, Func<T, bool> predicate)
{
return new TakeWhile<T>(source, predicate);
}

public static Event<T> TakeWhile<T>(this Event<T> source, Func<T, int, bool> predicate)
{
return new TakeWhileI<T>(source, predicate);
}
}

internal sealed class TakeWhile<T>(Event<T> source, Func<T, bool> predicate) : Event<T>
{
protected override IDisposable SubscribeCore(Subscriber<T> subscriber)
{
return source.Subscribe(new _TakeWhile(subscriber, predicate));
}

sealed class _TakeWhile(Subscriber<T> subscriber, Func<T, bool> predicate) : Subscriber<T>, IDisposable
{
protected override void OnNextCore(T value)
{
if (predicate(value))
{
subscriber.OnNext(value);
}
else
{
subscriber.OnCompleted();
}
}

protected override void OnErrorResumeCore(Exception error)
{
subscriber.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
subscriber.OnCompleted(result);
}
}
}

internal sealed class TakeWhileI<T>(Event<T> source, Func<T, int, bool> predicate) : Event<T>
{
protected override IDisposable SubscribeCore(Subscriber<T> subscriber)
{
return source.Subscribe(new _TakeWhile(subscriber, predicate));
}

sealed class _TakeWhile(Subscriber<T> subscriber, Func<T, int, bool> predicate) : Subscriber<T>, IDisposable
{
int count;

protected override void OnNextCore(T value)
{
if (predicate(value, count++))
{
subscriber.OnNext(value);
}
else
{
subscriber.OnCompleted();
}
}

protected override void OnErrorResumeCore(Exception error)
{
subscriber.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
subscriber.OnCompleted(result);
}
}
}
64 changes: 64 additions & 0 deletions tests/R3.Tests/OperatorTests/TakeUntilTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System.Reactive.Linq;

namespace R3.Tests.OperatorTests;

public class TakeUntilTest
{
[Fact]
public void EventOther()
{
var publisher1 = new Publisher<int>();
var publisher2 = new Publisher<int>();
var isDisposed = false;
var list = publisher1.TakeUntil(publisher2.DoOnDisposed(() => { isDisposed = true; })).ToLiveList();

publisher1.PublishOnNext(1);
publisher1.PublishOnNext(2);
publisher1.PublishOnNext(3);
list.AssertEqual([1, 2, 3]);

publisher2.PublishOnNext(10000);
isDisposed.Should().BeTrue();

list.AssertEqual([1, 2, 3]);
list.AssertIsCompleted();
}


[Fact]
public void CancellationToken()
{
var publisher1 = new Publisher<int>();
var cts = new CancellationTokenSource();
var list = publisher1.TakeUntil(cts.Token).ToLiveList();

publisher1.PublishOnNext(1);
publisher1.PublishOnNext(2);
publisher1.PublishOnNext(3);
list.AssertEqual([1, 2, 3]);

cts.Cancel();

list.AssertEqual([1, 2, 3]);
list.AssertIsCompleted();
}

[Fact]
public async Task TaskT()
{
var publisher1 = new Publisher<int>();
var tcs = new TaskCompletionSource();
var list = publisher1.TakeUntil(tcs.Task).ToLiveList();

publisher1.PublishOnNext(1);
publisher1.PublishOnNext(2);
publisher1.PublishOnNext(3);
list.AssertEqual([1, 2, 3]);

tcs.TrySetResult();
await Task.Delay(100); // wait for completion

list.AssertEqual([1, 2, 3]);
list.AssertIsCompleted();
}
}
18 changes: 18 additions & 0 deletions tests/R3.Tests/OperatorTests/TakeWhileTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System.Reactive.Linq;

namespace R3.Tests.OperatorTests;

public class TakeWhileTest
{
[Fact]
public void TakeWhile()
{
var xs = Event.Range(1, 10).TakeWhile(x => x <= 3).ToLiveList();
xs.AssertEqual([1, 2, 3]);
xs.AssertIsCompleted();

var ys = Event.Range(100, 10).TakeWhile((x, i) => i < 3).ToLiveList();
ys.AssertEqual([100, 101, 102]);
ys.AssertIsCompleted();
}
}

0 comments on commit 1de8ce8

Please sign in to comment.