diff --git a/docs/memory_aware_request_execution.md b/docs/memory_aware_request_execution.md index 67e656c87..13d8cd2e1 100644 --- a/docs/memory_aware_request_execution.md +++ b/docs/memory_aware_request_execution.md @@ -51,6 +51,12 @@ only, and supports acquiring several chunks (up to 4) at once. Blocks are kept around while there are ongoing requests and are released async, when there is low pressure on memory. +One complication is "forced" buffers. A forced buffer is one that +comes from primary or secondary storage as usual, but it is allowed to exceed +the memory limit. Forced buffers are only used when waiting for a normal ticket +reservation could cause deadlock. (At time of writing, they're only used for +async-writes) + ### Scheduling Running out of memory is a terminal condition within CRT and in general its not practical to try to set overall memory limit on all allocations, since it diff --git a/include/aws/s3/private/s3_buffer_pool.h b/include/aws/s3/private/s3_buffer_pool.h index 43d543864..0ab584397 100644 --- a/include/aws/s3/private/s3_buffer_pool.h +++ b/include/aws/s3/private/s3_buffer_pool.h @@ -41,21 +41,26 @@ struct aws_s3_buffer_pool_usage_stats { /* Max size of buffer to be allocated from primary. */ size_t primary_cutoff; - /* How much mem is used in primary storage. includes memory used by blocks - * that are waiting on all allocs to release before being put back in circulation. */ - size_t primary_used; /* Overall memory allocated for blocks. */ size_t primary_allocated; - /* Reserved memory. Does not account for how that memory will map into - * blocks and in practice can be lower than used memory. */ - size_t primary_reserved; /* Number of blocks allocated in primary. */ size_t primary_num_blocks; + /* Memory used in primary storage. + * Does not account for wasted space if memory doesn't map perfectly into chunks. + * This is always <= primary_allocated */ + size_t primary_used; + /* How much memory is reserved, but not yet used, in primary storage. + * Does not account for wasted space if memory doesn't map perfectly into chunks. */ + size_t primary_reserved; - /* Secondary mem used. Accurate, maps directly to base allocator. */ + /* Secondary memory used. Accurate, maps directly to base allocator. */ size_t secondary_used; - /* Secondary mem reserved. Accurate, maps directly to base allocator. */ + /* Secondary memory reserved, but not yet used. Accurate, maps directly to base allocator. */ size_t secondary_reserved; + + /* Bytes used in "forced" buffers (created even if they exceed memory limits). + * This is always <= primary_used + secondary_used */ + size_t forced_used; }; /* @@ -86,6 +91,9 @@ AWS_S3_API void aws_s3_buffer_pool_destroy(struct aws_s3_buffer_pool *buffer_poo * On failure NULL is returned, error is raised and reservation hold is placed * on the buffer. Any further reservations while hold is active will fail. * Remove reservation hold to unblock reservations. + * + * If you MUST acquire a buffer now (waiting to reserve a ticket would risk deadlock), + * use aws_s3_buffer_pool_acquire_forced_buffer() instead. */ AWS_S3_API struct aws_s3_buffer_pool_ticket *aws_s3_buffer_pool_reserve( struct aws_s3_buffer_pool *buffer_pool, @@ -111,6 +119,17 @@ AWS_S3_API struct aws_byte_buf aws_s3_buffer_pool_acquire_buffer( struct aws_s3_buffer_pool *buffer_pool, struct aws_s3_buffer_pool_ticket *ticket); +/* + * Force immediate acquisition of a buffer from the pool. + * This should only be used if waiting to reserve a ticket would risk deadlock. + * This cannot fail, not even if the pool has a reservation hold, + * not even if the memory limit has been exceeded. + */ +AWS_S3_API struct aws_byte_buf aws_s3_buffer_pool_acquire_forced_buffer( + struct aws_s3_buffer_pool *buffer_pool, + size_t size, + struct aws_s3_buffer_pool_ticket **out_new_ticket); + /* * Releases the ticket. * Any buffers associated with the ticket are invalidated. diff --git a/source/s3_buffer_pool.c b/source/s3_buffer_pool.c index fe62fe041..a4396c6a3 100644 --- a/source/s3_buffer_pool.c +++ b/source/s3_buffer_pool.c @@ -32,12 +32,18 @@ * of 32mb (4 chunks) or 16 buffers of 8mb (1 chunk). If requested buffer size * is 12mb, 2 chunks are used for acquire and 4mb will be wasted. * Secondary storage delegates directly to system allocator. + * + * One complication is "forced" buffers. A forced buffer is one that + * comes from primary or secondary storage as usual, but it is allowed to exceed + * the memory limit. This is only used when we want to use memory from + * the pool, but waiting for a normal ticket reservation could cause deadlock. */ struct aws_s3_buffer_pool_ticket { size_t size; uint8_t *ptr; size_t chunks_used; + bool forced; }; /* Default size for blocks array. Note: this is just for meta info, blocks @@ -63,6 +69,11 @@ static const size_t s_chunks_per_block = 16; */ static const size_t s_max_chunk_size_for_buffer_reuse = MB_TO_BYTES(64); +/* Forced buffers only count against the memory limit up to a certain percent. + * For example: if mem_limit is 10GiB, and forced_use is 11GiB, and THIS number is 90(%), + * we still consider 1GiB available for normal buffer usage. */ +static const size_t s_max_impact_of_forced_buffers_on_memory_limit_as_percentage = 80; + struct aws_s3_buffer_pool { struct aws_allocator *base_allocator; struct aws_mutex mutex; @@ -72,6 +83,8 @@ struct aws_s3_buffer_pool { /* size at which allocations should go to secondary */ size_t primary_size_cutoff; + /* NOTE: See aws_s3_buffer_pool_usage_stats for descriptions of most fields */ + size_t mem_limit; bool has_reservation_hold; @@ -83,6 +96,8 @@ struct aws_s3_buffer_pool { size_t secondary_reserved; size_t secondary_used; + size_t forced_used; + struct aws_array_list blocks; }; @@ -249,7 +264,7 @@ struct aws_s3_buffer_pool_ticket *aws_s3_buffer_pool_reserve(struct aws_s3_buffe buffer_pool->secondary_reserved; /* - * If we are allocating from secondary and there is unused space in + * If we are allocating from secondary and there is unused space in * primary, trim the primary in hopes we can free up enough memory. * TODO: something smarter, like partial trim? */ @@ -261,6 +276,13 @@ struct aws_s3_buffer_pool_ticket *aws_s3_buffer_pool_reserve(struct aws_s3_buffe buffer_pool->secondary_reserved; } + /* Don't let forced buffers account for 100% of the memory limit */ + const size_t max_impact_of_forced_on_limit = + (size_t)(buffer_pool->mem_limit * (s_max_impact_of_forced_buffers_on_memory_limit_as_percentage / 100.0)); + if (buffer_pool->forced_used > max_impact_of_forced_on_limit) { + overall_taken -= buffer_pool->forced_used - max_impact_of_forced_on_limit; + } + if ((size + overall_taken) <= buffer_pool->mem_limit) { ticket = aws_mem_calloc(buffer_pool->base_allocator, 1, sizeof(struct aws_s3_buffer_pool_ticket)); ticket->size = size; @@ -288,23 +310,26 @@ struct aws_s3_buffer_pool_ticket *aws_s3_buffer_pool_reserve(struct aws_s3_buffe bool aws_s3_buffer_pool_has_reservation_hold(struct aws_s3_buffer_pool *buffer_pool) { AWS_PRECONDITION(buffer_pool); - AWS_LOGF_TRACE(AWS_LS_S3_CLIENT, "Releasing buffer reservation hold."); return buffer_pool->has_reservation_hold; } void aws_s3_buffer_pool_remove_reservation_hold(struct aws_s3_buffer_pool *buffer_pool) { AWS_PRECONDITION(buffer_pool); + AWS_LOGF_TRACE(AWS_LS_S3_CLIENT, "Releasing buffer reservation hold."); buffer_pool->has_reservation_hold = false; } -static uint8_t *s_primary_acquire_synced(struct aws_s3_buffer_pool *buffer_pool, size_t size, size_t *out_chunks_used) { +static uint8_t *s_primary_acquire_synced( + struct aws_s3_buffer_pool *buffer_pool, + struct aws_s3_buffer_pool_ticket *ticket) { + uint8_t *alloc_ptr = NULL; - size_t chunks_needed = size / buffer_pool->chunk_size; - if (size % buffer_pool->chunk_size != 0) { + size_t chunks_needed = ticket->size / buffer_pool->chunk_size; + if (ticket->size % buffer_pool->chunk_size != 0) { ++chunks_needed; /* round up */ } - *out_chunks_used = chunks_needed; + ticket->chunks_used = chunks_needed; /* Look for space in existing blocks */ for (size_t i = 0; i < aws_array_list_length(&buffer_pool->blocks); ++i) { @@ -331,15 +356,24 @@ static uint8_t *s_primary_acquire_synced(struct aws_s3_buffer_pool *buffer_pool, buffer_pool->primary_allocated += buffer_pool->block_size; on_allocated: - buffer_pool->primary_reserved -= size; - buffer_pool->primary_used += size; + buffer_pool->primary_used += ticket->size; + + /* forced buffers acquire immediately, without reserving first */ + if (ticket->forced == false) { + buffer_pool->primary_reserved -= ticket->size; + } return alloc_ptr; } +static struct aws_byte_buf s_acquire_buffer_synced( + struct aws_s3_buffer_pool *buffer_pool, + struct aws_s3_buffer_pool_ticket *ticket); + struct aws_byte_buf aws_s3_buffer_pool_acquire_buffer( struct aws_s3_buffer_pool *buffer_pool, struct aws_s3_buffer_pool_ticket *ticket) { + AWS_PRECONDITION(buffer_pool); AWS_PRECONDITION(ticket); @@ -347,22 +381,59 @@ struct aws_byte_buf aws_s3_buffer_pool_acquire_buffer( return aws_byte_buf_from_empty_array(ticket->ptr, ticket->size); } - uint8_t *alloc_ptr = NULL; - aws_mutex_lock(&buffer_pool->mutex); + struct aws_byte_buf buf = s_acquire_buffer_synced(buffer_pool, ticket); + aws_mutex_unlock(&buffer_pool->mutex); + + return buf; +} + +static struct aws_byte_buf s_acquire_buffer_synced( + struct aws_s3_buffer_pool *buffer_pool, + struct aws_s3_buffer_pool_ticket *ticket) { + + AWS_PRECONDITION(ticket->ptr == NULL); if (ticket->size <= buffer_pool->primary_size_cutoff) { - alloc_ptr = s_primary_acquire_synced(buffer_pool, ticket->size, &ticket->chunks_used); + ticket->ptr = s_primary_acquire_synced(buffer_pool, ticket); } else { - alloc_ptr = aws_mem_acquire(buffer_pool->base_allocator, ticket->size); - buffer_pool->secondary_reserved -= ticket->size; + ticket->ptr = aws_mem_acquire(buffer_pool->base_allocator, ticket->size); buffer_pool->secondary_used += ticket->size; + + /* forced buffers acquire immediately, without reserving first */ + if (ticket->forced == false) { + buffer_pool->secondary_reserved -= ticket->size; + } } + return aws_byte_buf_from_empty_array(ticket->ptr, ticket->size); +} + +struct aws_byte_buf aws_s3_buffer_pool_acquire_forced_buffer( + struct aws_s3_buffer_pool *buffer_pool, + size_t size, + struct aws_s3_buffer_pool_ticket **out_new_ticket) { + + AWS_PRECONDITION(buffer_pool); + AWS_PRECONDITION(out_new_ticket); + + AWS_FATAL_ASSERT(size != 0); + + aws_mutex_lock(&buffer_pool->mutex); + + struct aws_s3_buffer_pool_ticket *ticket = + aws_mem_calloc(buffer_pool->base_allocator, 1, sizeof(struct aws_s3_buffer_pool_ticket)); + ticket->size = size; + ticket->forced = true; + + struct aws_byte_buf buf = s_acquire_buffer_synced(buffer_pool, ticket); + + buffer_pool->forced_used += size; + aws_mutex_unlock(&buffer_pool->mutex); - ticket->ptr = alloc_ptr; - return aws_byte_buf_from_empty_array(ticket->ptr, ticket->size); + *out_new_ticket = ticket; + return buf; } void aws_s3_buffer_pool_release_ticket( @@ -416,6 +487,10 @@ void aws_s3_buffer_pool_release_ticket( buffer_pool->secondary_used -= ticket->size; } + if (ticket->forced) { + buffer_pool->forced_used -= ticket->size; + } + aws_mem_release(buffer_pool->base_allocator, ticket); aws_mutex_unlock(&buffer_pool->mutex); @@ -433,6 +508,7 @@ struct aws_s3_buffer_pool_usage_stats aws_s3_buffer_pool_get_usage(struct aws_s3 .primary_num_blocks = aws_array_list_length(&buffer_pool->blocks), .secondary_used = buffer_pool->secondary_used, .secondary_reserved = buffer_pool->secondary_reserved, + .forced_used = buffer_pool->forced_used, }; aws_mutex_unlock(&buffer_pool->mutex); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index daea6913c..bd4e2521a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -359,6 +359,9 @@ add_test_case(test_s3_buffer_pool_trim) add_test_case(test_s3_buffer_pool_reservation_hold) add_test_case(test_s3_buffer_pool_too_small) add_net_test_case(test_s3_put_object_buffer_pool_trim) +add_test_case(test_s3_buffer_pool_forced_buffer) +add_test_case(test_s3_buffer_pool_forced_buffer_after_reservation_hold) +add_test_case(test_s3_buffer_pool_forced_buffer_wont_stop_reservations) add_net_test_case(client_update_upload_part_timeout) add_net_test_case(client_meta_request_override_part_size) diff --git a/tests/s3_buffer_pool_tests.c b/tests/s3_buffer_pool_tests.c index 40c6cb288..bdf05378d 100644 --- a/tests/s3_buffer_pool_tests.c +++ b/tests/s3_buffer_pool_tests.c @@ -217,3 +217,145 @@ static int s_test_s3_buffer_pool_too_small(struct aws_allocator *allocator, void return 0; }; AWS_TEST_CASE(test_s3_buffer_pool_too_small, s_test_s3_buffer_pool_too_small) + +/* Sanity check that forced-buffer allocation works at all */ +static int s_test_s3_buffer_pool_forced_buffer(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + const size_t chunk_size = MB_TO_BYTES(8); + struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, chunk_size, GB_TO_BYTES(1)); + + { /* Acquire forced buffer from primary storage */ + size_t acquire_size = chunk_size; + struct aws_s3_buffer_pool_ticket *forced_ticket = NULL; + struct aws_byte_buf forced_buf = + aws_s3_buffer_pool_acquire_forced_buffer(buffer_pool, acquire_size, &forced_ticket); + ASSERT_NOT_NULL(forced_ticket); + ASSERT_UINT_EQUALS(acquire_size, forced_buf.capacity); + ASSERT_UINT_EQUALS(0, forced_buf.len); + + struct aws_s3_buffer_pool_usage_stats stats = aws_s3_buffer_pool_get_usage(buffer_pool); + ASSERT_UINT_EQUALS(acquire_size, stats.forced_used); + ASSERT_UINT_EQUALS(acquire_size, stats.primary_used); + ASSERT_UINT_EQUALS(0, stats.primary_reserved); + aws_s3_buffer_pool_release_ticket(buffer_pool, forced_ticket); + } + + { /* Acquire forced buffer from secondary storage */ + size_t acquire_size = aws_s3_buffer_pool_get_usage(buffer_pool).primary_cutoff + 1; + struct aws_s3_buffer_pool_ticket *forced_ticket = NULL; + struct aws_byte_buf forced_buf = + aws_s3_buffer_pool_acquire_forced_buffer(buffer_pool, acquire_size, &forced_ticket); + ASSERT_NOT_NULL(forced_ticket); + ASSERT_UINT_EQUALS(acquire_size, forced_buf.capacity); + ASSERT_UINT_EQUALS(0, forced_buf.len); + + struct aws_s3_buffer_pool_usage_stats stats = aws_s3_buffer_pool_get_usage(buffer_pool); + ASSERT_UINT_EQUALS(acquire_size, stats.forced_used); + ASSERT_UINT_EQUALS(acquire_size, stats.secondary_used); + ASSERT_UINT_EQUALS(0, stats.secondary_reserved); + aws_s3_buffer_pool_release_ticket(buffer_pool, forced_ticket); + } + + /* Assert stats go back down after tickets released */ + struct aws_s3_buffer_pool_usage_stats stats = aws_s3_buffer_pool_get_usage(buffer_pool); + ASSERT_UINT_EQUALS(0, stats.forced_used); + ASSERT_UINT_EQUALS(0, stats.primary_used); + ASSERT_UINT_EQUALS(0, stats.secondary_used); + + aws_s3_buffer_pool_destroy(buffer_pool); + return 0; +} +AWS_TEST_CASE(test_s3_buffer_pool_forced_buffer, s_test_s3_buffer_pool_forced_buffer) + +/* Test that we can still acquire forced buffers, even after pool has a reservation-hold */ +static int s_test_s3_buffer_pool_forced_buffer_after_reservation_hold(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + const size_t chunk_size = MB_TO_BYTES(8); + struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, chunk_size, GB_TO_BYTES(1)); + + /* Reserve normal tickets until pool has reservation-hold */ + struct aws_array_list normal_tickets; + aws_array_list_init_dynamic(&normal_tickets, allocator, 1, sizeof(struct aws_s3_buffer_pool_ticket *)); + while (aws_s3_buffer_pool_has_reservation_hold(buffer_pool) == false) { + struct aws_s3_buffer_pool_ticket *normal_ticket = aws_s3_buffer_pool_reserve(buffer_pool, chunk_size); + if (normal_ticket != NULL) { + aws_array_list_push_back(&normal_tickets, &normal_ticket); + } + } + + /* Assert we can still get a forced-buffer */ + struct aws_s3_buffer_pool_ticket *forced_ticket_1 = NULL; + struct aws_byte_buf forced_buf_1 = + aws_s3_buffer_pool_acquire_forced_buffer(buffer_pool, chunk_size, &forced_ticket_1); + ASSERT_NOT_NULL(forced_ticket_1); + ASSERT_UINT_EQUALS(chunk_size, forced_buf_1.capacity); + + /* Assert we can still acquire buffers for all those normal reservations */ + for (size_t i = 0; i < aws_array_list_length(&normal_tickets); ++i) { + struct aws_s3_buffer_pool_ticket *normal_ticket; + aws_array_list_get_at(&normal_tickets, &normal_ticket, i); + struct aws_byte_buf normal_buf = aws_s3_buffer_pool_acquire_buffer(buffer_pool, normal_ticket); + ASSERT_UINT_EQUALS(chunk_size, normal_buf.capacity); + } + + /* Assert we can still get a forced-buffer */ + struct aws_s3_buffer_pool_ticket *forced_ticket_2 = NULL; + struct aws_byte_buf forced_buf_2 = + aws_s3_buffer_pool_acquire_forced_buffer(buffer_pool, chunk_size, &forced_ticket_2); + ASSERT_NOT_NULL(forced_ticket_2); + ASSERT_UINT_EQUALS(chunk_size, forced_buf_2.capacity); + + /* Cleanup */ + for (size_t i = 0; i < aws_array_list_length(&normal_tickets); ++i) { + struct aws_s3_buffer_pool_ticket *normal_ticket; + aws_array_list_get_at(&normal_tickets, &normal_ticket, i); + aws_s3_buffer_pool_release_ticket(buffer_pool, normal_ticket); + } + aws_array_list_clean_up(&normal_tickets); + + aws_s3_buffer_pool_release_ticket(buffer_pool, forced_ticket_1); + aws_s3_buffer_pool_release_ticket(buffer_pool, forced_ticket_2); + aws_s3_buffer_pool_destroy(buffer_pool); + return 0; +} +AWS_TEST_CASE( + test_s3_buffer_pool_forced_buffer_after_reservation_hold, + s_test_s3_buffer_pool_forced_buffer_after_reservation_hold) + +/* Test that some normal tickets can still be reserved, even if forced-buffer usage is huge. + * This is important because, if either system can stop the other from working, we risk deadlock. */ +static int s_test_s3_buffer_pool_forced_buffer_wont_stop_reservations(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + const size_t chunk_size = MB_TO_BYTES(8); + const size_t mem_limit = GB_TO_BYTES(1); + struct aws_s3_buffer_pool *buffer_pool = aws_s3_buffer_pool_new(allocator, chunk_size, mem_limit); + + /* Skip test if this machine can't do enormous allocations */ + void *try_large_alloc = malloc(mem_limit); + if (try_large_alloc == NULL) { + aws_s3_buffer_pool_destroy(buffer_pool); + return AWS_OP_SKIP; + } + free(try_large_alloc); + + /* Allocate enormous forced buffer */ + struct aws_s3_buffer_pool_ticket *forced_ticket = NULL; + struct aws_byte_buf forced_buf = aws_s3_buffer_pool_acquire_forced_buffer(buffer_pool, mem_limit, &forced_ticket); + ASSERT_NOT_NULL(forced_ticket); + ASSERT_UINT_EQUALS(mem_limit, forced_buf.capacity); + + /* Assert we can still reserve a normal ticket & allocate a normal buffer */ + struct aws_s3_buffer_pool_ticket *normal_ticket = aws_s3_buffer_pool_reserve(buffer_pool, chunk_size); + ASSERT_NOT_NULL(normal_ticket); + struct aws_byte_buf normal_buffer = aws_s3_buffer_pool_acquire_buffer(buffer_pool, normal_ticket); + ASSERT_UINT_EQUALS(chunk_size, normal_buffer.capacity); + aws_s3_buffer_pool_release_ticket(buffer_pool, normal_ticket); + + /* Cleanup */ + aws_s3_buffer_pool_release_ticket(buffer_pool, forced_ticket); + aws_s3_buffer_pool_destroy(buffer_pool); + return 0; +} +AWS_TEST_CASE( + test_s3_buffer_pool_forced_buffer_wont_stop_reservations, + s_test_s3_buffer_pool_forced_buffer_wont_stop_reservations)