Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid get block info when positioned read for first retry #18417

Merged
merged 12 commits into from
Jan 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,15 @@ private int positionedReadInternal(long pos, byte[] b, int off, int len) throws
try {
// Positioned read may be called multiple times for the same block. Caching the in-stream
// allows us to avoid the block store rpc to open a new stream for each call.
BlockInfo blockInfo = lastException == null
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a threshold of outdate time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateStream check outdate time

? mStatus.getBlockInfo(blockId) : mBlockStore.getInfo(blockId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbw9580 do you think it's possible that we can use mStatus.getBlockInfo(blockId) even if lastException != null?

if (mCachedPositionedReadStream == null) {
mCachedPositionedReadStream = mBlockStore.getInStream(blockId, mOptions, mFailedWorkers);
mCachedPositionedReadStream = mBlockStore.getInStream(
blockInfo, mOptions, mFailedWorkers);
} else if (mCachedPositionedReadStream.getId() != blockId) {
closeBlockInStream(mCachedPositionedReadStream);
mCachedPositionedReadStream = mBlockStore.getInStream(blockId, mOptions, mFailedWorkers);
mCachedPositionedReadStream = mBlockStore.getInStream(
blockInfo, mOptions, mFailedWorkers);
}
long offset = pos % mBlockSize;
int bytesRead = mCachedPositionedReadStream.positionedRead(offset, b, off,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,8 +766,15 @@ public void positionedReadRetry() throws Exception {
when(mBlockStore
.getInStream(eq(0L), any(InStreamOptions.class), any()))
.thenReturn(brokenStream).thenReturn(workingStream);
when(mBlockStore
.getInStream(eq(new BlockInfo().setBlockId(0)), any(InStreamOptions.class), any()))
.thenReturn(brokenStream).thenReturn(workingStream);
when(brokenStream.positionedRead(anyLong(), any(byte[].class), anyInt(), anyInt()))
.thenThrow(new UnavailableException("test exception"));
when(mBlockStore.getInfo(anyLong())).thenAnswer(invocation -> {
long blockId = invocation.getArgument(0);
return mStatus.getBlockInfo(blockId);
});

byte[] b = new byte[(int) BLOCK_LENGTH * 2];
mTestStream.positionedRead(BLOCK_LENGTH / 2, b, 0, b.length);
Expand Down