Skip to content

Commit

Permalink
Add rate limiting and browser chart opening features (#1468)
Browse files Browse the repository at this point in the history
This commit introduces a rate limiting feature for both producers and
consumers, allowing users to specify a rate limit in bytes per second.
The rate limiter is implemented using a linger-based algorithm to ensure
smooth throughput control.

Additionally, a new command-line argument is added to open generated
charts in the browser automatically after the benchmark is completed.

The changes also include updates to the latency calculation logic,
switching from average latency to worst latency per bucket for more
accurate performance insights.
  • Loading branch information
hubcio authored Jan 30, 2025
1 parent 7a2631f commit 69551cc
Show file tree
Hide file tree
Showing 14 changed files with 178 additions and 8 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ license = "Apache-2.0"

[dependencies]
async-trait = "0.1.85"
atomic-time = "0.1.5"
charming = "0.4.0"
chrono = "0.4.31"
clap = { version = "4.5.26", features = ["derive"] }
Expand Down
9 changes: 9 additions & 0 deletions bench/src/actors/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::analytics::metrics::individual::from_records;
use crate::analytics::record::BenchmarkRecord;
use crate::rate_limiter::RateLimiter;
use iggy::client::{ConsumerGroupClient, MessageClient};
use iggy::clients::client::IggyClient;
use iggy::consumer::Consumer as IggyConsumer;
Expand Down Expand Up @@ -28,6 +29,7 @@ pub struct Consumer {
sampling_time: IggyDuration,
moving_average_window: u32,
polling_kind: PollingKind,
rate_limiter: Option<RateLimiter>,
}

impl Consumer {
Expand All @@ -43,6 +45,7 @@ impl Consumer {
sampling_time: IggyDuration,
moving_average_window: u32,
polling_kind: PollingKind,
rate_limiter: Option<RateLimiter>,
) -> Self {
Self {
client_factory,
Expand All @@ -55,6 +58,7 @@ impl Consumer {
sampling_time,
moving_average_window,
polling_kind,
rate_limiter,
}
}

Expand Down Expand Up @@ -170,6 +174,11 @@ impl Consumer {
self.polling_kind
),
};
// Apply rate limiting if configured
if let Some(limiter) = &self.rate_limiter {
limiter.throttle(batch_size_total_bytes).await;
}

let before_poll = Instant::now();
let polled_messages = client
.poll_messages(
Expand Down
8 changes: 8 additions & 0 deletions bench/src/actors/producer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::analytics::metrics::individual::from_records;
use crate::analytics::record::BenchmarkRecord;
use crate::rate_limiter::RateLimiter;
use iggy::client::MessageClient;
use iggy::clients::client::IggyClient;
use iggy::error::IggyError;
Expand Down Expand Up @@ -28,6 +29,7 @@ pub struct Producer {
warmup_time: IggyDuration,
sampling_time: IggyDuration,
moving_average_window: u32,
rate_limiter: Option<RateLimiter>,
}

impl Producer {
Expand All @@ -43,6 +45,7 @@ impl Producer {
warmup_time: IggyDuration,
sampling_time: IggyDuration,
moving_average_window: u32,
rate_limiter: Option<RateLimiter>,
) -> Self {
Producer {
client_factory,
Expand All @@ -55,6 +58,7 @@ impl Producer {
warmup_time,
sampling_time,
moving_average_window,
rate_limiter,
}
}

Expand Down Expand Up @@ -117,6 +121,10 @@ impl Producer {
let mut latencies: Vec<Duration> = Vec::with_capacity(message_batches as usize);
let mut records = Vec::with_capacity(message_batches as usize);
for i in 1..=message_batches {
// Apply rate limiting if configured
if let Some(limiter) = &self.rate_limiter {
limiter.throttle(batch_total_bytes).await;
}
let before_send = Instant::now();
client
.send_messages(&stream_id, &topic_id, &partitioning, &mut messages)
Expand Down
1 change: 1 addition & 0 deletions bench/src/analytics/time_series/calculators/latency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tracing::warn;
pub struct LatencyTimeSeriesCalculator;

impl TimeSeriesCalculation for LatencyTimeSeriesCalculator {
// This implementation is using delta latency and average latencies per bucket
fn calculate(&self, records: &[BenchmarkRecord], bucket_size: IggyDuration) -> TimeSeries {
if records.len() < 2 {
warn!("Not enough records to calculate latency");
Expand Down
18 changes: 18 additions & 0 deletions bench/src/args/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::{defaults::*, transport::BenchmarkTransportCommand};
use clap::error::ErrorKind;
use clap::{CommandFactory, Parser};
use iggy::messages::poll_messages::PollingKind;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::duration::IggyDuration;
use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
use iggy_benchmark_report::params::BenchmarkParams;
Expand Down Expand Up @@ -49,6 +50,11 @@ pub struct IggyBenchArgs {
#[arg(long, default_value_t = DEFAULT_MOVING_AVERAGE_WINDOW)]
pub moving_average_window: u32,

/// Optional rate limit per individual producer/consumer in bytes per second (not aggregate).
/// Accepts human-readable formats like "50KB", "10MB", or "1GB"
#[arg(long)]
pub rate_limit: Option<IggyByteSize>,

/// Skip server start
#[arg(long, short = 'k', default_value_t = DEFAULT_SKIP_SERVER_START)]
pub skip_server_start: bool,
Expand Down Expand Up @@ -76,6 +82,10 @@ pub struct IggyBenchArgs {
/// Git reference date used for note in the benchmark results, preferably merge date of the commit
#[arg(long)]
pub gitref_date: Option<String>,

/// Open generated charts in browser after benchmark is finished
#[arg(long, default_value_t = false)]
pub open_charts: bool,
}

fn validate_server_executable_path(v: &str) -> Result<String, String> {
Expand Down Expand Up @@ -202,6 +212,10 @@ impl IggyBenchArgs {
self.moving_average_window
}

pub fn rate_limit(&self) -> Option<IggyByteSize> {
self.rate_limit
}

pub fn output_dir(&self) -> Option<String> {
self.output_dir.clone()
}
Expand Down Expand Up @@ -423,6 +437,10 @@ fn recreate_bench_command(args: &IggyBenchArgs) -> String {
parts.push(format!("--consumer-groups {}", consumer_groups));
}

if let Some(rate_limit) = args.rate_limit() {
parts.push(format!("--rate-limit {}", rate_limit));
}

parts.join(" ")
}

Expand Down
4 changes: 4 additions & 0 deletions bench/src/benchmarks/consumer_group_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
actors::consumer::Consumer,
args::common::IggyBenchArgs,
benchmarks::{CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX},
rate_limiter::RateLimiter,
};
use async_trait::async_trait;
use iggy::{client::ConsumerGroupClient, clients::client::IggyClient, error::IggyError};
Expand Down Expand Up @@ -97,6 +98,9 @@ impl Benchmarkable for ConsumerGroupBenchmark {
self.args.sampling_time(),
self.args.moving_average_window(),
self.args.polling_kind(),
self.args
.rate_limit()
.map(|rl| RateLimiter::new(rl.as_bytes_u64())),
);
let future = Box::pin(async move { consumer.run().await });
futures.as_mut().unwrap().push(future);
Expand Down
4 changes: 4 additions & 0 deletions bench/src/benchmarks/poll_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::benchmark::{BenchmarkFutures, Benchmarkable};
use crate::actors::consumer::Consumer;
use crate::args::common::IggyBenchArgs;
use crate::rate_limiter::RateLimiter;
use async_trait::async_trait;
use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
use integration::test_server::ClientFactory;
Expand Down Expand Up @@ -56,6 +57,9 @@ impl Benchmarkable for PollMessagesBenchmark {
args.sampling_time(),
args.moving_average_window(),
self.args.polling_kind(),
self.args
.rate_limit()
.map(|rl| RateLimiter::new(rl.as_bytes_u64())),
);

let future = Box::pin(async move { consumer.run().await });
Expand Down
7 changes: 7 additions & 0 deletions bench/src/benchmarks/send_and_poll_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::benchmark::{BenchmarkFutures, Benchmarkable};
use crate::actors::consumer::Consumer;
use crate::actors::producer::Producer;
use crate::args::common::IggyBenchArgs;
use crate::rate_limiter::RateLimiter;
use async_trait::async_trait;
use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
use integration::test_server::ClientFactory;
Expand Down Expand Up @@ -55,6 +56,9 @@ impl Benchmarkable for SendAndPollMessagesBenchmark {
warmup_time,
self.args.sampling_time(),
self.args.moving_average_window(),
self.args
.rate_limit()
.map(|rl| RateLimiter::new(rl.as_bytes_u64())),
);
let future = Box::pin(async move { producer.run().await });
futures.as_mut().unwrap().push(future);
Expand All @@ -76,6 +80,9 @@ impl Benchmarkable for SendAndPollMessagesBenchmark {
self.args.sampling_time(),
self.args.moving_average_window(),
self.args.polling_kind(),
self.args
.rate_limit()
.map(|rl| RateLimiter::new(rl.as_bytes_u64())),
);
let future = Box::pin(async move { consumer.run().await });
futures.as_mut().unwrap().push(future);
Expand Down
3 changes: 3 additions & 0 deletions bench/src/benchmarks/send_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::benchmark::{BenchmarkFutures, Benchmarkable};
use crate::actors::producer::Producer;
use crate::args::common::IggyBenchArgs;
use crate::rate_limiter::RateLimiter;
use async_trait::async_trait;
use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
use integration::test_server::ClientFactory;
Expand Down Expand Up @@ -60,6 +61,8 @@ impl Benchmarkable for SendMessagesBenchmark {
warmup_time,
args.sampling_time(),
args.moving_average_window(),
args.rate_limit()
.map(|rl| RateLimiter::new(rl.as_bytes_u64())),
);
let future = Box::pin(async move { producer.run().await });
futures.as_mut().unwrap().push(future);
Expand Down
1 change: 1 addition & 0 deletions bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod analytics;
mod args;
mod benchmarks;
mod plot;
mod rate_limiter;
mod runner;
mod utils;

Expand Down
41 changes: 35 additions & 6 deletions bench/src/plot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use charming::{Chart, HtmlRenderer};
use iggy::utils::byte_size::IggyByteSize;
use iggy_benchmark_report::report::BenchmarkReport;
use std::path::Path;
use std::process::Command;
use std::time::Instant;
use tracing::info;

Expand Down Expand Up @@ -43,28 +44,56 @@ impl ChartType {
}
}

fn open_in_browser(path: &str) -> std::io::Result<()> {
#[cfg(target_os = "linux")]
{
Command::new("xdg-open").arg(path).spawn().map(|_| ())
}

#[cfg(target_os = "macos")]
{
Command::new("open").arg(path).spawn().map(|_| ())
}

#[cfg(target_os = "windows")]
{
Command::new("cmd")
.args(["/C", "start", path])
.spawn()
.map(|_| ())
}
}

pub fn plot_chart(
report: &BenchmarkReport,
output_directory: &str,
chart_type: ChartType,
should_open_in_browser: bool,
) -> std::io::Result<()> {
let data_processing_start = Instant::now();
let chart = (chart_type.create_chart())(report, true); // Use dark theme by default
let data_processing_time = data_processing_start.elapsed();

let chart_render_start = Instant::now();
let chart_path = format!("{}/{}.html", output_directory, chart_type.name());
save_chart(&chart, chart_type.name(), output_directory, 1600, 1200)?;
let chart_render_time = chart_render_start.elapsed();
let file_name = chart_type.name();
save_chart(&chart, file_name, output_directory, 1600, 1200)?;

if should_open_in_browser {
let chart_path = format!("{}/{}.html", output_directory, file_name);
open_in_browser(&chart_path)?;
}

let total_samples = chart_type.get_samples(report);
let report_path = format!("{}/report.json", output_directory);
let report_size = IggyByteSize::from(std::fs::metadata(&report_path)?.len());

let chart_render_time = chart_render_start.elapsed();

info!(
"Generated {} plot at: {} ({} samples, report.json size: {}, data processing: {:.2?}, chart render: {:.2?})",
chart_type.name(),
chart_path,
"Generated {} plot at: {}/{}.html ({} samples, report.json size: {}, data processing: {:.2?}, chart render: {:.2?})",
file_name,
output_directory,
file_name,
total_samples,
report_size,
data_processing_time,
Expand Down
Loading

0 comments on commit 69551cc

Please sign in to comment.