Skip to content

Commit

Permalink
fix: requeue failed offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
massada authored and filipeesch committed Aug 13, 2021
1 parent 0544e79 commit 546fedf
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 16 deletions.
103 changes: 103 additions & 0 deletions src/KafkaFlow.UnitTests/OffsetCommitterTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
namespace KafkaFlow.UnitTests
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Confluent.Kafka;
using KafkaFlow.Consumers;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;

[TestClass]
public class OffsetCommitterTests
{
private const int TestTimeout = 1000;

private Mock<IConsumer> consumerMock;

private Mock<ILogHandler> logHandlerMock;

private TopicPartition topicPartition;

private OffsetCommitter offsetCommitter;

[TestInitialize]
public void Setup()
{
this.consumerMock = new Mock<IConsumer>();
this.logHandlerMock = new Mock<ILogHandler>();
this.topicPartition = new TopicPartition("topic-A", new Partition(1));

this.consumerMock.Setup(c => c.Configuration.AutoCommitInterval)
.Returns(TimeSpan.FromMilliseconds(10));

this.offsetCommitter = new OffsetCommitter(this.consumerMock.Object, this.logHandlerMock.Object);
}

[TestCleanup]
public void Cleanup()
{
this.offsetCommitter.Dispose();
}

[TestMethod]
public void StoreOffset_ShouldCommit()
{
// Arrange
var offset = new TopicPartitionOffset(this.topicPartition, new Offset(1));
var expectedOffsets = new[] { offset };

var ready = new ManualResetEvent(false);

this.consumerMock
.Setup(c => c.Commit(It.Is<IEnumerable<TopicPartitionOffset>>(l => l.SequenceEqual(expectedOffsets))))
.Callback((IEnumerable<TopicPartitionOffset> _) =>
{
ready.Set();
});

// Act
this.offsetCommitter.StoreOffset(offset);
ready.WaitOne(TestTimeout);

// Assert
this.consumerMock.Verify(
c => c.Commit(It.Is<IEnumerable<TopicPartitionOffset>>(l => l.SequenceEqual(expectedOffsets))),
Times.Once);
}

[TestMethod]
public void StoreOffset_WithFailure_ShouldRequeueFailedOffsetAndCommit()
{
// Arrange
var offset = new TopicPartitionOffset(this.topicPartition, new Offset(2));
var expectedOffsets = new[] { offset };

var ready = new ManualResetEvent(false);
var hasThrown = false;

this.consumerMock
.Setup(c => c.Commit(It.Is<IEnumerable<TopicPartitionOffset>>(l => l.SequenceEqual(expectedOffsets))))
.Callback((IEnumerable<TopicPartitionOffset> _) =>
{
if (!hasThrown)
{
hasThrown = true;
throw new InvalidOperationException();
}

ready.Set();
});

// Act
this.offsetCommitter.StoreOffset(offset);
ready.WaitOne(TestTimeout);

// Assert
this.consumerMock.Verify(
c => c.Commit(It.Is<IEnumerable<TopicPartitionOffset>>(l => l.SequenceEqual(expectedOffsets))),
Times.Exactly(2));
}
}
}
42 changes: 26 additions & 16 deletions src/KafkaFlow/Consumers/OffsetCommitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace KafkaFlow.Consumers
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Confluent.Kafka;
Expand All @@ -11,10 +12,10 @@ internal class OffsetCommitter : IOffsetCommitter
private readonly IConsumer consumer;
private readonly ILogHandler logHandler;

private ConcurrentDictionary<(string, int), TopicPartitionOffset> offsetsToCommit = new();

private readonly Timer commitTimer;

private ConcurrentDictionary<(string, int), TopicPartitionOffset> offsetsToCommit = new();

public OffsetCommitter(
IConsumer consumer,
ILogHandler logHandler)
Expand All @@ -29,6 +30,20 @@ public OffsetCommitter(
consumer.Configuration.AutoCommitInterval);
}

public void Dispose()
{
this.commitTimer.Dispose();
this.CommitHandler();
}

public void StoreOffset(TopicPartitionOffset tpo)
{
this.offsetsToCommit.AddOrUpdate(
(tpo.Topic, tpo.Partition.Value),
tpo,
(_, _) => tpo);
}

private void CommitHandler()
{
if (!this.offsetsToCommit.Any())
Expand All @@ -45,25 +60,20 @@ private void CommitHandler()
}
catch (Exception e)
{
this.logHandler.Error(
this.logHandler.Warning(
"Error Commiting Offsets",
e,
null);
}
}
new { ErrorMessage = e.Message });

public void Dispose()
{
this.commitTimer.Dispose();
this.CommitHandler();
this.RequeueFailedOffsets(offsets.Values);
}
}

public void StoreOffset(TopicPartitionOffset tpo)
private void RequeueFailedOffsets(IEnumerable<TopicPartitionOffset> offsets)
{
this.offsetsToCommit.AddOrUpdate(
(tpo.Topic, tpo.Partition.Value),
tpo,
(_, _) => tpo);
foreach (var tpo in offsets)
{
this.offsetsToCommit.TryAdd((tpo.Topic, tpo.Partition.Value), tpo);
}
}
}
}

0 comments on commit 546fedf

Please sign in to comment.