Skip to content

Commit

Permalink
Feature/262 fix deduplicator (#263)
Browse files Browse the repository at this point in the history
* wip how to test

* wip test

* Fix for 2.11
  • Loading branch information
kevinwallimann authored Mar 17, 2022
1 parent ae99672 commit cc31683
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ private[hyperdrive] object KafkaUtil {
val recordSizes = records
.groupBy(r => new TopicPartition(r.topic, r.partition))
.mapValues(records => records.size)
val unfinishedPartitions = topicPartitions.filter(p => recordSizes.getOrElse(p, 0) < numberOfRecords(p) && offsetLowerBounds(p) != 0)
val beginningOffsets = consumer.beginningOffsets(topicPartitions.asJava).asScala
val unfinishedPartitions = topicPartitions.filter(p =>
recordSizes.getOrElse(p, 0) < numberOfRecords(p) && offsetLowerBounds(p) > beginningOffsets(p))
if (unfinishedPartitions.isEmpty) {
break()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,26 @@

package za.co.absa.hyperdrive.ingestor.implementation.utils

import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.kafka010.KafkaSourceOffsetProxy
import org.mockito.ArgumentMatchers.{any, eq => eqTo}
import org.mockito.Mockito.{times, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.mockito.MockitoSugar.mock
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
import za.co.absa.commons.io.TempDirectory
import za.co.absa.hyperdrive.ingestor.implementation.transformer.deduplicate.kafka.PrunedConsumerRecord
import za.co.absa.hyperdrive.shared.utils.SparkTestBase

import java.time.Duration
import java.util

class TestKafkaUtil extends FlatSpec with Matchers with BeforeAndAfter with SparkTestBase {
private var baseDir: TempDirectory = _
private val kafkaSufficientTimeout = Duration.ofSeconds(5L)

before {
baseDir = TempDirectory("test-dir").deleteOnExit()
Expand Down Expand Up @@ -149,4 +160,48 @@ class TestKafkaUtil extends FlatSpec with Matchers with BeforeAndAfter with Spar
// then
result.getMessage should include ("batchId 1")
}

"getAtLeastNLatestRecordsFromPartition" should "poll only once if no more message is available" in {
val topic = "topic"
val topicPartition0 = new TopicPartition(topic, 0)
val topicPartition1 = new TopicPartition(topic, 1)
val startOffset0 = 280L
val startOffset1 = 480L
val endOffset0 = 300L
val endOffset1 = 500L

val pruningFn = (r: ConsumerRecord[String, String]) => PrunedConsumerRecord(
r.topic(),
r.partition(),
r.offset(),
Seq(r.value())
)
val mockKafkaConsumer = mock[KafkaConsumer[String, String]]
val numberOfRecords = Map(topicPartition0 -> 50L, topicPartition1 -> 50L)
import scala.collection.JavaConverters._
val endOffsets = Map(topicPartition0 -> endOffset0, topicPartition1 -> endOffset1).asJava
val beginningOffsets = Map(topicPartition0 -> startOffset0, topicPartition1 -> startOffset1).asJava
val records0 = (startOffset0 to endOffset0)
.map(i => new ConsumerRecord("topic", 0, i, "key", s"value_$i"))
.asJava
val records1 = (startOffset1 to endOffset1)
.map(i => new ConsumerRecord("topic", 0, i, "key", s"value_$i"))
.asJava
val records = new ConsumerRecords(Map(topicPartition0 -> records0, topicPartition1 -> records1).asJava)

when(mockKafkaConsumer.endOffsets(any())).thenAnswer(new Answer[java.util.Map[TopicPartition, Long]] {
override def answer(invocation: InvocationOnMock): util.Map[TopicPartition, Long] = endOffsets
})
when(mockKafkaConsumer.beginningOffsets(any())).thenAnswer(new Answer[java.util.Map[TopicPartition, Long]] {
override def answer(invocation: InvocationOnMock): util.Map[TopicPartition, Long] = beginningOffsets
})
when(mockKafkaConsumer.poll(any[Duration])).thenReturn(records)
when(mockKafkaConsumer.position(eqTo(topicPartition0))).thenReturn(endOffset0)
when(mockKafkaConsumer.position(eqTo(topicPartition1))).thenReturn(endOffset1)

implicit val kafkaConsumerTimeout: Duration = kafkaSufficientTimeout
val consumerRecords = KafkaUtil.getAtLeastNLatestRecordsFromPartition(mockKafkaConsumer, numberOfRecords, pruningFn)
consumerRecords.size shouldBe records0.size + records1.size
verify(mockKafkaConsumer, times(1)).poll(any[Duration])
}
}

0 comments on commit cc31683

Please sign in to comment.