From 4dff60df678c12c71c66a2e7a5c2a5b3f99921b8 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sat, 11 May 2024 09:50:52 +0800 Subject: [PATCH] MINOR: fix LogValidatorTest#checkNonCompressed (#15904) Reviewers: Jun Rao --- .../src/main/scala/kafka/log/UnifiedLog.scala | 1 + .../unit/kafka/log/LogValidatorTest.scala | 19 +++++++++++++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index eb6fc517a3dc1..eec5acd77442a 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1177,6 +1177,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validBytesCount += batchSize val batchCompression = CompressionType.forId(batch.compressionType.id) + // sourceCompression is only used on the leader path, which only contains one batch if version is v2 or messages are compressed if (batchCompression != CompressionType.NONE) sourceCompression = batchCompression } diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 57f7255f6ad8c..ee1b896e7afae 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -367,7 +367,7 @@ class LogValidatorTest { new SimpleRecord(timestampSeq(2), "beautiful".getBytes) ) - val records = MemoryRecords.withRecords(magic, 0L, CompressionType.GZIP, TimestampType.CREATE_TIME, producerId, + val records = MemoryRecords.withRecords(magic, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, producerId, producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional, recordList: _*) val offsetCounter = PrimitiveRef.ofLong(0) @@ -414,7 +414,15 @@ class LogValidatorTest { assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) + // V2: Only one batch is in the records, so the shallow OffsetOfMaxTimestamp is the last offset of the single batch + // V1: 3 batches are in the records, so the shallow OffsetOfMaxTimestamp is the timestamp of batch-1 + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + assertEquals(1, records.batches().asScala.size) + assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) + } else { + assertEquals(3, records.batches().asScala.size) + assertEquals(1, validatingResults.shallowOffsetOfMaxTimestamp) + } assertFalse(validatingResults.messageSizeMaybeChanged, "Message size should not have been changed") @@ -488,8 +496,11 @@ class LogValidatorTest { } assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp, - "Shallow offset of max timestamp should be 2") + + // Both V2 and V1 has single branch in the record when compression is enable, and hence their shallow OffsetOfMaxTimestamp + // is the last offset of the single branch + assertEquals(1, records.batches().asScala.size) + assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) assertTrue(validatingResults.messageSizeMaybeChanged, "Message size should have been changed")