Skip to content

Commit

Permalink
Fix read exception when the range size is smaller than Block size
Browse files Browse the repository at this point in the history
Cherry-pick of existing commit.
orig-pr: #18613
orig-commit: 8f48a25
orig-commit-author: yuyang wang <[email protected]>

			pr-link: #18614
			change-id: cid-e0d5a80304ced99e28114bbbad886b4abb934408
  • Loading branch information
alluxio-bot authored May 27, 2024
1 parent 28eba92 commit ff207bd
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 13 deletions.
5 changes: 5 additions & 0 deletions core/common/src/main/java/alluxio/retry/CountingRetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ public boolean attempt() {
public void reset() {
mAttemptCount = 0;
}

@Override
public RetryPolicy copy() {
return new CountingRetry(mMaxRetries);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,9 @@ protected long getSleepTime() {
return Math.min(Math.abs(sleepMs), mMaxSleepMs);
}
}

@Override
public ExponentialBackoffRetry copy() {
return new ExponentialBackoffRetry(mBaseSleepTimeMs, mMaxSleepMs, mMaxRetries);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,16 @@ public ExponentialTimeBoundedRetry build() {
mTimeCtx, mMaxDuration, mInitialSleep, mMaxSleep, mSkipInitialSleep);
}
}

@Override
public RetryPolicy copy() {
Builder builder = ExponentialTimeBoundedRetry.builder()
.withMaxDuration(mMaxDuration)
.withInitialSleep(mNextSleep)
.withMaxSleep(mMaxSleep);
if (mSkipInitialSleep) {
builder.withSkipInitialSleep();
}
return builder.build();
}
}
6 changes: 6 additions & 0 deletions core/common/src/main/java/alluxio/retry/RetryPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,10 @@ public interface RetryPolicy {
* @return whether another retry should be performed
*/
boolean attempt();

/**
* Copy a new RetryPolicy based on the current RetryPolicy configuration.
* @return a copy of RetryPolicy
*/
RetryPolicy copy();
}
2 changes: 1 addition & 1 deletion core/common/src/main/java/alluxio/retry/SleepingRetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/
@NotThreadSafe
public abstract class SleepingRetry implements RetryPolicy {
private final int mMaxRetries;
protected final int mMaxRetries;
private int mAttemptCount = 0;

protected SleepingRetry(int maxRetries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public abstract class TimeBoundedRetry implements RetryPolicy {
private final Clock mClock;
private final Sleeper mSleeper;
private final Duration mMaxDuration;
protected final Duration mMaxDuration;
private final Instant mStartTime;
private final Instant mEndTime;

Expand Down
7 changes: 6 additions & 1 deletion core/common/src/main/java/alluxio/retry/TimeoutRetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class TimeoutRetry implements RetryPolicy {
* @param retryTimeoutMs maximum period of time to retry for, in milliseconds
* @param sleepMs time in milliseconds to sleep before retrying
*/
public TimeoutRetry(long retryTimeoutMs, int sleepMs) {
public TimeoutRetry(long retryTimeoutMs, long sleepMs) {
Preconditions.checkArgument(retryTimeoutMs > 0, "Retry timeout must be a positive number");
Preconditions.checkArgument(sleepMs >= 0, "sleepMs cannot be negative");
mRetryTimeoutMs = retryTimeoutMs;
Expand Down Expand Up @@ -63,4 +63,9 @@ public boolean attempt() {
}
return false;
}

@Override
public RetryPolicy copy() {
return new TimeoutRetry(mRetryTimeoutMs, mSleepMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,15 @@ protected InputStream createStream(long startPos, long endPos)
req.setRange(startPos, endPos < mContentLength ? endPos - 1 : mContentLength - 1);
CosServiceException lastException = null;
String errorMessage = String.format("Failed to open key: %s bucket: %s", mKey, mBucketName);
while (mRetryPolicy.attempt()) {
RetryPolicy retryPolicy = mRetryPolicy.copy();
while (retryPolicy.attempt()) {
try {
COSObject object = mCosClient.getObject(req);
return new BufferedInputStream(object.getObjectContent());
} catch (CosServiceException e) {
errorMessage = String
.format("Failed to open key: %s bucket: %s attempts: %d error: %s", mKey, mBucketName,
mRetryPolicy.getAttemptCount(), e.getMessage());
retryPolicy.getAttemptCount(), e.getMessage());
if (e.getStatusCode() != HttpStatus.SC_NOT_FOUND) {
throw new IOException(errorMessage, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ protected InputStream createStream(long startPos, long endPos)
throws IOException {
IOException lastException = null;
String errorMessage = String.format("Failed to open key: %s", mKey);
while (mRetryPolicy.attempt()) {
RetryPolicy retryPolicy = mRetryPolicy.copy();
while (retryPolicy.attempt()) {
try {
return mKodoclent.getObject(mKey, startPos, endPos, mContentLength);
} catch (NotFoundException e) {
errorMessage = String
.format("Failed to open key: %s attempts: %s error: %s", mKey,
mRetryPolicy.getAttemptCount(), e.getMessage());
retryPolicy.getAttemptCount(), e.getMessage());
// Key does not exist
lastException = e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,15 @@ protected InputStream createStream(long startPos, long endPos) throws IOExceptio
req.setRangeStart(startPos);
req.setRangeEnd(endPos < mContentLength ? endPos - 1 : mContentLength - 1);
ObsException lastException = null;
while (mRetryPolicy.attempt()) {
RetryPolicy retryPolicy = mRetryPolicy.copy();
while (retryPolicy.attempt()) {
try {
ObsObject obj = mObsClient.getObject(req);
return new BufferedInputStream(obj.getObjectContent());
} catch (ObsException e) {
System.out.println(e.getResponseCode());
LOG.warn("Attempt {} to open key {} in bucket {} failed with exception : {}",
mRetryPolicy.getAttemptCount(), mKey, mBucketName, e.toString());
retryPolicy.getAttemptCount(), mKey, mBucketName, e.toString());
if (e.getResponseCode() != HttpStatus.SC_NOT_FOUND) {
throw new IOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,15 @@ protected InputStream createStream(long startPos, long endPos)
req.setRange(startPos, endPos < mContentLength ? endPos - 1 : mContentLength - 1);
OSSException lastException = null;
String errorMessage = String.format("Failed to open key: %s bucket: %s", mKey, mBucketName);
while (mRetryPolicy.attempt()) {
RetryPolicy retryPolicy = mRetryPolicy.copy();
while (retryPolicy.attempt()) {
try {
OSSObject ossObject = mOssClient.getObject(req);
return new BufferedInputStream(ossObject.getObjectContent());
} catch (OSSException e) {
errorMessage = String
.format("Failed to open key: %s bucket: %s attempts: %d error: %s", mKey, mBucketName,
mRetryPolicy.getAttemptCount(), e.getMessage());
retryPolicy.getAttemptCount(), e.getMessage());
if (!e.getErrorCode().equals("NoSuchKey")) {
throw new IOException(errorMessage, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ public SwiftInputStream(Account account, String container, String object, long p
protected InputStream createStream(long startPos, long endPos)
throws IOException {
NotFoundException lastException = null;
while (mRetryPolicy.attempt()) {
RetryPolicy retryPolicy = mRetryPolicy.copy();
while (retryPolicy.attempt()) {
try {
StoredObject storedObject = mAccount.getContainer(mContainerName).getObject(mObjectPath);
DownloadInstructions downloadInstructions = new DownloadInstructions();
downloadInstructions.setRange(new MidPartLongRange(startPos, endPos - 1));
return storedObject.downloadObjectAsInputStream(downloadInstructions);
} catch (NotFoundException e) {
LOG.warn("Attempt {} to get object {} from container {} failed with exception : {}",
mRetryPolicy.getAttemptCount(), mObjectPath, mContainerName, e.toString());
retryPolicy.getAttemptCount(), mObjectPath, mContainerName, e.toString());
// Object does not exist
lastException = e;
}
Expand Down

0 comments on commit ff207bd

Please sign in to comment.