Skip to content

Commit

Permalink
KAFKA-18683: Handle slicing of file records for updated start position (
Browse files Browse the repository at this point in the history
apache#18759)

The PR corrects the check which was introduced in apache#5332 where position is checked to be within boundaries of file. The check 
    position > currentSizeInBytes - start 
is incorrect, since the position is relative to start.

Reviewers: Jun Rao <[email protected]>
  • Loading branch information
apoorvmittal10 authored Jan 31, 2025
1 parent 7920fad commit 484ba83
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ private int availableBytes(int position, int size) {

if (position < 0)
throw new IllegalArgumentException("Invalid position: " + position + " in read from " + this);
if (position > currentSizeInBytes - start)
// position should always be relative to the start of the file hence compare with file size
// to verify if the position is within the file.
if (position > currentSizeInBytes)
throw new IllegalArgumentException("Slice from position " + position + " exceeds end position of " + this);
if (size < 0)
throw new IllegalArgumentException("Invalid size: " + size + " in read from " + this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.IntStream;

import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.utf8;
Expand Down Expand Up @@ -433,6 +434,39 @@ public void testSearchForTimestamp() throws IOException {
}
}

/**
* Test slice when already sliced file records have start position greater than available bytes
* in the file records.
*/
@Test
public void testSliceForAlreadySlicedFileRecords() throws IOException {
byte[][] values = new byte[][] {
"abcd".getBytes(),
"efgh".getBytes(),
"ijkl".getBytes(),
"mnop".getBytes(),
"qrst".getBytes()
};
try (FileRecords fileRecords = createFileRecords(values)) {
List<RecordBatch> items = batches(fileRecords.slice(0, fileRecords.sizeInBytes()));

// Slice from fourth message until the end.
int position = IntStream.range(0, 3).map(i -> items.get(i).sizeInBytes()).sum();
FileRecords sliced = fileRecords.slice(position, fileRecords.sizeInBytes() - position);
assertEquals(fileRecords.sizeInBytes() - position, sliced.sizeInBytes());
assertEquals(items.subList(3, items.size()), batches(sliced), "Read starting from the fourth message");

// Further slice the already sliced file records, from fifth message until the end. Now the
// bytes available in the sliced file records are less than the start position. However, the
// position to slice is relative hence reset position to second message in the sliced file
// records i.e. reset with the size of the fourth message from the original file records.
position = items.get(4).sizeInBytes();
FileRecords finalSliced = sliced.slice(position, sliced.sizeInBytes() - position);
assertEquals(sliced.sizeInBytes() - position, finalSliced.sizeInBytes());
assertEquals(items.subList(4, items.size()), batches(finalSliced), "Read starting from the fifth message");
}
}

private void testSearchForTimestamp(RecordVersion version) throws IOException {
File temp = tempFile();
FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true);
Expand Down

0 comments on commit 484ba83

Please sign in to comment.