Skip to content

Commit

Permalink
Merge pull request #56 from dfinity/igornovg/misc-v1
Browse files Browse the repository at this point in the history
chore: simplify periodic tasks, update deps
  • Loading branch information
blind-oracle authored Dec 11, 2024
2 parents c889559 + 8630b78 commit 398a55b
Show file tree
Hide file tree
Showing 12 changed files with 358 additions and 492 deletions.
441 changes: 204 additions & 237 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ axum = "0.7.9"
axum-extra = "0.9.6"
backoff = { version = "0.4.0", features = ["tokio"] }
base64 = "0.22.1"
bytes = "1.8.0"
bytes = "1.9.0"
candid = "0.10.10"
chrono = "0.4.38"
clap = { version = "4.5.20", features = ["derive", "string", "env"] }
Expand All @@ -39,7 +39,7 @@ hickory-resolver = { version = "0.24.1", features = [
"dnssec-ring",
] }
hostname = "0.4.0"
http = "1.1.0"
http = "1.2.0"
http-body = "1.0.1"
http-body-util = "0.1.2"
humantime = "2.1.0"
Expand All @@ -48,7 +48,7 @@ ic-agent = { version = "0.39.1", features = [
"ring",
"_internal_dynamic-routing",
] }
ic-bn-lib = { git = "https://github.com/dfinity/ic-bn-lib", rev = "da94b46792f6e82cd106a15d920d449d7d9edf92" }
ic-bn-lib = { git = "https://github.com/dfinity/ic-bn-lib", rev = "1e733b13ad9458a792012e7824d458570569cb6e" }
ic-http-gateway = { git = "https://github.com/dfinity/http-gateway", tag = "0.1.0-b3" }
itertools = "0.13.0"
lazy_static = "1.5.0"
Expand Down Expand Up @@ -79,15 +79,15 @@ thiserror = "2.0.3"
tikv-jemallocator = "0.6.0"
tikv-jemalloc-ctl = { version = "0.6.0", features = ["stats"] }
time = { version = "0.3.36", features = ["macros", "serde"] }
tokio = { version = "1.41.0", features = ["full", "tracing"] }
tokio = { version = "1.42.0", features = ["full", "tracing"] }
tokio-util = { version = "0.7.12", features = ["full"] }
tower = { version = "0.5.1", features = ["limit"] }
tower_governor = "0.4.3"
tower-http = { version = "0.6.1", features = ["cors", "compression-full"] }
tower-service = "0.3.3"
tracing = "0.1.40"
tracing-core = "0.1.32"
tracing-serde = "0.1.3"
tracing-serde = "0.2.0"
tracing-subscriber = { version = "0.3.18", features = [
"env-filter",
"fmt",
Expand All @@ -108,7 +108,7 @@ hex-literal = "0.4.1"
hyper = "1.5.0"
criterion = { version = "0.5.1", features = ["async_tokio"] }
httptest = "0.16.1"
tempfile = "3.13.0"
tempfile = "3.14.0"

[profile.release]
strip = "symbols"
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub const HTTP_RESPONSE_SIZE_BUCKETS: &[f64] = &[1.0 * KB, 8.0 * KB, 64.0 * KB,
pub fn setup(registry: &Registry, tasks: &mut TaskManager) -> Router {
let cache = Arc::new(runner::MetricsCache::new());
let runner = Arc::new(runner::MetricsRunner::new(cache.clone(), registry));
tasks.add("metrics_runner", runner);
tasks.add_interval("metrics_runner", runner, Duration::from_secs(5));

Router::new()
.route("/metrics", get(runner::handler))
Expand Down
37 changes: 9 additions & 28 deletions src/metrics/runner.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
sync::Arc,
time::{Duration, Instant},
};
use std::{sync::Arc, time::Instant};

use anyhow::Error;
use arc_swap::ArcSwap;
Expand All @@ -11,7 +8,6 @@ use http::header::CONTENT_TYPE;
use ic_bn_lib::tasks::Run;
use prometheus::{register_int_gauge_with_registry, Encoder, IntGauge, Registry, TextEncoder};
use tikv_jemalloc_ctl::{epoch, stats};
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};

Expand Down Expand Up @@ -94,30 +90,15 @@ impl MetricsRunner {

#[async_trait]
impl Run for MetricsRunner {
async fn run(&self, token: CancellationToken) -> Result<(), Error> {
let mut interval = tokio::time::interval(Duration::from_secs(5));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

warn!("MetricsRunner: started");
loop {
select! {
biased;

() = token.cancelled() => {
warn!("MetricsRunner: exited");
return Ok(());
}

_ = interval.tick() => {
let start = Instant::now();
if let Err(e) = self.update().await {
warn!("Unable to update metrics: {e:#}");
} else {
debug!("Metrics updated in {}ms", start.elapsed().as_millis());
}
}
}
async fn run(&self, _: CancellationToken) -> Result<(), Error> {
let start = Instant::now();
if let Err(e) = self.update().await {
warn!("Unable to update metrics: {e:#}");
} else {
debug!("Metrics updated in {}ms", start.elapsed().as_millis());
}

Ok(())
}
}

Expand Down
89 changes: 4 additions & 85 deletions src/policy/denylist.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
use std::{fs, path::PathBuf, sync::Arc, time::Duration};
use std::{fs, path::PathBuf, sync::Arc};

use ahash::{AHashMap, AHashSet};
use anyhow::{anyhow, Context, Error};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use candid::Principal;
use ic_bn_lib::{http::Client, tasks::Run};
use prometheus::{register_int_counter_vec_with_registry, IntCounterVec, Registry};
use ic_bn_lib::http::Client;
use serde::Deserialize;
use serde_json as json;
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use tracing::warn;
use url::Url;

use super::load_principal_list;
Expand All @@ -22,27 +18,19 @@ pub struct Denylist {
http_client: Arc<dyn Client>,
inner: ArcSwapOption<AHashMap<Principal, Vec<String>>>,
allowlist: AHashSet<Principal>,
update_interval: Duration,
metrics: MetricParams,
}

impl Denylist {
pub fn new(
url: Option<Url>,
allowlist: AHashSet<Principal>,
http_client: Arc<dyn Client>,
update_interval: Duration,
registry: &Registry,
) -> Self {
let metrics = MetricParams::new(registry);

Self {
url,
http_client,
inner: ArcSwapOption::empty(),
allowlist,
update_interval,
metrics,
}
}

Expand All @@ -51,8 +39,6 @@ impl Denylist {
allowlist: Option<PathBuf>,
seed: Option<PathBuf>,
http_client: Arc<dyn Client>,
update_interval: Duration,
registry: &Registry,
) -> Result<Self, Error> {
let allowlist = if let Some(v) = allowlist {
let r = load_principal_list(&v).context("unable to read allowlist")?;
Expand All @@ -62,7 +48,7 @@ impl Denylist {
AHashSet::new()
};

let denylist = Self::new(url, allowlist, http_client, update_interval, registry);
let denylist = Self::new(url, allowlist, http_client);

if let Some(v) = seed {
let seed = fs::read(v).context("unable to read seed")?;
Expand Down Expand Up @@ -156,66 +142,6 @@ impl Denylist {
}
}

#[async_trait]
impl Run for Denylist {
async fn run(&self, token: CancellationToken) -> Result<(), Error> {
warn!(
"Denylist updater started with {}s interval",
self.update_interval.as_secs()
);

let mut interval = tokio::time::interval(self.update_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
select! {
biased;

() = token.cancelled() => {
warn!("Denylist updater stopped");
return Ok(());
}

_ = interval.tick() => {
let res = self.update().await;

let lbl = match res {
Err(e) => {
warn!("Denylist update failed: {e:#}");
"fail"
}
Ok(v) => {
info!("Denylist updated: {} canisters", v);
"ok"
}
};

self.metrics.updates.with_label_values(&[lbl]).inc();
}
}
}
}
}

#[derive(Clone)]
pub struct MetricParams {
pub updates: IntCounterVec,
}

impl MetricParams {
pub fn new(registry: &Registry) -> Self {
Self {
updates: register_int_counter_vec_with_registry!(
format!("denylist_updates"),
format!("Counts denylist updates and results"),
&["result"],
registry
)
.unwrap(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -259,14 +185,10 @@ mod tests {
let client =
Arc::new(TestClient(reqwest::ClientBuilder::new().build()?)) as Arc<dyn Client>;

let registry = Registry::new();

let denylist = Denylist::new(
Some(Url::parse(&server.url_str("/denylist.json")).unwrap()),
AHashSet::from([Principal::from_text("g3wsl-eqaaa-aaaan-aaaaa-cai").unwrap()]),
client,
Duration::ZERO,
&registry,
);
denylist.update().await?;

Expand Down Expand Up @@ -336,13 +258,10 @@ mod tests {

let client =
Arc::new(TestClient(reqwest::ClientBuilder::new().build()?)) as Arc<dyn Client>;
let registry = Registry::new();
let denylist = Denylist::new(
Some(Url::parse(&server.url_str("/denylist.json")).unwrap()),
AHashSet::new(),
client,
Duration::ZERO,
&registry,
);
assert!(denylist.update().await.is_err());

Expand Down
31 changes: 6 additions & 25 deletions src/routing/domain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, str::FromStr, sync::Arc, time::Duration};
use std::{collections::BTreeMap, str::FromStr, sync::Arc};

use anyhow::{anyhow, Context, Error};
use arc_swap::ArcSwapOption;
Expand All @@ -7,9 +7,7 @@ use candid::Principal;
use derive_new::new;
use fqdn::{fqdn, Fqdn, FQDN};
use ic_bn_lib::tasks::Run;
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::warn;

#[macro_export]
macro_rules! principal {
Expand Down Expand Up @@ -91,7 +89,6 @@ struct CustomDomainStorageInner(BTreeMap<FQDN, DomainLookup>);
#[derive(new)]
pub struct CustomDomainStorage {
providers: Vec<Arc<dyn ProvidesCustomDomains>>,
poll_interval: Duration,
#[new(default)]
inner: ArcSwapOption<CustomDomainStorageInner>,
}
Expand Down Expand Up @@ -128,26 +125,10 @@ impl CustomDomainStorage {

#[async_trait]
impl Run for CustomDomainStorage {
async fn run(&self, token: CancellationToken) -> Result<(), Error> {
let mut interval = tokio::time::interval(self.poll_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
select! {
biased;

() = token.cancelled() => {
warn!("CustomDomainStorage: exiting");
return Ok(());
},

_ = interval.tick() => {
if let Err(e) = self.refresh().await {
warn!("CustomDomainStorage: unable to refresh: {e:#}");
}
}
}
}
async fn run(&self, _: CancellationToken) -> Result<(), Error> {
self.refresh()
.await
.context("unable to refresh custom domains")
}
}

Expand Down Expand Up @@ -354,7 +335,7 @@ mod test {
});

let custom_domain_storage =
CustomDomainStorage::new(vec![Arc::new(custom_domain_provider)], Duration::ZERO);
CustomDomainStorage::new(vec![Arc::new(custom_domain_provider)]);
custom_domain_storage.refresh().await?;

let resolver = DomainResolver::new(
Expand Down
Loading

0 comments on commit 398a55b

Please sign in to comment.