Skip to content

Commit

Permalink
fix: requeue failed offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
massada authored and filipeesch committed Aug 13, 2021
1 parent 5e80846 commit 3e01f84
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 3 deletions.
103 changes: 103 additions & 0 deletions src/KafkaFlow.UnitTests/OffsetCommitterTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
namespace KafkaFlow.UnitTests
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Confluent.Kafka;
using KafkaFlow.Consumers;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;

[TestClass]
public class OffsetCommitterTests
{
private const int TestTimeout = 1000;

private Mock<IConsumer> consumerMock;

private Mock<ILogHandler> logHandlerMock;

private TopicPartition topicPartition;

private OffsetCommitter offsetCommitter;

[TestInitialize]
public void Setup()
{
this.consumerMock = new Mock<IConsumer>();
this.logHandlerMock = new Mock<ILogHandler>();
this.topicPartition = new TopicPartition("topic-A", new Partition(1));

this.consumerMock.Setup(c => c.Configuration.AutoCommitInterval)
.Returns(TimeSpan.FromMilliseconds(10));

this.offsetCommitter = new OffsetCommitter(this.consumerMock.Object, this.logHandlerMock.Object);
}

[TestCleanup]
public void Cleanup()
{
this.offsetCommitter.Dispose();
}

[TestMethod]
public void StoreOffset_ShouldCommit()
{
// Arrange
var offset = new TopicPartitionOffset(this.topicPartition, new Offset(1));
var expectedOffsets = new[] { offset };

var ready = new ManualResetEvent(false);

this.consumerMock
.Setup(c => c.Commit(It.Is<IEnumerable<TopicPartitionOffset>>(l => l.SequenceEqual(expectedOffsets))))
.Callback((IEnumerable<TopicPartitionOffset> _) =>
{
ready.Set();
});

// Act
this.offsetCommitter.StoreOffset(offset);
ready.WaitOne(TestTimeout);

// Assert
this.consumerMock.Verify(
c => c.Commit(It.Is<IEnumerable<TopicPartitionOffset>>(l => l.SequenceEqual(expectedOffsets))),
Times.Once);
}

[TestMethod]
public void StoreOffset_WithFailure_ShouldRequeueFailedOffsetAndCommit()
{
// Arrange
var offset = new TopicPartitionOffset(this.topicPartition, new Offset(2));
var expectedOffsets = new[] { offset };

var ready = new ManualResetEvent(false);
var hasThrown = false;

this.consumerMock
.Setup(c => c.Commit(It.Is<IEnumerable<TopicPartitionOffset>>(l => l.SequenceEqual(expectedOffsets))))
.Callback((IEnumerable<TopicPartitionOffset> _) =>
{
if (!hasThrown)
{
hasThrown = true;
throw new InvalidOperationException();
}

ready.Set();
});

// Act
this.offsetCommitter.StoreOffset(offset);
ready.WaitOne(TestTimeout);

// Assert
this.consumerMock.Verify(
c => c.Commit(It.Is<IEnumerable<TopicPartitionOffset>>(l => l.SequenceEqual(expectedOffsets))),
Times.Exactly(2));
}
}
}
16 changes: 13 additions & 3 deletions src/KafkaFlow/Consumers/OffsetCommitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace KafkaFlow.Consumers
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Confluent.Kafka;
Expand Down Expand Up @@ -59,10 +60,19 @@ private void CommitHandler()
}
catch (Exception e)
{
this.logHandler.Error(
this.logHandler.Warning(
"Error Commiting Offsets",
e,
null);
new { ErrorMessage = e.Message });

this.RequeueFailedOffsets(offsets.Values);
}
}

private void RequeueFailedOffsets(IEnumerable<TopicPartitionOffset> offsets)
{
foreach (var tpo in offsets)
{
this.offsetsToCommit.TryAdd((tpo.Topic, tpo.Partition.Value), tpo);
}
}
}
Expand Down

0 comments on commit 3e01f84

Please sign in to comment.