Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid get block info when positioned read for first retry #18417

Merged
merged 12 commits into from
Jan 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class AlluxioFileInStream extends FileInStream {
private final BlockStoreClient mBlockStore;
private final FileSystemContext mContext;
private final boolean mPassiveCachingEnabled;
private final long mStatusOutdatedTime;

/* Convenience values derived from mStatus, use these instead of querying mStatus. */
/** Length of the file in bytes. */
Expand Down Expand Up @@ -130,6 +131,8 @@ protected AlluxioFileInStream(URIStatus status, InStreamOptions options,
.withMaxSleep(blockReadRetrySleepMax)
.withSkipInitialSleep().build();
mStatus = status;
mStatusOutdatedTime = System.currentTimeMillis()
+ conf.getMs(PropertyKey.USER_FILE_IN_STREAM_STATUS_EXPIRATION_TIME);
mOptions = options;
mBlockStore = BlockStoreClient.create(mContext);
mLength = mStatus.getLength();
Expand Down Expand Up @@ -302,11 +305,15 @@ private int positionedReadInternal(long pos, byte[] b, int off, int len) throws
try {
// Positioned read may be called multiple times for the same block. Caching the in-stream
// allows us to avoid the block store rpc to open a new stream for each call.
BlockInfo blockInfo = isStatusOutdated() || lastException != null
? mBlockStore.getInfo(blockId) : mStatus.getBlockInfo(blockId);
if (mCachedPositionedReadStream == null) {
mCachedPositionedReadStream = mBlockStore.getInStream(blockId, mOptions, mFailedWorkers);
mCachedPositionedReadStream = mBlockStore.getInStream(
blockInfo, mOptions, mFailedWorkers);
} else if (mCachedPositionedReadStream.getId() != blockId) {
closeBlockInStream(mCachedPositionedReadStream);
mCachedPositionedReadStream = mBlockStore.getInStream(blockId, mOptions, mFailedWorkers);
mCachedPositionedReadStream = mBlockStore.getInStream(
blockInfo, mOptions, mFailedWorkers);
}
long offset = pos % mBlockSize;
int bytesRead = mCachedPositionedReadStream.positionedRead(offset, b, off,
Expand Down Expand Up @@ -406,7 +413,7 @@ private void updateStream() throws IOException {
}
}
}
if (isBlockInfoOutdated) {
if (isBlockInfoOutdated || isStatusOutdated()) {
mBlockInStream = mBlockStore.getInStream(blockId, mOptions, mFailedWorkers);
} else {
mBlockInStream = mBlockStore.getInStream(blockInfo, mOptions, mFailedWorkers);
Expand All @@ -416,6 +423,10 @@ private void updateStream() throws IOException {
mBlockInStream.seek(offset);
}

private boolean isStatusOutdated() {
return System.currentTimeMillis() > mStatusOutdatedTime;
}

private void closeBlockInStream(BlockInStream stream) throws IOException {
if (stream != null) {
BlockInStream.BlockInStreamSource blockSource = stream.getSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,8 +766,15 @@ public void positionedReadRetry() throws Exception {
when(mBlockStore
.getInStream(eq(0L), any(InStreamOptions.class), any()))
.thenReturn(brokenStream).thenReturn(workingStream);
when(mBlockStore
.getInStream(eq(new BlockInfo().setBlockId(0)), any(InStreamOptions.class), any()))
.thenReturn(brokenStream).thenReturn(workingStream);
when(brokenStream.positionedRead(anyLong(), any(byte[].class), anyInt(), anyInt()))
.thenThrow(new UnavailableException("test exception"));
when(mBlockStore.getInfo(anyLong())).thenAnswer(invocation -> {
long blockId = invocation.getArgument(0);
return mStatus.getBlockInfo(blockId);
});

byte[] b = new byte[(int) BLOCK_LENGTH * 2];
mTestStream.positionedRead(BLOCK_LENGTH / 2, b, 0, b.length);
Expand Down
7 changes: 7 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -6307,6 +6307,11 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.CLIENT)
.build();
public static final PropertyKey USER_FILE_IN_STREAM_STATUS_EXPIRATION_TIME =
durationBuilder(Name.USER_FILE_IN_STREAM_STATUS_EXPIRATION_TIME)
.setDefaultValue("5min")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whenever a property key is added, remember to add description:

setDescription("Specifies how long the file metadata can be cached and reused during the FileInStream. Once the specified expiration time has elapsed, the file metadata will be reloaded from the Alluxio master. The cache reduces the number of metadata requests to the Master. The default is 5 minutes.")

.setScope(Scope.CLIENT)
.build();
public static final PropertyKey USER_FILE_WRITE_INIT_SLEEP_MIN =
durationBuilder(Name.USER_FILE_WRITE_INIT_SLEEP_MIN)
.setDefaultValue("1sec")
Expand Down Expand Up @@ -8965,6 +8970,8 @@ public static final class Name {
"alluxio.user.file.write.tier.default";
public static final String USER_FILE_INCLUDE_OPERATION_ID =
"alluxio.user.file.include.operation.id";
public static final String USER_FILE_IN_STREAM_STATUS_EXPIRATION_TIME =
"alluxio.user.file.in.stream.expiration.time";
maobaolong marked this conversation as resolved.
Show resolved Hide resolved
public static final String USER_FILE_WRITE_INIT_SLEEP_MIN =
"alluxio.user.file.write.init.sleep.min";
public static final String USER_FILE_WRITE_INIT_SLEEP_MAX =
Expand Down
Loading