Skip to content

Commit

Permalink
Merge pull request #9 from dfinity/maxperf
Browse files Browse the repository at this point in the history
Configuration, performance and task -> rollout state mapping fixes.
  • Loading branch information
DFINITYManu authored Jul 30, 2024
2 parents f0d96de + 96d9b47 commit 1bb6c81
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 125 deletions.
4 changes: 4 additions & 0 deletions rollout-dashboard/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ set to the correct value (though sometimes the defaults are OK):
production -- only for development).
4. `RUST_LOG` set to `info` ideally to observe at least log
messages of info level and above.
5. `MAX_ROLLOUTS` optionally set to a nonzero positive integer
to limit the number of rollouts (default 15).
6. `REFRESH_INTERVAL` optionally set to a nonzero positive integer
as the number of seconds to wait between queries to Airflow.

## To-do

Expand Down
1 change: 1 addition & 0 deletions rollout-dashboard/frontend/src/lib/Batch.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
complete: { icon: "", name: "Complete" },
skipped: { icon: "", name: "Skipped" },
error: { icon: "", name: "Error" },
unknown: { icon: "", name: "Unknown (check backend logs)" },
};
</script>

Expand Down
98 changes: 86 additions & 12 deletions rollout-dashboard/server/src/airflow_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,50 @@ use std::time::Duration;
use std::{f64, fmt};
use std::{vec, vec::Vec};
use urlencoding::decode;
/// Default maximum batch size for paged requests in Airflow.

/// Default maximum batch size for paged requests in Airflow.
const MAX_BATCH_SIZE: usize = 100;
/// Default timeout per request to Airflow.
const PER_REQUEST_TIMEOUT: u64 = 15;

fn add_date_parm(url: String, parm_name: &str, date: Option<DateTime<Utc>>) -> String {
let dfmt = "%Y-%m-%dT%H:%M:%S%.fZ";
if let Some(date) = date {
url.clone()
+ match url.find('?') {
Some(_) => "&",
None => "?",
}
+ &format!("{}={}", parm_name, date.format(dfmt))
} else {
url
}
}

fn add_updated_parameters(
url: String,
updated_at_lte: Option<DateTime<Utc>>,
updated_at_gte: Option<DateTime<Utc>>,
) -> String {
add_date_parm(
add_date_parm(url, "updated_at_lte", updated_at_lte),
"updated_at_gte",
updated_at_gte,
)
}

fn add_ended_parameters(
url: String,
end_date_lte: Option<DateTime<Utc>>,
end_date_gte: Option<DateTime<Utc>>,
) -> String {
add_date_parm(
add_date_parm(url, "end_date_lte", end_date_lte),
"end_date_gte",
end_date_gte,
)
}

trait Pageable {
fn len(&self) -> usize;
/// Append all elements from other into self (to the end
Expand All @@ -45,6 +84,7 @@ pub enum DagRunState {
#[derive(Debug, Deserialize, Clone)]
pub struct DagRunsResponseItem {
pub conf: HashMap<String, serde_json::Value>,
/// dag_run_id is unique, enforced by Airflow.
pub dag_run_id: String,
pub dag_id: String,
pub logical_date: DateTime<Utc>,
Expand Down Expand Up @@ -230,6 +270,35 @@ impl Pageable for TaskInstancesResponse {
}
}

#[derive(Default)]
pub struct TaskInstanceRequestFilters {
updated_at_lte: Option<DateTime<Utc>>,
updated_at_gte: Option<DateTime<Utc>>,
ended_at_lte: Option<DateTime<Utc>>,
ended_at_gte: Option<DateTime<Utc>>,
}

impl TaskInstanceRequestFilters {
#[allow(dead_code)]
pub fn updated_on_or_before(mut self, date: Option<DateTime<Utc>>) -> Self {
self.updated_at_lte = date;
self
}
pub fn updated_on_or_after(mut self, date: Option<DateTime<Utc>>) -> Self {
self.updated_at_gte = date;
self
}
#[allow(dead_code)]
pub fn ended_on_or_before(mut self, date: Option<DateTime<Utc>>) -> Self {
self.ended_at_lte = date;
self
}
pub fn ended_on_or_after(mut self, date: Option<DateTime<Utc>>) -> Self {
self.ended_at_gte = date;
self
}
}

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub enum TriggerRule {
Expand Down Expand Up @@ -380,7 +449,7 @@ where
}
}

debug!(target: "paged_get",
trace!(target: "paged_get",
"retrieving {} instances of {} at offset {}, with {} already retrieved",
batch_limit,
suburl,
Expand Down Expand Up @@ -511,17 +580,16 @@ impl AirflowClient {

let c = self.client.clone();

debug!(target: "http_client", "GET {}", url);
let res = match c
.get(url)
.get(url.clone())
.header(ACCEPT, "application/json")
.header(CONTENT_TYPE, "application/json")
.send()
.await
{
Ok(resp) => {
let status = resp.status();
debug!(target: "http_client", "HTTP status {}", status);
debug!(target: "http_client", "GET {} HTTP {}", url, status);
match status {
reqwest::StatusCode::OK => match resp.json().await {
Ok(json) => Ok(json),
Expand Down Expand Up @@ -633,14 +701,19 @@ impl AirflowClient {
}

/// Return DAG runs from newest to oldest.
/// Optionally only return DAG runs updated between a certain time frame.
pub async fn dag_runs(
&self,
dag_id: &str,
limit: usize,
offset: usize,
updated_at_lte: Option<DateTime<Utc>>,
updated_at_gte: Option<DateTime<Utc>>,
) -> Result<DagRunsResponse, AirflowError> {
let mut url = format!("dags/{}/dagRuns", dag_id);
url = add_updated_parameters(url, updated_at_lte, updated_at_gte);
_paged_get(
format!("dags/{}/dagRuns", dag_id),
url,
Some("-execution_date".into()),
Some(PagingParameters { limit, offset }),
|x| self._get_logged_in(x),
Expand All @@ -656,13 +729,14 @@ impl AirflowClient {
dag_run_id: &str,
limit: usize,
offset: usize,
filters: TaskInstanceRequestFilters,
) -> Result<TaskInstancesResponse, AirflowError> {
_paged_get(
format!("dags/{}/dagRuns/{}/taskInstances", dag_id, dag_run_id),
None,
Some(PagingParameters { limit, offset }),
|x| self._get_logged_in(x),
)
let mut url = format!("dags/{}/dagRuns/{}/taskInstances", dag_id, dag_run_id);
url = add_updated_parameters(url, filters.updated_at_lte, filters.updated_at_gte);
url = add_ended_parameters(url, filters.ended_at_lte, filters.ended_at_gte);
_paged_get(url, None, Some(PagingParameters { limit, offset }), |x| {
self._get_logged_in(x)
})
.await
}

Expand Down
Loading

0 comments on commit 1bb6c81

Please sign in to comment.