From 4a90bb4f576be2d6bb4869a63be0bd14b2941965 Mon Sep 17 00:00:00 2001 From: neuecc Date: Thu, 29 Feb 2024 15:01:27 +0900 Subject: [PATCH] Change Chunk(TimeSpan) timer start when first value comes --- src/R3/Operators/Chunk.cs | 42 +++++++++++++++++------ tests/R3.Tests/OperatorTests/ChunkTest.cs | 4 +-- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/src/R3/Operators/Chunk.cs b/src/R3/Operators/Chunk.cs index 6c572035..0c456b29 100644 --- a/src/R3/Operators/Chunk.cs +++ b/src/R3/Operators/Chunk.cs @@ -94,14 +94,16 @@ sealed class _Chunk : Observer readonly Observer observer; readonly List list; // lock gate - readonly ITimer timer; + readonly TimeProvider timeProvider; + readonly TimeSpan timeSpan; + ITimer? timer; public _Chunk(Observer observer, TimeSpan timeSpan, TimeProvider timeProvider) { this.observer = observer; + this.timeSpan = timeSpan; + this.timeProvider = timeProvider; this.list = new List(); - this.timer = timeProvider.CreateStoppedTimer(timerCallback, this); - this.timer.Change(timeSpan, timeSpan); } protected override void OnNextCore(T value) @@ -109,6 +111,11 @@ protected override void OnNextCore(T value) lock (list) { list.Add(value); + if (timer == null) + { + this.timer = timeProvider.CreateStoppedTimer(timerCallback, this); + this.timer.InvokeOnce(timeSpan); + } } } @@ -132,7 +139,10 @@ protected override void OnCompletedCore(Result result) protected override void DisposeCore() { - timer.Dispose(); + lock (list) + { + timer?.Dispose(); + } } static void TimerCallback(object? state) @@ -149,6 +159,7 @@ static void TimerCallback(object? state) self.observer.OnNext(self.list.ToArray()); self.list.Clear(); } + self.timer = null; } } } @@ -167,10 +178,11 @@ sealed class _Chunk : Observer static readonly TimerCallback timerCallback = TimerCallback; readonly Observer observer; - readonly ITimer timer; readonly int count; readonly TimeSpan timeSpan; + readonly TimeProvider timeProvider; readonly object gate = new object(); + ITimer? timer; T[] buffer; int index; int timerId; @@ -180,9 +192,8 @@ public _Chunk(Observer observer, TimeSpan timeSpan, int count, TimeProvider this.observer = observer; this.count = count; this.timeSpan = timeSpan; + this.timeProvider = timeProvider; this.buffer = new T[count]; - this.timer = timeProvider.CreateStoppedTimer(timerCallback, this); - this.timer.Change(timeSpan, timeSpan); } protected override void OnNextCore(T value) @@ -192,7 +203,8 @@ protected override void OnNextCore(T value) buffer[index++] = value; if (index == count) { - timer.Stop(); // stop timer for restart + timer?.Stop(); // stop timer for restart + timer = null; try { @@ -202,9 +214,16 @@ protected override void OnNextCore(T value) } finally { - // Restart and increment timerId + // increment timerId for restart timerId = unchecked(timerId += 1); - timer.Change(timeSpan, timeSpan); + } + } + else + { + if (timer == null) + { + timer = timeProvider.CreateStoppedTimer(timerCallback, this); + timer.InvokeOnce(timeSpan); } } } @@ -229,7 +248,7 @@ protected override void OnCompletedCore(Result result) protected override void DisposeCore() { - timer.Dispose(); + timer?.Dispose(); } static void TimerCallback(object? state) @@ -252,6 +271,7 @@ static void TimerCallback(object? state) span.Clear(); self.index = 0; } + self.timer = null; } } } diff --git a/tests/R3.Tests/OperatorTests/ChunkTest.cs b/tests/R3.Tests/OperatorTests/ChunkTest.cs index a505a0d8..970372bf 100644 --- a/tests/R3.Tests/OperatorTests/ChunkTest.cs +++ b/tests/R3.Tests/OperatorTests/ChunkTest.cs @@ -113,13 +113,13 @@ public void ChunkTimeAndCount() timeProvider.Advance(TimeSpan.FromSeconds(3)); - list.AssertEqual([1, 10], [100], [1000, 10000], [50, 2], [3], []); + list.AssertEqual([1, 10], [100], [1000, 10000], [50, 2], [3]); publisher.OnNext(4); publisher.OnCompleted(); - list.AssertEqual([1, 10], [100], [1000, 10000], [50, 2], [3], [], [4]); + list.AssertEqual([1, 10], [100], [1000, 10000], [50, 2], [3], [4]); list.AssertIsCompleted(); }