From 6dc1351f09f5c54b7d6b588b05a4edf551529431 Mon Sep 17 00:00:00 2001 From: James Bornholt Date: Fri, 2 Feb 2024 10:21:43 -0600 Subject: [PATCH] Add a new mock client that simulates GET throughput (#723) * Add a new mock client that simulates GET throughput For performance testing and microbenchmarking, we'd like to be able to separate the S3 service and the CRT datapath from our own client and file system. This mock client can simulate a target network throughput by rate-limiting the `get_object` stream. The goal is to be able to use this client in place of a regular `S3CrtClient` when we want to isolate performance questions. Along the way, I realized we're including the mock client in our release builds, because it's an always-on feature of the client crate. This change therefore also does a little dependency refactoring to remove the mock and failure clients from the non-test dependency closure. I checked this works by seeing that the release binary is a few MBs smaller, and that `strings mount-s3 | grep mock` no longer includes the mock client's symbols. Signed-off-by: James Bornholt * Adopt mock client as an option in client benchmark Signed-off-by: James Bornholt * Changelog Signed-off-by: James Bornholt * Dependency fixes Signed-off-by: James Bornholt --------- Signed-off-by: James Bornholt --- Cargo.lock | 90 ++++++- mountpoint-s3-client/CHANGELOG.md | 10 + mountpoint-s3-client/Cargo.toml | 17 +- .../examples/client_benchmark.rs | 123 +++++---- mountpoint-s3-client/src/failure_client.rs | 2 + mountpoint-s3-client/src/mock_client.rs | 5 + .../src/mock_client/leaky_bucket.rs | 235 ++++++++++++++++++ .../src/mock_client/throughput_client.rs | 214 ++++++++++++++++ mountpoint-s3/Cargo.toml | 2 + 9 files changed, 641 insertions(+), 57 deletions(-) create mode 100644 mountpoint-s3-client/src/mock_client/leaky_bucket.rs create mode 100644 mountpoint-s3-client/src/mock_client/throughput_client.rs diff --git a/Cargo.lock b/Cargo.lock index 83eff043d..feff19db6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -137,17 +137,47 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" dependencies = [ "concurrent-queue", - "event-listener", + "event-listener 2.5.3", "futures-core", ] +[[package]] +name = "async-io" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f97ab0c5b00a7cdbe5a371b9a782ee7be1316095885c8a4ea1daf490eb0ef65" +dependencies = [ + "async-lock 3.3.0", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite", + "parking", + "polling", + "rustix 0.38.28", + "slab", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "async-lock" version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" dependencies = [ - "event-listener", + "event-listener 2.5.3", +] + +[[package]] +name = "async-lock" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d034b430882f8381900d3fe6f0aaa3ad94f2cb4ac519b429692a1bc2dda4ae7b" +dependencies = [ + "event-listener 4.0.3", + "event-listener-strategy", + "pin-project-lite", ] [[package]] @@ -1164,6 +1194,27 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "event-listener" +version = "4.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" +dependencies = [ + "event-listener 4.0.3", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -1290,6 +1341,16 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" +[[package]] +name = "futures-lite" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "445ba825b27408685aaecefd65178908c36c6e96aaf6d8599419d46e624192ba" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.29" @@ -1901,7 +1962,7 @@ dependencies = [ "assert_cmd", "assert_fs", "async-channel", - "async-lock", + "async-lock 2.8.0", "async-trait", "aws-config", "aws-sdk-s3", @@ -1960,6 +2021,8 @@ name = "mountpoint-s3-client" version = "0.6.2" dependencies = [ "anyhow", + "async-io", + "async-lock 2.8.0", "async-trait", "auto_impl", "aws-config", @@ -1979,6 +2042,7 @@ dependencies = [ "libc-stdhandle", "md-5", "metrics", + "mountpoint-s3-client", "mountpoint-s3-crt", "once_cell", "percent-encoding", @@ -2180,6 +2244,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" @@ -2291,6 +2361,20 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "polling" +version = "3.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "545c980a3880efd47b2e262f6a4bb6daad6555cf3367aa9c4e52895f69537a41" +dependencies = [ + "cfg-if", + "concurrent-queue", + "pin-project-lite", + "rustix 0.38.28", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "portable-atomic" version = "0.3.20" diff --git a/mountpoint-s3-client/CHANGELOG.md b/mountpoint-s3-client/CHANGELOG.md index c65028c69..116fe78f7 100644 --- a/mountpoint-s3-client/CHANGELOG.md +++ b/mountpoint-s3-client/CHANGELOG.md @@ -1,3 +1,13 @@ +## Unreleased + +### Breaking changes + +* The `mock_client` module is no longer enabled by default, and is now available by enabling the `mock` feature for this crate. ([#723](https://github.com/awslabs/mountpoint-s3/pull/723)) + +### Other changes + +* Introduced a new `ThroughputMockClient` that simulates a target network throughput from an in-memory mock S3 client. This client requires the `mock` feature flag. ([#723](https://github.com/awslabs/mountpoint-s3/pull/723)) + ## v0.6.2 (January 18, 2024) ### Breaking changes diff --git a/mountpoint-s3-client/Cargo.toml b/mountpoint-s3-client/Cargo.toml index 4facfcd78..7ac6966b6 100644 --- a/mountpoint-s3-client/Cargo.toml +++ b/mountpoint-s3-client/Cargo.toml @@ -24,8 +24,6 @@ once_cell = "1.16.0" percent-encoding = "2.2.0" pin-project = "1.0.12" platform-info = "2.0.2" -rand = "0.8.5" -rand_chacha = "0.3.1" regex = "1.7.1" static_assertions = "1.1.0" thiserror = "1.0.34" @@ -34,6 +32,12 @@ tracing = { version = "0.1.35", default-features = false, features = ["std", "lo xmltree = "0.10.3" serde_json = "1.0.104" +# Dependencies for the mock client only +async-io = { version = "2.3.1", optional = true } +async-lock = { version = "2.6.0", optional = true } +rand = { version = "0.8.5", optional = true } +rand_chacha = { version = "0.3.1", optional = true } + [dev-dependencies] anyhow = { version = "1.0.64", features = ["backtrace"] } aws-config = "0.56.0" @@ -42,7 +46,7 @@ aws-sdk-s3 = "0.29.0" aws-sdk-sts = "0.29.0" aws-smithy-runtime-api = "0.56.1" bytes = "1.2.1" -clap = "4.1.9" +clap = { version = "4.1.9", features = ["derive"] } ctor = "0.1.23" proptest = "1.0.0" rusty-fork = "0.3.0" @@ -51,6 +55,11 @@ test-case = "2.2.2" tokio = { version = "1.24.2", features = ["rt", "macros"] } tracing-subscriber = { version = "0.3.14", features = ["fmt", "env-filter"] } +# HACK: we want our own tests to use the mock client, but don't want to enable it for consumers by +# default, so we take a dev-dependency on ourself with that feature enabled. +# https://github.com/rust-lang/cargo/issues/2911#issuecomment-749580481 +mountpoint-s3-client = { path = ".", features = ["mock"] } + [build-dependencies] built = { version = "0.6.0", features = ["git2"] } @@ -58,6 +67,8 @@ built = { version = "0.6.0", features = ["git2"] } doctest = false [features] +mock = ["dep:async-io", "dep:async-lock", "dep:rand", "dep:rand_chacha"] +# Test features s3_tests = [] fips_tests = [] s3express_tests = [] diff --git a/mountpoint-s3-client/examples/client_benchmark.rs b/mountpoint-s3-client/examples/client_benchmark.rs index d88b3ce5c..9002b9a37 100644 --- a/mountpoint-s3-client/examples/client_benchmark.rs +++ b/mountpoint-s3-client/examples/client_benchmark.rs @@ -1,10 +1,14 @@ +use std::pin::pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Instant; -use clap::{Arg, Command}; +use clap::{Parser, Subcommand}; use futures::StreamExt; use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig}; +use mountpoint_s3_client::mock_client::throughput_client::ThroughputMockClient; +use mountpoint_s3_client::mock_client::{MockClientConfig, MockObject}; +use mountpoint_s3_client::types::ETag; use mountpoint_s3_client::{ObjectClient, S3CrtClient}; use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter; use tracing_subscriber::fmt::Subscriber; @@ -23,63 +27,18 @@ fn init_tracing_subscriber() { subscriber.try_init().expect("unable to install global subscriber"); } -fn main() { - init_tracing_subscriber(); - - let matches = Command::new("benchmark") - .about("Download a single key from S3 and ignore its contents") - .arg(Arg::new("bucket").required(true)) - .arg(Arg::new("key").required(true)) - .arg( - Arg::new("throughput-target-gbps") - .long("throughput-target-gbps") - .help("Desired throughput in Gbps"), - ) - .arg( - Arg::new("part-size") - .long("part-size") - .help("Part size for multi-part GET and PUT"), - ) - .arg( - Arg::new("iterations") - .long("iterations") - .help("Number of times to download"), - ) - .arg(Arg::new("region").long("region").default_value("us-east-1")) - .get_matches(); - - let bucket = matches.get_one::("bucket").unwrap(); - let key = matches.get_one::("key").unwrap(); - let throughput_target_gbps = matches - .get_one::("throughput-target-gbps") - .map(|s| s.parse::().expect("throughput target must be an f64")); - let part_size = matches - .get_one::("part-size") - .map(|s| s.parse::().expect("part size must be a usize")); - let iterations = matches - .get_one::("iterations") - .map(|s| s.parse::().expect("iterations must be a number")); - let region = matches.get_one::("region").unwrap(); - - let mut config = S3ClientConfig::new().endpoint_config(EndpointConfig::new(region)); - if let Some(throughput_target_gbps) = throughput_target_gbps { - config = config.throughput_target_gbps(throughput_target_gbps); - } - if let Some(part_size) = part_size { - config = config.part_size(part_size); - } - let client = S3CrtClient::new(config).expect("couldn't create client"); - - for i in 0..iterations.unwrap_or(1) { +fn run_benchmark(client: impl ObjectClient + Clone, num_iterations: usize, bucket: &str, key: &str) { + for i in 0..num_iterations { let received_size = Arc::new(AtomicU64::new(0)); - let start = Instant::now(); let client = client.clone(); let received_size_clone = Arc::clone(&received_size); + let start = Instant::now(); futures::executor::block_on(async move { let mut request = client .get_object(bucket, key, None, None) .await .expect("couldn't create get request"); + let mut request = pin!(request); loop { match StreamExt::next(&mut request).await { Some(Ok((_offset, body))) => { @@ -95,7 +54,6 @@ fn main() { }); let elapsed = start.elapsed(); - let received_size = received_size.load(Ordering::SeqCst); println!( "{}: received {} bytes in {:.2}s: {:.2}MiB/s", @@ -106,3 +64,66 @@ fn main() { ); } } + +#[derive(Subcommand)] +enum Client { + #[command(about = "Download a key from S3")] + Real { + #[arg(help = "Bucket name")] + bucket: String, + #[arg(help = "Key name")] + key: String, + #[arg(long, help = "AWS region", default_value = "us-east-1")] + region: String, + }, + #[command(about = "Download a key from a mock S3 server")] + Mock { + #[arg(help = "Mock object size")] + object_size: u64, + }, +} + +#[derive(Parser)] +struct CliArgs { + #[command(subcommand)] + client: Client, + #[arg(long, help = "Desired throughput in Gbps", default_value = "10.0")] + throughput_target_gbps: f64, + #[arg(long, help = "Part size for multi-part GET", default_value = "8388608")] + part_size: usize, + #[arg(long, help = "Number of benchmark iterations", default_value = "1")] + iterations: usize, +} + +fn main() { + init_tracing_subscriber(); + + let args = CliArgs::parse(); + + match args.client { + Client::Real { bucket, key, region } => { + let mut config = S3ClientConfig::new().endpoint_config(EndpointConfig::new(®ion)); + config = config.throughput_target_gbps(args.throughput_target_gbps); + config = config.part_size(args.part_size); + let client = S3CrtClient::new(config).expect("couldn't create client"); + + run_benchmark(client, args.iterations, &bucket, &key); + } + Client::Mock { object_size } => { + const BUCKET: &str = "bucket"; + const KEY: &str = "key"; + + let config = MockClientConfig { + bucket: BUCKET.to_owned(), + part_size: args.part_size, + unordered_list_seed: None, + }; + let client = ThroughputMockClient::new(config, args.throughput_target_gbps); + let client = Arc::new(client); + + client.add_object(KEY, MockObject::ramp(0xaa, object_size as usize, ETag::for_tests())); + + run_benchmark(client, args.iterations, BUCKET, "key"); + } + } +} diff --git a/mountpoint-s3-client/src/failure_client.rs b/mountpoint-s3-client/src/failure_client.rs index 2d6ce6db6..a7860cf0b 100644 --- a/mountpoint-s3-client/src/failure_client.rs +++ b/mountpoint-s3-client/src/failure_client.rs @@ -1,5 +1,7 @@ //! An [`ObjectClient`] that can inject failures into requests for testing purposes. +#![cfg(feature = "mock")] + use std::collections::HashMap; use std::fmt::Debug; use std::ops::Range; diff --git a/mountpoint-s3-client/src/mock_client.rs b/mountpoint-s3-client/src/mock_client.rs index 26eeb0aeb..5cf55d299 100644 --- a/mountpoint-s3-client/src/mock_client.rs +++ b/mountpoint-s3-client/src/mock_client.rs @@ -1,5 +1,7 @@ //! A mock implementation of an object client for use in tests. +#![cfg(feature = "mock")] + use std::borrow::Cow; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::ops::Range; @@ -27,6 +29,9 @@ use crate::object_client::{ PutObjectRequest, PutObjectResult, RestoreStatus, UploadReview, UploadReviewPart, }; +mod leaky_bucket; +pub mod throughput_client; + pub const RAMP_MODULUS: usize = 251; // Largest prime under 256 static_assertions::const_assert!((RAMP_MODULUS > 0) && (RAMP_MODULUS <= 256)); diff --git a/mountpoint-s3-client/src/mock_client/leaky_bucket.rs b/mountpoint-s3-client/src/mock_client/leaky_bucket.rs new file mode 100644 index 000000000..9da5747f5 --- /dev/null +++ b/mountpoint-s3-client/src/mock_client/leaky_bucket.rs @@ -0,0 +1,235 @@ +//! A token-based rate limiter. +//! +//! This is adapted from https://github.com/Gelbpunkt/leaky-bucket-lite (Apache 2.0) to remove the +//! Tokio dependency and instead work with the generic async-io and async-lock crates that we use. + +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use async_io::Timer; +use async_lock::{Mutex, MutexGuard}; + +#[derive(Debug)] +struct LeakyBucketInner { + /// How many tokens this bucket can hold. + max: u32, + /// Interval at which the bucket gains tokens. + refill_interval: Duration, + /// Amount of tokens gained per interval. + refill_amount: u32, + + locked: Mutex, +} + +#[derive(Debug)] +struct LeakyBucketInnerLocked { + /// Current tokens in the bucket. + tokens: u32, + /// Last refill of the tokens. + last_refill: Instant, +} + +impl LeakyBucketInner { + fn new(max: u32, tokens: u32, refill_interval: Duration, refill_amount: u32) -> Self { + Self { + max, + refill_interval, + refill_amount, + locked: Mutex::new(LeakyBucketInnerLocked { + tokens, + last_refill: Instant::now(), + }), + } + } + + /// Updates the tokens in the leaky bucket and returns the current amount + /// of tokens in the bucket. + #[inline] + fn update_tokens(&self, locked: &mut MutexGuard<'_, LeakyBucketInnerLocked>) -> u32 { + let time_passed = Instant::now() - locked.last_refill; + + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + let refills_since = (time_passed.as_secs_f64() / self.refill_interval.as_secs_f64()).floor() as u32; + + let added_tokens = self.refill_amount.saturating_mul(refills_since); + locked.tokens = locked.tokens.saturating_add(added_tokens).min(self.max); + locked.last_refill += self.refill_interval * refills_since; + + locked.tokens + } + + async fn acquire(&self, amount: u32) { + let mut locked = self.locked.lock().await; + if let Err(target_time) = self.try_acquire_locked(amount, &mut locked) { + Timer::at(target_time).await; + + self.update_tokens(&mut locked); + locked.tokens -= amount; + } + } + + fn try_acquire_locked( + &self, + amount: u32, + locked: &mut MutexGuard<'_, LeakyBucketInnerLocked>, + ) -> Result<(), Instant> { + assert!( + amount <= self.max, + "Acquiring more tokens than the configured maximum is not possible" + ); + + let current_tokens = self.update_tokens(locked); + + if current_tokens < amount { + let tokens_needed = amount - current_tokens; + let mut refills_needed = tokens_needed / self.refill_amount; + let refills_needed_remainder = tokens_needed % self.refill_amount; + + if refills_needed_remainder > 0 { + refills_needed += 1; + } + + Err(locked.last_refill + self.refill_interval * refills_needed) + } else { + locked.tokens -= amount; + Ok(()) + } + } +} + +/// A leaky-bucket rate limiter. +#[derive(Clone, Debug)] +pub struct LeakyBucket { + inner: Arc, +} + +impl LeakyBucket { + fn new(max: u32, tokens: u32, refill_interval: Duration, refill_amount: u32) -> Self { + let inner = Arc::new(LeakyBucketInner::new(max, tokens, refill_interval, refill_amount)); + + Self { inner } + } + + /// Construct a new leaky bucket through a builder. + #[must_use] + pub const fn builder() -> Builder { + Builder::new() + } + + /// Acquire the given `amount` of tokens. This method will panic when acquiring more tokens than + /// the configured maximum. + #[inline] + pub async fn acquire(&self, amount: u32) { + self.inner.acquire(amount).await; + } +} + +/// Builder for a leaky bucket. +#[derive(Debug)] +pub struct Builder { + max: Option, + tokens: Option, + refill_interval: Option, + refill_amount: Option, +} + +impl Builder { + /// Create a new builder with all defaults. + #[must_use] + pub const fn new() -> Self { + Self { + max: None, + tokens: None, + refill_interval: None, + refill_amount: None, + } + } + + /// Set the max value for the builder. + #[must_use] + pub const fn max(mut self, max: u32) -> Self { + self.max = Some(max); + self + } + + /// The number of tokens that the bucket should start with. + /// + /// If set to larger than `max` at build time, will only saturate to max. + #[must_use] + pub const fn tokens(mut self, tokens: u32) -> Self { + self.tokens = Some(tokens); + self + } + + /// Set the max value for the builder. + #[must_use] + pub const fn refill_interval(mut self, refill_interval: Duration) -> Self { + self.refill_interval = Some(refill_interval); + self + } + + /// Set the refill amount to use. + #[must_use] + pub const fn refill_amount(mut self, refill_amount: u32) -> Self { + self.refill_amount = Some(refill_amount); + self + } + + /// Construct a new leaky bucket. + #[must_use] + pub fn build(self) -> LeakyBucket { + const DEFAULT_MAX: u32 = 120; + const DEFAULT_TOKENS: u32 = 0; + const DEFAULT_REFILL_INTERVAL: Duration = Duration::from_secs(1); + const DEFAULT_REFILL_AMOUNT: u32 = 1; + + let max = self.max.unwrap_or(DEFAULT_MAX); + let tokens = self.tokens.unwrap_or(DEFAULT_TOKENS); + let refill_interval = self.refill_interval.unwrap_or(DEFAULT_REFILL_INTERVAL); + let refill_amount = self.refill_amount.unwrap_or(DEFAULT_REFILL_AMOUNT); + + LeakyBucket::new(max, tokens, refill_interval, refill_amount) + } +} + +impl Default for Builder { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_leaky_bucket() { + const INTERVAL: Duration = Duration::from_millis(20); + + let leaky = Builder::new() + .tokens(0) + .max(10) + .refill_amount(10) + .refill_interval(INTERVAL) + .build(); + + let mut wakeups = 0u32; + let mut duration = None; + + let test = async { + let start = Instant::now(); + leaky.acquire(10).await; + wakeups += 1; + leaky.acquire(10).await; + wakeups += 1; + leaky.acquire(10).await; + wakeups += 1; + duration = Some(Instant::now().duration_since(start)); + }; + + test.await; + + assert_eq!(3, wakeups); + assert!(duration.expect("expected measured duration") > INTERVAL * 2); + } +} diff --git a/mountpoint-s3-client/src/mock_client/throughput_client.rs b/mountpoint-s3-client/src/mock_client/throughput_client.rs new file mode 100644 index 000000000..a4cac37b5 --- /dev/null +++ b/mountpoint-s3-client/src/mock_client/throughput_client.rs @@ -0,0 +1,214 @@ +use std::ops::Range; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use async_trait::async_trait; +use futures::stream::BoxStream; +use futures::{Stream, StreamExt}; +use pin_project::pin_project; + +use crate::mock_client::leaky_bucket::LeakyBucket; +use crate::mock_client::{MockClient, MockClientConfig, MockClientError, MockObject, MockPutObjectRequest}; +use crate::object_client::{ + DeleteObjectError, DeleteObjectResult, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult, + GetObjectError, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, + ObjectClient, ObjectClientResult, PutObjectError, PutObjectParams, +}; +use crate::types::ETag; + +/// A [MockClient] that rate limits overall download throughput to simulate a target network +/// performance without the jitter or service latency of targeting a real service. Note that while +/// the rate limit is shared by all downloading streams, there is no fairness, so some streams can +/// be starved. +/// +/// TODO: make it bi-directional, so that upload throughput can be simulated as well. +pub struct ThroughputMockClient { + inner: MockClient, + /// A throughput rate limiter with one token per byte + rate_limiter: LeakyBucket, +} + +impl ThroughputMockClient { + /// Create a new [ThroughputMockClient] with the given configuration and download rate limit + /// in gigabits per second. + pub fn new(config: MockClientConfig, rate_limit_gbps: f64) -> Self { + let bytes_per_sec = rate_limit_gbps * 1000000000.0 / 8.0; + let interval = Duration::from_micros(1); + let bytes_per_interval = bytes_per_sec * interval.as_secs_f64(); + let rate_limiter = LeakyBucket::builder() + .refill_interval(interval) + .refill_amount(bytes_per_interval as u32) + .max(config.part_size as u32) + .tokens(0) + .build(); + tracing::info!(?rate_limiter, "new client"); + + Self { + inner: MockClient::new(config), + rate_limiter, + } + } + + /// Add an object to this mock client's bucket + pub fn add_object(&self, key: &str, value: MockObject) { + self.inner.add_object(key, value); + } +} + +#[pin_project] +pub struct GetObjectResult { + #[pin] + inner: BoxStream<'static, ObjectClientResult>, +} + +impl Stream for GetObjectResult { + type Item = ObjectClientResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.inner.poll_next(cx) + } +} + +#[async_trait] +impl ObjectClient for ThroughputMockClient { + type GetObjectResult = GetObjectResult; + type PutObjectRequest = MockPutObjectRequest; + type ClientError = MockClientError; + + fn part_size(&self) -> Option { + self.inner.part_size() + } + + async fn delete_object( + &self, + bucket: &str, + key: &str, + ) -> ObjectClientResult { + self.inner.delete_object(bucket, key).await + } + + async fn get_object( + &self, + bucket: &str, + key: &str, + range: Option>, + if_match: Option, + ) -> ObjectClientResult { + let inner = self.inner.get_object(bucket, key, range, if_match).await?; + let rate_limiter = self.rate_limiter.clone(); + let stream = inner.then(move |p| { + let rate_limiter = rate_limiter.clone(); + async move { + let p = p?; + // Acquire enough tokens for the number of bytes we want to deliver + rate_limiter.acquire(p.1.len() as u32).await; + Ok(p) + } + }); + Ok(GetObjectResult { inner: stream.boxed() }) + } + + async fn list_objects( + &self, + bucket: &str, + continuation_token: Option<&str>, + delimiter: &str, + max_keys: usize, + prefix: &str, + ) -> ObjectClientResult { + self.inner + .list_objects(bucket, continuation_token, delimiter, max_keys, prefix) + .await + } + + async fn head_object( + &self, + bucket: &str, + key: &str, + ) -> ObjectClientResult { + self.inner.head_object(bucket, key).await + } + + async fn put_object( + &self, + bucket: &str, + key: &str, + params: &PutObjectParams, + ) -> ObjectClientResult { + self.inner.put_object(bucket, key, params).await + } + + async fn get_object_attributes( + &self, + bucket: &str, + key: &str, + max_parts: Option, + part_number_marker: Option, + object_attributes: &[ObjectAttribute], + ) -> ObjectClientResult { + self.inner + .get_object_attributes(bucket, key, max_parts, part_number_marker, object_attributes) + .await + } +} + +#[cfg(test)] +mod tests { + use std::time::Instant; + + use futures::executor::block_on; + use futures::StreamExt; + + use crate::mock_client::MockObject; + + use super::*; + + #[test] + fn ramp_throughput() { + const OBJECT_SIZE: usize = 128 * 1024 * 1024; + const ITERATIONS: usize = 1; + + for rate_gbps in [0.5, 1.0, 2.0] { + for _ in 0..ITERATIONS { + let config = MockClientConfig { + part_size: 8 * 1024 * 1024, + bucket: "test_bucket".to_owned(), + unordered_list_seed: None, + }; + let client = ThroughputMockClient::new(config, rate_gbps); + + client + .inner + .add_object("testfile", MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests())); + + // Stream the entire object and drop it on the floor + let start = Instant::now(); + let num_bytes = block_on(async move { + let mut num_bytes = 0; + let mut get = client.get_object("test_bucket", "testfile", None, None).await.unwrap(); + while let Some(part) = get.next().await { + let (_offset, part) = part.unwrap(); + num_bytes += part.len(); + } + num_bytes + }); + let actual = start.elapsed().as_secs_f64(); + + assert_eq!(num_bytes, OBJECT_SIZE, "didn't stream entire object"); + + let expected = OBJECT_SIZE as f64 / (rate_gbps / 8.0 * 1.0e9); + assert!( + actual > expected * 0.9, + "too fast: rate_gbps={rate_gbps} actual={actual}s expected={expected}s" + ); + // This one will be too flaky on slow machines to check by default + // assert!( + // actual < expected * 1.1, + // "too slow: rate_gbps={rate_gbps} actual={actual}s expected={expected}s" + // ); + } + } + } +} diff --git a/mountpoint-s3/Cargo.toml b/mountpoint-s3/Cargo.toml index 7a8fc55b3..f24ed4d2d 100644 --- a/mountpoint-s3/Cargo.toml +++ b/mountpoint-s3/Cargo.toml @@ -46,6 +46,8 @@ fs2 = "0.4.3" procfs = { version = "0.15.1", default-features = false } [dev-dependencies] +mountpoint-s3-client = { path = "../mountpoint-s3-client", features = ["mock"] } + assert_cmd = "2.0.6" assert_fs = "1.0.9" aws-config = "0.56.0"