Skip to content

Commit

Permalink
Avoid get block info when positioned read for first retry
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Reduce talk to master to improve the performance for small file scenario.

I guess this is introduced by  ed53d54

### Why are the changes needed?

The block info can be get from file status, so we should not ask it from master every time.

### Does this PR introduce any user facing changes?

No

			pr-link: #18417
			change-id: cid-e642ce66ee370febeafdbaab24fe0255b4104ae5
  • Loading branch information
maobaolong authored Jan 12, 2024
1 parent edfc260 commit e939a39
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class AlluxioFileInStream extends FileInStream {
private final BlockStoreClient mBlockStore;
private final FileSystemContext mContext;
private final boolean mPassiveCachingEnabled;
private final long mStatusOutdatedTime;

/* Convenience values derived from mStatus, use these instead of querying mStatus. */
/** Length of the file in bytes. */
Expand Down Expand Up @@ -130,6 +131,8 @@ protected AlluxioFileInStream(URIStatus status, InStreamOptions options,
.withMaxSleep(blockReadRetrySleepMax)
.withSkipInitialSleep().build();
mStatus = status;
mStatusOutdatedTime = System.currentTimeMillis()
+ conf.getMs(PropertyKey.USER_FILE_IN_STREAM_STATUS_EXPIRATION_TIME);
mOptions = options;
mBlockStore = BlockStoreClient.create(mContext);
mLength = mStatus.getLength();
Expand Down Expand Up @@ -302,11 +305,15 @@ private int positionedReadInternal(long pos, byte[] b, int off, int len) throws
try {
// Positioned read may be called multiple times for the same block. Caching the in-stream
// allows us to avoid the block store rpc to open a new stream for each call.
BlockInfo blockInfo = isStatusOutdated() || lastException != null
? mBlockStore.getInfo(blockId) : mStatus.getBlockInfo(blockId);
if (mCachedPositionedReadStream == null) {
mCachedPositionedReadStream = mBlockStore.getInStream(blockId, mOptions, mFailedWorkers);
mCachedPositionedReadStream = mBlockStore.getInStream(
blockInfo, mOptions, mFailedWorkers);
} else if (mCachedPositionedReadStream.getId() != blockId) {
closeBlockInStream(mCachedPositionedReadStream);
mCachedPositionedReadStream = mBlockStore.getInStream(blockId, mOptions, mFailedWorkers);
mCachedPositionedReadStream = mBlockStore.getInStream(
blockInfo, mOptions, mFailedWorkers);
}
long offset = pos % mBlockSize;
int bytesRead = mCachedPositionedReadStream.positionedRead(offset, b, off,
Expand Down Expand Up @@ -406,7 +413,7 @@ private void updateStream() throws IOException {
}
}
}
if (isBlockInfoOutdated) {
if (isBlockInfoOutdated || isStatusOutdated()) {
mBlockInStream = mBlockStore.getInStream(blockId, mOptions, mFailedWorkers);
} else {
mBlockInStream = mBlockStore.getInStream(blockInfo, mOptions, mFailedWorkers);
Expand All @@ -416,6 +423,14 @@ private void updateStream() throws IOException {
mBlockInStream.seek(offset);
}

/**
* @return true if the status is outdated
*/
@VisibleForTesting
public boolean isStatusOutdated() {
return System.currentTimeMillis() >= mStatusOutdatedTime;
}

private void closeBlockInStream(BlockInStream stream) throws IOException {
if (stream != null) {
BlockInStream.BlockInStreamSource blockSource = stream.getSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import alluxio.wire.WorkerNetAddress;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -766,8 +767,15 @@ public void positionedReadRetry() throws Exception {
when(mBlockStore
.getInStream(eq(0L), any(InStreamOptions.class), any()))
.thenReturn(brokenStream).thenReturn(workingStream);
when(mBlockStore
.getInStream(eq(new BlockInfo().setBlockId(0)), any(InStreamOptions.class), any()))
.thenReturn(brokenStream).thenReturn(workingStream);
when(brokenStream.positionedRead(anyLong(), any(byte[].class), anyInt(), anyInt()))
.thenThrow(new UnavailableException("test exception"));
when(mBlockStore.getInfo(anyLong())).thenAnswer(invocation -> {
long blockId = invocation.getArgument(0);
return mStatus.getBlockInfo(blockId);
});

byte[] b = new byte[(int) BLOCK_LENGTH * 2];
mTestStream.positionedRead(BLOCK_LENGTH / 2, b, 0, b.length);
Expand Down Expand Up @@ -951,4 +959,21 @@ private void testReadBuffer(int dataRead) throws Exception {
private void validatePartialCaching(int index, int readSize) {
assertEquals(readSize, mInStreams.get(index).getBytesRead());
}

@Test
public void testStatusOutdated() throws IOException, InterruptedException {
OpenFilePOptions options =
OpenFilePOptions.newBuilder().setReadType(ReadPType.CACHE_PROMOTE).build();
try (AlluxioFileInStream testStream = new AlluxioFileInStream(mStatus,
new InStreamOptions(mStatus, options, mConf, mContext), mContext)) {
Thread.sleep(1);
Assert.assertFalse(testStream.isStatusOutdated());
}
mConf.set(PropertyKey.USER_FILE_IN_STREAM_STATUS_EXPIRATION_TIME, 0L);
try (AlluxioFileInStream testStream = new AlluxioFileInStream(mStatus,
new InStreamOptions(mStatus, options, mConf, mContext), mContext)) {
Thread.sleep(1);
Assert.assertTrue(testStream.isStatusOutdated());
}
}
}
11 changes: 11 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -6307,6 +6307,15 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.CLIENT)
.build();
public static final PropertyKey USER_FILE_IN_STREAM_STATUS_EXPIRATION_TIME =
durationBuilder(Name.USER_FILE_IN_STREAM_STATUS_EXPIRATION_TIME)
.setDefaultValue("5min")
.setDescription("Specifies how long the file metadata can be cached and reused during "
+ "the FileInStream. Once the specified expiration time has elapsed, the file "
+ "metadata will be reloaded from the Alluxio master. The cache reduces the number "
+ "of metadata requests to the Master. The default is 5 minutes.")
.setScope(Scope.CLIENT)
.build();
public static final PropertyKey USER_FILE_WRITE_INIT_SLEEP_MIN =
durationBuilder(Name.USER_FILE_WRITE_INIT_SLEEP_MIN)
.setDefaultValue("1sec")
Expand Down Expand Up @@ -8965,6 +8974,8 @@ public static final class Name {
"alluxio.user.file.write.tier.default";
public static final String USER_FILE_INCLUDE_OPERATION_ID =
"alluxio.user.file.include.operation.id";
public static final String USER_FILE_IN_STREAM_STATUS_EXPIRATION_TIME =
"alluxio.user.file.in.stream.status.expiration.time";
public static final String USER_FILE_WRITE_INIT_SLEEP_MIN =
"alluxio.user.file.write.init.sleep.min";
public static final String USER_FILE_WRITE_INIT_SLEEP_MAX =
Expand Down

0 comments on commit e939a39

Please sign in to comment.