diff --git a/core/common/src/main/java/alluxio/retry/CountingRetry.java b/core/common/src/main/java/alluxio/retry/CountingRetry.java index 6de9ded12d41..2a94277151cf 100644 --- a/core/common/src/main/java/alluxio/retry/CountingRetry.java +++ b/core/common/src/main/java/alluxio/retry/CountingRetry.java @@ -54,4 +54,9 @@ public boolean attempt() { public void reset() { mAttemptCount = 0; } + + @Override + public RetryPolicy copy() { + return new CountingRetry(mMaxRetries); + } } diff --git a/core/common/src/main/java/alluxio/retry/ExponentialBackoffRetry.java b/core/common/src/main/java/alluxio/retry/ExponentialBackoffRetry.java index f4aa4905e89f..e0417c65de83 100644 --- a/core/common/src/main/java/alluxio/retry/ExponentialBackoffRetry.java +++ b/core/common/src/main/java/alluxio/retry/ExponentialBackoffRetry.java @@ -56,4 +56,9 @@ protected long getSleepTime() { return Math.min(Math.abs(sleepMs), mMaxSleepMs); } } + + @Override + public ExponentialBackoffRetry copy() { + return new ExponentialBackoffRetry(mBaseSleepTimeMs, mMaxSleepMs, mMaxRetries); + } } diff --git a/core/common/src/main/java/alluxio/retry/ExponentialTimeBoundedRetry.java b/core/common/src/main/java/alluxio/retry/ExponentialTimeBoundedRetry.java index 11661f019869..3da6eb122e72 100644 --- a/core/common/src/main/java/alluxio/retry/ExponentialTimeBoundedRetry.java +++ b/core/common/src/main/java/alluxio/retry/ExponentialTimeBoundedRetry.java @@ -133,4 +133,16 @@ public ExponentialTimeBoundedRetry build() { mTimeCtx, mMaxDuration, mInitialSleep, mMaxSleep, mSkipInitialSleep); } } + + @Override + public RetryPolicy copy() { + Builder builder = ExponentialTimeBoundedRetry.builder() + .withMaxDuration(mMaxDuration) + .withInitialSleep(mNextSleep) + .withMaxSleep(mMaxSleep); + if (mSkipInitialSleep) { + builder.withSkipInitialSleep(); + } + return builder.build(); + } } diff --git a/core/common/src/main/java/alluxio/retry/RetryPolicy.java b/core/common/src/main/java/alluxio/retry/RetryPolicy.java index 95bf04e04e0e..9e54f37346a5 100644 --- a/core/common/src/main/java/alluxio/retry/RetryPolicy.java +++ b/core/common/src/main/java/alluxio/retry/RetryPolicy.java @@ -36,4 +36,10 @@ public interface RetryPolicy { * @return whether another retry should be performed */ boolean attempt(); + + /** + * Copy a new RetryPolicy based on the current RetryPolicy configuration. + * @return a copy of RetryPolicy + */ + RetryPolicy copy(); } diff --git a/core/common/src/main/java/alluxio/retry/SleepingRetry.java b/core/common/src/main/java/alluxio/retry/SleepingRetry.java index ec03d07dbedf..82b0b639d77c 100644 --- a/core/common/src/main/java/alluxio/retry/SleepingRetry.java +++ b/core/common/src/main/java/alluxio/retry/SleepingRetry.java @@ -21,7 +21,7 @@ */ @NotThreadSafe public abstract class SleepingRetry implements RetryPolicy { - private final int mMaxRetries; + protected final int mMaxRetries; private int mAttemptCount = 0; protected SleepingRetry(int maxRetries) { diff --git a/core/common/src/main/java/alluxio/retry/TimeBoundedRetry.java b/core/common/src/main/java/alluxio/retry/TimeBoundedRetry.java index 0216c723f635..6466f1d7588f 100644 --- a/core/common/src/main/java/alluxio/retry/TimeBoundedRetry.java +++ b/core/common/src/main/java/alluxio/retry/TimeBoundedRetry.java @@ -25,7 +25,7 @@ public abstract class TimeBoundedRetry implements RetryPolicy { private final Clock mClock; private final Sleeper mSleeper; - private final Duration mMaxDuration; + protected final Duration mMaxDuration; private final Instant mStartTime; private final Instant mEndTime; diff --git a/core/common/src/main/java/alluxio/retry/TimeoutRetry.java b/core/common/src/main/java/alluxio/retry/TimeoutRetry.java index 58890e09a3c0..af0f8e98a0ec 100644 --- a/core/common/src/main/java/alluxio/retry/TimeoutRetry.java +++ b/core/common/src/main/java/alluxio/retry/TimeoutRetry.java @@ -34,7 +34,7 @@ public class TimeoutRetry implements RetryPolicy { * @param retryTimeoutMs maximum period of time to retry for, in milliseconds * @param sleepMs time in milliseconds to sleep before retrying */ - public TimeoutRetry(long retryTimeoutMs, int sleepMs) { + public TimeoutRetry(long retryTimeoutMs, long sleepMs) { Preconditions.checkArgument(retryTimeoutMs > 0, "Retry timeout must be a positive number"); Preconditions.checkArgument(sleepMs >= 0, "sleepMs cannot be negative"); mRetryTimeoutMs = retryTimeoutMs; @@ -63,4 +63,9 @@ public boolean attempt() { } return false; } + + @Override + public RetryPolicy copy() { + return new TimeoutRetry(mRetryTimeoutMs, mSleepMs); + } } diff --git a/underfs/cos/src/main/java/alluxio/underfs/cos/COSInputStream.java b/underfs/cos/src/main/java/alluxio/underfs/cos/COSInputStream.java index 045eb18bdd68..18b01786669a 100644 --- a/underfs/cos/src/main/java/alluxio/underfs/cos/COSInputStream.java +++ b/underfs/cos/src/main/java/alluxio/underfs/cos/COSInputStream.java @@ -98,14 +98,15 @@ protected InputStream createStream(long startPos, long endPos) req.setRange(startPos, endPos < mContentLength ? endPos - 1 : mContentLength - 1); CosServiceException lastException = null; String errorMessage = String.format("Failed to open key: %s bucket: %s", mKey, mBucketName); - while (mRetryPolicy.attempt()) { + RetryPolicy retryPolicy = mRetryPolicy.copy(); + while (retryPolicy.attempt()) { try { COSObject object = mCosClient.getObject(req); return new BufferedInputStream(object.getObjectContent()); } catch (CosServiceException e) { errorMessage = String .format("Failed to open key: %s bucket: %s attempts: %d error: %s", mKey, mBucketName, - mRetryPolicy.getAttemptCount(), e.getMessage()); + retryPolicy.getAttemptCount(), e.getMessage()); if (e.getStatusCode() != HttpStatus.SC_NOT_FOUND) { throw new IOException(errorMessage, e); } diff --git a/underfs/kodo/src/main/java/alluxio/underfs/kodo/KodoInputStream.java b/underfs/kodo/src/main/java/alluxio/underfs/kodo/KodoInputStream.java index 9e7f1223b43b..102e561308a6 100644 --- a/underfs/kodo/src/main/java/alluxio/underfs/kodo/KodoInputStream.java +++ b/underfs/kodo/src/main/java/alluxio/underfs/kodo/KodoInputStream.java @@ -74,13 +74,14 @@ protected InputStream createStream(long startPos, long endPos) throws IOException { IOException lastException = null; String errorMessage = String.format("Failed to open key: %s", mKey); - while (mRetryPolicy.attempt()) { + RetryPolicy retryPolicy = mRetryPolicy.copy(); + while (retryPolicy.attempt()) { try { return mKodoclent.getObject(mKey, startPos, endPos, mContentLength); } catch (NotFoundException e) { errorMessage = String .format("Failed to open key: %s attempts: %s error: %s", mKey, - mRetryPolicy.getAttemptCount(), e.getMessage()); + retryPolicy.getAttemptCount(), e.getMessage()); // Key does not exist lastException = e; } diff --git a/underfs/obs/src/main/java/alluxio/underfs/obs/OBSInputStream.java b/underfs/obs/src/main/java/alluxio/underfs/obs/OBSInputStream.java index 54de4902158a..a2c9dc172cd5 100644 --- a/underfs/obs/src/main/java/alluxio/underfs/obs/OBSInputStream.java +++ b/underfs/obs/src/main/java/alluxio/underfs/obs/OBSInputStream.java @@ -104,14 +104,15 @@ protected InputStream createStream(long startPos, long endPos) throws IOExceptio req.setRangeStart(startPos); req.setRangeEnd(endPos < mContentLength ? endPos - 1 : mContentLength - 1); ObsException lastException = null; - while (mRetryPolicy.attempt()) { + RetryPolicy retryPolicy = mRetryPolicy.copy(); + while (retryPolicy.attempt()) { try { ObsObject obj = mObsClient.getObject(req); return new BufferedInputStream(obj.getObjectContent()); } catch (ObsException e) { System.out.println(e.getResponseCode()); LOG.warn("Attempt {} to open key {} in bucket {} failed with exception : {}", - mRetryPolicy.getAttemptCount(), mKey, mBucketName, e.toString()); + retryPolicy.getAttemptCount(), mKey, mBucketName, e.toString()); if (e.getResponseCode() != HttpStatus.SC_NOT_FOUND) { throw new IOException(e); } diff --git a/underfs/oss/src/main/java/alluxio/underfs/oss/OSSInputStream.java b/underfs/oss/src/main/java/alluxio/underfs/oss/OSSInputStream.java index 08c17f934fe4..fbde5ad95c1a 100644 --- a/underfs/oss/src/main/java/alluxio/underfs/oss/OSSInputStream.java +++ b/underfs/oss/src/main/java/alluxio/underfs/oss/OSSInputStream.java @@ -97,14 +97,15 @@ protected InputStream createStream(long startPos, long endPos) req.setRange(startPos, endPos < mContentLength ? endPos - 1 : mContentLength - 1); OSSException lastException = null; String errorMessage = String.format("Failed to open key: %s bucket: %s", mKey, mBucketName); - while (mRetryPolicy.attempt()) { + RetryPolicy retryPolicy = mRetryPolicy.copy(); + while (retryPolicy.attempt()) { try { OSSObject ossObject = mOssClient.getObject(req); return new BufferedInputStream(ossObject.getObjectContent()); } catch (OSSException e) { errorMessage = String .format("Failed to open key: %s bucket: %s attempts: %d error: %s", mKey, mBucketName, - mRetryPolicy.getAttemptCount(), e.getMessage()); + retryPolicy.getAttemptCount(), e.getMessage()); if (!e.getErrorCode().equals("NoSuchKey")) { throw new IOException(errorMessage, e); } diff --git a/underfs/swift/src/main/java/alluxio/underfs/swift/SwiftInputStream.java b/underfs/swift/src/main/java/alluxio/underfs/swift/SwiftInputStream.java index dcdfcf738c1b..79d590b0a599 100644 --- a/underfs/swift/src/main/java/alluxio/underfs/swift/SwiftInputStream.java +++ b/underfs/swift/src/main/java/alluxio/underfs/swift/SwiftInputStream.java @@ -84,7 +84,8 @@ public SwiftInputStream(Account account, String container, String object, long p protected InputStream createStream(long startPos, long endPos) throws IOException { NotFoundException lastException = null; - while (mRetryPolicy.attempt()) { + RetryPolicy retryPolicy = mRetryPolicy.copy(); + while (retryPolicy.attempt()) { try { StoredObject storedObject = mAccount.getContainer(mContainerName).getObject(mObjectPath); DownloadInstructions downloadInstructions = new DownloadInstructions(); @@ -92,7 +93,7 @@ protected InputStream createStream(long startPos, long endPos) return storedObject.downloadObjectAsInputStream(downloadInstructions); } catch (NotFoundException e) { LOG.warn("Attempt {} to get object {} from container {} failed with exception : {}", - mRetryPolicy.getAttemptCount(), mObjectPath, mContainerName, e.toString()); + retryPolicy.getAttemptCount(), mObjectPath, mContainerName, e.toString()); // Object does not exist lastException = e; }