From 719ac6a6adfc41a0bca114b1c43fec9747ff3509 Mon Sep 17 00:00:00 2001 From: yuyang wang <39869597+Jackson-Wang-7@users.noreply.github.com> Date: Mon, 27 May 2024 16:17:26 +0800 Subject: [PATCH] Fix read exception when the range size is smaller than Block size ### What changes are proposed in this pull request? Add the copy() for RetryPolicy, so it can be used in some situations where the RetryPolicy needs to be reused. ### Why are the changes needed? For OSSInputStream or other stream, it needs to call createStream() more than once in some cases, it should not use the same RetryPolicy instance in multiple different calls of createStream(). ### Does this PR introduce any user facing changes? None pr-link: Alluxio/alluxio#18613 change-id: cid-e0d5a80304ced99e28114bbbad886b4abb934408 --- .../src/main/java/alluxio/retry/CountingRetry.java | 5 +++++ .../java/alluxio/retry/ExponentialBackoffRetry.java | 5 +++++ .../alluxio/retry/ExponentialTimeBoundedRetry.java | 12 ++++++++++++ .../src/main/java/alluxio/retry/RetryPolicy.java | 6 ++++++ .../src/main/java/alluxio/retry/SleepingRetry.java | 2 +- .../main/java/alluxio/retry/TimeBoundedRetry.java | 2 +- .../src/main/java/alluxio/retry/TimeoutRetry.java | 7 ++++++- .../java/alluxio/underfs/cos/COSInputStream.java | 5 +++-- .../java/alluxio/underfs/kodo/KodoInputStream.java | 5 +++-- .../java/alluxio/underfs/obs/OBSInputStream.java | 5 +++-- .../java/alluxio/underfs/oss/OSSInputStream.java | 5 +++-- .../java/alluxio/underfs/swift/SwiftInputStream.java | 5 +++-- 12 files changed, 51 insertions(+), 13 deletions(-) diff --git a/core/common/src/main/java/alluxio/retry/CountingRetry.java b/core/common/src/main/java/alluxio/retry/CountingRetry.java index 6de9ded12d41..2a94277151cf 100644 --- a/core/common/src/main/java/alluxio/retry/CountingRetry.java +++ b/core/common/src/main/java/alluxio/retry/CountingRetry.java @@ -54,4 +54,9 @@ public boolean attempt() { public void reset() { mAttemptCount = 0; } + + @Override + public RetryPolicy copy() { + return new CountingRetry(mMaxRetries); + } } diff --git a/core/common/src/main/java/alluxio/retry/ExponentialBackoffRetry.java b/core/common/src/main/java/alluxio/retry/ExponentialBackoffRetry.java index f4aa4905e89f..e0417c65de83 100644 --- a/core/common/src/main/java/alluxio/retry/ExponentialBackoffRetry.java +++ b/core/common/src/main/java/alluxio/retry/ExponentialBackoffRetry.java @@ -56,4 +56,9 @@ protected long getSleepTime() { return Math.min(Math.abs(sleepMs), mMaxSleepMs); } } + + @Override + public ExponentialBackoffRetry copy() { + return new ExponentialBackoffRetry(mBaseSleepTimeMs, mMaxSleepMs, mMaxRetries); + } } diff --git a/core/common/src/main/java/alluxio/retry/ExponentialTimeBoundedRetry.java b/core/common/src/main/java/alluxio/retry/ExponentialTimeBoundedRetry.java index 11661f019869..3da6eb122e72 100644 --- a/core/common/src/main/java/alluxio/retry/ExponentialTimeBoundedRetry.java +++ b/core/common/src/main/java/alluxio/retry/ExponentialTimeBoundedRetry.java @@ -133,4 +133,16 @@ public ExponentialTimeBoundedRetry build() { mTimeCtx, mMaxDuration, mInitialSleep, mMaxSleep, mSkipInitialSleep); } } + + @Override + public RetryPolicy copy() { + Builder builder = ExponentialTimeBoundedRetry.builder() + .withMaxDuration(mMaxDuration) + .withInitialSleep(mNextSleep) + .withMaxSleep(mMaxSleep); + if (mSkipInitialSleep) { + builder.withSkipInitialSleep(); + } + return builder.build(); + } } diff --git a/core/common/src/main/java/alluxio/retry/RetryPolicy.java b/core/common/src/main/java/alluxio/retry/RetryPolicy.java index 95bf04e04e0e..9e54f37346a5 100644 --- a/core/common/src/main/java/alluxio/retry/RetryPolicy.java +++ b/core/common/src/main/java/alluxio/retry/RetryPolicy.java @@ -36,4 +36,10 @@ public interface RetryPolicy { * @return whether another retry should be performed */ boolean attempt(); + + /** + * Copy a new RetryPolicy based on the current RetryPolicy configuration. + * @return a copy of RetryPolicy + */ + RetryPolicy copy(); } diff --git a/core/common/src/main/java/alluxio/retry/SleepingRetry.java b/core/common/src/main/java/alluxio/retry/SleepingRetry.java index ec03d07dbedf..82b0b639d77c 100644 --- a/core/common/src/main/java/alluxio/retry/SleepingRetry.java +++ b/core/common/src/main/java/alluxio/retry/SleepingRetry.java @@ -21,7 +21,7 @@ */ @NotThreadSafe public abstract class SleepingRetry implements RetryPolicy { - private final int mMaxRetries; + protected final int mMaxRetries; private int mAttemptCount = 0; protected SleepingRetry(int maxRetries) { diff --git a/core/common/src/main/java/alluxio/retry/TimeBoundedRetry.java b/core/common/src/main/java/alluxio/retry/TimeBoundedRetry.java index 0216c723f635..6466f1d7588f 100644 --- a/core/common/src/main/java/alluxio/retry/TimeBoundedRetry.java +++ b/core/common/src/main/java/alluxio/retry/TimeBoundedRetry.java @@ -25,7 +25,7 @@ public abstract class TimeBoundedRetry implements RetryPolicy { private final Clock mClock; private final Sleeper mSleeper; - private final Duration mMaxDuration; + protected final Duration mMaxDuration; private final Instant mStartTime; private final Instant mEndTime; diff --git a/core/common/src/main/java/alluxio/retry/TimeoutRetry.java b/core/common/src/main/java/alluxio/retry/TimeoutRetry.java index 58890e09a3c0..af0f8e98a0ec 100644 --- a/core/common/src/main/java/alluxio/retry/TimeoutRetry.java +++ b/core/common/src/main/java/alluxio/retry/TimeoutRetry.java @@ -34,7 +34,7 @@ public class TimeoutRetry implements RetryPolicy { * @param retryTimeoutMs maximum period of time to retry for, in milliseconds * @param sleepMs time in milliseconds to sleep before retrying */ - public TimeoutRetry(long retryTimeoutMs, int sleepMs) { + public TimeoutRetry(long retryTimeoutMs, long sleepMs) { Preconditions.checkArgument(retryTimeoutMs > 0, "Retry timeout must be a positive number"); Preconditions.checkArgument(sleepMs >= 0, "sleepMs cannot be negative"); mRetryTimeoutMs = retryTimeoutMs; @@ -63,4 +63,9 @@ public boolean attempt() { } return false; } + + @Override + public RetryPolicy copy() { + return new TimeoutRetry(mRetryTimeoutMs, mSleepMs); + } } diff --git a/underfs/cos/src/main/java/alluxio/underfs/cos/COSInputStream.java b/underfs/cos/src/main/java/alluxio/underfs/cos/COSInputStream.java index 045eb18bdd68..18b01786669a 100644 --- a/underfs/cos/src/main/java/alluxio/underfs/cos/COSInputStream.java +++ b/underfs/cos/src/main/java/alluxio/underfs/cos/COSInputStream.java @@ -98,14 +98,15 @@ protected InputStream createStream(long startPos, long endPos) req.setRange(startPos, endPos < mContentLength ? endPos - 1 : mContentLength - 1); CosServiceException lastException = null; String errorMessage = String.format("Failed to open key: %s bucket: %s", mKey, mBucketName); - while (mRetryPolicy.attempt()) { + RetryPolicy retryPolicy = mRetryPolicy.copy(); + while (retryPolicy.attempt()) { try { COSObject object = mCosClient.getObject(req); return new BufferedInputStream(object.getObjectContent()); } catch (CosServiceException e) { errorMessage = String .format("Failed to open key: %s bucket: %s attempts: %d error: %s", mKey, mBucketName, - mRetryPolicy.getAttemptCount(), e.getMessage()); + retryPolicy.getAttemptCount(), e.getMessage()); if (e.getStatusCode() != HttpStatus.SC_NOT_FOUND) { throw new IOException(errorMessage, e); } diff --git a/underfs/kodo/src/main/java/alluxio/underfs/kodo/KodoInputStream.java b/underfs/kodo/src/main/java/alluxio/underfs/kodo/KodoInputStream.java index 9e7f1223b43b..102e561308a6 100644 --- a/underfs/kodo/src/main/java/alluxio/underfs/kodo/KodoInputStream.java +++ b/underfs/kodo/src/main/java/alluxio/underfs/kodo/KodoInputStream.java @@ -74,13 +74,14 @@ protected InputStream createStream(long startPos, long endPos) throws IOException { IOException lastException = null; String errorMessage = String.format("Failed to open key: %s", mKey); - while (mRetryPolicy.attempt()) { + RetryPolicy retryPolicy = mRetryPolicy.copy(); + while (retryPolicy.attempt()) { try { return mKodoclent.getObject(mKey, startPos, endPos, mContentLength); } catch (NotFoundException e) { errorMessage = String .format("Failed to open key: %s attempts: %s error: %s", mKey, - mRetryPolicy.getAttemptCount(), e.getMessage()); + retryPolicy.getAttemptCount(), e.getMessage()); // Key does not exist lastException = e; } diff --git a/underfs/obs/src/main/java/alluxio/underfs/obs/OBSInputStream.java b/underfs/obs/src/main/java/alluxio/underfs/obs/OBSInputStream.java index 54de4902158a..a2c9dc172cd5 100644 --- a/underfs/obs/src/main/java/alluxio/underfs/obs/OBSInputStream.java +++ b/underfs/obs/src/main/java/alluxio/underfs/obs/OBSInputStream.java @@ -104,14 +104,15 @@ protected InputStream createStream(long startPos, long endPos) throws IOExceptio req.setRangeStart(startPos); req.setRangeEnd(endPos < mContentLength ? endPos - 1 : mContentLength - 1); ObsException lastException = null; - while (mRetryPolicy.attempt()) { + RetryPolicy retryPolicy = mRetryPolicy.copy(); + while (retryPolicy.attempt()) { try { ObsObject obj = mObsClient.getObject(req); return new BufferedInputStream(obj.getObjectContent()); } catch (ObsException e) { System.out.println(e.getResponseCode()); LOG.warn("Attempt {} to open key {} in bucket {} failed with exception : {}", - mRetryPolicy.getAttemptCount(), mKey, mBucketName, e.toString()); + retryPolicy.getAttemptCount(), mKey, mBucketName, e.toString()); if (e.getResponseCode() != HttpStatus.SC_NOT_FOUND) { throw new IOException(e); } diff --git a/underfs/oss/src/main/java/alluxio/underfs/oss/OSSInputStream.java b/underfs/oss/src/main/java/alluxio/underfs/oss/OSSInputStream.java index 08c17f934fe4..fbde5ad95c1a 100644 --- a/underfs/oss/src/main/java/alluxio/underfs/oss/OSSInputStream.java +++ b/underfs/oss/src/main/java/alluxio/underfs/oss/OSSInputStream.java @@ -97,14 +97,15 @@ protected InputStream createStream(long startPos, long endPos) req.setRange(startPos, endPos < mContentLength ? endPos - 1 : mContentLength - 1); OSSException lastException = null; String errorMessage = String.format("Failed to open key: %s bucket: %s", mKey, mBucketName); - while (mRetryPolicy.attempt()) { + RetryPolicy retryPolicy = mRetryPolicy.copy(); + while (retryPolicy.attempt()) { try { OSSObject ossObject = mOssClient.getObject(req); return new BufferedInputStream(ossObject.getObjectContent()); } catch (OSSException e) { errorMessage = String .format("Failed to open key: %s bucket: %s attempts: %d error: %s", mKey, mBucketName, - mRetryPolicy.getAttemptCount(), e.getMessage()); + retryPolicy.getAttemptCount(), e.getMessage()); if (!e.getErrorCode().equals("NoSuchKey")) { throw new IOException(errorMessage, e); } diff --git a/underfs/swift/src/main/java/alluxio/underfs/swift/SwiftInputStream.java b/underfs/swift/src/main/java/alluxio/underfs/swift/SwiftInputStream.java index dcdfcf738c1b..79d590b0a599 100644 --- a/underfs/swift/src/main/java/alluxio/underfs/swift/SwiftInputStream.java +++ b/underfs/swift/src/main/java/alluxio/underfs/swift/SwiftInputStream.java @@ -84,7 +84,8 @@ public SwiftInputStream(Account account, String container, String object, long p protected InputStream createStream(long startPos, long endPos) throws IOException { NotFoundException lastException = null; - while (mRetryPolicy.attempt()) { + RetryPolicy retryPolicy = mRetryPolicy.copy(); + while (retryPolicy.attempt()) { try { StoredObject storedObject = mAccount.getContainer(mContainerName).getObject(mObjectPath); DownloadInstructions downloadInstructions = new DownloadInstructions(); @@ -92,7 +93,7 @@ protected InputStream createStream(long startPos, long endPos) return storedObject.downloadObjectAsInputStream(downloadInstructions); } catch (NotFoundException e) { LOG.warn("Attempt {} to get object {} from container {} failed with exception : {}", - mRetryPolicy.getAttemptCount(), mObjectPath, mContainerName, e.toString()); + retryPolicy.getAttemptCount(), mObjectPath, mContainerName, e.toString()); // Object does not exist lastException = e; }