Skip to content

Commit

Permalink
Add a new mock client that simulates GET throughput (#723)
Browse files Browse the repository at this point in the history
* Add a new mock client that simulates GET throughput

For performance testing and microbenchmarking, we'd like to be able to
separate the S3 service and the CRT datapath from our own client and
file system. This mock client can simulate a target network throughput
by rate-limiting the `get_object` stream. The goal is to be able to use
this client in place of a regular `S3CrtClient` when we want to isolate
performance questions.

Along the way, I realized we're including the mock client in our release
builds, because it's an always-on feature of the client crate. This
change therefore also does a little dependency refactoring to remove the
mock and failure clients from the non-test dependency closure. I checked
this works by seeing that the release binary is a few MBs smaller, and
that `strings mount-s3 | grep mock` no longer includes the mock client's
symbols.

Signed-off-by: James Bornholt <[email protected]>

* Adopt mock client as an option in client benchmark

Signed-off-by: James Bornholt <[email protected]>

* Changelog

Signed-off-by: James Bornholt <[email protected]>

* Dependency fixes

Signed-off-by: James Bornholt <[email protected]>

---------

Signed-off-by: James Bornholt <[email protected]>
  • Loading branch information
jamesbornholt authored Feb 2, 2024
1 parent e4bdd1c commit 6dc1351
Show file tree
Hide file tree
Showing 9 changed files with 641 additions and 57 deletions.
90 changes: 87 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions mountpoint-s3-client/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## Unreleased

### Breaking changes

* The `mock_client` module is no longer enabled by default, and is now available by enabling the `mock` feature for this crate. ([#723](https://github.com/awslabs/mountpoint-s3/pull/723))

### Other changes

* Introduced a new `ThroughputMockClient` that simulates a target network throughput from an in-memory mock S3 client. This client requires the `mock` feature flag. ([#723](https://github.com/awslabs/mountpoint-s3/pull/723))

## v0.6.2 (January 18, 2024)

### Breaking changes
Expand Down
17 changes: 14 additions & 3 deletions mountpoint-s3-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ once_cell = "1.16.0"
percent-encoding = "2.2.0"
pin-project = "1.0.12"
platform-info = "2.0.2"
rand = "0.8.5"
rand_chacha = "0.3.1"
regex = "1.7.1"
static_assertions = "1.1.0"
thiserror = "1.0.34"
Expand All @@ -34,6 +32,12 @@ tracing = { version = "0.1.35", default-features = false, features = ["std", "lo
xmltree = "0.10.3"
serde_json = "1.0.104"

# Dependencies for the mock client only
async-io = { version = "2.3.1", optional = true }
async-lock = { version = "2.6.0", optional = true }
rand = { version = "0.8.5", optional = true }
rand_chacha = { version = "0.3.1", optional = true }

[dev-dependencies]
anyhow = { version = "1.0.64", features = ["backtrace"] }
aws-config = "0.56.0"
Expand All @@ -42,7 +46,7 @@ aws-sdk-s3 = "0.29.0"
aws-sdk-sts = "0.29.0"
aws-smithy-runtime-api = "0.56.1"
bytes = "1.2.1"
clap = "4.1.9"
clap = { version = "4.1.9", features = ["derive"] }
ctor = "0.1.23"
proptest = "1.0.0"
rusty-fork = "0.3.0"
Expand All @@ -51,13 +55,20 @@ test-case = "2.2.2"
tokio = { version = "1.24.2", features = ["rt", "macros"] }
tracing-subscriber = { version = "0.3.14", features = ["fmt", "env-filter"] }

# HACK: we want our own tests to use the mock client, but don't want to enable it for consumers by
# default, so we take a dev-dependency on ourself with that feature enabled.
# https://github.com/rust-lang/cargo/issues/2911#issuecomment-749580481
mountpoint-s3-client = { path = ".", features = ["mock"] }

[build-dependencies]
built = { version = "0.6.0", features = ["git2"] }

[lib]
doctest = false

[features]
mock = ["dep:async-io", "dep:async-lock", "dep:rand", "dep:rand_chacha"]
# Test features
s3_tests = []
fips_tests = []
s3express_tests = []
Expand Down
123 changes: 72 additions & 51 deletions mountpoint-s3-client/examples/client_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::pin::pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;

use clap::{Arg, Command};
use clap::{Parser, Subcommand};
use futures::StreamExt;
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::mock_client::throughput_client::ThroughputMockClient;
use mountpoint_s3_client::mock_client::{MockClientConfig, MockObject};
use mountpoint_s3_client::types::ETag;
use mountpoint_s3_client::{ObjectClient, S3CrtClient};
use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter;
use tracing_subscriber::fmt::Subscriber;
Expand All @@ -23,63 +27,18 @@ fn init_tracing_subscriber() {
subscriber.try_init().expect("unable to install global subscriber");
}

fn main() {
init_tracing_subscriber();

let matches = Command::new("benchmark")
.about("Download a single key from S3 and ignore its contents")
.arg(Arg::new("bucket").required(true))
.arg(Arg::new("key").required(true))
.arg(
Arg::new("throughput-target-gbps")
.long("throughput-target-gbps")
.help("Desired throughput in Gbps"),
)
.arg(
Arg::new("part-size")
.long("part-size")
.help("Part size for multi-part GET and PUT"),
)
.arg(
Arg::new("iterations")
.long("iterations")
.help("Number of times to download"),
)
.arg(Arg::new("region").long("region").default_value("us-east-1"))
.get_matches();

let bucket = matches.get_one::<String>("bucket").unwrap();
let key = matches.get_one::<String>("key").unwrap();
let throughput_target_gbps = matches
.get_one::<String>("throughput-target-gbps")
.map(|s| s.parse::<f64>().expect("throughput target must be an f64"));
let part_size = matches
.get_one::<String>("part-size")
.map(|s| s.parse::<usize>().expect("part size must be a usize"));
let iterations = matches
.get_one::<String>("iterations")
.map(|s| s.parse::<usize>().expect("iterations must be a number"));
let region = matches.get_one::<String>("region").unwrap();

let mut config = S3ClientConfig::new().endpoint_config(EndpointConfig::new(region));
if let Some(throughput_target_gbps) = throughput_target_gbps {
config = config.throughput_target_gbps(throughput_target_gbps);
}
if let Some(part_size) = part_size {
config = config.part_size(part_size);
}
let client = S3CrtClient::new(config).expect("couldn't create client");

for i in 0..iterations.unwrap_or(1) {
fn run_benchmark(client: impl ObjectClient + Clone, num_iterations: usize, bucket: &str, key: &str) {
for i in 0..num_iterations {
let received_size = Arc::new(AtomicU64::new(0));
let start = Instant::now();
let client = client.clone();
let received_size_clone = Arc::clone(&received_size);
let start = Instant::now();
futures::executor::block_on(async move {
let mut request = client
.get_object(bucket, key, None, None)
.await
.expect("couldn't create get request");
let mut request = pin!(request);
loop {
match StreamExt::next(&mut request).await {
Some(Ok((_offset, body))) => {
Expand All @@ -95,7 +54,6 @@ fn main() {
});

let elapsed = start.elapsed();

let received_size = received_size.load(Ordering::SeqCst);
println!(
"{}: received {} bytes in {:.2}s: {:.2}MiB/s",
Expand All @@ -106,3 +64,66 @@ fn main() {
);
}
}

#[derive(Subcommand)]
enum Client {
#[command(about = "Download a key from S3")]
Real {
#[arg(help = "Bucket name")]
bucket: String,
#[arg(help = "Key name")]
key: String,
#[arg(long, help = "AWS region", default_value = "us-east-1")]
region: String,
},
#[command(about = "Download a key from a mock S3 server")]
Mock {
#[arg(help = "Mock object size")]
object_size: u64,
},
}

#[derive(Parser)]
struct CliArgs {
#[command(subcommand)]
client: Client,
#[arg(long, help = "Desired throughput in Gbps", default_value = "10.0")]
throughput_target_gbps: f64,
#[arg(long, help = "Part size for multi-part GET", default_value = "8388608")]
part_size: usize,
#[arg(long, help = "Number of benchmark iterations", default_value = "1")]
iterations: usize,
}

fn main() {
init_tracing_subscriber();

let args = CliArgs::parse();

match args.client {
Client::Real { bucket, key, region } => {
let mut config = S3ClientConfig::new().endpoint_config(EndpointConfig::new(&region));
config = config.throughput_target_gbps(args.throughput_target_gbps);
config = config.part_size(args.part_size);
let client = S3CrtClient::new(config).expect("couldn't create client");

run_benchmark(client, args.iterations, &bucket, &key);
}
Client::Mock { object_size } => {
const BUCKET: &str = "bucket";
const KEY: &str = "key";

let config = MockClientConfig {
bucket: BUCKET.to_owned(),
part_size: args.part_size,
unordered_list_seed: None,
};
let client = ThroughputMockClient::new(config, args.throughput_target_gbps);
let client = Arc::new(client);

client.add_object(KEY, MockObject::ramp(0xaa, object_size as usize, ETag::for_tests()));

run_benchmark(client, args.iterations, BUCKET, "key");
}
}
}
2 changes: 2 additions & 0 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! An [`ObjectClient`] that can inject failures into requests for testing purposes.
#![cfg(feature = "mock")]

use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Range;
Expand Down
Loading

0 comments on commit 6dc1351

Please sign in to comment.