Skip to content

Commit

Permalink
Buffer-pool allows "forced" buffers, which don't count against memory…
Browse files Browse the repository at this point in the history
… limit (#429)

**Issue:**
`aws_s3_meta_request_write()` must write to a buffer immediately if the data is less than part-size. Currently, it uses a buffer hooked up the to default allocator ([code here](https://github.com/awslabs/aws-c-s3/blob/cc06c419448b40417caa7b587f61bb4d8b4c08c1/source/s3_meta_request.c#L2260-L2262)). We'd like to get these buffers from the [buffer-pool](#368), to reduce memory fragmentation and resident set size (RSS).

The problem is: the buffer-pool maintains strict memory limits, and won't allow further allocations when that limit is hit. But `aws_s3_meta_request_write()` MUST get a buffer immediately, or else the system can deadlock (see description in PR #418)

**Description of changes:**
Add `aws_s3_buffer_pool_acquire_forced_buffer()`. These buffers can be created even if they exceed the memory limit.

**Future work:**
Modify `aws_s3_meta_request_write()` to use this new function

**Additional thoughts:**
- Anyone using `aws_s3_meta_request_write()` should limit the total number of uploads like: `max-uploads = memory-limit / part-size`. That was the case even before this PR.
  • Loading branch information
graebm authored May 9, 2024
1 parent cc06c41 commit 3647b4b
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 23 deletions.
6 changes: 6 additions & 0 deletions docs/memory_aware_request_execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ only, and supports acquiring several chunks (up to 4) at once.
Blocks are kept around while there are ongoing requests and are released async,
when there is low pressure on memory.

One complication is "forced" buffers. A forced buffer is one that
comes from primary or secondary storage as usual, but it is allowed to exceed
the memory limit. Forced buffers are only used when waiting for a normal ticket
reservation could cause deadlock. (At time of writing, they're only used for
async-writes)

### Scheduling
Running out of memory is a terminal condition within CRT and in general its not
practical to try to set overall memory limit on all allocations, since it
Expand Down
35 changes: 27 additions & 8 deletions include/aws/s3/private/s3_buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,26 @@ struct aws_s3_buffer_pool_usage_stats {
/* Max size of buffer to be allocated from primary. */
size_t primary_cutoff;

/* How much mem is used in primary storage. includes memory used by blocks
* that are waiting on all allocs to release before being put back in circulation. */
size_t primary_used;
/* Overall memory allocated for blocks. */
size_t primary_allocated;
/* Reserved memory. Does not account for how that memory will map into
* blocks and in practice can be lower than used memory. */
size_t primary_reserved;
/* Number of blocks allocated in primary. */
size_t primary_num_blocks;
/* Memory used in primary storage.
* Does not account for wasted space if memory doesn't map perfectly into chunks.
* This is always <= primary_allocated */
size_t primary_used;
/* How much memory is reserved, but not yet used, in primary storage.
* Does not account for wasted space if memory doesn't map perfectly into chunks. */
size_t primary_reserved;

/* Secondary mem used. Accurate, maps directly to base allocator. */
/* Secondary memory used. Accurate, maps directly to base allocator. */
size_t secondary_used;
/* Secondary mem reserved. Accurate, maps directly to base allocator. */
/* Secondary memory reserved, but not yet used. Accurate, maps directly to base allocator. */
size_t secondary_reserved;

/* Bytes used in "forced" buffers (created even if they exceed memory limits).
* This is always <= primary_used + secondary_used */
size_t forced_used;
};

/*
Expand Down Expand Up @@ -86,6 +91,9 @@ AWS_S3_API void aws_s3_buffer_pool_destroy(struct aws_s3_buffer_pool *buffer_poo
* On failure NULL is returned, error is raised and reservation hold is placed
* on the buffer. Any further reservations while hold is active will fail.
* Remove reservation hold to unblock reservations.
*
* If you MUST acquire a buffer now (waiting to reserve a ticket would risk deadlock),
* use aws_s3_buffer_pool_acquire_forced_buffer() instead.
*/
AWS_S3_API struct aws_s3_buffer_pool_ticket *aws_s3_buffer_pool_reserve(
struct aws_s3_buffer_pool *buffer_pool,
Expand All @@ -111,6 +119,17 @@ AWS_S3_API struct aws_byte_buf aws_s3_buffer_pool_acquire_buffer(
struct aws_s3_buffer_pool *buffer_pool,
struct aws_s3_buffer_pool_ticket *ticket);

/*
* Force immediate acquisition of a buffer from the pool.
* This should only be used if waiting to reserve a ticket would risk deadlock.
* This cannot fail, not even if the pool has a reservation hold,
* not even if the memory limit has been exceeded.
*/
AWS_S3_API struct aws_byte_buf aws_s3_buffer_pool_acquire_forced_buffer(
struct aws_s3_buffer_pool *buffer_pool,
size_t size,
struct aws_s3_buffer_pool_ticket **out_new_ticket);

/*
* Releases the ticket.
* Any buffers associated with the ticket are invalidated.
Expand Down
106 changes: 91 additions & 15 deletions source/s3_buffer_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@
* of 32mb (4 chunks) or 16 buffers of 8mb (1 chunk). If requested buffer size
* is 12mb, 2 chunks are used for acquire and 4mb will be wasted.
* Secondary storage delegates directly to system allocator.
*
* One complication is "forced" buffers. A forced buffer is one that
* comes from primary or secondary storage as usual, but it is allowed to exceed
* the memory limit. This is only used when we want to use memory from
* the pool, but waiting for a normal ticket reservation could cause deadlock.
*/

struct aws_s3_buffer_pool_ticket {
size_t size;
uint8_t *ptr;
size_t chunks_used;
bool forced;
};

/* Default size for blocks array. Note: this is just for meta info, blocks
Expand All @@ -63,6 +69,11 @@ static const size_t s_chunks_per_block = 16;
*/
static const size_t s_max_chunk_size_for_buffer_reuse = MB_TO_BYTES(64);

/* Forced buffers only count against the memory limit up to a certain percent.
* For example: if mem_limit is 10GiB, and forced_use is 11GiB, and THIS number is 90(%),
* we still consider 1GiB available for normal buffer usage. */
static const size_t s_max_impact_of_forced_buffers_on_memory_limit_as_percentage = 80;

struct aws_s3_buffer_pool {
struct aws_allocator *base_allocator;
struct aws_mutex mutex;
Expand All @@ -72,6 +83,8 @@ struct aws_s3_buffer_pool {
/* size at which allocations should go to secondary */
size_t primary_size_cutoff;

/* NOTE: See aws_s3_buffer_pool_usage_stats for descriptions of most fields */

size_t mem_limit;

bool has_reservation_hold;
Expand All @@ -83,6 +96,8 @@ struct aws_s3_buffer_pool {
size_t secondary_reserved;
size_t secondary_used;

size_t forced_used;

struct aws_array_list blocks;
};

Expand Down Expand Up @@ -249,7 +264,7 @@ struct aws_s3_buffer_pool_ticket *aws_s3_buffer_pool_reserve(struct aws_s3_buffe
buffer_pool->secondary_reserved;

/*
* If we are allocating from secondary and there is unused space in
* If we are allocating from secondary and there is unused space in
* primary, trim the primary in hopes we can free up enough memory.
* TODO: something smarter, like partial trim?
*/
Expand All @@ -261,6 +276,13 @@ struct aws_s3_buffer_pool_ticket *aws_s3_buffer_pool_reserve(struct aws_s3_buffe
buffer_pool->secondary_reserved;
}

/* Don't let forced buffers account for 100% of the memory limit */
const size_t max_impact_of_forced_on_limit =
(size_t)(buffer_pool->mem_limit * (s_max_impact_of_forced_buffers_on_memory_limit_as_percentage / 100.0));
if (buffer_pool->forced_used > max_impact_of_forced_on_limit) {
overall_taken -= buffer_pool->forced_used - max_impact_of_forced_on_limit;
}

if ((size + overall_taken) <= buffer_pool->mem_limit) {
ticket = aws_mem_calloc(buffer_pool->base_allocator, 1, sizeof(struct aws_s3_buffer_pool_ticket));
ticket->size = size;
Expand Down Expand Up @@ -288,23 +310,26 @@ struct aws_s3_buffer_pool_ticket *aws_s3_buffer_pool_reserve(struct aws_s3_buffe

bool aws_s3_buffer_pool_has_reservation_hold(struct aws_s3_buffer_pool *buffer_pool) {
AWS_PRECONDITION(buffer_pool);
AWS_LOGF_TRACE(AWS_LS_S3_CLIENT, "Releasing buffer reservation hold.");
return buffer_pool->has_reservation_hold;
}

void aws_s3_buffer_pool_remove_reservation_hold(struct aws_s3_buffer_pool *buffer_pool) {
AWS_PRECONDITION(buffer_pool);
AWS_LOGF_TRACE(AWS_LS_S3_CLIENT, "Releasing buffer reservation hold.");
buffer_pool->has_reservation_hold = false;
}

static uint8_t *s_primary_acquire_synced(struct aws_s3_buffer_pool *buffer_pool, size_t size, size_t *out_chunks_used) {
static uint8_t *s_primary_acquire_synced(
struct aws_s3_buffer_pool *buffer_pool,
struct aws_s3_buffer_pool_ticket *ticket) {

uint8_t *alloc_ptr = NULL;

size_t chunks_needed = size / buffer_pool->chunk_size;
if (size % buffer_pool->chunk_size != 0) {
size_t chunks_needed = ticket->size / buffer_pool->chunk_size;
if (ticket->size % buffer_pool->chunk_size != 0) {
++chunks_needed; /* round up */
}
*out_chunks_used = chunks_needed;
ticket->chunks_used = chunks_needed;

/* Look for space in existing blocks */
for (size_t i = 0; i < aws_array_list_length(&buffer_pool->blocks); ++i) {
Expand All @@ -331,38 +356,84 @@ static uint8_t *s_primary_acquire_synced(struct aws_s3_buffer_pool *buffer_pool,
buffer_pool->primary_allocated += buffer_pool->block_size;

on_allocated:
buffer_pool->primary_reserved -= size;
buffer_pool->primary_used += size;
buffer_pool->primary_used += ticket->size;

/* forced buffers acquire immediately, without reserving first */
if (ticket->forced == false) {
buffer_pool->primary_reserved -= ticket->size;
}

return alloc_ptr;
}

static struct aws_byte_buf s_acquire_buffer_synced(
struct aws_s3_buffer_pool *buffer_pool,
struct aws_s3_buffer_pool_ticket *ticket);

struct aws_byte_buf aws_s3_buffer_pool_acquire_buffer(
struct aws_s3_buffer_pool *buffer_pool,
struct aws_s3_buffer_pool_ticket *ticket) {

AWS_PRECONDITION(buffer_pool);
AWS_PRECONDITION(ticket);

if (ticket->ptr != NULL) {
return aws_byte_buf_from_empty_array(ticket->ptr, ticket->size);
}

uint8_t *alloc_ptr = NULL;

aws_mutex_lock(&buffer_pool->mutex);
struct aws_byte_buf buf = s_acquire_buffer_synced(buffer_pool, ticket);
aws_mutex_unlock(&buffer_pool->mutex);

return buf;
}

static struct aws_byte_buf s_acquire_buffer_synced(
struct aws_s3_buffer_pool *buffer_pool,
struct aws_s3_buffer_pool_ticket *ticket) {

AWS_PRECONDITION(ticket->ptr == NULL);

if (ticket->size <= buffer_pool->primary_size_cutoff) {
alloc_ptr = s_primary_acquire_synced(buffer_pool, ticket->size, &ticket->chunks_used);
ticket->ptr = s_primary_acquire_synced(buffer_pool, ticket);
} else {
alloc_ptr = aws_mem_acquire(buffer_pool->base_allocator, ticket->size);
buffer_pool->secondary_reserved -= ticket->size;
ticket->ptr = aws_mem_acquire(buffer_pool->base_allocator, ticket->size);
buffer_pool->secondary_used += ticket->size;

/* forced buffers acquire immediately, without reserving first */
if (ticket->forced == false) {
buffer_pool->secondary_reserved -= ticket->size;
}
}

return aws_byte_buf_from_empty_array(ticket->ptr, ticket->size);
}

struct aws_byte_buf aws_s3_buffer_pool_acquire_forced_buffer(
struct aws_s3_buffer_pool *buffer_pool,
size_t size,
struct aws_s3_buffer_pool_ticket **out_new_ticket) {

AWS_PRECONDITION(buffer_pool);
AWS_PRECONDITION(out_new_ticket);

AWS_FATAL_ASSERT(size != 0);

aws_mutex_lock(&buffer_pool->mutex);

struct aws_s3_buffer_pool_ticket *ticket =
aws_mem_calloc(buffer_pool->base_allocator, 1, sizeof(struct aws_s3_buffer_pool_ticket));
ticket->size = size;
ticket->forced = true;

struct aws_byte_buf buf = s_acquire_buffer_synced(buffer_pool, ticket);

buffer_pool->forced_used += size;

aws_mutex_unlock(&buffer_pool->mutex);
ticket->ptr = alloc_ptr;

return aws_byte_buf_from_empty_array(ticket->ptr, ticket->size);
*out_new_ticket = ticket;
return buf;
}

void aws_s3_buffer_pool_release_ticket(
Expand Down Expand Up @@ -416,6 +487,10 @@ void aws_s3_buffer_pool_release_ticket(
buffer_pool->secondary_used -= ticket->size;
}

if (ticket->forced) {
buffer_pool->forced_used -= ticket->size;
}

aws_mem_release(buffer_pool->base_allocator, ticket);

aws_mutex_unlock(&buffer_pool->mutex);
Expand All @@ -433,6 +508,7 @@ struct aws_s3_buffer_pool_usage_stats aws_s3_buffer_pool_get_usage(struct aws_s3
.primary_num_blocks = aws_array_list_length(&buffer_pool->blocks),
.secondary_used = buffer_pool->secondary_used,
.secondary_reserved = buffer_pool->secondary_reserved,
.forced_used = buffer_pool->forced_used,
};

aws_mutex_unlock(&buffer_pool->mutex);
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,9 @@ add_test_case(test_s3_buffer_pool_trim)
add_test_case(test_s3_buffer_pool_reservation_hold)
add_test_case(test_s3_buffer_pool_too_small)
add_net_test_case(test_s3_put_object_buffer_pool_trim)
add_test_case(test_s3_buffer_pool_forced_buffer)
add_test_case(test_s3_buffer_pool_forced_buffer_after_reservation_hold)
add_test_case(test_s3_buffer_pool_forced_buffer_wont_stop_reservations)

add_net_test_case(client_update_upload_part_timeout)
add_net_test_case(client_meta_request_override_part_size)
Expand Down
Loading

0 comments on commit 3647b4b

Please sign in to comment.