Skip to content

Commit

Permalink
fix: consumer never stops when max poll is exceeded
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Dec 13, 2023
1 parent aff26ac commit bb6e499
Show file tree
Hide file tree
Showing 12 changed files with 248 additions and 101 deletions.
67 changes: 42 additions & 25 deletions src/KafkaFlow/Consumers/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using KafkaFlow.Configuration;

namespace KafkaFlow.Consumers;
Expand All @@ -13,18 +14,20 @@ internal class Consumer : IConsumer
private readonly IDependencyResolver _dependencyResolver;
private readonly ILogHandler _logHandler;

private readonly List<Action<IDependencyResolver, Confluent.Kafka.IConsumer<byte[], byte[]>, List<Confluent.Kafka.TopicPartition>>>
private readonly List<Action<IDependencyResolver, IConsumer<byte[], byte[]>, List<TopicPartition>>>
_partitionsAssignedHandlers = new();

private readonly List<Action<IDependencyResolver, Confluent.Kafka.IConsumer<byte[], byte[]>, List<Confluent.Kafka.TopicPartitionOffset>>>
private readonly List<Action<IDependencyResolver, IConsumer<byte[], byte[]>,
List<Confluent.Kafka.TopicPartitionOffset>>>
_partitionsRevokedHandlers = new();

private readonly List<Action<Confluent.Kafka.IConsumer<byte[], byte[]>, Confluent.Kafka.Error>> _errorsHandlers = new();
private readonly List<Action<Confluent.Kafka.IConsumer<byte[], byte[]>, string>> _statisticsHandlers = new();
private readonly ConcurrentDictionary<Confluent.Kafka.TopicPartition, long> _currentPartitionsOffsets = new();
private readonly List<Action<IConsumer<byte[], byte[]>, Error>> _errorsHandlers = new();
private readonly List<Action<IConsumer<byte[], byte[]>, string>> _statisticsHandlers = new();
private readonly ConcurrentDictionary<TopicPartition, long> _currentPartitionsOffsets = new();
private readonly ConsumerFlowManager _flowManager;
private readonly Event _maxPollIntervalExceeded;

private Confluent.Kafka.IConsumer<byte[], byte[]> _consumer;
private IConsumer<byte[], byte[]> _consumer;

public Consumer(
IConsumerConfiguration configuration,
Expand All @@ -34,9 +37,8 @@ public Consumer(
_dependencyResolver = dependencyResolver;
_logHandler = logHandler;
this.Configuration = configuration;
_flowManager = new ConsumerFlowManager(
this,
_logHandler);
_flowManager = new ConsumerFlowManager(this, _logHandler);
_maxPollIntervalExceeded = new(_logHandler);

foreach (var handler in this.Configuration.StatisticsHandlers)
{
Expand Down Expand Up @@ -65,14 +67,16 @@ public Consumer(

public IReadOnlyList<string> Subscription { get; private set; } = new List<string>();

public IReadOnlyList<Confluent.Kafka.TopicPartition> Assignment { get; private set; } = new List<Confluent.Kafka.TopicPartition>();
public IReadOnlyList<TopicPartition> Assignment { get; private set; } = new List<TopicPartition>();

public IConsumerFlowManager FlowManager => _flowManager;

public string MemberId => _consumer?.MemberId;

public string ClientInstanceName => _consumer?.Name;

public IEvent MaxPollIntervalExceeded => _maxPollIntervalExceeded;

public ConsumerStatus Status
{
get
Expand All @@ -93,29 +97,30 @@ public ConsumerStatus Status
}
}

public void OnPartitionsAssigned(Action<IDependencyResolver, Confluent.Kafka.IConsumer<byte[], byte[]>, List<Confluent.Kafka.TopicPartition>> handler) =>
public void OnPartitionsAssigned(Action<IDependencyResolver, IConsumer<byte[], byte[]>, List<TopicPartition>> handler) =>
_partitionsAssignedHandlers.Add(handler);

public void OnPartitionsRevoked(Action<IDependencyResolver, Confluent.Kafka.IConsumer<byte[], byte[]>, List<Confluent.Kafka.TopicPartitionOffset>> handler) =>
public void OnPartitionsRevoked(
Action<IDependencyResolver, IConsumer<byte[], byte[]>, List<Confluent.Kafka.TopicPartitionOffset>> handler) =>
_partitionsRevokedHandlers.Add(handler);

public void OnError(Action<Confluent.Kafka.IConsumer<byte[], byte[]>, Confluent.Kafka.Error> handler) =>
public void OnError(Action<IConsumer<byte[], byte[]>, Error> handler) =>
_errorsHandlers.Add(handler);

public void OnStatistics(Action<Confluent.Kafka.IConsumer<byte[], byte[]>, string> handler) =>
public void OnStatistics(Action<IConsumer<byte[], byte[]>, string> handler) =>
_statisticsHandlers.Add(handler);

public Confluent.Kafka.Offset GetPosition(Confluent.Kafka.TopicPartition topicPartition) =>
public Offset GetPosition(TopicPartition topicPartition) =>
_consumer.Position(topicPartition);

public Confluent.Kafka.WatermarkOffsets GetWatermarkOffsets(Confluent.Kafka.TopicPartition topicPartition) =>
public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition) =>
_consumer.GetWatermarkOffsets(topicPartition);

public Confluent.Kafka.WatermarkOffsets QueryWatermarkOffsets(Confluent.Kafka.TopicPartition topicPartition, TimeSpan timeout) =>
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout) =>
_consumer.QueryWatermarkOffsets(topicPartition, timeout);

public List<Confluent.Kafka.TopicPartitionOffset> OffsetsForTimes(
IEnumerable<Confluent.Kafka.TopicPartitionTimestamp> topicPartitions,
IEnumerable<TopicPartitionTimestamp> topicPartitions,
TimeSpan timeout) =>
_consumer.OffsetsForTimes(topicPartitions, timeout);

Expand Down Expand Up @@ -150,7 +155,7 @@ public void Commit(IReadOnlyCollection<Confluent.Kafka.TopicPartitionOffset> off
}
}

public async ValueTask<Confluent.Kafka.ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationToken cancellationToken)
public async ValueTask<ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationToken cancellationToken)
{
while (true)
{
Expand All @@ -164,7 +169,15 @@ public void Commit(IReadOnlyCollection<Confluent.Kafka.TopicPartitionOffset> off
{
throw;
}
catch (Confluent.Kafka.KafkaException ex) when (ex.Error.IsFatal)
catch (ConsumeException ex) when (ex.Error.Code == ErrorCode.Local_MaxPollExceeded)
{
_logHandler.Warning(
"Max Poll Interval Exceeded",
new { this.Configuration.ConsumerName });

await _maxPollIntervalExceeded.FireAsync();
}
catch (KafkaException ex) when (ex.Error.IsFatal)
{
_logHandler.Error(
"Kafka Consumer fatal error occurred. Recreating consumer in 5 seconds",
Expand All @@ -173,7 +186,7 @@ public void Commit(IReadOnlyCollection<Confluent.Kafka.TopicPartitionOffset> off

this.InvalidateConsumer();

await Task.Delay(5000, cancellationToken).ConfigureAwait(false);
await Task.Delay(5000, cancellationToken);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -222,7 +235,7 @@ private void EnsureConsumer()

var kafkaConfig = this.Configuration.GetKafkaConfig();

var consumerBuilder = new Confluent.Kafka.ConsumerBuilder<byte[], byte[]>(kafkaConfig);
var consumerBuilder = new ConsumerBuilder<byte[], byte[]>(kafkaConfig);

_consumer =
consumerBuilder
Expand All @@ -231,7 +244,7 @@ private void EnsureConsumer()
.SetPartitionsRevokedHandler(
(consumer, partitions) =>
{
this.Assignment = new List<Confluent.Kafka.TopicPartition>();
this.Assignment = new List<TopicPartition>();
this.Subscription = new List<string>();
_currentPartitionsOffsets.Clear();
_flowManager.Stop();
Expand All @@ -256,14 +269,18 @@ private void EnsureConsumer()
private void ManualAssign(IEnumerable<TopicPartitions> topics)
{
var partitions = topics
.SelectMany(topic => topic.Partitions.Select(partition => new Confluent.Kafka.TopicPartition(topic.Name, new Confluent.Kafka.Partition(partition))))
.SelectMany(
topic => topic.Partitions.Select(
partition => new TopicPartition(topic.Name, new Partition(partition))))
.ToList();

_consumer.Assign(partitions);
this.FirePartitionsAssignedHandlers(_consumer, partitions);
}

private void FirePartitionsAssignedHandlers(Confluent.Kafka.IConsumer<byte[], byte[]> consumer, List<Confluent.Kafka.TopicPartition> partitions)
private void FirePartitionsAssignedHandlers(
IConsumer<byte[], byte[]> consumer,
List<TopicPartition> partitions)
{
this.Assignment = partitions;
this.Subscription = consumer.Subscription;
Expand Down
56 changes: 29 additions & 27 deletions src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using KafkaFlow.Extensions;

namespace KafkaFlow.Consumers;

Expand All @@ -19,7 +20,6 @@ internal class ConsumerWorker : IConsumerWorker
private readonly Event _workerStoppedEvent;
private readonly Event<IMessageContext> _workerProcessingEnded;

private CancellationTokenSource _stopCancellationTokenSource;
private Task _backgroundTask;

public ConsumerWorker(
Expand Down Expand Up @@ -49,7 +49,7 @@ public ConsumerWorker(

public int Id { get; }

public CancellationToken StopCancellationToken => _stopCancellationTokenSource?.Token ?? default;
public CancellationToken StopCancellationToken { get; private set; }

public IDependencyResolver WorkerDependencyResolver => _workerDependencyResolverScope.Resolver;

Expand All @@ -64,30 +64,31 @@ public ValueTask EnqueueAsync(IMessageContext context)
return _messagesBuffer.Writer.WriteAsync(context, CancellationToken.None);
}

public Task StartAsync()
public Task StartAsync(CancellationToken stopCancellationToken)
{
_stopCancellationTokenSource = new CancellationTokenSource();
this.StopCancellationToken = stopCancellationToken;

_backgroundTask = Task.Run(
async () =>
{
IMessageContext currentContext = null;

try
{
try
{
while (await _messagesBuffer.Reader.WaitToReadAsync(CancellationToken.None).ConfigureAwait(false))
{
while (_messagesBuffer.Reader.TryRead(out var message))
{
await this.ProcessMessageAsync(message, _stopCancellationTokenSource.Token).ConfigureAwait(false);
}
}
}
catch (OperationCanceledException)
await foreach (var context in _messagesBuffer.Reader.ReadAllItemsAsync(stopCancellationToken))
{
// Ignores the exception
currentContext = context;

await this
.ProcessMessageAsync(context, stopCancellationToken)
.WithCancellation(stopCancellationToken, true);
}
}
catch (OperationCanceledException)
{
currentContext?.ConsumerContext.Discard();
await this.DiscardBufferedContextsAsync();
}
catch (Exception ex)
{
_logHandler.Error("KafkaFlow consumer worker fatal error", ex, null);
Expand All @@ -104,12 +105,7 @@ public async Task StopAsync()

_messagesBuffer.Writer.TryComplete();

if (_stopCancellationTokenSource.Token.CanBeCanceled)
{
_stopCancellationTokenSource.CancelAfter(_consumer.Configuration.WorkerStopTimeout);
}

await _backgroundTask.ConfigureAwait(false);
await _backgroundTask;

await _workerStoppedEvent.FireAsync();
}
Expand All @@ -118,7 +114,14 @@ public void Dispose()
{
_backgroundTask.Dispose();
_workerDependencyResolverScope.Dispose();
_stopCancellationTokenSource.Dispose();
}

private async Task DiscardBufferedContextsAsync()
{
await foreach (var context in _messagesBuffer.Reader.ReadAllItemsAsync(CancellationToken.None))
{
context.ConsumerContext.Discard();
}
}

private async Task ProcessMessageAsync(IMessageContext context, CancellationToken cancellationToken)
Expand All @@ -138,11 +141,10 @@ private async Task ProcessMessageAsync(IMessageContext context, CancellationToke
}

await _globalEvents.FireMessageConsumeCompletedAsync(new MessageEventContext(context));
});
},
CancellationToken.None);

await _middlewareExecutor
.Execute(context, _ => Task.CompletedTask)
.ConfigureAwait(false);
await _middlewareExecutor.Execute(context, _ => Task.CompletedTask);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
Expand Down
57 changes: 38 additions & 19 deletions src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using Confluent.Kafka;
using KafkaFlow.Configuration;
using KafkaFlow.Extensions;

namespace KafkaFlow.Consumers;

Expand All @@ -22,6 +23,7 @@ internal class ConsumerWorkerPool : IConsumerWorkerPool
private TaskCompletionSource<object> _startedTaskSource = new();
private List<IConsumerWorker> _workers = new();

private CancellationTokenSource _stopCancellationTokenSource;
private IWorkerDistributionStrategy _distributionStrategy;
private IOffsetManager _offsetManager;

Expand Down Expand Up @@ -65,24 +67,34 @@ public async Task StartAsync(IReadOnlyCollection<TopicPartition> partitions, int

this.CurrentWorkersCount = workersCount;

_stopCancellationTokenSource = new CancellationTokenSource();

var subscription = _consumer.MaxPollIntervalExceeded.Subscribe(
() =>
{
_stopCancellationTokenSource.Cancel();
return Task.CompletedTask;
});

_stopCancellationTokenSource.Token.Register(() => subscription.Cancel());

await Task.WhenAll(
Enumerable
.Range(0, this.CurrentWorkersCount)
.Select(
workerId =>
{
var worker = new ConsumerWorker(
_consumer,
_consumerDependencyResolver,
workerId,
_middlewareExecutor,
_logHandler);

_workers.Add(worker);

return worker.StartAsync();
}))
.ConfigureAwait(false);
Enumerable
.Range(0, this.CurrentWorkersCount)
.Select(
workerId =>
{
var worker = new ConsumerWorker(
_consumer,
_consumerDependencyResolver,
workerId,
_middlewareExecutor,
_logHandler);

_workers.Add(worker);

return worker.StartAsync(_stopCancellationTokenSource.Token);
}));

_distributionStrategy = _distributionStrategyFactory(_consumerDependencyResolver);
_distributionStrategy.Initialize(_workers.AsReadOnly());
Expand Down Expand Up @@ -112,11 +124,18 @@ public async Task StopAsync()
_workers = new List<IConsumerWorker>();
_startedTaskSource = new();

await Task.WhenAll(currentWorkers.Select(x => x.StopAsync())).ConfigureAwait(false);
if (_stopCancellationTokenSource.Token.CanBeCanceled)
{
_stopCancellationTokenSource.CancelAfter(_consumer.Configuration.WorkerStopTimeout);
}

await _offsetManager.WaitContextsCompletionAsync();
await Task.WhenAll(currentWorkers.Select(x => x.StopAsync()));
await _offsetManager
.WaitContextsCompletionAsync()
.WithCancellation(_stopCancellationTokenSource.Token, false);

currentWorkers.ForEach(worker => worker.Dispose());
_stopCancellationTokenSource?.Dispose();

_offsetManager = null;

Expand Down
Loading

0 comments on commit bb6e499

Please sign in to comment.