diff --git a/core/client/fs/src/main/java/alluxio/client/file/AlluxioFileInStream.java b/core/client/fs/src/main/java/alluxio/client/file/AlluxioFileInStream.java index 83769689b75b..8193faf56d1e 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/AlluxioFileInStream.java +++ b/core/client/fs/src/main/java/alluxio/client/file/AlluxioFileInStream.java @@ -82,6 +82,7 @@ public class AlluxioFileInStream extends FileInStream { private final BlockStoreClient mBlockStore; private final FileSystemContext mContext; private final boolean mPassiveCachingEnabled; + private final long mStatusOutdatedTime; /* Convenience values derived from mStatus, use these instead of querying mStatus. */ /** Length of the file in bytes. */ @@ -130,6 +131,8 @@ protected AlluxioFileInStream(URIStatus status, InStreamOptions options, .withMaxSleep(blockReadRetrySleepMax) .withSkipInitialSleep().build(); mStatus = status; + mStatusOutdatedTime = System.currentTimeMillis() + + conf.getMs(PropertyKey.USER_FILE_IN_STREAM_STATUS_EXPIRATION_TIME); mOptions = options; mBlockStore = BlockStoreClient.create(mContext); mLength = mStatus.getLength(); @@ -302,11 +305,15 @@ private int positionedReadInternal(long pos, byte[] b, int off, int len) throws try { // Positioned read may be called multiple times for the same block. Caching the in-stream // allows us to avoid the block store rpc to open a new stream for each call. + BlockInfo blockInfo = isStatusOutdated() || lastException != null + ? mBlockStore.getInfo(blockId) : mStatus.getBlockInfo(blockId); if (mCachedPositionedReadStream == null) { - mCachedPositionedReadStream = mBlockStore.getInStream(blockId, mOptions, mFailedWorkers); + mCachedPositionedReadStream = mBlockStore.getInStream( + blockInfo, mOptions, mFailedWorkers); } else if (mCachedPositionedReadStream.getId() != blockId) { closeBlockInStream(mCachedPositionedReadStream); - mCachedPositionedReadStream = mBlockStore.getInStream(blockId, mOptions, mFailedWorkers); + mCachedPositionedReadStream = mBlockStore.getInStream( + blockInfo, mOptions, mFailedWorkers); } long offset = pos % mBlockSize; int bytesRead = mCachedPositionedReadStream.positionedRead(offset, b, off, @@ -406,7 +413,7 @@ private void updateStream() throws IOException { } } } - if (isBlockInfoOutdated) { + if (isBlockInfoOutdated || isStatusOutdated()) { mBlockInStream = mBlockStore.getInStream(blockId, mOptions, mFailedWorkers); } else { mBlockInStream = mBlockStore.getInStream(blockInfo, mOptions, mFailedWorkers); @@ -416,6 +423,14 @@ private void updateStream() throws IOException { mBlockInStream.seek(offset); } + /** + * @return true if the status is outdated + */ + @VisibleForTesting + public boolean isStatusOutdated() { + return System.currentTimeMillis() >= mStatusOutdatedTime; + } + private void closeBlockInStream(BlockInStream stream) throws IOException { if (stream != null) { BlockInStream.BlockInStreamSource blockSource = stream.getSource(); diff --git a/core/client/fs/src/test/java/alluxio/client/file/AlluxioFileInStreamTest.java b/core/client/fs/src/test/java/alluxio/client/file/AlluxioFileInStreamTest.java index 04271a1d8d16..34080db42e0d 100644 --- a/core/client/fs/src/test/java/alluxio/client/file/AlluxioFileInStreamTest.java +++ b/core/client/fs/src/test/java/alluxio/client/file/AlluxioFileInStreamTest.java @@ -53,6 +53,7 @@ import alluxio.wire.WorkerNetAddress; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -766,8 +767,15 @@ public void positionedReadRetry() throws Exception { when(mBlockStore .getInStream(eq(0L), any(InStreamOptions.class), any())) .thenReturn(brokenStream).thenReturn(workingStream); + when(mBlockStore + .getInStream(eq(new BlockInfo().setBlockId(0)), any(InStreamOptions.class), any())) + .thenReturn(brokenStream).thenReturn(workingStream); when(brokenStream.positionedRead(anyLong(), any(byte[].class), anyInt(), anyInt())) .thenThrow(new UnavailableException("test exception")); + when(mBlockStore.getInfo(anyLong())).thenAnswer(invocation -> { + long blockId = invocation.getArgument(0); + return mStatus.getBlockInfo(blockId); + }); byte[] b = new byte[(int) BLOCK_LENGTH * 2]; mTestStream.positionedRead(BLOCK_LENGTH / 2, b, 0, b.length); @@ -951,4 +959,21 @@ private void testReadBuffer(int dataRead) throws Exception { private void validatePartialCaching(int index, int readSize) { assertEquals(readSize, mInStreams.get(index).getBytesRead()); } + + @Test + public void testStatusOutdated() throws IOException, InterruptedException { + OpenFilePOptions options = + OpenFilePOptions.newBuilder().setReadType(ReadPType.CACHE_PROMOTE).build(); + try (AlluxioFileInStream testStream = new AlluxioFileInStream(mStatus, + new InStreamOptions(mStatus, options, mConf, mContext), mContext)) { + Thread.sleep(1); + Assert.assertFalse(testStream.isStatusOutdated()); + } + mConf.set(PropertyKey.USER_FILE_IN_STREAM_STATUS_EXPIRATION_TIME, 0L); + try (AlluxioFileInStream testStream = new AlluxioFileInStream(mStatus, + new InStreamOptions(mStatus, options, mConf, mContext), mContext)) { + Thread.sleep(1); + Assert.assertTrue(testStream.isStatusOutdated()); + } + } } diff --git a/core/common/src/main/java/alluxio/conf/PropertyKey.java b/core/common/src/main/java/alluxio/conf/PropertyKey.java index c0f64cee8913..4a9db8dccc0f 100755 --- a/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -6307,6 +6307,15 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.CLIENT) .build(); + public static final PropertyKey USER_FILE_IN_STREAM_STATUS_EXPIRATION_TIME = + durationBuilder(Name.USER_FILE_IN_STREAM_STATUS_EXPIRATION_TIME) + .setDefaultValue("5min") + .setDescription("Specifies how long the file metadata can be cached and reused during " + + "the FileInStream. Once the specified expiration time has elapsed, the file " + + "metadata will be reloaded from the Alluxio master. The cache reduces the number " + + "of metadata requests to the Master. The default is 5 minutes.") + .setScope(Scope.CLIENT) + .build(); public static final PropertyKey USER_FILE_WRITE_INIT_SLEEP_MIN = durationBuilder(Name.USER_FILE_WRITE_INIT_SLEEP_MIN) .setDefaultValue("1sec") @@ -8965,6 +8974,8 @@ public static final class Name { "alluxio.user.file.write.tier.default"; public static final String USER_FILE_INCLUDE_OPERATION_ID = "alluxio.user.file.include.operation.id"; + public static final String USER_FILE_IN_STREAM_STATUS_EXPIRATION_TIME = + "alluxio.user.file.in.stream.status.expiration.time"; public static final String USER_FILE_WRITE_INIT_SLEEP_MIN = "alluxio.user.file.write.init.sleep.min"; public static final String USER_FILE_WRITE_INIT_SLEEP_MAX =