Skip to content

Commit

Permalink
Improve Stats payload decoding, make format_params public, improv…
Browse files Browse the repository at this point in the history
…e scripts (#1465)

This commit enhances the `map_stats` function in `mapper.rs` by adding
safe decoding
for hostname, OS name, OS version, kernel version, server version, and
server semver.
The changes ensure that the function checks for sufficient bytes in the
payload
before attempting to decode each field, preventing potential errors due
to
insufficient data. Default values are used for fields that may not be
present
in older server payloads, ensuring backward compatibility.

Besdies that, method format_params is now public and performance suite
scripts
have better env variables handling.
  • Loading branch information
hubcio authored Jan 28, 2025
1 parent 4ae140b commit ea07f69
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 81 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bench/report/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy-benchmark-report"
version = "0.1.0"
version = "0.1.1"
edition = "2021"
description = "Benchmark report and chart generation library for iggy-bench binary and iggy-benchmarks-dashboard web app"
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion bench/report/src/plotting/text/subtext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl BenchmarkGroupMetrics {
}

impl BenchmarkParams {
fn format_params(&self) -> String {
pub fn format_params(&self) -> String {
let actors_info = self.format_actors_info();
let message_batches = self.message_batches as u64;
let messages_per_batch = self.messages_per_batch as u64;
Expand Down
60 changes: 44 additions & 16 deletions scripts/performance/run-standard-performance-suite.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,35 @@ source "$(dirname "$0")/utils.sh"
trap on_exit_bench SIGINT
trap on_exit_bench EXIT

# Function to get environment variables based on benchmark type
get_env_vars() {
local bench_type="$1"
local env_vars=()

# Specific env vars based on bench type
case "$bench_type" in
*"only_cache"*)
env_vars+=("IGGY_SYSTEM_CACHE_SIZE=9GB")
;;
*"no_cache"*)
env_vars+=("IGGY_SYSTEM_CACHE_ENABLED=false")
;;
*"no_wait"*)
env_vars+=("IGGY_SYSTEM_SEGMENT_SERVER_CONFIRMATION=no_wait")
;;
esac

# Convert array to env var string
local env_string=""
for var in "${env_vars[@]}"; do
env_string+="$var "
done
echo "$env_string"
}

# Build the project
echo "Building project..."
cargo build --release
RUSTFLAGS="-C target-cpu=native" cargo build --release

# Create a directory for the performance results
(mkdir -p performance_results || true) &> /dev/null
Expand All @@ -48,7 +74,6 @@ SMALL_BATCH_NO_CACHE_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 1
LARGE_BATCH_NO_CACHE_SEND_AND_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "send-and-poll" 8 1000 1000 1000 tcp "no_cache") # 8GB data, 1KB messages, 1000 msgs/batch with disabled cache
LARGE_BATCH_NO_CACHE_CG_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "consumer-group-poll" 8 1000 1000 1000 tcp "no_cache") # 8GB data, 1KB messages, 1000 msgs/batch with disabled cache


# Make an array of the suites
SUITES=(
"${LARGE_BATCH_ONLY_CACHE_SEND}"
Expand All @@ -74,38 +99,41 @@ for (( i=0; i<${#SUITES[@]} ; i+=2 )) ; do
echo "Cleaning old local_data..."
rm -rf local_data || true

# Get environment variables based on benchmark type
ENV_VARS=$(get_env_vars "$SEND_BENCH")

echo "Starting iggy-server with command:"

# Start iggy-server with appropriate configuration
if [[ "$SEND_BENCH" == *"only_cache"* ]] || [[ "$POLL_BENCH" == *"only_cache"* ]]; then
echo "Starting iggy-server with command: IGGY_SYSTEM_CACHE_SIZE=\"9GB\" target/release/iggy-server"
IGGY_SYSTEM_CACHE_SIZE="9GB" target/release/iggy-server &> /dev/null &
elif [[ "$SEND_BENCH" == *"no_cache"* ]] || [[ "$POLL_BENCH" == *"no_cache"* ]]; then
echo "Starting iggy-server with command: IGGY_SYSTEM_CACHE_ENABLED=false target/release/iggy-server"
IGGY_SYSTEM_CACHE_ENABLED=false target/release/iggy-server &> /dev/null &
elif [[ "$SEND_BENCH" == *"no_wait"* ]] || [[ "$POLL_BENCH" == *"no_wait"* ]]; then
echo "Starting iggy-server with command: IGGY_SYSTEM_SEGMENT_SERVER_CONFIRMATION=no_wait target/release/iggy-server"
IGGY_SYSTEM_SEGMENT_SERVER_CONFIRMATION=no_wait target/release/iggy-server &> /dev/null &
if [[ -n "$ENV_VARS" ]]; then
echo "$ENV_VARS target/release/iggy-server"
eval "$ENV_VARS target/release/iggy-server" &> /dev/null &
else
echo "Starting iggy-server with command: target/release/iggy-server"

echo "target/release/iggy-server"
target/release/iggy-server &> /dev/null &
fi
echo
IGGY_SERVER_PID=$!
sleep 2

# Check if the server is running
exit_if_process_is_not_running "$IGGY_SERVER_PID"

# Start send bench
echo "Running ${SEND_BENCH}"
send_results=$(eval "${SEND_BENCH}" | grep -e "Results:")
echo "Running bench:"
echo "$ENV_VARS ${SEND_BENCH}"
send_results=$(eval "$ENV_VARS ${SEND_BENCH}" | grep -e "Results:")
echo
echo "Send results:"
echo "${send_results}"
echo
sleep 1

# Start poll bench
echo "Running ${POLL_BENCH}"
poll_results=$(eval "${POLL_BENCH}" | grep -e "Results:")
echo "Running bench:"
echo "$ENV_VARS ${POLL_BENCH}"
poll_results=$(eval "$ENV_VARS ${POLL_BENCH}" | grep -e "Results:")
echo
echo "Poll results:"
echo "${poll_results}"
Expand Down
30 changes: 15 additions & 15 deletions scripts/performance/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,19 @@ function construct_bench_command() {
hostname=$(hostname)

echo "$bench_command \
$COMMON_ARGS \
--output-dir performance_results \
--identifier ${hostname} \
--remark ${remark} \
--extra-info \"\" \
--gitref \"${commit_hash}\" \
--gitref-date \"${commit_date}\" \
${type} \
${producer_arg} \
${consumer_arg} \
--streams ${streams} \
--message-size ${message_size} \
--messages-per-batch ${messages_per_batch} \
--message-batches ${message_batches} \
${protocol}"
$COMMON_ARGS \
--output-dir performance_results \
--identifier ${hostname} \
--remark ${remark} \
--extra-info \"\" \
--gitref \"${commit_hash}\" \
--gitref-date \"${commit_date}\" \
${type} \
${producer_arg} \
${consumer_arg} \
--streams ${streams} \
--message-size ${message_size} \
--messages-per-batch ${messages_per_batch} \
--message-batches ${message_batches} \
${protocol}"
}
12 changes: 9 additions & 3 deletions sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.90"
version = "0.6.91"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "Apache-2.0"
Expand Down Expand Up @@ -37,12 +37,18 @@ flume = "0.11.1"
futures = "0.3.31"
futures-util = "0.3.31"
humantime = "2.1.0"
keyring = { version = "3.6.1", optional = true, features = ["sync-secret-service", "vendored"] }
keyring = { version = "3.6.1", optional = true, features = [
"sync-secret-service",
"vendored",
] }
lazy_static = "1.5.0"
passterm = { version = "2.0.1", optional = true }
quinn = { version = "0.11.6" }
regex = "1.11.1"
reqwest = { version = "0.12.12", default-features = false, features = ["json", "rustls-tls"] }
reqwest = { version = "0.12.12", default-features = false, features = [
"json",
"rustls-tls",
] }
reqwest-middleware = { version = "0.4.0", features = ["json"] }
reqwest-retry = "0.7.0"
rustls = { version = "0.23.21", features = ["ring"] }
Expand Down
160 changes: 117 additions & 43 deletions sdk/src/binary/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,66 +128,140 @@ pub fn map_stats(payload: Bytes) -> Result<Stats, IggyError> {
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);

let mut current_position = 108;

//
// Safely decode hostname
//
if current_position + 4 > payload.len() {
return Err(IggyError::InvalidNumberEncoding);
}
let hostname_length = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) as usize;
let hostname =
from_utf8(&payload[current_position + 4..current_position + 4 + hostname_length])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
current_position += 4 + hostname_length;
current_position += 4;
if current_position + hostname_length > payload.len() {
return Err(IggyError::InvalidNumberEncoding);
}
let hostname = from_utf8(&payload[current_position..current_position + hostname_length])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
current_position += hostname_length;

//
// Safely Decode OS name
//
if current_position + 4 > payload.len() {
return Err(IggyError::InvalidNumberEncoding);
}
let os_name_length = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) as usize;
let os_name = from_utf8(&payload[current_position + 4..current_position + 4 + os_name_length])
current_position += 4;
if current_position + os_name_length > payload.len() {
return Err(IggyError::InvalidNumberEncoding);
}
let os_name = from_utf8(&payload[current_position..current_position + os_name_length])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
current_position += 4 + os_name_length;
current_position += os_name_length;

//
// Safely decode OS version
//
if current_position + 4 > payload.len() {
return Err(IggyError::InvalidNumberEncoding);
}
let os_version_length = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) as usize;
let os_version =
from_utf8(&payload[current_position + 4..current_position + 4 + os_version_length])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
current_position += 4 + os_version_length;
let kernel_version_length = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) as usize;
let kernel_version =
from_utf8(&payload[current_position + 4..current_position + 4 + kernel_version_length])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
current_position += 4 + kernel_version_length;
let iggy_version_length = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidUtf8)?,
) as usize;
let iggy_version =
from_utf8(&payload[current_position + 4..current_position + 4 + iggy_version_length])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
current_position += 4 + iggy_version_length;
let iggy_semver = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let iggy_semver = if iggy_semver == 0 {
None
current_position += 4;
if current_position + os_version_length > payload.len() {
return Err(IggyError::InvalidNumberEncoding);
}
let os_version = from_utf8(&payload[current_position..current_position + os_version_length])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
current_position += os_version_length;

//
// Safely decode kernel version (NEW) + server version (NEW) + server semver (NEW)
// We'll check if there's enough bytes before reading each new field.
//

// Default them in case payload doesn't have them (older server)
let mut kernel_version = String::new();
let mut iggy_server_version = String::new();
let mut iggy_server_semver: Option<u32> = None;

// kernel_version (if it exists)
if current_position + 4 <= payload.len() {
let kernel_version_length = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) as usize;
current_position += 4;
if current_position + kernel_version_length <= payload.len() {
let kv =
from_utf8(&payload[current_position..current_position + kernel_version_length])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
kernel_version = kv;
current_position += kernel_version_length;
} else {
// Not enough bytes for kernel version string, treat as empty or error out
// return Err(IggyError::InvalidNumberEncoding);
kernel_version = String::new(); // fallback
}
} else {
Some(iggy_semver)
};
// This means older server didn't send kernel_version, so remain empty
}

// iggy_server_version (if it exists)
if current_position + 4 <= payload.len() {
let iggy_version_length = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) as usize;
current_position += 4;
if current_position + iggy_version_length <= payload.len() {
let iv = from_utf8(&payload[current_position..current_position + iggy_version_length])
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
iggy_server_version = iv;
current_position += iggy_version_length;
} else {
// Not enough bytes for iggy version string, treat as empty or error out
// return Err(IggyError::InvalidNumberEncoding);
iggy_server_version = String::new(); // fallback
}
} else {
// older server didn't send iggy_server_version, so remain empty
}

// iggy_server_semver (if it exists)
if current_position + 4 <= payload.len() {
let semver = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
// current_position += 4; // uncomment this when adding new fields
if semver != 0 {
iggy_server_semver = Some(semver);
}
} else {
// older server didn't send semver
}

Ok(Stats {
process_id,
Expand All @@ -212,8 +286,8 @@ pub fn map_stats(payload: Bytes) -> Result<Stats, IggyError> {
os_name,
os_version,
kernel_version,
iggy_server_version: iggy_version,
iggy_server_semver: iggy_semver,
iggy_server_version,
iggy_server_semver,
})
}

Expand Down

0 comments on commit ea07f69

Please sign in to comment.