From 21c4c1c8d81a4469b8b3693e5d787de2e99b8b88 Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Thu, 12 Sep 2024 14:21:56 +0200 Subject: [PATCH 1/3] Server API doc link fixed. --- rollout-dashboard/doc/api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rollout-dashboard/doc/api.md b/rollout-dashboard/doc/api.md index 54277fc..be4ead9 100644 --- a/rollout-dashboard/doc/api.md +++ b/rollout-dashboard/doc/api.md @@ -5,7 +5,7 @@ To learn about the API, how to use it, and how to interpret the data served by API calls, please consult the programming documentation that accompanies the `rollout_dashboard` crate, available under folder -[`server/`](server/) by running the `cargo rustdoc` program within that +[`../server/`](../server/) by running the `cargo rustdoc` program within that folder, and then launching the Web page it generates for you. Please do not proceed with creating a client From e043da2018e1a58250aa78d657b9119e9013b31f Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Wed, 25 Sep 2024 16:02:12 +0200 Subject: [PATCH 2/3] Rework local cache to address "not yet computed a rollout plan" issue. The rollout dashboard keeps a local cache of all tasks it has seen, because retrieving *all* task instances from Airflow is expensive. We do this during the first loop, but in subsequent runs, we retrieve tasks that have updated or modified after the last check's timestamp. To this, we also add tasks that have started after the last check. In addition to that, we now linearize the tasks in case that any of the retrieved task lists contains the same task, but may have been updated between requests. Linearization involves picking, for each task instance the object with the latest date (be it execution, start or end date). In addition to that, if the rollout plan is somehow retrieved but marked empty when the schedule task is complete, we re-retrieve it again. This prevents the odd error where the task has completed but the XCom associated with the task (containing the plan) is not yet saved to the database (or at least it looks that way, because we're racing to get the value right after the task finished, but the value is not yet inserted stably into the database). Finally, this PR parallelizes the multiple requests that take place when the task retrieval is performed. This reduces incremental update time to roughly half of what it used to be. --- .../server/src/airflow_client.rs | 57 ++++ rollout-dashboard/server/src/frontend_api.rs | 245 +++++++++++------- 2 files changed, 209 insertions(+), 93 deletions(-) diff --git a/rollout-dashboard/server/src/airflow_client.rs b/rollout-dashboard/server/src/airflow_client.rs index 9e6f260..9d6c798 100644 --- a/rollout-dashboard/server/src/airflow_client.rs +++ b/rollout-dashboard/server/src/airflow_client.rs @@ -52,6 +52,18 @@ fn add_updated_parameters( ) } +fn add_executed_parameters( + url: String, + execution_date_lte: Option>, + execution_date_gte: Option>, +) -> String { + add_date_parm( + add_date_parm(url, "execution_date_lte", execution_date_lte), + "execution_date_gte", + execution_date_gte, + ) +} + fn add_ended_parameters( url: String, end_date_lte: Option>, @@ -233,6 +245,39 @@ pub struct TaskInstancesResponseItem { pub note: Option, } +impl TaskInstancesResponseItem { + #[allow(dead_code)] + pub fn latest_date(&self) -> DateTime { + let mut d = self.execution_date; + if let Some(dd) = self.start_date { + if dd > d { + d = dd + } + } + if let Some(dd) = self.end_date { + if dd > d { + d = dd + } + } + d + } + #[allow(dead_code)] + pub fn earliest_date(&self) -> DateTime { + let mut d = self.execution_date; + if let Some(dd) = self.start_date { + if dd < d { + d = dd + } + } + if let Some(dd) = self.end_date { + if dd < d { + d = dd + } + } + d + } +} + #[derive(Debug, Deserialize, Default)] pub struct TaskInstancesResponse { @@ -275,6 +320,8 @@ impl Pageable for TaskInstancesResponse { #[derive(Default)] pub struct TaskInstanceRequestFilters { + executed_at_lte: Option>, + executed_at_gte: Option>, updated_at_lte: Option>, updated_at_gte: Option>, ended_at_lte: Option>, @@ -282,6 +329,15 @@ pub struct TaskInstanceRequestFilters { } impl TaskInstanceRequestFilters { + #[allow(dead_code)] + pub fn executed_on_or_before(mut self, date: Option>) -> Self { + self.executed_at_lte = date; + self + } + pub fn executed_on_or_after(mut self, date: Option>) -> Self { + self.executed_at_gte = date; + self + } #[allow(dead_code)] pub fn updated_on_or_before(mut self, date: Option>) -> Self { self.updated_at_lte = date; @@ -737,6 +793,7 @@ impl AirflowClient { filters: TaskInstanceRequestFilters, ) -> Result { let mut url = format!("dags/{}/dagRuns/{}/taskInstances", dag_id, dag_run_id); + url = add_executed_parameters(url, filters.executed_at_lte, filters.executed_at_gte); url = add_updated_parameters(url, filters.updated_at_lte, filters.updated_at_gte); url = add_ended_parameters(url, filters.ended_at_lte, filters.ended_at_gte); _paged_get( diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index 68ac4a5..eadc4cf 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -1,14 +1,25 @@ +use crate::airflow_client::{ + AirflowClient, AirflowError, DagRunState, TaskInstanceRequestFilters, TaskInstanceState, + TaskInstancesResponseItem, TasksResponse, TasksResponseItem, +}; use crate::python; use chrono::{DateTime, Utc}; +use futures::future::join_all; use indexmap::IndexMap; use lazy_static::lazy_static; use log::{debug, trace, warn}; use regex::Regex; +use rollout_dashboard::types::{ + Batch, Rollout, RolloutState, Rollouts, Subnet, SubnetRolloutState, +}; use serde::Serialize; use std::cmp::min; +use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::HashMap; use std::fmt::{self, Display}; +use std::future::Future; use std::num::ParseIntError; +use std::pin::Pin; use std::rc::Rc; use std::str::FromStr; use std::sync::Arc; @@ -16,20 +27,14 @@ use std::{vec, vec::Vec}; use tokio::sync::Mutex; use topological_sort::TopologicalSort; -use crate::airflow_client::{ - AirflowClient, AirflowError, DagRunState, TaskInstanceRequestFilters, TaskInstanceState, - TaskInstancesResponseItem, TasksResponse, TasksResponseItem, -}; -use rollout_dashboard::types::{ - Batch, Rollout, RolloutState, Rollouts, Subnet, SubnetRolloutState, -}; - lazy_static! { // unwrap() is legitimate here because we know these cannot fail to compile. static ref SubnetGitRevisionRe: Regex = Regex::new("dfinity.ic_types.SubnetRolloutInstance.*@version=0[(]start_at=.*,subnet_id=([0-9-a-z-]+),git_revision=([0-9a-f]+)[)]").unwrap(); static ref BatchIdentificationRe: Regex = Regex::new("batch_([0-9]+)[.](.+)").unwrap(); } +const TASK_INSTANCE_LIST_LIMIT: usize = 500; + #[derive(Debug)] pub struct CyclicDependencyError { message: String, @@ -254,14 +259,14 @@ enum ScheduleCache { Valid(String), } struct RolloutDataCache { - task_instances: HashMap, + task_instances: HashMap, TaskInstancesResponseItem>>, dispatch_time: DateTime, note: Option, schedule: ScheduleCache, + last_update_time: Option>, } struct RolloutApiCache { - last_update_time: Option>, /// Map from DAG run ID to task instance ID (with / without index) /// to task instance. by_dag_run: HashMap, @@ -339,7 +344,6 @@ impl RolloutApi { Self { airflow_api: Arc::new(client), cache: Arc::new(Mutex::new(RolloutApiCache { - last_update_time: None, by_dag_run: HashMap::new(), })), } @@ -364,8 +368,6 @@ impl RolloutApi { max_rollouts: usize, ) -> Result<(Rollouts, bool), RolloutDataGatherError> { let mut cache = self.cache.lock().await; - let now = Utc::now(); - let last_update_time = cache.last_update_time; let dag_id = "rollout_ic_os_to_mainnet_subnets"; let dag_runs = self @@ -392,43 +394,99 @@ impl RolloutApi { dispatch_time: dag_run.logical_date, note: dag_run.note.clone(), schedule: ScheduleCache::Empty, + last_update_time: None, }); - // All new task instances that have not been seen before. This includes - // tasks of rollouts newly created since last time this loop checked for rollouts. - let updated_task_instances = self - .airflow_api - .task_instances( - dag_id, - dag_run.dag_run_id.as_str(), - 500, - 0, - TaskInstanceRequestFilters::default().updated_on_or_after(last_update_time), - ) - .await? - .task_instances; - - // Tasks that are ended are not marked as updated in Airflow. - let ended_task_instances = if last_update_time.is_some() { - self.airflow_api - .task_instances( - dag_id, - dag_run.dag_run_id.as_str(), - 500, - 0, - TaskInstanceRequestFilters::default().ended_on_or_after(last_update_time), - ) - .await? - .task_instances - } else { - vec![] - }; + type TaskInstanceResponse = Result, AirflowError>; + + let last_update_time = cache_entry.last_update_time; + let now = Utc::now(); + let requests: Vec + Send>>> = vec![ + Box::pin(async move { + match self + .airflow_api + .task_instances( + dag_id, + dag_run.dag_run_id.as_str(), + TASK_INSTANCE_LIST_LIMIT, + 0, + TaskInstanceRequestFilters::default() + .executed_on_or_after(last_update_time), + ) + .await + { + Ok(r) => Ok(r.task_instances), + Err(e) => Err(e), + } + }), + Box::pin(async move { + match last_update_time { + None => Ok(vec![]), + Some(_) => { + match self + .airflow_api + .task_instances( + dag_id, + dag_run.dag_run_id.as_str(), + TASK_INSTANCE_LIST_LIMIT, + 0, + TaskInstanceRequestFilters::default() + .updated_on_or_after(last_update_time), + ) + .await + { + Ok(r) => Ok(r.task_instances), + Err(e) => Err(e), + } + } + } + }), + Box::pin(async move { + match last_update_time { + None => Ok(vec![]), + Some(_) => { + match self + .airflow_api + .task_instances( + dag_id, + dag_run.dag_run_id.as_str(), + TASK_INSTANCE_LIST_LIMIT, + 0, + TaskInstanceRequestFilters::default() + .ended_on_or_after(last_update_time), + ) + .await + { + Ok(r) => Ok(r.task_instances), + Err(e) => Err(e), + } + } + } + }), + ]; + + let results = join_all(requests).await; + let mut all_task_instances: Vec = vec![]; + for r in results.into_iter() { + all_task_instances.append(&mut r?) + } debug!( - target: "frontend_api", "{}: Updated tasks {} Ended tasks {}", - dag_run.dag_run_id, updated_task_instances.len(), ended_task_instances.len(), + target: "frontend_api", "{}: Undeduplicated tasks: {}", + dag_run.dag_run_id, all_task_instances.len() ); + let rollout_had_changed_tasks = match all_task_instances.is_empty() { + true => false, + false => { + // At least one task has updated or finished. + // See function documentation about meaningful changes. + meaningful_updates_to_any_rollout = true; + // Now remember this rollout was updated. + true + } + }; + // If the note of the rollout has changed, // note that this has been updated. if cache_entry.note != dag_run.note { @@ -464,58 +522,58 @@ impl RolloutApi { ); // Let's update the cache to incorporate the most up-to-date task instances. - for task_instance in updated_task_instances - .into_iter() - .chain(ended_task_instances.into_iter()) - { - // At least one task has updated or finished. - // See function documentation about meaningful changes. - meaningful_updates_to_any_rollout = true; - + for task_instance in all_task_instances.into_iter() { let task_instance_id = task_instance.task_id.clone(); if task_instance_id == "schedule" { cache_entry.schedule = ScheduleCache::Invalid; } - match task_instance.map_index { - None => { - cache_entry - .task_instances - .insert(format!("{} None", task_instance_id), task_instance); + + let by_name = cache_entry + .task_instances + .entry(task_instance_id) + .or_insert(HashMap::new()); + + match by_name.entry(task_instance.map_index) { + Vacant(entry) => { + entry.insert(task_instance); } - Some(idx) => { - if cache_entry - .task_instances - .contains_key(&format!("{} None", task_instance_id)) - { - debug!( - target: "frontend_api", "Formerly unmapped task {} is now mapped to index {}", - task_instance_id, idx - ); + Occupied(mut entry) => { + if task_instance.latest_date() > entry.get().latest_date() { + entry.insert(task_instance.clone()); } - // Once a task has been mapped, clearing the task will not cause it - // to become unmapped anymore. This is behavior that the API has - // presented to me through observation. - // - // The number of map indexes for a task cannot be reduced once a - // flow has started executing. - cache_entry - .task_instances - .insert(format!("{} {}", task_instance_id, idx), task_instance); - // Thus, we must remove any cached entry that has map index None. - cache_entry - .task_instances - .remove(&format!("{} None", task_instance_id)); + } + }; + } + + for (task_instance_id, tasks) in cache_entry.task_instances.iter_mut() { + // Delete data on all unmapped tasks if a mapped task sibling is present. + if tasks.len() > 1 { + if let Occupied(_) = tasks.entry(None) { + debug!( + target: "frontend_api", "Formerly unmapped task {} is now mapped", + task_instance_id + ); + tasks.remove(&None); } } } + let linearized_tasks: Vec = cache_entry + .task_instances + .iter() + .flat_map(|(_, tasks)| tasks.iter().map(|(_, task)| task.clone())) + .collect(); + + debug!( + target: "frontend_api", "{}: Total disambiguated tasks including locally cached ones: {}", + dag_run.dag_run_id, linearized_tasks.len(), + ); + // Now update rollout and batch state based on the obtained data. // What this process does is fairly straightforward: // * for each and every known up-to-date Airflow task in the cache // (always processed in topological order), - for task_instance in - sorter.sort_instances(cache_entry.task_instances.clone().into_values()) - { + for task_instance in sorter.sort_instances(linearized_tasks.into_iter()) { // * deduce the rollout plan, if available, // * mark the rollout as having problems or errors depending on what // the task state is, or as one of the various running states, if @@ -542,7 +600,7 @@ impl RolloutApi { Some(TaskInstanceState::Success) => { let schedule_string = match &cache_entry.schedule { ScheduleCache::Valid(s) => s, - ScheduleCache::Invalid => { + ScheduleCache::Invalid | ScheduleCache::Empty => { let value = self .airflow_api .xcom_entry( @@ -563,6 +621,8 @@ impl RolloutApi { reqwest::StatusCode::NOT_FOUND, )) => { // There is no schedule to be found. + // Or there was no schedule to be found last time + // it was queried. cache_entry.schedule = ScheduleCache::Empty; continue; } @@ -572,11 +632,6 @@ impl RolloutApi { }; &schedule.clone() } - ScheduleCache::Empty => { - // There was no schedule to be found last time - // it was queried. - continue; - } }; let schedule = RolloutPlan::from_python_string(schedule_string.clone())?; @@ -834,14 +889,18 @@ impl RolloutApi { } } + if rollout_had_changed_tasks { + // We ump the cache entry's last update time, to only retrieve + // tasks from this point in time on during subsequent retrievals. + // We only do this at the end, in case any code above returns + // early, to force a full state recalculation if there was a + // failure or an early return. + cache_entry.last_update_time = Some(now); + } + res.push(rollout); } - if meaningful_updates_to_any_rollout { - // Preserve the value for next loop so that we have a baseline - // of date/time to query data incrementally. - cache.last_update_time = Some(now); - } Ok((res, meaningful_updates_to_any_rollout)) } } From 4916ee586aa86b93c92757545d8e3e40dce6dc95 Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Wed, 25 Sep 2024 16:34:27 +0200 Subject: [PATCH 3/3] Comment fix. --- rollout-dashboard/server/src/frontend_api.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index eadc4cf..1127eb7 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -890,7 +890,7 @@ impl RolloutApi { } if rollout_had_changed_tasks { - // We ump the cache entry's last update time, to only retrieve + // We bump the cache entry's last update time, to only retrieve // tasks from this point in time on during subsequent retrievals. // We only do this at the end, in case any code above returns // early, to force a full state recalculation if there was a