Skip to content

Commit

Permalink
Change Chunk(TimeSpan) timer start when first value comes
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Feb 29, 2024
1 parent c6cdd83 commit 4a90bb4
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 13 deletions.
42 changes: 31 additions & 11 deletions src/R3/Operators/Chunk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,28 @@ sealed class _Chunk : Observer<T>

readonly Observer<T[]> observer;
readonly List<T> list; // lock gate
readonly ITimer timer;
readonly TimeProvider timeProvider;
readonly TimeSpan timeSpan;
ITimer? timer;

public _Chunk(Observer<T[]> observer, TimeSpan timeSpan, TimeProvider timeProvider)
{
this.observer = observer;
this.timeSpan = timeSpan;
this.timeProvider = timeProvider;
this.list = new List<T>();
this.timer = timeProvider.CreateStoppedTimer(timerCallback, this);
this.timer.Change(timeSpan, timeSpan);
}

protected override void OnNextCore(T value)
{
lock (list)
{
list.Add(value);
if (timer == null)
{
this.timer = timeProvider.CreateStoppedTimer(timerCallback, this);
this.timer.InvokeOnce(timeSpan);
}
}
}

Expand All @@ -132,7 +139,10 @@ protected override void OnCompletedCore(Result result)

protected override void DisposeCore()
{
timer.Dispose();
lock (list)
{
timer?.Dispose();
}
}

static void TimerCallback(object? state)
Expand All @@ -149,6 +159,7 @@ static void TimerCallback(object? state)
self.observer.OnNext(self.list.ToArray());
self.list.Clear();
}
self.timer = null;
}
}
}
Expand All @@ -167,10 +178,11 @@ sealed class _Chunk : Observer<T>
static readonly TimerCallback timerCallback = TimerCallback;

readonly Observer<T[]> observer;
readonly ITimer timer;
readonly int count;
readonly TimeSpan timeSpan;
readonly TimeProvider timeProvider;
readonly object gate = new object();
ITimer? timer;
T[] buffer;
int index;
int timerId;
Expand All @@ -180,9 +192,8 @@ public _Chunk(Observer<T[]> observer, TimeSpan timeSpan, int count, TimeProvider
this.observer = observer;
this.count = count;
this.timeSpan = timeSpan;
this.timeProvider = timeProvider;
this.buffer = new T[count];
this.timer = timeProvider.CreateStoppedTimer(timerCallback, this);
this.timer.Change(timeSpan, timeSpan);
}

protected override void OnNextCore(T value)
Expand All @@ -192,7 +203,8 @@ protected override void OnNextCore(T value)
buffer[index++] = value;
if (index == count)
{
timer.Stop(); // stop timer for restart
timer?.Stop(); // stop timer for restart
timer = null;

try
{
Expand All @@ -202,9 +214,16 @@ protected override void OnNextCore(T value)
}
finally
{
// Restart and increment timerId
// increment timerId for restart
timerId = unchecked(timerId += 1);
timer.Change(timeSpan, timeSpan);
}
}
else
{
if (timer == null)
{
timer = timeProvider.CreateStoppedTimer(timerCallback, this);
timer.InvokeOnce(timeSpan);
}
}
}
Expand All @@ -229,7 +248,7 @@ protected override void OnCompletedCore(Result result)

protected override void DisposeCore()
{
timer.Dispose();
timer?.Dispose();
}

static void TimerCallback(object? state)
Expand All @@ -252,6 +271,7 @@ static void TimerCallback(object? state)
span.Clear();
self.index = 0;
}
self.timer = null;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions tests/R3.Tests/OperatorTests/ChunkTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ public void ChunkTimeAndCount()

timeProvider.Advance(TimeSpan.FromSeconds(3));

list.AssertEqual([1, 10], [100], [1000, 10000], [50, 2], [3], []);
list.AssertEqual([1, 10], [100], [1000, 10000], [50, 2], [3]);

publisher.OnNext(4);

publisher.OnCompleted();

list.AssertEqual([1, 10], [100], [1000, 10000], [50, 2], [3], [], [4]);
list.AssertEqual([1, 10], [100], [1000, 10000], [50, 2], [3], [4]);

list.AssertIsCompleted();
}
Expand Down

0 comments on commit 4a90bb4

Please sign in to comment.