Skip to content

Commit

Permalink
fix: use chrono durations and make jitter random 0..5 seconds instead…
Browse files Browse the repository at this point in the history
… of 0..5000 ms (#350)
  • Loading branch information
Christopher Kolstad authored Nov 27, 2023
1 parent 33a511d commit 3618299
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 24 deletions.
56 changes: 33 additions & 23 deletions server/src/http/background_send_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::cmp::max;
use tracing::{error, info, trace, warn};

use super::unleash_client::UnleashClient;
use std::time::Duration;

use crate::{
error::EdgeError,
metrics::client_metrics::{size_of_batch, MetricsCache},
};
use chrono::Duration;
use lazy_static::lazy_static;
use prometheus::{register_int_gauge, register_int_gauge_vec, IntGauge, IntGaugeVec, Opts};
use rand::Rng;
Expand All @@ -30,10 +30,10 @@ lazy_static! {
pub async fn send_metrics_task(
metrics_cache: Arc<MetricsCache>,
unleash_client: Arc<UnleashClient>,
send_interval: u64,
send_interval: i64,
) {
let mut failures = 0;
let mut interval = Duration::from_secs(send_interval);
let mut interval = Duration::seconds(send_interval);
loop {
let batches = metrics_cache.get_appropriately_sized_batches();
trace!("Posting {} batches", batches.len());
Expand All @@ -55,20 +55,20 @@ pub async fn send_metrics_task(
}
StatusCode::NOT_FOUND => {
failures = 10;
interval = new_interval(send_interval, failures, 5);
error!("Upstream said we are trying to post to an endpoint that doesn't exist. backing off to {} seconds", interval.as_secs());
interval = new_interval(interval, failures, 5);
error!("Upstream said we are trying to post to an endpoint that doesn't exist. backing off to {} seconds", interval.num_seconds());
}
StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => {
failures = 10;
interval = new_interval(send_interval, failures, 5);
error!("Upstream said we were not allowed to post metrics, backing off to {} seconds", interval.as_secs());
interval = new_interval(interval, failures, 5);
error!("Upstream said we were not allowed to post metrics, backing off to {} seconds", interval.num_seconds());
}
StatusCode::TOO_MANY_REQUESTS => {
failures = max(10, failures + 1);
interval = new_interval(send_interval, failures, 5);
interval = new_interval(interval, failures, 5);
info!(
"Upstream said it was too busy, backing off to {} seconds",
interval.as_secs()
interval.num_seconds()
);
metrics_cache.reinsert_batch(batch);
}
Expand All @@ -77,8 +77,8 @@ pub async fn send_metrics_task(
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => {
failures = max(10, failures + 1);
interval = new_interval(send_interval, failures, 5);
info!("Upstream said it is struggling. It returned Http status {}. Backing off to {} seconds", status_code, interval.as_secs());
interval = new_interval(interval, failures, 5);
info!("Upstream said it is struggling. It returned Http status {}. Backing off to {} seconds", status_code, interval.num_seconds());
metrics_cache.reinsert_batch(batch);
}
_ => {
Expand All @@ -94,27 +94,37 @@ pub async fn send_metrics_task(
}
} else {
failures = max(0, failures - 1);
interval = new_interval(send_interval, failures, 5);
interval = new_interval(interval, failures, 5);
}
}
}
trace!(
"Done posting traces. Sleeping for {} seconds and then going again",
interval.as_secs()
interval.num_seconds()
);
tokio::time::sleep(interval).await;
tokio::time::sleep(std::time::Duration::from_secs(interval.num_seconds() as u64)).await;
}
}

fn new_interval(send_interval: u64, failures: u64, max_jitter_seconds: u64) -> Duration {
let initial = Duration::from_secs(send_interval);
let added_interval_from_failure = Duration::from_secs(send_interval * failures);
let jitter = random_jitter_milliseconds(max_jitter_seconds);
initial + added_interval_from_failure + jitter
fn new_interval(send_interval: Duration, failures: i32, max_jitter_seconds: u8) -> Duration {
let added_interval_from_failure = send_interval * failures;
let jitter = random_jitter_seconds(max_jitter_seconds);
send_interval + added_interval_from_failure + jitter
}

fn random_jitter_milliseconds(max_jitter_seconds: u64) -> Duration {
let mut rng = rand::thread_rng();
let jitter = rng.gen_range(0..(max_jitter_seconds * 1000));
Duration::from_millis(jitter)
fn random_jitter_seconds(max_jitter_seconds: u8) -> Duration {
let jitter = rand::thread_rng().gen_range(0..max_jitter_seconds);
Duration::seconds(jitter as i64)
}

#[cfg(test)]
mod tests {
use crate::http::background_send_metrics::new_interval;
use chrono::Duration;

#[test]
pub fn new_interval_does_not_overflow() {
let metrics = new_interval(Duration::seconds(300), 10, 5);
assert!(metrics.num_seconds() < 3305);
}
}
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async fn main() -> Result<(), anyhow::Error> {
_ = refresher.start_refresh_features_background_task() => {
tracing::info!("Feature refresher unexpectedly shut down");
}
_ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.unleash_client.clone(), edge.metrics_interval_seconds) => {
_ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.unleash_client.clone(), edge.metrics_interval_seconds.try_into().unwrap()) => {
tracing::info!("Metrics poster unexpectedly shut down");
}
_ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone(), refresher.tokens_to_refresh.clone()) => {
Expand Down

0 comments on commit 3618299

Please sign in to comment.