Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support meta request level override of part size and mpu threshold #393

Merged
merged 10 commits into from
Dec 12, 2023
54 changes: 46 additions & 8 deletions include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,21 +389,37 @@ struct aws_s3_client_config {
*/
struct aws_signing_config_aws *signing_config;

/* Size of parts the files will be downloaded or uploaded in. */
/**
* Optional.
* Size of parts the object will be downloaded or uploaded in, in bytes.
* This only affects AWS_S3_META_REQUEST_TYPE_GET_OBJECT and AWS_S3_META_REQUEST_TYPE_PUT_OBJECT.
* If not set, this defaults to 8 MiB.
* The client will adjust the part size for AWS_S3_META_REQUEST_TYPE_PUT_OBJECT if needed for service limits (max
* number of parts per upload is 10,000, minimum upload part size is 5 MiB).
*
* You can also set this per meta-request, via `aws_s3_meta_request_options.part_size`.
*/
uint64_t part_size;

/* If the part size needs to be adjusted for service limits, this is the maximum size it will be adjusted to. On 32
* bit machine, it will be forced to SIZE_MAX, which is around 4GiB. The server limit is 5GiB, but object size limit
* is 5TiB for now. We should be good enough for all the cases. */
uint64_t max_part_size;

/* The size threshold in bytes for when to use multipart uploads for a AWS_S3_META_REQUEST_TYPE_PUT_OBJECT meta
* request. Uploads over this size will automatically use a multipart upload strategy,while uploads smaller or
* equal to this threshold will use a single request to upload the whole object. If not set, `part_size` will be
* used as threshold. */
/**
* Optional.
* The size threshold in bytes for when to use multipart uploads.
* Uploads larger than this will use the multipart upload strategy.
* Uploads smaller or equal to this will use a single HTTP request.
* This only affects AWS_S3_META_REQUEST_TYPE_PUT_OBJECT.
* If set, this should be at least `part_size`.
* If not set, maximal of `part_size` and 5 MiB will be used.
*
* You can also set this per meta-request, via `aws_s3_meta_request_options.multipart_upload_threshold`.
*/
uint64_t multipart_upload_threshold;

/* Throughput target in Gbps that we are trying to reach. */
/* Throughput target in gigabits per second (Gbps) that we are trying to reach. */
double throughput_target_gbps;

/* How much memory can we use. */
Expand Down Expand Up @@ -554,8 +570,6 @@ struct aws_s3_checksum_config {
* 3) If the data will be be produced in asynchronous chunks, set `send_async_stream`.
*/
struct aws_s3_meta_request_options {
/* TODO: The meta request options cannot control the request to be split or not. Should consider to add one */

/* The type of meta request we will be trying to accelerate. */
enum aws_s3_meta_request_type type;

Expand Down Expand Up @@ -612,6 +626,30 @@ struct aws_s3_meta_request_options {
*/
const struct aws_s3_checksum_config *checksum_config;

/**
* Optional.
* Size of parts the object will be downloaded or uploaded in, in bytes.
* This only affects AWS_S3_META_REQUEST_TYPE_GET_OBJECT and AWS_S3_META_REQUEST_TYPE_PUT_OBJECT.
* If not set, the value from `aws_s3_client_config.part_size` is used, which defaults to 8MiB.
*
* The client will adjust the part size for AWS_S3_META_REQUEST_TYPE_PUT_OBJECT if needed for service limits (max
* number of parts per upload is 10,000, minimum upload part size is 5 MiB).
*/
uint64_t part_size;

/**
* Optional.
* The size threshold in bytes for when to use multipart uploads.
* Uploads larger than this will use the multipart upload strategy.
* Uploads smaller or equal to this will use a single HTTP request.
* This only affects AWS_S3_META_REQUEST_TYPE_PUT_OBJECT.
* If set, this should be at least `part_size`.
* If not set, `part_size` adjusted by client will be used as the threshold.
* If both `part_size` and `multipart_upload_threshold` are not set,
* the values from `aws_s3_client_config` are used.
*/
uint64_t multipart_upload_threshold;

/* User data for all callbacks. */
void *user_data;

Expand Down
70 changes: 48 additions & 22 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,13 @@ struct aws_s3_client *aws_s3_client_new(
mem_limit = client_config->memory_limit_in_bytes;
}

size_t part_size;
size_t part_size = s_default_part_size;
if (client_config->part_size != 0) {
part_size = (size_t)client_config->part_size;
} else {
part_size = s_default_part_size;
if (client_config->part_size > SIZE_MAX) {
part_size = SIZE_MAX;
} else {
part_size = (size_t)client_config->part_size;
}
}

client->buffer_pool = aws_s3_buffer_pool_new(allocator, part_size, mem_limit);
Expand Down Expand Up @@ -424,6 +426,9 @@ struct aws_s3_client *aws_s3_client_new(

if (client_config->multipart_upload_threshold != 0) {
*((uint64_t *)&client->multipart_upload_threshold) = client_config->multipart_upload_threshold;
} else {
*((uint64_t *)&client->multipart_upload_threshold) =
part_size > g_s3_min_upload_part_size ? part_size : g_s3_min_upload_part_size;
}

if (client_config->max_part_size < client_config->part_size) {
Expand Down Expand Up @@ -1151,6 +1156,14 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
return NULL;
}
size_t part_size = client->part_size;
if (options->part_size != 0) {
if (options->part_size > SIZE_MAX) {
part_size = SIZE_MAX;
} else {
part_size = (size_t)options->part_size;
}
}

/* Call the appropriate meta-request new function. */
switch (options->type) {
Expand All @@ -1169,7 +1182,7 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
options);
}

return aws_s3_meta_request_auto_ranged_get_new(client->allocator, client, client->part_size, options);
return aws_s3_meta_request_auto_ranged_get_new(client->allocator, client, part_size, options);
}
case AWS_S3_META_REQUEST_TYPE_PUT_OBJECT: {
if (body_source_count == 0) {
Expand All @@ -1182,19 +1195,17 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
}

if (options->resume_token == NULL) {

size_t client_part_size = client->part_size;
uint64_t client_max_part_size = client->max_part_size;

if (client_part_size < g_s3_min_upload_part_size) {
if (part_size < g_s3_min_upload_part_size) {
AWS_LOGF_WARN(
AWS_LS_S3_META_REQUEST,
"Client config part size of %" PRIu64 " is less than the minimum upload part size of %" PRIu64
"Config part size of %" PRIu64 " is less than the minimum upload part size of %" PRIu64
". Using to the minimum part-size for upload.",
(uint64_t)client_part_size,
(uint64_t)part_size,
(uint64_t)g_s3_min_upload_part_size);

client_part_size = g_s3_min_upload_part_size;
part_size = g_s3_min_upload_part_size;
}

if (client_max_part_size < (uint64_t)g_s3_min_upload_part_size) {
Expand All @@ -1208,8 +1219,32 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(

client_max_part_size = (uint64_t)g_s3_min_upload_part_size;
}
uint64_t multipart_upload_threshold =
client->multipart_upload_threshold == 0 ? client_part_size : client->multipart_upload_threshold;

uint32_t num_parts = 0;
if (content_length_found) {
size_t out_part_size = 0;
if (aws_s3_calculate_optimal_mpu_part_size_and_num_parts(
content_length, part_size, client_max_part_size, &out_part_size, &num_parts)) {
return NULL;
}
part_size = out_part_size;
}
if (part_size != options->part_size && part_size != client->part_size) {
AWS_LOGF_DEBUG(
AWS_LS_S3_META_REQUEST,
"The multipart upload part size has been adjusted to %" PRIu64 "",
(uint64_t)part_size);
}

/* Default to client level setting */
uint64_t multipart_upload_threshold = client->multipart_upload_threshold;
if (options->multipart_upload_threshold != 0) {
/* If the threshold is set for the meta request, use it */
multipart_upload_threshold = options->multipart_upload_threshold;
} else if (options->part_size != 0) {
/* If the threshold is not set, but the part size is set for the meta request, use it */
multipart_upload_threshold = part_size;
}

if (content_length_found && content_length <= multipart_upload_threshold) {
return aws_s3_meta_request_default_new(
Expand All @@ -1233,15 +1268,6 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
}
}

size_t part_size = client_part_size;
uint32_t num_parts = 0;
if (content_length_found) {
if (aws_s3_calculate_optimal_mpu_part_size_and_num_parts(
content_length, client_part_size, client_max_part_size, &part_size, &num_parts)) {
return NULL;
}
}

return aws_s3_meta_request_auto_ranged_put_new(
client->allocator, client, part_size, content_length_found, content_length, num_parts, options);
} else { /* else using resume token */
Expand Down
6 changes: 6 additions & 0 deletions source/s3_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,12 @@ int aws_s3_calculate_optimal_mpu_part_size_and_num_parts(
AWS_FATAL_ASSERT(out_part_size);
AWS_FATAL_ASSERT(out_num_parts);

if (content_length == 0) {
*out_part_size = 0;
*out_num_parts = 0;
return AWS_OP_SUCCESS;
}

uint64_t part_size_uint64 = content_length / (uint64_t)g_s3_max_num_upload_parts;

if ((content_length % g_s3_max_num_upload_parts) > 0) {
Expand Down
2 changes: 2 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ add_test_case(test_s3_buffer_pool_too_small)
add_net_test_case(test_s3_put_object_buffer_pool_trim)

add_net_test_case(client_update_upload_part_timeout)
add_net_test_case(client_meta_request_override_part_size)
add_net_test_case(client_meta_request_override_multipart_upload_threshold)

set(TEST_BINARY_NAME ${PROJECT_NAME}-tests)
generate_test_driver(${TEST_BINARY_NAME})
Expand Down
141 changes: 141 additions & 0 deletions tests/s3_client_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,144 @@ TEST_CASE(client_update_upload_part_timeout) {
aws_s3_tester_clean_up(&tester);
return AWS_OP_SUCCESS;
}

/* Test meta request can override the part size as expected */
TEST_CASE(client_meta_request_override_part_size) {
(void)ctx;
struct aws_s3_tester tester;
AWS_ZERO_STRUCT(tester);
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));
struct aws_s3_client *client = NULL;
struct aws_s3_tester_client_options client_options = {
.part_size = MB_TO_BYTES(8),
.tls_usage = AWS_S3_TLS_DISABLED,
};
ASSERT_SUCCESS(aws_s3_tester_client_new(&tester, &client_options, &client));

struct aws_string *host_name =
aws_s3_tester_build_endpoint_string(allocator, &g_test_bucket_name, &g_test_s3_region);
struct aws_byte_cursor host_cur = aws_byte_cursor_from_string(host_name);
struct aws_byte_cursor test_object_path = aws_byte_cursor_from_c_str("/mytest");

size_t override_part_size = MB_TO_BYTES(10);
size_t content_length =
MB_TO_BYTES(20); /* Let the content length larger than the override part size to make sure we do MPU */

/* MPU put object */
struct aws_input_stream_tester_options stream_options = {
.autogen_length = content_length,
};
struct aws_input_stream *input_stream = aws_input_stream_new_tester(allocator, &stream_options);

struct aws_http_message *put_messages = aws_s3_test_put_object_request_new(
allocator, &host_cur, g_test_body_content_type, test_object_path, input_stream, 0 /*flags*/);

struct aws_s3_meta_request_options meta_request_options = {
.message = put_messages,
.type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
.part_size = override_part_size,
};
struct aws_s3_meta_request *put_meta_request = client->vtable->meta_request_factory(client, &meta_request_options);
ASSERT_UINT_EQUALS(put_meta_request->part_size, override_part_size);

/* auto ranged Get Object */
struct aws_http_message *get_message = aws_s3_test_get_object_request_new(
allocator, aws_byte_cursor_from_string(host_name), g_pre_existing_object_1MB);

struct aws_s3_meta_request_options get_meta_request_options = {
.message = get_message,
.type = AWS_S3_META_REQUEST_TYPE_GET_OBJECT,
.part_size = override_part_size,
};

struct aws_s3_meta_request *get_meta_request =
client->vtable->meta_request_factory(client, &get_meta_request_options);
ASSERT_UINT_EQUALS(get_meta_request->part_size, override_part_size);

aws_http_message_release(put_messages);
aws_s3_meta_request_release(put_meta_request);
aws_http_message_release(get_message);
aws_s3_meta_request_release(get_meta_request);
aws_string_destroy(host_name);
aws_s3_client_release(client);
aws_input_stream_release(input_stream);
aws_s3_tester_clean_up(&tester);

return AWS_OP_SUCCESS;
}

/* Test meta request can override the multipart upload threshold as expected */
TEST_CASE(client_meta_request_override_multipart_upload_threshold) {
(void)ctx;
struct aws_s3_tester tester;
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));

struct aws_s3_client_config client_config = {
.part_size = MB_TO_BYTES(8),
.multipart_upload_threshold = MB_TO_BYTES(15),
};

ASSERT_SUCCESS(aws_s3_tester_bind_client(
&tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_REGION | AWS_S3_TESTER_BIND_CLIENT_SIGNING));

struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config);

ASSERT_TRUE(client != NULL);

struct aws_string *host_name =
aws_s3_tester_build_endpoint_string(allocator, &g_test_bucket_name, &g_test_s3_region);
struct aws_byte_cursor host_cur = aws_byte_cursor_from_string(host_name);
struct aws_byte_cursor test_object_path = aws_byte_cursor_from_c_str("/mytest");

size_t override_multipart_upload_threshold = MB_TO_BYTES(20);
size_t content_length =
MB_TO_BYTES(20); /* Let the content length larger than the override part size to make sure we do MPU */

/* MPU put object */
struct aws_input_stream_tester_options stream_options = {
.autogen_length = content_length,
};
struct aws_input_stream *input_stream = aws_input_stream_new_tester(allocator, &stream_options);

struct aws_http_message *put_messages = aws_s3_test_put_object_request_new(
allocator, &host_cur, g_test_body_content_type, test_object_path, input_stream, 0 /*flags*/);

{
/* Content length is smaller than the override multipart_upload_threshold */
struct aws_s3_meta_request_options meta_request_options = {
.message = put_messages,
.type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
.multipart_upload_threshold = override_multipart_upload_threshold,
};
struct aws_s3_meta_request *put_meta_request =
client->vtable->meta_request_factory(client, &meta_request_options);

/* Part size will be 0, as we don't use MPU */
ASSERT_UINT_EQUALS(put_meta_request->part_size, 0);
aws_s3_meta_request_release(put_meta_request);
}

{
/* meta request override the part size, so the override part size will be used as the multipart upload threshold
*/
struct aws_s3_meta_request_options meta_request_options = {
.message = put_messages,
.type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
.part_size = override_multipart_upload_threshold,
};
struct aws_s3_meta_request *put_meta_request =
client->vtable->meta_request_factory(client, &meta_request_options);

/* Part size will be 0, as we don't use MPU */
ASSERT_UINT_EQUALS(put_meta_request->part_size, 0);
aws_s3_meta_request_release(put_meta_request);
}

aws_http_message_release(put_messages);
aws_string_destroy(host_name);
aws_s3_client_release(client);
aws_input_stream_release(input_stream);
aws_s3_tester_clean_up(&tester);

return AWS_OP_SUCCESS;
}