Skip to content

Commit

Permalink
Merge pull request #146 from Cysharp/fix-async
Browse files Browse the repository at this point in the history
Fix Debounce, ThrottleFirstLast, ThrottleLast async overload does not handle correctly when func return immediately
  • Loading branch information
neuecc authored Feb 29, 2024
2 parents ab76681 + c6cdd83 commit 81dfa96
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 15 deletions.
12 changes: 7 additions & 5 deletions src/R3/Operators/Debounce.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ sealed class _Debounce(Observer<T> observer, Func<T, CancellationToken, ValueTas
readonly object gate = new object();
T? latestValue;
bool hasValue;
Task? runningTask;
bool isRunning;
int taskId;
CancellationTokenSource cancellationTokenSource = new();

Expand All @@ -123,15 +123,17 @@ protected override void OnNextCore(T value)
latestValue = value;
hasValue = true;

if (runningTask != null)
if (isRunning)
{
cancellationTokenSource.Cancel();
cancellationTokenSource = new CancellationTokenSource();
}

var newId = unchecked(taskId + 1);
Volatile.Write(ref taskId, newId);
runningTask = PublishOnNextAfterAsync(value, newId, cancellationTokenSource.Token);

isRunning = true;
PublishOnNextAfterAsync(value, newId, cancellationTokenSource.Token);
}
}

Expand Down Expand Up @@ -164,7 +166,7 @@ protected override void DisposeCore()
cancellationTokenSource.Cancel();
}

async Task PublishOnNextAfterAsync(T value, int id, CancellationToken cancellationToken)
async void PublishOnNextAfterAsync(T value, int id, CancellationToken cancellationToken)
{
try
{
Expand All @@ -188,7 +190,7 @@ async Task PublishOnNextAfterAsync(T value, int id, CancellationToken cancellati
observer.OnNext(latestValue!);
hasValue = false;
latestValue = default;
runningTask = null;
isRunning = false;

END:
{ }
Expand Down
11 changes: 6 additions & 5 deletions src/R3/Operators/ThrottleFirstLast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,16 @@ sealed class _ThrottleFirstLast(Observer<T> observer, Func<T, CancellationToken,
readonly CancellationTokenSource cancellationTokenSource = new();
T? lastValue;
bool hasValue;
Task? taskRunner;
bool isRunning;

protected override void OnNextCore(T value)
{
lock (gate)
{
if (taskRunner == null)
if (!isRunning)
{
taskRunner = RaiseOnNextAsync(value);
isRunning = true;
RaiseOnNextAsync(value);
observer.OnNext(value);
}
else
Expand All @@ -148,7 +149,7 @@ protected override void DisposeCore()
cancellationTokenSource.Cancel();
}

async Task RaiseOnNextAsync(T value)
async void RaiseOnNextAsync(T value)
{
try
{
Expand All @@ -171,7 +172,7 @@ async Task RaiseOnNextAsync(T value)
observer.OnNext(lastValue!);
lastValue = default;
hasValue = false;
taskRunner = null;
isRunning = false;
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions src/R3/Operators/ThrottleLast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,17 @@ sealed class _ThrottleLast(Observer<T> observer, Func<T, CancellationToken, Valu
readonly object gate = new object();
readonly CancellationTokenSource cancellationTokenSource = new();
T? lastValue;
Task? taskRunner;
bool isRunning;

protected override void OnNextCore(T value)
{
lock (gate)
{
lastValue = value;
if (taskRunner == null)
if (!isRunning)
{
taskRunner = RaiseOnNextAsync(value);
isRunning = true;
RaiseOnNextAsync(value);
}
}
}
Expand All @@ -135,7 +136,7 @@ protected override void DisposeCore()
cancellationTokenSource.Cancel();
}

async Task RaiseOnNextAsync(T value)
async void RaiseOnNextAsync(T value)
{
try
{
Expand All @@ -155,7 +156,7 @@ async Task RaiseOnNextAsync(T value)
{
observer.OnNext(lastValue!);
lastValue = default;
taskRunner = null;
isRunning = false;
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions tests/R3.Tests/OperatorTests/ChunkTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ public void ChunkFrameAndCount()
[Fact]
public void ChunkAsync()
{
SynchronizationContext.SetSynchronizationContext(null);

var publisher = new Subject<int>();
var tp = new FakeTimeProvider();
var list = publisher.Chunk(async (x, ct) =>
Expand Down

0 comments on commit 81dfa96

Please sign in to comment.