Skip to content

Commit

Permalink
Refactor MemoryLimiter to specify tracked 'area' when reserving memory (
Browse files Browse the repository at this point in the history
#1161)

The memory limiter currently tracks the amount of memory reserved for
prefetching. We plan to extend this as part of supporting appends in S3
Express One Zone (#1160).

This change (originally authored by @monthonk) refactors the memory
limiter API to allow specifying the "area" we'd like to reserve in, for
the purpose of metrics for now.

### Does this change impact existing behavior?

No change to existing behavior.

### Does this change need a changelog entry?

No.

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

Signed-off-by: Daniel Carl Jones <[email protected]>
Co-authored-by: Monthon Klongklaew <[email protected]>
  • Loading branch information
dannycjones and monthonk authored Nov 22, 2024
1 parent 4bb64e1 commit 47e1d56
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 31 deletions.
63 changes: 40 additions & 23 deletions mountpoint-s3/src/mem_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ use tracing::{debug, trace};

pub const MINIMUM_MEM_LIMIT: u64 = 512 * 1024 * 1024;

/// Buffer areas that can be managed by the memory limiter. This is used for updating metrics.
pub enum BufferArea {
Prefetch,
}

/// `MemoryLimiter` tracks memory used by Mountpoint and makes decisions if a new memory reservation request can be accepted.
/// Currently, there are two metrics we take into account:
/// 1) the memory reserved by prefetcher instances for the data requested or fetched from CRT client.
Expand Down Expand Up @@ -38,9 +43,9 @@ pub const MINIMUM_MEM_LIMIT: u64 = 512 * 1024 * 1024;
#[derive(Debug)]
pub struct MemoryLimiter<Client: ObjectClient> {
mem_limit: u64,
/// Reserved memory for data we had requested via the request task but may not
/// arrived yet.
prefetcher_mem_reserved: AtomicU64,
/// Reserved memory for allocations we are tracking, such as buffers we allocate for prefetching.
/// The memory may not be used yet but has been reserved.
mem_reserved: AtomicU64,
/// Additional reserved memory for other non-buffer usage like storing metadata
additional_mem_reserved: u64,
// We will also take client's reserved memory into account because even if the
Expand All @@ -63,62 +68,74 @@ impl<Client: ObjectClient> MemoryLimiter<Client> {
Self {
client,
mem_limit,
prefetcher_mem_reserved: AtomicU64::new(0),
mem_reserved: AtomicU64::new(0),
additional_mem_reserved: reserved_mem,
}
}

/// Reserve the memory for future uses. Always succeeds, even if it means going beyond
/// the configured memory limit.
pub fn reserve(&self, size: u64) {
self.prefetcher_mem_reserved.fetch_add(size, Ordering::SeqCst);
metrics::gauge!("prefetch.bytes_reserved").increment(size as f64);
pub fn reserve(&self, area: BufferArea, size: u64) {
self.mem_reserved.fetch_add(size, Ordering::SeqCst);
match area {
BufferArea::Prefetch => metrics::gauge!("prefetch.bytes_reserved").increment(size as f64),
}
}

/// Reserve the memory for future uses. If there is not enough memory returns `false`.
pub fn try_reserve(&self, size: u64) -> bool {
pub fn try_reserve(&self, area: BufferArea, size: u64) -> bool {
let start = Instant::now();
let mut prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst);
let mut mem_reserved = self.mem_reserved.load(Ordering::SeqCst);
loop {
let new_prefetcher_mem_reserved = prefetcher_mem_reserved.saturating_add(size);
let new_mem_reserved = mem_reserved.saturating_add(size);
let client_mem_allocated = self.client_mem_allocated();
let new_total_mem_usage = new_prefetcher_mem_reserved
let new_total_mem_usage = new_mem_reserved
.saturating_add(client_mem_allocated)
.saturating_add(self.additional_mem_reserved);
if new_total_mem_usage > self.mem_limit {
trace!(new_total_mem_usage, "not enough memory to reserve");
metrics::histogram!("prefetch.mem_reserve_latency_us").record(start.elapsed().as_micros() as f64);
match area {
BufferArea::Prefetch => metrics::histogram!("prefetch.mem_reserve_latency_us")
.record(start.elapsed().as_micros() as f64),
}
return false;
}
// Check that the value we have read is still the same before updating it
match self.prefetcher_mem_reserved.compare_exchange_weak(
prefetcher_mem_reserved,
new_prefetcher_mem_reserved,
match self.mem_reserved.compare_exchange_weak(
mem_reserved,
new_mem_reserved,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
metrics::gauge!("prefetch.bytes_reserved").increment(size as f64);
metrics::histogram!("prefetch.mem_reserve_latency_us").record(start.elapsed().as_micros() as f64);
match area {
BufferArea::Prefetch => {
metrics::gauge!("prefetch.bytes_reserved").increment(size as f64);
metrics::histogram!("prefetch.mem_reserve_latency_us")
.record(start.elapsed().as_micros() as f64);
}
}
return true;
}
Err(current) => prefetcher_mem_reserved = current, // another thread updated the atomic before us, trying again
Err(current) => mem_reserved = current, // another thread updated the atomic before us, trying again
}
}
}

/// Release the reserved memory.
pub fn release(&self, size: u64) {
self.prefetcher_mem_reserved.fetch_sub(size, Ordering::SeqCst);
metrics::gauge!("prefetch.bytes_reserved").decrement(size as f64);
pub fn release(&self, area: BufferArea, size: u64) {
self.mem_reserved.fetch_sub(size, Ordering::SeqCst);
match area {
BufferArea::Prefetch => metrics::gauge!("prefetch.bytes_reserved").decrement(size as f64),
}
}

/// Query available memory tracked by the memory limiter.
pub fn available_mem(&self) -> u64 {
let prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst);
let mem_reserved = self.mem_reserved.load(Ordering::SeqCst);
let client_mem_allocated = self.client_mem_allocated();
self.mem_limit
.saturating_sub(prefetcher_mem_reserved)
.saturating_sub(mem_reserved)
.saturating_sub(client_mem_allocated)
.saturating_sub(self.additional_mem_reserved)
}
Expand Down
12 changes: 6 additions & 6 deletions mountpoint-s3/src/prefetch/backpressure_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use humansize::make_format;
use mountpoint_s3_client::ObjectClient;
use tracing::trace;

use crate::mem_limiter::MemoryLimiter;
use crate::mem_limiter::{BufferArea, MemoryLimiter};

use super::PrefetchReadError;

Expand Down Expand Up @@ -81,7 +81,7 @@ pub fn new_backpressure_controller<Client: ObjectClient>(
const MIN_WINDOW_SIZE_MULTIPLIER: usize = 2;
let read_window_end_offset = config.request_range.start + config.initial_read_window_size as u64;
let (read_window_updater, read_window_incrementing_queue) = unbounded();
mem_limiter.reserve(config.initial_read_window_size as u64);
mem_limiter.reserve(BufferArea::Prefetch, config.initial_read_window_size as u64);
let controller = BackpressureController {
read_window_updater,
preferred_read_window_size: config.initial_read_window_size,
Expand Down Expand Up @@ -113,7 +113,7 @@ impl<Client: ObjectClient> BackpressureController<Client> {
// Note, that this may come from a backwards seek, so offsets observed by this method are not necessarily ascending
BackpressureFeedbackEvent::DataRead { offset, length } => {
self.next_read_offset = offset + length as u64;
self.mem_limiter.release(length as u64);
self.mem_limiter.release(BufferArea::Prefetch, length as u64);
let remaining_window = self.read_window_end_offset.saturating_sub(self.next_read_offset) as usize;

// Increment the read window only if the remaining window reaches some threshold i.e. half of it left.
Expand All @@ -136,14 +136,14 @@ impl<Client: ObjectClient> BackpressureController<Client> {
// Force incrementing read window regardless of available memory when we are already at minimum
// read window size.
if self.preferred_read_window_size <= self.min_read_window_size {
self.mem_limiter.reserve(to_increase as u64);
self.mem_limiter.reserve(BufferArea::Prefetch, to_increase as u64);
self.increment_read_window(to_increase).await;
break;
}

// Try to reserve the memory for the length we want to increase before sending the request,
// scale down the read window if it fails.
if self.mem_limiter.try_reserve(to_increase as u64) {
if self.mem_limiter.try_reserve(BufferArea::Prefetch, to_increase as u64) {
self.increment_read_window(to_increase).await;
break;
} else {
Expand Down Expand Up @@ -226,7 +226,7 @@ impl<Client: ObjectClient> Drop for BackpressureController<Client> {
// Free up memory we have reserved for the read window.
debug_assert!(self.request_end_offset >= self.next_read_offset);
let remaining_window = self.read_window_end_offset.saturating_sub(self.next_read_offset);
self.mem_limiter.release(remaining_window);
self.mem_limiter.release(BufferArea::Prefetch, remaining_window);
}
}

Expand Down
4 changes: 2 additions & 2 deletions mountpoint-s3/src/prefetch/part_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Instant;
use mountpoint_s3_client::ObjectClient;
use tracing::trace;

use crate::mem_limiter::MemoryLimiter;
use crate::mem_limiter::{BufferArea, MemoryLimiter};
use crate::prefetch::part::Part;
use crate::prefetch::PrefetchReadError;
use crate::sync::async_channel::{unbounded, Receiver, RecvError, Sender};
Expand Down Expand Up @@ -104,7 +104,7 @@ impl<Client: ObjectClient> PartQueue<Client> {
metrics::gauge!("prefetch.bytes_in_queue").increment(part.len() as f64);
// The backpressure controller is not aware of the parts from backwards seek,
// so we have to reserve memory for them here.
self.mem_limiter.reserve(part.len() as u64);
self.mem_limiter.reserve(BufferArea::Prefetch, part.len() as u64);
self.front_queue.push(part);
Ok(())
}
Expand Down

0 comments on commit 47e1d56

Please sign in to comment.