From 1de8ce87c15c0cba63cc169c73d61e8f5f8e8b96 Mon Sep 17 00:00:00 2001 From: neuecc Date: Fri, 15 Dec 2023 01:35:43 +0900 Subject: [PATCH] TakeUntil, TakeWhile --- sandbox/ConsoleApp1/Program.cs | 7 + src/R3/Operators/TakeUntil.cs | 191 ++++++++++++++++++ src/R3/Operators/TakeWhile.cs | 82 ++++++++ tests/R3.Tests/OperatorTests/TakeUntilTest.cs | 64 ++++++ tests/R3.Tests/OperatorTests/TakeWhileTest.cs | 18 ++ 5 files changed, 362 insertions(+) create mode 100644 src/R3/Operators/TakeUntil.cs create mode 100644 src/R3/Operators/TakeWhile.cs create mode 100644 tests/R3.Tests/OperatorTests/TakeUntilTest.cs create mode 100644 tests/R3.Tests/OperatorTests/TakeWhileTest.cs diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index becd8b5d..9d463399 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -37,9 +37,16 @@ //publisher.PublishOnNext(1); + //var xs = await publisher.Take(TimeSpan.FromSeconds(5)); +foreach (var item in Enumerable.Range(1, 10).TakeWhile(x => x <= 3)) +{ + Console.WriteLine(item); +} +var repeat = Observable.Repeat("foo", 10); +// repeat.TakeWhile( diff --git a/src/R3/Operators/TakeUntil.cs b/src/R3/Operators/TakeUntil.cs new file mode 100644 index 00000000..9cdd4bd9 --- /dev/null +++ b/src/R3/Operators/TakeUntil.cs @@ -0,0 +1,191 @@ +namespace R3; + +public static partial class EventExtensions +{ + public static Event TakeUntil(this Event source, Event other) + { + return new TakeUntil(source, other); + } + + public static Event TakeUntil(this Event source, CancellationToken cancellationToken) + { + if (!cancellationToken.CanBeCanceled) + { + return source; + } + if (cancellationToken.IsCancellationRequested) + { + return Event.Empty(); + } + + return new TakeUntilC(source, cancellationToken); + } + + public static Event TakeUntil(this Event source, Task task) + { + return new TakeUntilT(source, task); + } +} + +internal sealed class TakeUntil(Event source, Event other) : Event +{ + protected override IDisposable SubscribeCore(Subscriber subscriber) + { + var takeUntil = new _TakeUntil(subscriber); + var stopperSubscription = other.Subscribe(takeUntil.stopper); + try + { + return source.Subscribe(takeUntil); // subscription contains self and stopper. + } + catch + { + stopperSubscription.Dispose(); + throw; + } + } + + sealed class _TakeUntil : Subscriber + { + readonly Subscriber subscriber; + internal readonly TakeUntilStopperSubscriber stopper; // this instance is not exposed to public so can use lock. + + public _TakeUntil(Subscriber subscriber) + { + this.subscriber = subscriber; + this.stopper = new TakeUntilStopperSubscriber(this); + } + + protected override void OnNextCore(T value) + { + subscriber.OnNext(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + subscriber.OnCompleted(result); + } + + protected override void DisposeCore() + { + stopper.Dispose(); + } + } + + sealed class TakeUntilStopperSubscriber(_TakeUntil parent) : Subscriber + { + protected override void OnNextCore(TOther value) + { + parent.OnCompleted(Result.Success); + } + + protected override void OnErrorResumeCore(Exception error) + { + parent.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + parent.OnCompleted(result); + } + } +} + +internal sealed class TakeUntilC(Event source, CancellationToken cancellationToken) : Event +{ + protected override IDisposable SubscribeCore(Subscriber subscriber) + { + return source.Subscribe(new _TakeUntil(subscriber, cancellationToken)); + } + + sealed class _TakeUntil : Subscriber, IDisposable + { + static readonly Action cancellationCallback = CancellationCallback; + + readonly Subscriber subscriber; + CancellationTokenRegistration tokenRegistration; + + public _TakeUntil(Subscriber subscriber, CancellationToken cancellationToken) + { + this.subscriber = subscriber; + this.tokenRegistration = cancellationToken.Register(cancellationCallback, this); + } + + protected override void OnNextCore(T value) + { + subscriber.OnNext(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + subscriber.OnCompleted(result); + } + + static void CancellationCallback(object? state) + { + var self = (_TakeUntil)state!; + self.OnCompleted(); + } + + protected override void DisposeCore() + { + tokenRegistration.Dispose(); + } + } +} + +internal sealed class TakeUntilT(Event source, Task task) : Event +{ + protected override IDisposable SubscribeCore(Subscriber subscriber) + { + return source.Subscribe(new _TakeUntil(subscriber, task)); + } + + sealed class _TakeUntil : Subscriber, IDisposable + { + readonly Subscriber subscriber; + + public _TakeUntil(Subscriber subscriber, Task task) + { + this.subscriber = subscriber; + TaskAwait(task); + } + + protected override void OnNextCore(T value) + { + subscriber.OnNext(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + subscriber.OnCompleted(result); + } + + async void TaskAwait(Task task) + { + try + { + await task.ConfigureAwait(false); + OnCompleted(Result.Success); + } + catch (Exception ex) + { + OnCompleted(Result.Failure(ex)); + } + } + } +} diff --git a/src/R3/Operators/TakeWhile.cs b/src/R3/Operators/TakeWhile.cs new file mode 100644 index 00000000..2cac6a0b --- /dev/null +++ b/src/R3/Operators/TakeWhile.cs @@ -0,0 +1,82 @@ +namespace R3; + +public static partial class EventExtensions +{ + public static Event TakeWhile(this Event source, Func predicate) + { + return new TakeWhile(source, predicate); + } + + public static Event TakeWhile(this Event source, Func predicate) + { + return new TakeWhileI(source, predicate); + } +} + +internal sealed class TakeWhile(Event source, Func predicate) : Event +{ + protected override IDisposable SubscribeCore(Subscriber subscriber) + { + return source.Subscribe(new _TakeWhile(subscriber, predicate)); + } + + sealed class _TakeWhile(Subscriber subscriber, Func predicate) : Subscriber, IDisposable + { + protected override void OnNextCore(T value) + { + if (predicate(value)) + { + subscriber.OnNext(value); + } + else + { + subscriber.OnCompleted(); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + subscriber.OnCompleted(result); + } + } +} + +internal sealed class TakeWhileI(Event source, Func predicate) : Event +{ + protected override IDisposable SubscribeCore(Subscriber subscriber) + { + return source.Subscribe(new _TakeWhile(subscriber, predicate)); + } + + sealed class _TakeWhile(Subscriber subscriber, Func predicate) : Subscriber, IDisposable + { + int count; + + protected override void OnNextCore(T value) + { + if (predicate(value, count++)) + { + subscriber.OnNext(value); + } + else + { + subscriber.OnCompleted(); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + subscriber.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + subscriber.OnCompleted(result); + } + } +} diff --git a/tests/R3.Tests/OperatorTests/TakeUntilTest.cs b/tests/R3.Tests/OperatorTests/TakeUntilTest.cs new file mode 100644 index 00000000..5baa5fa6 --- /dev/null +++ b/tests/R3.Tests/OperatorTests/TakeUntilTest.cs @@ -0,0 +1,64 @@ +using System.Reactive.Linq; + +namespace R3.Tests.OperatorTests; + +public class TakeUntilTest +{ + [Fact] + public void EventOther() + { + var publisher1 = new Publisher(); + var publisher2 = new Publisher(); + var isDisposed = false; + var list = publisher1.TakeUntil(publisher2.DoOnDisposed(() => { isDisposed = true; })).ToLiveList(); + + publisher1.PublishOnNext(1); + publisher1.PublishOnNext(2); + publisher1.PublishOnNext(3); + list.AssertEqual([1, 2, 3]); + + publisher2.PublishOnNext(10000); + isDisposed.Should().BeTrue(); + + list.AssertEqual([1, 2, 3]); + list.AssertIsCompleted(); + } + + + [Fact] + public void CancellationToken() + { + var publisher1 = new Publisher(); + var cts = new CancellationTokenSource(); + var list = publisher1.TakeUntil(cts.Token).ToLiveList(); + + publisher1.PublishOnNext(1); + publisher1.PublishOnNext(2); + publisher1.PublishOnNext(3); + list.AssertEqual([1, 2, 3]); + + cts.Cancel(); + + list.AssertEqual([1, 2, 3]); + list.AssertIsCompleted(); + } + + [Fact] + public async Task TaskT() + { + var publisher1 = new Publisher(); + var tcs = new TaskCompletionSource(); + var list = publisher1.TakeUntil(tcs.Task).ToLiveList(); + + publisher1.PublishOnNext(1); + publisher1.PublishOnNext(2); + publisher1.PublishOnNext(3); + list.AssertEqual([1, 2, 3]); + + tcs.TrySetResult(); + await Task.Delay(100); // wait for completion + + list.AssertEqual([1, 2, 3]); + list.AssertIsCompleted(); + } +} diff --git a/tests/R3.Tests/OperatorTests/TakeWhileTest.cs b/tests/R3.Tests/OperatorTests/TakeWhileTest.cs new file mode 100644 index 00000000..e0bc362b --- /dev/null +++ b/tests/R3.Tests/OperatorTests/TakeWhileTest.cs @@ -0,0 +1,18 @@ +using System.Reactive.Linq; + +namespace R3.Tests.OperatorTests; + +public class TakeWhileTest +{ + [Fact] + public void TakeWhile() + { + var xs = Event.Range(1, 10).TakeWhile(x => x <= 3).ToLiveList(); + xs.AssertEqual([1, 2, 3]); + xs.AssertIsCompleted(); + + var ys = Event.Range(100, 10).TakeWhile((x, i) => i < 3).ToLiveList(); + ys.AssertEqual([100, 101, 102]); + ys.AssertIsCompleted(); + } +}