Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix read exception when the range size ...; Port [#18613] to branch-2.10 #18614

Merged
merged 1 commit into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/common/src/main/java/alluxio/retry/CountingRetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ public boolean attempt() {
public void reset() {
mAttemptCount = 0;
}

@Override
public RetryPolicy copy() {
return new CountingRetry(mMaxRetries);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,9 @@ protected long getSleepTime() {
return Math.min(Math.abs(sleepMs), mMaxSleepMs);
}
}

@Override
public ExponentialBackoffRetry copy() {
return new ExponentialBackoffRetry(mBaseSleepTimeMs, mMaxSleepMs, mMaxRetries);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,16 @@ public ExponentialTimeBoundedRetry build() {
mTimeCtx, mMaxDuration, mInitialSleep, mMaxSleep, mSkipInitialSleep);
}
}

@Override
public RetryPolicy copy() {
Builder builder = ExponentialTimeBoundedRetry.builder()
.withMaxDuration(mMaxDuration)
.withInitialSleep(mNextSleep)
.withMaxSleep(mMaxSleep);
if (mSkipInitialSleep) {
builder.withSkipInitialSleep();
}
return builder.build();
}
}
6 changes: 6 additions & 0 deletions core/common/src/main/java/alluxio/retry/RetryPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,10 @@ public interface RetryPolicy {
* @return whether another retry should be performed
*/
boolean attempt();

/**
* Copy a new RetryPolicy based on the current RetryPolicy configuration.
* @return a copy of RetryPolicy
*/
RetryPolicy copy();
}
2 changes: 1 addition & 1 deletion core/common/src/main/java/alluxio/retry/SleepingRetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/
@NotThreadSafe
public abstract class SleepingRetry implements RetryPolicy {
private final int mMaxRetries;
protected final int mMaxRetries;
private int mAttemptCount = 0;

protected SleepingRetry(int maxRetries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public abstract class TimeBoundedRetry implements RetryPolicy {
private final Clock mClock;
private final Sleeper mSleeper;
private final Duration mMaxDuration;
protected final Duration mMaxDuration;
private final Instant mStartTime;
private final Instant mEndTime;

Expand Down
7 changes: 6 additions & 1 deletion core/common/src/main/java/alluxio/retry/TimeoutRetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class TimeoutRetry implements RetryPolicy {
* @param retryTimeoutMs maximum period of time to retry for, in milliseconds
* @param sleepMs time in milliseconds to sleep before retrying
*/
public TimeoutRetry(long retryTimeoutMs, int sleepMs) {
public TimeoutRetry(long retryTimeoutMs, long sleepMs) {
Preconditions.checkArgument(retryTimeoutMs > 0, "Retry timeout must be a positive number");
Preconditions.checkArgument(sleepMs >= 0, "sleepMs cannot be negative");
mRetryTimeoutMs = retryTimeoutMs;
Expand Down Expand Up @@ -63,4 +63,9 @@ public boolean attempt() {
}
return false;
}

@Override
public RetryPolicy copy() {
return new TimeoutRetry(mRetryTimeoutMs, mSleepMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,15 @@ protected InputStream createStream(long startPos, long endPos)
req.setRange(startPos, endPos < mContentLength ? endPos - 1 : mContentLength - 1);
CosServiceException lastException = null;
String errorMessage = String.format("Failed to open key: %s bucket: %s", mKey, mBucketName);
while (mRetryPolicy.attempt()) {
RetryPolicy retryPolicy = mRetryPolicy.copy();
while (retryPolicy.attempt()) {
try {
COSObject object = mCosClient.getObject(req);
return new BufferedInputStream(object.getObjectContent());
} catch (CosServiceException e) {
errorMessage = String
.format("Failed to open key: %s bucket: %s attempts: %d error: %s", mKey, mBucketName,
mRetryPolicy.getAttemptCount(), e.getMessage());
retryPolicy.getAttemptCount(), e.getMessage());
if (e.getStatusCode() != HttpStatus.SC_NOT_FOUND) {
throw new IOException(errorMessage, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ protected InputStream createStream(long startPos, long endPos)
throws IOException {
IOException lastException = null;
String errorMessage = String.format("Failed to open key: %s", mKey);
while (mRetryPolicy.attempt()) {
RetryPolicy retryPolicy = mRetryPolicy.copy();
while (retryPolicy.attempt()) {
try {
return mKodoclent.getObject(mKey, startPos, endPos, mContentLength);
} catch (NotFoundException e) {
errorMessage = String
.format("Failed to open key: %s attempts: %s error: %s", mKey,
mRetryPolicy.getAttemptCount(), e.getMessage());
retryPolicy.getAttemptCount(), e.getMessage());
// Key does not exist
lastException = e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,15 @@ protected InputStream createStream(long startPos, long endPos) throws IOExceptio
req.setRangeStart(startPos);
req.setRangeEnd(endPos < mContentLength ? endPos - 1 : mContentLength - 1);
ObsException lastException = null;
while (mRetryPolicy.attempt()) {
RetryPolicy retryPolicy = mRetryPolicy.copy();
while (retryPolicy.attempt()) {
try {
ObsObject obj = mObsClient.getObject(req);
return new BufferedInputStream(obj.getObjectContent());
} catch (ObsException e) {
System.out.println(e.getResponseCode());
LOG.warn("Attempt {} to open key {} in bucket {} failed with exception : {}",
mRetryPolicy.getAttemptCount(), mKey, mBucketName, e.toString());
retryPolicy.getAttemptCount(), mKey, mBucketName, e.toString());
if (e.getResponseCode() != HttpStatus.SC_NOT_FOUND) {
throw new IOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,15 @@ protected InputStream createStream(long startPos, long endPos)
req.setRange(startPos, endPos < mContentLength ? endPos - 1 : mContentLength - 1);
OSSException lastException = null;
String errorMessage = String.format("Failed to open key: %s bucket: %s", mKey, mBucketName);
while (mRetryPolicy.attempt()) {
RetryPolicy retryPolicy = mRetryPolicy.copy();
while (retryPolicy.attempt()) {
try {
OSSObject ossObject = mOssClient.getObject(req);
return new BufferedInputStream(ossObject.getObjectContent());
} catch (OSSException e) {
errorMessage = String
.format("Failed to open key: %s bucket: %s attempts: %d error: %s", mKey, mBucketName,
mRetryPolicy.getAttemptCount(), e.getMessage());
retryPolicy.getAttemptCount(), e.getMessage());
if (!e.getErrorCode().equals("NoSuchKey")) {
throw new IOException(errorMessage, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ public SwiftInputStream(Account account, String container, String object, long p
protected InputStream createStream(long startPos, long endPos)
throws IOException {
NotFoundException lastException = null;
while (mRetryPolicy.attempt()) {
RetryPolicy retryPolicy = mRetryPolicy.copy();
while (retryPolicy.attempt()) {
try {
StoredObject storedObject = mAccount.getContainer(mContainerName).getObject(mObjectPath);
DownloadInstructions downloadInstructions = new DownloadInstructions();
downloadInstructions.setRange(new MidPartLongRange(startPos, endPos - 1));
return storedObject.downloadObjectAsInputStream(downloadInstructions);
} catch (NotFoundException e) {
LOG.warn("Attempt {} to get object {} from container {} failed with exception : {}",
mRetryPolicy.getAttemptCount(), mObjectPath, mContainerName, e.toString());
retryPolicy.getAttemptCount(), mObjectPath, mContainerName, e.toString());
// Object does not exist
lastException = e;
}
Expand Down
Loading