Skip to content

Commit

Permalink
fix: make feederTask fully cancellable
Browse files Browse the repository at this point in the history
  • Loading branch information
slang25 authored and filipeesch committed Apr 19, 2021
1 parent 060aa00 commit 5796584
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 28 deletions.
34 changes: 21 additions & 13 deletions src/KafkaFlow.UnitTests/Consumer/WorkerPoolFeederTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ public async Task StopAsync_WithoutStarting_Return()

[TestMethod]
[Timeout(1000)]
public async Task StopAsync_BlockedOnConsume_MustStop()
public async Task StopAsync_WaitingOnConsumeWithCancellation_MustStop()
{
// Arrange
var consumeResult = new ConsumeResult<byte[], byte[]>();

this.consumerMock
.Setup(x => x.ConsumeAsync(It.IsAny<CancellationToken>()))
.Returns(
new ValueTask<ConsumeResult<byte[], byte[]>>(
Task.Delay(Timeout.Infinite).ContinueWith(_ => consumeResult)));
.Returns(async (CancellationToken ct) =>
{
await Task.Delay(Timeout.Infinite, ct).ConfigureAwait(false);
return default; // Never reached
});

// Act
this.target.Start();
Expand All @@ -62,7 +62,7 @@ public async Task StopAsync_BlockedOnConsume_MustStop()

[TestMethod]
[Timeout(1000)]
public async Task StopAsync_BlockedOnQueuing_MustStop()
public async Task StopAsync_WaitingOnQueuingWithCancellation_MustStop()
{
// Arrange
var consumeResult = new ConsumeResult<byte[], byte[]>();
Expand All @@ -73,7 +73,7 @@ public async Task StopAsync_BlockedOnQueuing_MustStop()

this.workerPoolMock
.Setup(x => x.EnqueueAsync(consumeResult, It.IsAny<CancellationToken>()))
.Returns(Task.Delay(Timeout.Infinite));
.Returns((ConsumeResult<byte[],byte[]> _, CancellationToken ct) => Task.Delay(Timeout.Infinite, ct));

// Act
this.target.Start();
Expand All @@ -100,7 +100,7 @@ public async Task ConsumeAsyncThrows_LogAndCallConsumeAsyncAgain()

this.workerPoolMock
.Setup(x => x.EnqueueAsync(consumeResult, It.IsAny<CancellationToken>()))
.Returns(Task.Delay(Timeout.Infinite));
.Returns((ConsumeResult<byte[], byte[]> _, CancellationToken ct) => Task.Delay(Timeout.Infinite, ct));

this.logHandlerMock
.Setup(x => x.Error(It.IsAny<string>(), exception, It.IsAny<object>()));
Expand Down Expand Up @@ -128,11 +128,19 @@ public async Task EnqueueAsyncThrows_LogAndCallConsumeAsyncAgain()
.Setup(x => x.ConsumeAsync(It.IsAny<CancellationToken>()))
.ReturnsAsync(consumeResult);

var hasThrown = false;
this.workerPoolMock
.SetupSequence(x => x.EnqueueAsync(consumeResult, It.IsAny<CancellationToken>()))
.Throws(exception)
.Returns(Task.Delay(Timeout.Infinite));

.Setup(x => x.EnqueueAsync(consumeResult, It.IsAny<CancellationToken>()))
.Returns((ConsumeResult<byte[], byte[]> _, CancellationToken ct) =>
{
if (!hasThrown)
{
hasThrown = true;
throw exception;
}
return Task.Delay(Timeout.Infinite, ct);
});

this.logHandlerMock
.Setup(x => x.Error(It.IsAny<string>(), exception, It.IsAny<object>()));

Expand Down
7 changes: 2 additions & 5 deletions src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public Task StartAsync()
{
this.stopCancellationTokenSource = new CancellationTokenSource();

this.backgroundTask = Task.Factory.StartNew(
this.backgroundTask = Task.Run(
async () =>
{
while (!this.stopCancellationTokenSource.IsCancellationRequested)
Expand Down Expand Up @@ -102,10 +102,7 @@ await this.middlewareExecutor
// Ignores the exception
}
}
},
CancellationToken.None,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
});

return Task.CompletedTask;
}
Expand Down
18 changes: 8 additions & 10 deletions src/KafkaFlow/Consumers/WorkerPoolFeeder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ internal class WorkerPoolFeeder : IWorkerPoolFeeder
private readonly ILogHandler logHandler;

private CancellationTokenSource stopTokenSource;
private Task<Task> feederTask;
private Task feederTask;

public WorkerPoolFeeder(
IConsumer consumer,
Expand All @@ -26,20 +26,21 @@ public WorkerPoolFeeder(
public void Start()
{
this.stopTokenSource = new CancellationTokenSource();

this.feederTask = Task.Factory.StartNew(
var token = stopTokenSource.Token;

this.feederTask = Task.Run(
async () =>
{
while (!this.stopTokenSource.IsCancellationRequested)
while (!token.IsCancellationRequested)
{
try
{
var message = await this.consumer
.ConsumeAsync(this.stopTokenSource.Token)
.ConsumeAsync(token)
.ConfigureAwait(false);

await this.workerPool
.EnqueueAsync(message, this.stopTokenSource.Token)
.EnqueueAsync(message, token)
.ConfigureAwait(false);
}
catch (OperationCanceledException)
Expand All @@ -54,10 +55,7 @@ await this.workerPool
null);
}
}
},
CancellationToken.None,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}, token);
}

public Task StopAsync()
Expand Down

0 comments on commit 5796584

Please sign in to comment.