Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support integrating TOS as the UnderFileSystem #18621

Merged
merged 9 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/common/src/main/java/alluxio/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public final class Constants {
public static final String HEADER_WASB = "wasb://";
public static final String HEADER_WASBS = "wasbs://";
public static final String HEADER_OBS = "obs://";
public static final String HEADER_TOS = "tos://";

public static final int MAX_PORT = 65535;

Expand Down
27 changes: 27 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -2010,6 +2010,29 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey TOS_ACCESS_KEY = stringBuilder(Name.TOS_ACCESS_KEY)
.setDescription("The access key of TOS bucket.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.setDisplayType(DisplayType.CREDENTIALS)
.build();
public static final PropertyKey TOS_ENDPOINT_KEY = stringBuilder(Name.TOS_ENDPOINT_KEY)
.setDescription("The endpoint key of TOS bucket.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey TOS_SECRET_KEY = stringBuilder(Name.TOS_SECRET_KEY)
.setDescription("The secret key of TOS bucket.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.setDisplayType(DisplayType.CREDENTIALS)
.build();
public static final PropertyKey TOS_REGION =
stringBuilder(Name.TOS_REGION)
.setDescription("The region name of TOS bucket.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please align to the previous line.

.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
//
// Mount table related properties
//
Expand Down Expand Up @@ -8062,6 +8085,10 @@ public static final class Name {
public static final String OBS_ENDPOINT = "fs.obs.endpoint";
public static final String OBS_SECRET_KEY = "fs.obs.secretKey";
public static final String OBS_BUCKET_TYPE = "fs.obs.bucketType";
public static final String TOS_SECRET_KEY = "fs.tos.accessKeySecret";
public static final String TOS_ACCESS_KEY = "fs.tos.accessKeyId";
public static final String TOS_ENDPOINT_KEY = "fs.tos.endpoint";
public static final String TOS_REGION = "fs.tos.region";

//
// Master related properties
Expand Down
1 change: 1 addition & 0 deletions underfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
<module>wasb</module>
<module>web</module>
<module>obs</module>
<module>tos</module>
</modules>

<properties>
Expand Down
77 changes: 77 additions & 0 deletions underfs/tos/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<!--

The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
(the "License"). You may not use this work except in compliance with the License, which is
available at www.apache.org/licenses/LICENSE-2.0

This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
either express or implied, as more fully set forth in the License.

See the NOTICE file distributed with this work for information regarding copyright ownership.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-underfs</artifactId>
<version>2.10.0-SNAPSHOT</version>
</parent>
<artifactId>alluxio-underfs-tos</artifactId>
<name>Alluxio Under File System - Tinder Object Storage</name>
<description>Tinder Object Storage Under File System implementation</description>

<properties>
<!-- The following paths need to be defined here as well as in the parent pom so that mvn can -->
<!-- run properly from sub-project directories -->
<build.path>${project.parent.parent.basedir}/build</build.path>
</properties>

<dependencies>
<!-- External dependencies -->
<dependency>
<groupId>com.volcengine</groupId>
<artifactId>ve-tos-java-sdk</artifactId>
<version>2.7.1</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

define the version in the root pom.xml, not here

</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>

<!-- Internal dependencies -->
<dependency>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-core-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- Internal test dependencies -->
<dependency>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-core-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>
<plugin>
<groupId>com.coderplus.maven.plugins</groupId>
<artifactId>copy-rename-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
101 changes: 101 additions & 0 deletions underfs/tos/src/main/java/alluxio/underfs/tos/AlluxioTosException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.underfs.tos;

import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.grpc.ErrorType;

import com.volcengine.tos.TosException;
import io.grpc.Status;

import java.net.HttpURLConnection;

/**
* Alluxio exception for cos.
*/
public class AlluxioTosException extends AlluxioRuntimeException {
private static final ErrorType ERROR_TYPE = ErrorType.External;

/**
* Converts an TosClientException to a corresponding AlluxioTosException.
*
* @param cause tos exception
* @return alluxio tos exception
*/
public static AlluxioTosException from(TosException cause) {
return from(null, cause);
}

/**
* Converts an CosClientException with errormessage to a corresponding AlluxioCosException.
*
* @param errorMessage error message
* @param cause cos exception
* @return alluxio cos exception
*/
public static AlluxioTosException from(String errorMessage, TosException cause) {
Status status = Status.UNKNOWN;
String errorDescription = "Exception:" + cause.getMessage();
if (cause instanceof TosException) {
TosException exception = (TosException) cause;
status = httpStatusToGrpcStatus(exception.getStatusCode());
errorDescription = exception.getCode() + ":" + exception.getMessage();
}
if (errorMessage == null) {
errorMessage = errorDescription;
}
return new AlluxioTosException(status, errorMessage, cause, true);
}

private AlluxioTosException(Status status, String message, Throwable cause, boolean isRetryAble) {
super(status, message, cause, ERROR_TYPE, isRetryAble);
}

private static Status httpStatusToGrpcStatus(int httpStatusCode) {
if (httpStatusCode >= 100 && httpStatusCode < 200) {
// 1xx. These headers should have been ignored.
return Status.INTERNAL;
}
switch (httpStatusCode) {
case HttpURLConnection.HTTP_BAD_REQUEST: // 400
Jackson-Wang-7 marked this conversation as resolved.
Show resolved Hide resolved
return Status.INVALID_ARGUMENT;
case HttpURLConnection.HTTP_UNAUTHORIZED: // 401
return Status.UNAUTHENTICATED;
case HttpURLConnection.HTTP_FORBIDDEN: // 403
return Status.PERMISSION_DENIED;
case HttpURLConnection.HTTP_NOT_FOUND: // 404
return Status.NOT_FOUND;
case HttpURLConnection.HTTP_BAD_METHOD: // 405
case HttpURLConnection.HTTP_NOT_IMPLEMENTED: // 501
return Status.UNIMPLEMENTED;
case HttpURLConnection.HTTP_CONFLICT: // 409
return Status.ABORTED;
case HttpURLConnection.HTTP_LENGTH_REQUIRED: // 411
case HttpURLConnection.HTTP_PRECON_FAILED: // 412
return Status.FAILED_PRECONDITION;
case 416: // Requested Range Not Satisfiable
return Status.OUT_OF_RANGE;
case HttpURLConnection.HTTP_INTERNAL_ERROR: //500
return Status.INTERNAL;
case HttpURLConnection.HTTP_MOVED_PERM: // 301
case HttpURLConnection.HTTP_NOT_MODIFIED: //304
case 307: // Moved Temporarily
case HttpURLConnection.HTTP_BAD_GATEWAY: // 502
case HttpURLConnection.HTTP_UNAVAILABLE: // 503
return Status.UNAVAILABLE;
case HttpURLConnection.HTTP_GATEWAY_TIMEOUT: // 504
return Status.DEADLINE_EXCEEDED;
default:
return Status.UNKNOWN;
}
}
}
131 changes: 131 additions & 0 deletions underfs/tos/src/main/java/alluxio/underfs/tos/TOSInputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.underfs.tos;

import alluxio.retry.RetryPolicy;
import alluxio.underfs.MultiRangeObjectInputStream;

import com.volcengine.tos.TOSV2;
import com.volcengine.tos.TosException;
import com.volcengine.tos.model.object.GetObjectV2Input;
import com.volcengine.tos.model.object.GetObjectV2Output;
import com.volcengine.tos.model.object.HeadObjectV2Input;
import com.volcengine.tos.model.object.HeadObjectV2Output;
import com.volcengine.tos.model.object.ObjectMetaRequestOptions;
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import javax.annotation.concurrent.NotThreadSafe;

/**
* A stream for reading a file from COS. This input stream returns 0 when calling read with an empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* A stream for reading a file from COS. This input stream returns 0 when calling read with an empty
* A stream for reading a file from TOS. This input stream returns 0 when calling read with an empty

* buffer.
*/
@NotThreadSafe
public class TOSInputStream extends MultiRangeObjectInputStream {
private static final Logger LOG = LoggerFactory.getLogger(TOSInputStream.class);

/**
* Bucket name of the Alluxio TOS bucket.
*/
private final String mBucketName;

/**
* Key of the file in TOS to read.
*/
private final String mKey;

/**
* The TOS client for TOS operations.
*/
private final TOSV2 mTosClient;

/**
* The size of the object in bytes.
*/
private final long mContentLength;

/**
* Policy determining the retry behavior in case the key does not exist. The key may not exist
* because of eventual consistency.
*/
private final RetryPolicy mRetryPolicy;

/**
* Creates a new instance of {@link TOSInputStream}.
*
* @param bucketName the name of the bucket
* @param key the key of the file
* @param client the client for COS
* @param multiRangeChunkSize the chunk size to use on this stream
*/
TOSInputStream(String bucketName, String key, TOSV2 client, RetryPolicy retryPolicy,
long multiRangeChunkSize) throws IOException {
this(bucketName, key, client, 0L, retryPolicy, multiRangeChunkSize);
}

/**
* Creates a new instance of {@link TOSInputStream}.
*
* @param bucketName the name of the bucket
* @param key the key of the file
* @param client the client for TOS
* @param position the position to begin reading from
* @param multiRangeChunkSize the chunk size to use on this stream
*/
TOSInputStream(String bucketName, String key, TOSV2 client, long position,
RetryPolicy retryPolicy, long multiRangeChunkSize) throws IOException {
super(multiRangeChunkSize);
mBucketName = bucketName;
mKey = key;
mTosClient = client;
mPos = position;
mRetryPolicy = retryPolicy;
HeadObjectV2Input input = new HeadObjectV2Input().setBucket(bucketName).setKey(key);
HeadObjectV2Output meta = mTosClient.headObject(input);
mContentLength = meta == null ? 0 : meta.getContentLength();
}

@Override
protected InputStream createStream(long startPos, long endPos)
throws IOException {
ObjectMetaRequestOptions options = new ObjectMetaRequestOptions();
GetObjectV2Input req = new GetObjectV2Input().setBucket(mBucketName).setKey(mKey);
// TOS returns entire object if we read past the end
options.setRange(startPos, endPos < mContentLength ? endPos - 1 : mContentLength - 1);
req.setOptions(options);
TosException lastException = null;
String errorMessage = String.format("Failed to open key: %s bucket: %s", mKey, mBucketName);
RetryPolicy retryPolicy = mRetryPolicy.copy();
while (retryPolicy.attempt()) {
try {
GetObjectV2Output object = mTosClient.getObject(req);
return new BufferedInputStream(object.getContent());
} catch (TosException e) {
errorMessage = String
.format("Failed to open key: %s bucket: %s attempts: %d error: %s", mKey, mBucketName,
retryPolicy.getAttemptCount(), e.getMessage());
if (e.getStatusCode() != HttpStatus.SC_NOT_FOUND) {
throw new IOException(errorMessage, e);
}
// Key does not exist
lastException = e;
}
}
// Failed after retrying key does not exist
throw new IOException(errorMessage, lastException);
}
}
Loading
Loading