Skip to content

Commit

Permalink
Add polling kind argument to consumer group poll, add default identif…
Browse files Browse the repository at this point in the history
…ier (#1472)

This commit introduces a new polling kind argument to the consumer
group poll benchmark. Additionally, the default identifier is now
set to the hostname if not provided, ensuring consistent identification
across benchmarks.
  • Loading branch information
hubcio authored Jan 30, 2025
1 parent a0dc3a1 commit 7a2631f
Show file tree
Hide file tree
Showing 20 changed files with 590 additions and 455 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ chrono = "0.4.31"
clap = { version = "4.5.26", features = ["derive"] }
figlet-rs = "0.1.5"
futures = "0.3.31"
hostname = "0.4.0"
iggy = { path = "../sdk" }
iggy-benchmark-report = { path = "report" }
integration = { path = "../integration" }
Expand Down
19 changes: 16 additions & 3 deletions bench/src/analytics/report_builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::metrics::group::{from_individual_metrics, from_producers_and_consumers_statistics};
use crate::utils::server_version::get_server_version;
use chrono::{DateTime, Utc};
use iggy::utils::timestamp::IggyTimestamp;
use iggy_benchmark_report::{
Expand All @@ -10,10 +11,9 @@ use iggy_benchmark_report::{
pub struct BenchmarkReportBuilder;

impl BenchmarkReportBuilder {
pub fn build(
server_version: String,
pub async fn build(
hardware: BenchmarkHardware,
params: BenchmarkParams,
mut params: BenchmarkParams,
mut individual_metrics: Vec<BenchmarkIndividualMetrics>,
moving_average_window: u32,
) -> BenchmarkReport {
Expand All @@ -24,6 +24,19 @@ impl BenchmarkReportBuilder {
.map(|dt| dt.to_rfc3339())
.unwrap_or_else(|| String::from("unknown"));

let server_version = match get_server_version(&params).await {
Ok(v) => v,
Err(_) => "unknown".to_string(),
};

if params.gitref.is_none() {
params.gitref = Some(server_version.clone());
};

if params.gitref_date.is_none() {
params.gitref_date = Some(timestamp.clone());
}

let mut group_metrics = Vec::new();

// Sort metrics by actor type and ID
Expand Down
17 changes: 13 additions & 4 deletions bench/src/args/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use super::props::{BenchmarkKindProps, BenchmarkTransportProps};
use super::{defaults::*, transport::BenchmarkTransportCommand};
use clap::error::ErrorKind;
use clap::{CommandFactory, Parser};
use iggy::messages::poll_messages::PollingKind;
use iggy::utils::duration::IggyDuration;
use iggy_benchmark_report::benchmark_kind::BenchmarkKind;
use iggy_benchmark_report::params::BenchmarkParams;
Expand Down Expand Up @@ -36,7 +37,7 @@ pub struct IggyBenchArgs {
#[arg(long, short='v', default_value_t = DEFAULT_SERVER_STDOUT_VISIBILITY)]
pub verbose: bool,

/// Warmup time
/// Warmup time in human readable format, e.g. "1s", "2m", "3h"
#[arg(long, short = 'w', default_value_t = IggyDuration::from_str(DEFAULT_WARMUP_TIME).unwrap())]
pub warmup_time: IggyDuration,

Expand All @@ -53,11 +54,11 @@ pub struct IggyBenchArgs {
pub skip_server_start: bool,

/// Output directory path for storing benchmark results
#[arg(long)]
#[arg(long, short = 'o')]
pub output_dir: Option<String>,

/// Identifier for the benchmark run (e.g., machine name)
#[arg(long)]
/// Identifier for the benchmark run (defaults to hostname if not provided)
#[arg(long, value_parser = get_identifier_or_hostname)]
pub identifier: Option<String>,

/// Additional remark for the benchmark (e.g., no-cache)
Expand Down Expand Up @@ -85,6 +86,10 @@ fn validate_server_executable_path(v: &str) -> Result<String, String> {
}
}

fn get_identifier_or_hostname(v: &str) -> Result<String, String> {
Ok(v.to_owned())
}

impl IggyBenchArgs {
pub fn transport_command(&self) -> &BenchmarkTransportCommand {
self.benchmark_kind.transport_command()
Expand Down Expand Up @@ -177,6 +182,10 @@ impl IggyBenchArgs {
.disable_parallel_consumer_streams()
}

pub fn polling_kind(&self) -> PollingKind {
self.benchmark_kind.inner().polling_kind()
}

pub fn number_of_consumer_groups(&self) -> u32 {
self.benchmark_kind.inner().number_of_consumer_groups()
}
Expand Down
5 changes: 4 additions & 1 deletion bench/src/args/defaults.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use iggy::messages::poll_messages::PollingKind;
use nonzero_lit::u32;
use std::num::NonZeroU32;

Expand Down Expand Up @@ -28,11 +29,13 @@ pub const DEFAULT_NUMBER_OF_CONSUMERS: NonZeroU32 = u32!(10);
pub const DEFAULT_NUMBER_OF_CONSUMER_GROUPS: NonZeroU32 = u32!(1);
pub const DEFAULT_NUMBER_OF_PRODUCERS: NonZeroU32 = u32!(10);

pub const DEFAULT_POLLING_KIND_CG_POLL: PollingKind = PollingKind::Next;

pub const DEFAULT_PERFORM_CLEANUP: bool = false;
pub const DEFAULT_SERVER_SYSTEM_PATH: &str = "local_data";
pub const DEFAULT_SERVER_STDOUT_VISIBILITY: bool = false;

pub const DEFAULT_WARMUP_TIME: &str = "1 s";
pub const DEFAULT_WARMUP_TIME: &str = "0 s";
pub const DEFAULT_SKIP_SERVER_START: bool = false;

pub const DEFAULT_SAMPLING_TIME: &str = "10 ms";
Expand Down
Loading

0 comments on commit 7a2631f

Please sign in to comment.