From e4eb1d215f0c1631548a2de536f31d65942d5b7f Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Wed, 23 Oct 2024 19:20:37 +0200 Subject: [PATCH] Fix various problems with rollout dashboard's cache and task processing. As per https://dfinity.atlassian.net/browse/DRE-303 and https://dfinity.atlassian.net/browse/DRE-304 , in certain cases where the administrator has intervened in the rollout (via clearing tasks for retry, or marking them as failed, or marking them as successful), the cache gets out of synchronization with the actual Airflow state, because these task changes do not update the task dates (which is what the cache relies on, to prevent transferring tens of megabytes every 5 seconds). This implements the use of an incremental log parser that fishes out tasks updated in the last update window. With that, we can deduce when a rollout needs full task update or just certain tasks. Alas, some tasks do not log updates even when they execute (chiefly the tasks implemented by Airflow logic and null operators), so we still must hit Airflow with an incremental "after this date" query (actually, 3) to obtain that information. We may be able to optimize this to three queries for N rollouts rather than three * N, but that is not in the cards for this PR. This has been verified manually through replication of the error conditions in a local Airflow instance. --- rollout-dashboard/server/Cargo.lock | 7 + rollout-dashboard/server/Cargo.toml | 1 + .../server/src/airflow_client.rs | 295 +++- rollout-dashboard/server/src/frontend_api.rs | 1368 ++++++++++------- rollout-dashboard/server/src/main.rs | 12 +- 5 files changed, 1137 insertions(+), 546 deletions(-) diff --git a/rollout-dashboard/server/Cargo.lock b/rollout-dashboard/server/Cargo.lock index 25f2454..9df9f98 100644 --- a/rollout-dashboard/server/Cargo.lock +++ b/rollout-dashboard/server/Cargo.lock @@ -1063,6 +1063,12 @@ dependencies = [ "psl-types", ] +[[package]] +name = "querystring" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9318ead08c799aad12a55a3e78b82e0b6167271ffd1f627b758891282f739187" + [[package]] name = "quote" version = "1.0.36" @@ -1183,6 +1189,7 @@ dependencies = [ "indexmap", "lazy_static", "log", + "querystring", "regex", "reqwest", "serde", diff --git a/rollout-dashboard/server/Cargo.toml b/rollout-dashboard/server/Cargo.toml index 67ea9c6..ca2047e 100644 --- a/rollout-dashboard/server/Cargo.toml +++ b/rollout-dashboard/server/Cargo.toml @@ -23,6 +23,7 @@ futures = "0.3.30" indexmap = { version = "2.3.0", features = ["serde"] } lazy_static = "1.5.0" log = "0.4.22" +querystring = "1.1.0" regex = "1.10.5" reqwest = { version = "0.12.5", features = ["json", "cookies"] } serde = { version = "1.0.203", features = ["derive", "std"] } diff --git a/rollout-dashboard/server/src/airflow_client.rs b/rollout-dashboard/server/src/airflow_client.rs index e660d8a..579f38f 100644 --- a/rollout-dashboard/server/src/airflow_client.rs +++ b/rollout-dashboard/server/src/airflow_client.rs @@ -450,6 +450,103 @@ impl Pageable for TasksResponse { } } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct EventLogsResponseItem { + pub event_log_id: u64, + pub when: DateTime, + pub dag_id: Option, + pub task_id: Option, + pub run_id: Option, + pub map_index: Option, + pub try_number: Option, + pub event: String, + pub execution_date: Option>, + pub owner: Option, + pub extra: Option, +} + +#[derive(Debug, Deserialize, Default)] + +pub struct EventLogsResponse { + pub event_logs: Vec, + #[serde(skip_serializing, skip_deserializing)] + position_cache: HashMap, + total_entries: usize, +} + +#[allow(dead_code)] +#[derive(Default)] +pub struct EventLogsResponseFilters<'a> { + pub dag_id: Option<&'a String>, + pub task_id: Option<&'a String>, + pub run_id: Option<&'a String>, + pub map_index: Option, + pub try_number: Option, + pub event: Option<&'a String>, + pub owner: Option<&'a String>, + pub before: Option>, + pub after: Option>, +} + +impl<'a> EventLogsResponseFilters<'a> { + fn as_queryparams(&self) -> Vec<(&str, String)> { + let dfmt = "%Y-%m-%dT%H:%M:%S%.fZ"; + let shit = [ + self.dag_id.as_ref().map(|v| ("dag_id", (*v).clone())), + self.task_id.as_ref().map(|v| ("task_id", (*v).clone())), + self.run_id.as_ref().map(|v| ("run_id", (*v).clone())), + self.map_index + .as_ref() + .map(|v| ("map_index", format!("{}", v).to_owned())), + self.try_number + .as_ref() + .map(|v| ("try_number", format!("{}", v).to_owned())), + self.event.as_ref().map(|v| ("event", (*v).clone())), + self.owner.as_ref().map(|v| ("owner", (*v).clone())), + self.before + .as_ref() + .map(|v| ("before", format!("{}", v.format(dfmt)).to_owned())), + self.after + .as_ref() + .map(|v| ("after", format!("{}", v.format(dfmt)).to_owned())), + ]; + let res: Vec<_> = shit + .iter() + .flatten() + .map(|(k, v)| (*k, (*v).clone())) + .collect(); + res + } +} + +impl Pageable for EventLogsResponse { + fn len(&self) -> usize { + self.event_logs.len() + } + fn merge(&mut self, other: Self) { + for v in other.event_logs.clone().into_iter() { + let id = v.event_log_id; + match self.position_cache.get(&id) { + Some(pos) => { + //debug!(target: "processing", "Replacing {} at position {}", id, pos); + self.event_logs[*pos] = v; + } + None => { + //debug!(target: "processing", "Consuming {}", id); + self.position_cache.insert(id, self.event_logs.len()); + self.event_logs.push(v); + } + } + } + self.total_entries = other.total_entries; + } + fn truncate(&mut self, max_entries: usize) { + if self.event_logs.len() > max_entries { + self.event_logs.truncate(max_entries) + } + } +} + #[derive(Debug)] pub enum AirflowError { StatusCode(reqwest::StatusCode), @@ -509,7 +606,7 @@ where } } - trace!(target: "paged_get", + trace!(target: "airflow_client::paged_get", "retrieving {} instances of {} at offset {}, with {} already retrieved", batch_limit, suburl, @@ -521,7 +618,7 @@ where Ok(json_value) => match ::deserialize(json_value.clone()) { Ok(deserialized) => deserialized, Err(e) => { - warn!("Error deserializing ({})\n{:?}", e, json_value); // FIXME; make proper type + warn!(target: "airflow_client::paged_get" ,"Error deserializing ({})\n{:?}", e, json_value); // FIXME; make proper type return Err(AirflowError::Other(format!( "Could not deserialize structure: {}", e @@ -531,9 +628,9 @@ where Err(e) => return Err(e), }; let batch_len = batch.len(); - trace!(target: "paged_get", "Got {} before discarding", batch_len); + trace!(target: "airflow_client::paged_get", "Got {} before discarding", batch_len); results.merge(batch); - trace!(target: "paged_get", + trace!(target: "airflow_client::paged_get", "Now we have {} objects after retrieving with offset {:?}", results.len(), current_offset @@ -560,6 +657,45 @@ where Ok(results) } +async fn _post<'a, T: Deserialize<'a> + Pageable + Default, W, G, Fut>( + url: String, + content: W, + mut poster: G, +) -> Result +where + G: FnMut(String, W) -> Fut, + W: serde::Serialize + Sync + Send, + Fut: Future>, +{ + let mut results = T::default(); + trace!(target: "airflow_client::paged_post", + "posting to and retrieving {}", + url, + ); + + let batch = match poster(url, content).await { + Ok(json_value) => match ::deserialize(json_value.clone()) { + Ok(deserialized) => deserialized, + Err(e) => { + warn!(target: "airflow_client::paged_post", "Error deserializing ({})\n{:?}", e, json_value); // FIXME; make proper type + return Err(AirflowError::Other(format!( + "Could not deserialize structure: {}", + e + ))); + } + }, + Err(e) => return Err(e), + }; + let batch_len = batch.len(); + trace!(target: "airflow_client::paged_post", "Got {}", batch_len); + results.merge(batch); + trace!(target: "airflow_client::paged_post", + "Now we have {} objects after retrieving", + results.len(), + ); + Ok(results) +} + #[derive(Debug)] pub enum AirflowClientCreationError { Reqwest(reqwest::Error), @@ -629,6 +765,18 @@ impl AirflowClient { self._get_or_login_and_get(suburl, true).await } + async fn _post_logged_in( + &self, + suburl: String, + content: &T, + ) -> Result + where + T: Serialize + Sync + Send, + { + let suburl = "api/v1/".to_string() + &suburl; + self._post_or_login_and_post(suburl, content, true).await + } + #[async_recursion] async fn _get_or_login_and_get( &self, @@ -649,7 +797,7 @@ impl AirflowClient { { Ok(resp) => { let status = resp.status(); - debug!(target: "http_client", "GET {} HTTP {}", url, status); + debug!(target: "airflow_client::http_client", "GET {} HTTP {}", url, status); match status { reqwest::StatusCode::OK => match resp.json().await { Ok(json) => Ok(json), @@ -679,7 +827,67 @@ impl AirflowClient { } Err(err) => Err(AirflowError::ReqwestError(err)), }; - trace!(target: "http_client", "Result: {:#?}", res); + trace!(target: "airflow_client::http_client", "Result: {:#?}", res); + res + } + + // FIXME: deduplicate with get_or_login_and_get + #[async_recursion] + async fn _post_or_login_and_post( + &self, + suburl: String, + content: &T, + attempt_login: bool, + ) -> Result + where + T: Serialize + Sync + Send, + { + // Next one cannot fail because self.url has already succeeded. + let url = self.url.join(suburl.as_str()).unwrap(); + + let c = self.client.clone(); + + let res = match c + .post(url.clone()) + .header(ACCEPT, "application/json") + .header(CONTENT_TYPE, "application/json") + .json(content) + .send() + .await + { + Ok(resp) => { + let status = resp.status(); + debug!(target: "airflow_client::http_client", "POST {} HTTP {}", url, status); + match status { + reqwest::StatusCode::OK => match resp.json().await { + Ok(json) => Ok(json), + Err(err) => Err(AirflowError::Other(format!( + "Could not decode JSON from Airflow: {}", + err + ))), + }, + reqwest::StatusCode::FORBIDDEN => { + if attempt_login { + match self._login().await { + Ok(..) => (), + Err(err) => return Err(err), + }; + self._post_or_login_and_post(suburl, content, false).await + } else { + Err(AirflowError::Other( + "Forbidden from Airflow -- could not log in".into(), + )) + } + } + reqwest::StatusCode::NOT_FOUND => { + Err(AirflowError::StatusCode(reqwest::StatusCode::NOT_FOUND)) + } + other => Err(AirflowError::StatusCode(other)), + } + } + Err(err) => Err(AirflowError::ReqwestError(err)), + }; + trace!(target: "airflow_client::http_client", "Result: {:#?}", res); res } @@ -806,6 +1014,47 @@ impl AirflowClient { .await } + /// Return listed TaskInstances for a number of DAG IDs and DAG runs. + /// Mapped tasks are not returned here. + pub async fn task_instances_batch( + &self, + dag_ids: Option>, + dag_run_ids: Option>, + task_instances: Option>, + ) -> Result { + if let Some(dag_ids) = &dag_ids { + if dag_ids.is_empty() { + return Ok(TaskInstancesResponse::default()); + } + } + if let Some(dag_run_ids) = &dag_run_ids { + if dag_run_ids.is_empty() { + return Ok(TaskInstancesResponse::default()); + } + } + if let Some(task_instances) = &task_instances { + if task_instances.is_empty() { + return Ok(TaskInstancesResponse::default()); + } + } + let url = "dags/~/dagRuns/~/taskInstances/list"; + #[derive(Serialize)] + struct TaskInstancesRequest { + #[serde(skip_serializing_if = "Option::is_none")] + dag_ids: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + dag_run_ids: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + task_ids: Option>, + } + let tr = TaskInstancesRequest { + dag_ids, + dag_run_ids, + task_ids: task_instances, + }; + _post(url.to_string(), &tr, |x, c| self._post_logged_in(x, c)).await + } + /// Return TaskInstances for a DAG run. /// Mapped tasks are not returned here. pub async fn tasks(&self, dag_id: &str) -> Result { @@ -877,7 +1126,7 @@ impl AirflowClient { match ::deserialize(json_value.clone()) { Ok(deserialized) => Ok(deserialized), Err(e) => { - warn!("Error deserializing ({})\n{:?}", e, json_value); + warn!(target: "airflow_client::xcom_entry", "Error deserializing ({})\n{:?}", e, json_value); Err(AirflowError::Other(format!( "Could not deserialize structure: {}", e @@ -885,6 +1134,38 @@ impl AirflowClient { } } } + + /// Return event logs matching the filters specified. + /// Events are returned in chronological order (old to new). + pub async fn event_logs( + &self, + limit: usize, + offset: usize, + filters: &EventLogsResponseFilters<'_>, + order_by: Option, // FIXME: use structural typing here. + ) -> Result { + let qpairs = filters.as_queryparams(); + let qparams: querystring::QueryParams = + qpairs.iter().map(|(k, v)| (*k, v.as_str())).collect(); + let suburl = "eventLogs".to_string() + + (if !qparams.is_empty() { + "?".to_string() + querystring::stringify(qparams).as_str() + } else { + "".to_string() + }) + .as_str(); + _paged_get( + suburl, + match order_by { + Some(x) => Some(x), + None => Some("event_log_id".into()), + }, + Some(PagingParameters { limit, offset }), + MAX_BATCH_SIZE, + |x| self._get_logged_in(x), + ) + .await + } } #[cfg(test)] diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index ac18d12..6aafd29 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -1,6 +1,7 @@ use crate::airflow_client::{ - AirflowClient, AirflowError, DagRunState, TaskInstanceRequestFilters, TaskInstanceState, - TaskInstancesResponseItem, TasksResponse, TasksResponseItem, + AirflowClient, AirflowError, DagRunState, DagRunsResponseItem, EventLogsResponseFilters, + TaskInstanceRequestFilters, TaskInstanceState, TaskInstancesResponseItem, TasksResponse, + TasksResponseItem, }; use crate::python; use chrono::{DateTime, Utc}; @@ -13,9 +14,9 @@ use rollout_dashboard::types::{ Batch, Rollout, RolloutState, Rollouts, Subnet, SubnetRolloutState, }; use serde::Serialize; -use std::cmp::min; +use std::cmp::{max, min}; use std::collections::hash_map::Entry::{Occupied, Vacant}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::{self, Display}; use std::future::Future; use std::num::ParseIntError; @@ -35,6 +36,18 @@ lazy_static! { const TASK_INSTANCE_LIST_LIMIT: usize = 500; +fn max_option_date(d1: Option>, d2: Option>) -> Option> +where + T: chrono::TimeZone, +{ + match (d1, d2) { + (Some(d1), Some(d2)) => Some(max(d1, d2)), + (Some(d1), None) => Some(d1), + (None, Some(d2)) => Some(d2), + (None, None) => None, + } +} + #[derive(Debug)] pub struct CyclicDependencyError { message: String, @@ -46,6 +59,7 @@ impl Display for CyclicDependencyError { } } +#[derive(Clone)] struct TaskInstanceTopologicalSorter { sorted_tasks: Vec>, } @@ -259,6 +273,214 @@ enum ScheduleCache { Valid(usize, String), } +#[derive(Debug, Clone)] +/// DAG run update type. +enum DagRunUpdateType { + /// This DAG run needs all its task instances refreshed. + AllTaskInstances, + /// At least these tasks in this DAG run need to be + /// refreshed, as well as incremental queries of tasks + /// finished, updated, or started since last query. + SomeTaskInstances(HashSet), +} + +/// Describe what kind of update each DAG run needs. +/// A DAG run listed in this DAG can have one of two update types, +/// defined in `DagRunUpdateType``. +struct DagRunUpdatesRequired { + dag_runs: HashMap, +} + +impl DagRunUpdatesRequired { + fn new() -> Self { + Self { + dag_runs: HashMap::new(), + } + } + + /// Returns the `DagRunUpdateType` for a given DAG run. + /// If the DAG run is unknown to this function, then + /// SomeTaskInstances(vec[]) is returned. + fn update_type(&self, dag_run_id: &String) -> DagRunUpdateType { + match self.dag_runs.get(dag_run_id) { + None => DagRunUpdateType::SomeTaskInstances(HashSet::new()), + Some(t) => t.clone(), + } + } +} + +#[derive(Default)] +/// Inspects the Airflow log every time its incrementally_detect_dag_updates +/// function is called. +struct AirflowIncrementalLogInspector { + last_event_log_update: Option>, +} + +impl AirflowIncrementalLogInspector { + /// Inspect changes to the log, and return a `DagRunUpdatesRequired` + /// struct based on the contents of the log since its last inspection. + async fn incrementally_detect_dag_updates( + &self, + airflow_api: &AirflowClient, + dag_id: &str, + ) -> Result<(Self, DagRunUpdatesRequired), AirflowError> { + let mut task_instances_to_update_per_dag = DagRunUpdatesRequired::new(); + + let mut last_event_log_update = self.last_event_log_update; + + if last_event_log_update.is_some() { + // Construct a plan of what tasks will be queried, by using the + // Airflow event log as a deciding factor. + let event_logs = airflow_api + .event_logs( + 1000, + 0, + &EventLogsResponseFilters { + after: last_event_log_update, + dag_id: Some(&dag_id.to_string()), + ..Default::default() + }, + None, + ) + .await?; + + // Process log events. + for event in event_logs.event_logs.iter() { + // Remember the date of the latest event. + last_event_log_update = Some(event.when); + // Ignore events with no dag ID or wrong dag ID. + match &event.dag_id { + Some(d) => match *d == dag_id { + true => d, + false => continue, + }, + None => continue, + }; + // Ignore events that have no run ID. + let event_run_id = match &event.run_id { + Some(r) => r, + None => continue, + }; + // Ignore events that just change the DAG run note. We already + // retrieve the full DAG (not necessarily its tasks or instances), + // so this event is not interesting. + if event.event == "ui.set_dag_run_note" { + continue; + } + // Also ignore UI confirmation events to mark tasks as failed/success. + if event.event == "confirm" { + continue; + } + // Also ignore UI clearing events of tasks not yet confirmed. + if event.event == "clear" { + match &event.extra { + None => continue, + Some(extra) => { + let r = Regex::new(r".*.confirmed.: .true.*").unwrap(); + if r.captures(extra.as_str()).is_none() { + // No confirmation. We continue. + continue; + } + // We found it. We won't continue. + } + } + } + + // Under the following circumstances, the whole rollout has to be refreshed because + // administrative action was taken to clear / fail / succeed tasks that may not in + // fact appear listed in the log as such. + let force_refresh_all_tasks = (event.event == "success" && event.extra.is_some()) + || (event.event == "failed" && event.extra.is_some()) + || (event.event == "clear" && event.extra.is_some()); + + trace!(target: "frontend_api::log_inspector", "Processing event:\n{:#?}\n", event); + + match task_instances_to_update_per_dag + .dag_runs + .entry(event_run_id.to_string()) + { + // No entry. Let's initialize it (all tasks if event has no run_id or forced, else the single task). + Vacant(ventry) => { + ventry.insert(match (&event.task_id, force_refresh_all_tasks) { + (Some(t), false) => { + trace!(target: "frontend_api::log_inspector", "{}: initializing plan with a request to update task {}", event_run_id, t); + let mut init = HashSet::new(); + init.insert(t.clone()); + DagRunUpdateType::SomeTaskInstances(init) + }, + _ => { + trace!(target: "frontend_api::log_inspector", "{}: initializing plan with a request to update all tasks", event_run_id); + DagRunUpdateType::AllTaskInstances + }, + }); + } + // There's an entry. Update to all tasks if this event has no run_id. + Occupied(mut entry) => match (&event.task_id, force_refresh_all_tasks) { + (Some(t), false) => { + if let DagRunUpdateType::SomeTaskInstances(thevec) = entry.get_mut() { + let ts = t.to_string(); + if !thevec.contains(&ts) { + trace!(target: "frontend_api::log_inspector", "{}: adding task {} to plan", event_run_id, ts); + thevec.insert(ts); + } + } + } + _ => { + if let DagRunUpdateType::SomeTaskInstances(_) = entry.get() { + trace!(target: "frontend_api::log_inspector", "{}: switching plan to request to update all tasks", event_run_id); + entry.insert(DagRunUpdateType::AllTaskInstances); + } + } + }, + } + } + + // Now that we have a plan, we know what data to fetch from Airflow, minimizing the load on the server. + for (k, v) in task_instances_to_update_per_dag.dag_runs.iter() { + debug!(target: "frontend_api::log_inspector", "{}: tasks that will be updated: {}", k, match v { + DagRunUpdateType::AllTaskInstances => "all tasks".to_string(), + DagRunUpdateType::SomeTaskInstances(set_of_tasks) => set_of_tasks.iter().cloned().collect::>().join(", "), + }); + } + if !event_logs.event_logs.is_empty() + && !task_instances_to_update_per_dag.dag_runs.is_empty() + { + debug!( + target: "frontend_api::log_inspector", "Setting incremental refresh date to {:?}", + last_event_log_update + ) + }; + } else { + let event_logs = airflow_api + .event_logs( + 1, + 0, + &EventLogsResponseFilters { + after: last_event_log_update, + dag_id: Some(&dag_id.to_string()), + ..Default::default() + }, + Some("-event_log_id".to_string()), + ) + .await?; + for event in event_logs.event_logs.iter() { + last_event_log_update = Some(event.when); + } + if !event_logs.event_logs.is_empty() { + debug!(target: "frontend_api::log_inspector", "Setting initial refresh date to {:?}", last_event_log_update); + } + } + + Ok(( + Self { + last_event_log_update, + }, + task_instances_to_update_per_dag, + )) + } +} + +#[derive(Clone)] struct RolloutDataCache { task_instances: HashMap, TaskInstancesResponseItem>>, dispatch_time: DateTime, @@ -280,6 +502,7 @@ struct RolloutApiCache { /// Map from DAG run ID to task instance ID (with / without index) /// to task instance. by_dag_run: HashMap, + log_inspector: AirflowIncrementalLogInspector, } fn format_some(opt: Option, prefix: &str, fallback: &str) -> String @@ -307,10 +530,10 @@ fn annotate_subnet_state( if (only_decrease && new_state < subnet.state) || (!only_decrease && new_state != subnet.state) { - trace!(target: "subnet_state", "{}: {} {:?} transition {} => {} note: {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state, subnet.comment); + trace!(target: "frontend_api::annotate_subnet_state", "{}: {} {:?} transition {} => {} note: {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state, subnet.comment); subnet.state = new_state.clone(); } else { - trace!(target: "subnet_state", "{}: {} {:?} NO transition {} => {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state); + trace!(target: "frontend_api::annotate_subnet_state", "{}: {} {:?} NO transition {} => {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state); } if new_state == subnet.state { subnet.comment = format!( @@ -343,612 +566,691 @@ fn annotate_subnet_state( state } -#[derive(Clone)] -pub struct RolloutApi { - airflow_api: Arc, - cache: Arc>, +struct RolloutUpdater<'a> { + dag_run: &'a DagRunsResponseItem, + cache_entry: RolloutDataCache, + update_type: DagRunUpdateType, } -impl RolloutApi { - pub fn new(client: AirflowClient) -> Self { - Self { - airflow_api: Arc::new(client), - cache: Arc::new(Mutex::new(RolloutApiCache { - by_dag_run: HashMap::new(), - })), - } - } +impl<'a> RolloutUpdater<'a> { + async fn update( + &mut self, + airflow_api: &AirflowClient, + sorter: TaskInstanceTopologicalSorter, + last_event_log_update: Option>, + ) -> Result<(bool, Rollout, RolloutDataCache), RolloutDataGatherError> { + let mut meaningful_updates_to_this_rollout = false; - pub async fn get_cache(&self) -> Vec { - let cache = self.cache.lock().await; - let mut result: Vec<_> = cache - .by_dag_run - .iter() - .map(|(k, v)| { - let linearized_tasks = v - .task_instances - .iter() - .flat_map(|(_, tasks)| tasks.iter().map(|(_, task)| task.clone())) - .collect(); - RolloutDataCacheResponse { - rollout_id: k.clone(), - linearized_task_instances: linearized_tasks, - dispatch_time: v.dispatch_time, - last_update_time: v.last_update_time, - schedule: v.schedule.clone(), - } - }) - .collect(); - drop(cache); - result.sort_by_key(|v| v.dispatch_time); - result.reverse(); - result - } + // If the note of the rollout has changed, + // note that this has been updated. + let cache_entry = &mut self.cache_entry; + let dag_run = &self.dag_run; + let dag_run_update_type = &self.update_type; - /// Retrieve all rollout data, using a cache to avoid - /// re-fetching task instances not updated since last time. - /// - /// Returns a tuple of the the rollout data and a flag - /// indicating if the rollout data was updated since - /// the last time. The flag should be used by calling - /// code to decide whether to send data to clients or not. - /// - /// The rollout structure itself is updated on every call - /// for every DAG run. However, not every change in the DAG - /// run is considered to be a meaningful change (causing a - /// true return in the update flag). Currently, only a change - /// in the rollout note, the state of any of its tasks, or - /// the rollout dispatch time are considered meaningful changes. - pub async fn get_rollout_data( - &self, - max_rollouts: usize, - ) -> Result<(Rollouts, bool), RolloutDataGatherError> { - let mut cache = self.cache.lock().await; - - let dag_id = "rollout_ic_os_to_mainnet_subnets"; - let dag_runs = self - .airflow_api - .dag_runs(dag_id, max_rollouts, 0, None, None) - .await?; - let tasks = self.airflow_api.tasks(dag_id).await?; - // Note: perhaps if the number of tasks, or the names of tasks - // have changed, we would want to reset the task instance cache - // and re-request everything again. - let sorter = TaskInstanceTopologicalSorter::new(tasks)?; + if cache_entry.note != dag_run.note { + meaningful_updates_to_this_rollout = true; + cache_entry.note.clone_from(&dag_run.note); + } + // Same for the dispatch time. + if cache_entry.dispatch_time != dag_run.logical_date { + meaningful_updates_to_this_rollout = true; + cache_entry.dispatch_time = dag_run.logical_date; + } - let mut res: Rollouts = vec![]; - // Track if any rollout has had any meaningful changes. - // Also see function documentation about meaningful changes. - let mut meaningful_updates_to_any_rollout = false; + type TaskInstanceResponse = Result, AirflowError>; - for dag_run in dag_runs.dag_runs.iter() { - let cache_entry = cache - .by_dag_run - .entry(dag_run.dag_run_id.clone()) - .or_insert(RolloutDataCache { - task_instances: HashMap::new(), - dispatch_time: dag_run.logical_date, - note: dag_run.note.clone(), - schedule: ScheduleCache::Empty, - last_update_time: None, - }); + let last_update_time = cache_entry.last_update_time; + let dag_id = dag_run.dag_id.as_str(); + let dag_run_id = dag_run.dag_run_id.as_str(); - type TaskInstanceResponse = Result, AirflowError>; - - let last_update_time = cache_entry.last_update_time; - let now = Utc::now(); - let requests: Vec + Send>>> = vec![ - Box::pin(async move { - match self - .airflow_api - .task_instances( - dag_id, - dag_run.dag_run_id.as_str(), - TASK_INSTANCE_LIST_LIMIT, - 0, - TaskInstanceRequestFilters::default() - .executed_on_or_after(last_update_time), - ) - .await - { - Ok(r) => Ok(r.task_instances), - Err(e) => Err(e), - } - }), - Box::pin(async move { - match last_update_time { - None => Ok(vec![]), - Some(_) => { - match self - .airflow_api - .task_instances( - dag_id, - dag_run.dag_run_id.as_str(), - TASK_INSTANCE_LIST_LIMIT, - 0, - TaskInstanceRequestFilters::default() - .updated_on_or_after(last_update_time), + let requests: Vec + Send>>> = + match dag_run_update_type { + DagRunUpdateType::AllTaskInstances => { + debug!(target:"frontend_api::get_rollout_data", "{}: collecting data about all task instances", dag_run.dag_run_id); + vec![Box::pin(async move { + match airflow_api + .task_instances( + &dag_id, + dag_run.dag_run_id.as_str(), + TASK_INSTANCE_LIST_LIMIT, + 0, + TaskInstanceRequestFilters::default(), + ) + .await + { + Ok(r) => Ok(r.task_instances), + Err(e) => Err(e), + } + })] + } + DagRunUpdateType::SomeTaskInstances(updated_task_instances) => { + let updated_task_instances = + updated_task_instances.iter().cloned().collect::>(); + debug!(target:"frontend_api::get_rollout_data", "{}: collecting data about task instances updated since {:?} and a specific set of tasks too: {:?}", dag_run_id, last_update_time, updated_task_instances); + vec![ + Box::pin(async move { + match airflow_api + .task_instances_batch( + Some(vec![dag_id.to_string()]), + Some(vec![dag_run_id.to_string()]), + Some(updated_task_instances), ) .await { Ok(r) => Ok(r.task_instances), Err(e) => Err(e), } - } - } - }), - Box::pin(async move { - match last_update_time { - None => Ok(vec![]), - Some(_) => { - match self - .airflow_api + }), + Box::pin(async move { + match airflow_api .task_instances( dag_id, - dag_run.dag_run_id.as_str(), + dag_run_id, TASK_INSTANCE_LIST_LIMIT, 0, TaskInstanceRequestFilters::default() - .ended_on_or_after(last_update_time), + .executed_on_or_after(last_update_time), ) .await { Ok(r) => Ok(r.task_instances), Err(e) => Err(e), } - } - } - }), - ]; - - let results = join_all(requests).await; - let mut all_task_instances: Vec = vec![]; - for r in results.into_iter() { - all_task_instances.append(&mut r?) - } - - debug!( - target: "frontend_api", "{}: Undeduplicated tasks: {}", - dag_run.dag_run_id, all_task_instances.len() - ); - - let rollout_had_changed_tasks = match all_task_instances.is_empty() { - true => false, - false => { - // At least one task has updated or finished. - // See function documentation about meaningful changes. - meaningful_updates_to_any_rollout = true; - // Now remember this rollout was updated. - true + }), + Box::pin(async move { + match last_update_time { + None => Ok(vec![]), + Some(_) => { + match airflow_api + .task_instances( + dag_id, + dag_run_id, + TASK_INSTANCE_LIST_LIMIT, + 0, + TaskInstanceRequestFilters::default() + .updated_on_or_after(last_update_time), + ) + .await + { + Ok(r) => Ok(r.task_instances), + Err(e) => Err(e), + } + } + } + }), + Box::pin(async move { + match last_update_time { + None => Ok(vec![]), + Some(_) => { + match airflow_api + .task_instances( + dag_id, + dag_run_id, + TASK_INSTANCE_LIST_LIMIT, + 0, + TaskInstanceRequestFilters::default() + .ended_on_or_after(last_update_time), + ) + .await + { + Ok(r) => Ok(r.task_instances), + Err(e) => Err(e), + } + } + } + }), + ] } }; - // If the note of the rollout has changed, - // note that this has been updated. - if cache_entry.note != dag_run.note { - meaningful_updates_to_any_rollout = true; - cache_entry.note.clone_from(&dag_run.note); - } - // Same for the dispatch time. - if cache_entry.dispatch_time != dag_run.logical_date { - meaningful_updates_to_any_rollout = true; - cache_entry.dispatch_time = dag_run.logical_date; - } + let mut retrieved_task_instances: Vec = vec![]; + for r in join_all(requests).await.into_iter() { + retrieved_task_instances.append(&mut r?) + } - let mut rollout = Rollout::new( - dag_run.dag_run_id.to_string(), - { - let mut display_url = self - .airflow_api - .as_ref() - .url - .join(format!("/dags/{}/grid", dag_run.dag_id).as_str()) - .unwrap(); - display_url - .query_pairs_mut() - .append_pair("dag_run_id", dag_run.dag_run_id.as_str()); - display_url.to_string() - }, - // Use recently-updated cache values here. - // See function documentation about meaningful changes. - cache_entry.note.clone(), - cache_entry.dispatch_time, - dag_run.last_scheduling_decision, - dag_run.conf.clone(), - ); + debug!( + target: "frontend_api::get_rollout_data", "{}: retrieved {} tasks", + dag_run_id, retrieved_task_instances.len() + ); - // Let's update the cache to incorporate the most up-to-date task instances. - for task_instance in all_task_instances.into_iter() { - let task_instance_id = task_instance.task_id.clone(); + if !retrieved_task_instances.is_empty() { + // At least one task has updated or finished. + // See function documentation about meaningful changes. + meaningful_updates_to_this_rollout = true; + }; - let by_name = cache_entry - .task_instances - .entry(task_instance_id) - .or_insert(HashMap::new()); + let mut rollout = Rollout::new( + dag_run.dag_run_id.to_string(), + { + let mut display_url = airflow_api + .url + .join(format!("/dags/{}/grid", dag_run.dag_id).as_str()) + .unwrap(); + display_url + .query_pairs_mut() + .append_pair("dag_run_id", dag_run.dag_run_id.as_str()); + display_url.to_string() + }, + // Use recently-updated cache values here. + // See function documentation about meaningful changes. + cache_entry.note.clone(), + cache_entry.dispatch_time, + dag_run.last_scheduling_decision, + dag_run.conf.clone(), + ); - match by_name.entry(task_instance.map_index) { - Vacant(entry) => { - entry.insert(task_instance); - } - Occupied(mut entry) => { - if task_instance.latest_date() > entry.get().latest_date() { - entry.insert(task_instance.clone()); - } - } - }; - } + // Let's update the cache to incorporate the most up-to-date task instances. + let mut new_last_update_time = max_option_date(last_update_time, last_event_log_update); + for task_instance in retrieved_task_instances.into_iter() { + let task_instance_id = task_instance.task_id.clone(); + new_last_update_time = + max_option_date(new_last_update_time, Some(task_instance.latest_date())); - for (task_instance_id, tasks) in cache_entry.task_instances.iter_mut() { - // Delete data on all unmapped tasks if a mapped task sibling is present. - if tasks.len() > 1 { - if let Occupied(_) = tasks.entry(None) { - debug!( - target: "frontend_api", "Formerly unmapped task {} is now mapped", - task_instance_id - ); - tasks.remove(&None); + let by_name = cache_entry + .task_instances + .entry(task_instance_id) + .or_insert(HashMap::new()); + + match by_name.entry(task_instance.map_index) { + Vacant(entry) => { + entry.insert(task_instance); + } + Occupied(mut entry) => { + if task_instance.latest_date() > entry.get().latest_date() { + entry.insert(task_instance.clone()); } } - } + }; + } - let linearized_tasks: Vec = cache_entry - .task_instances - .iter() - .flat_map(|(_, tasks)| tasks.iter().map(|(_, task)| task.clone())) - .collect(); + for (task_instance_id, tasks) in cache_entry.task_instances.iter_mut() { + // Delete data on all unmapped tasks if a mapped task sibling is present. + if tasks.len() > 1 { + if let Occupied(_) = tasks.entry(None) { + debug!( + target: "frontend_api::get_rollout_data", "formerly unmapped task {} is now mapped", + task_instance_id + ); + tasks.remove(&None); + } + } + } - debug!( - target: "frontend_api", "{}: Total disambiguated tasks including locally cached ones: {}", - dag_run.dag_run_id, linearized_tasks.len(), - ); + let linearized_tasks: Vec = cache_entry + .task_instances + .iter() + .flat_map(|(_, tasks)| tasks.iter().map(|(_, task)| task.clone())) + .collect(); - // Now update rollout and batch state based on the obtained data. - // What this process does is fairly straightforward: - // * for each and every known up-to-date Airflow task in the cache - // (always processed in topological order), - for task_instance in sorter.sort_instances(linearized_tasks.into_iter()) { - // * deduce the rollout plan, if available, - // * mark the rollout as having problems or errors depending on what - // the task state is, or as one of the various running states, if - // any non-subnet-related task is running / pending. - // * handle tasks corresponding to a batch/subnet in a special way - // (commented below in its pertinent section). - debug!( - target: "frontend_api", "Processing task {}.{:?} in state {:?}", - task_instance.task_id, task_instance.map_index, task_instance.state, - ); - if task_instance.task_id == "schedule" { - match task_instance.state { - Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (), - Some(TaskInstanceState::UpForRetry) - | Some(TaskInstanceState::Restarting) => { - rollout.state = RolloutState::Problem; - } - Some(TaskInstanceState::Failed) - | Some(TaskInstanceState::UpstreamFailed) => { - rollout.state = RolloutState::Failed; - } - Some(TaskInstanceState::UpForReschedule) - | Some(TaskInstanceState::Running) - | Some(TaskInstanceState::Deferred) - | Some(TaskInstanceState::Queued) - | Some(TaskInstanceState::Scheduled) - | None => rollout.state = min(rollout.state, RolloutState::Preparing), - Some(TaskInstanceState::Success) => { - if let ScheduleCache::Valid(try_number, _) = cache_entry.schedule { - if try_number != task_instance.try_number { - info!(target: "frontend_api", "{}: resetting schedule cache", dag_run.dag_run_id); - // Another task run of the same task has executed. We must clear the cache entry. - cache_entry.schedule = ScheduleCache::Empty; - } - } - let schedule_string = match &cache_entry.schedule { - ScheduleCache::Valid(_, s) => s, - ScheduleCache::Empty => { - let value = self - .airflow_api - .xcom_entry( - dag_id, - dag_run.dag_run_id.as_str(), - task_instance.task_id.as_str(), - task_instance.map_index, - "return_value", - ) - .await; - let schedule = match value { - Ok(schedule) => { - cache_entry.schedule = ScheduleCache::Valid( - task_instance.try_number, - schedule.value.clone(), - ); - info!(target: "frontend_api", "{}: saving schedule cache", dag_run.dag_run_id); - schedule.value - } - Err(AirflowError::StatusCode( - reqwest::StatusCode::NOT_FOUND, - )) => { - // There is no schedule to be found. - // Or there was no schedule to be found last time - // it was queried. - warn!(target: "frontend_api", "{}: no schedule despite schedule task finished", dag_run.dag_run_id); - cache_entry.schedule = ScheduleCache::Empty; - continue; - } - Err(e) => { - return Err(RolloutDataGatherError::AirflowError(e)); - } - }; - &schedule.clone() - } - }; - let schedule = - RolloutPlan::from_python_string(schedule_string.clone())?; - rollout.batches = schedule.batches; - } - } - } else if task_instance.task_id == "wait_for_other_rollouts" - || task_instance.task_id == "wait_for_revision_to_be_elected" - || task_instance.task_id == "revisions" - { - match task_instance.state { - Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (), - Some(TaskInstanceState::UpForRetry) - | Some(TaskInstanceState::Restarting) => { - rollout.state = RolloutState::Problem; - } - Some(TaskInstanceState::Failed) - | Some(TaskInstanceState::UpstreamFailed) => { - rollout.state = RolloutState::Failed; - } - Some(TaskInstanceState::UpForReschedule) - | Some(TaskInstanceState::Running) - | Some(TaskInstanceState::Deferred) - | Some(TaskInstanceState::Queued) - | Some(TaskInstanceState::Scheduled) - | None => rollout.state = min(rollout.state, RolloutState::Waiting), - Some(TaskInstanceState::Success) => {} - } - } else if let Some(captured) = - BatchIdentificationRe.captures(task_instance.task_id.as_str()) - { - // Handling of subnet state: - // * for each Airflow task that pertains to a rollout batch, - // * if its state in cache differs (or in some cases is higher) from the - // corresponding subnet state, upgrade the subnet state to be the correct - // state, - // * update the subnet link to the corresponding Airflow task if the - // state of the task (after update) corresponds to the expected state, - // * update rollout state to problem / error depending on the task state. - trace!(target: "subnet_state", "{}: processing {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); - let (batch, task_name) = ( - // We get away with unwrap() here because we know we captured an integer. - match rollout - .batches - .get_mut(&usize::from_str(&captured[1]).unwrap()) - { - Some(batch) => batch, - None => { - trace!(target: "subnet_state", "{}: no corresponding batch, continuing", task_instance.dag_run_id); - continue; - } - }, - &captured[2], - ); + debug!( + target: "frontend_api::get_rollout_data", "{}: total disambiguated tasks including locally cached ones: {}", + dag_run.dag_run_id, linearized_tasks.len(), + ); - macro_rules! trans_min { - ($input:expr) => { - annotate_subnet_state( - batch, - $input, - &task_instance, - &self.airflow_api.as_ref().url, - true, - ) - }; + // Now update rollout and batch state based on the obtained data. + // What this process does is fairly straightforward: + // * for each and every known up-to-date Airflow task in the cache + // (always processed in topological order), + for task_instance in sorter.sort_instances(linearized_tasks.into_iter()) { + // * deduce the rollout plan, if available, + // * mark the rollout as having problems or errors depending on what + // the task state is, or as one of the various running states, if + // any non-subnet-related task is running / pending. + // * handle tasks corresponding to a batch/subnet in a special way + // (commented below in its pertinent section). + trace!( + target: "frontend_api::get_rollout_data", "Processing task {}.{:?} in state {:?}", + task_instance.task_id, task_instance.map_index, task_instance.state, + ); + if task_instance.task_id == "schedule" { + match task_instance.state { + Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (), + Some(TaskInstanceState::UpForRetry) | Some(TaskInstanceState::Restarting) => { + rollout.state = RolloutState::Problem; } - macro_rules! trans_exact { - ($input:expr) => { - annotate_subnet_state( - batch, - $input, - &task_instance, - &self.airflow_api.as_ref().url, - false, - ) - }; + Some(TaskInstanceState::Failed) | Some(TaskInstanceState::UpstreamFailed) => { + rollout.state = RolloutState::Failed; } - - // FIXME: perhaps we want to destructure both the task name - // and the task state here. - match &task_instance.state { - None => { - if task_name == "collect_batch_subnets" { - trans_exact!(SubnetRolloutState::Pending); - } else { - trace!(target: "subnet_state", "{}: ignoring task instance {} {:?} with no state", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index); + Some(TaskInstanceState::UpForReschedule) + | Some(TaskInstanceState::Running) + | Some(TaskInstanceState::Deferred) + | Some(TaskInstanceState::Queued) + | Some(TaskInstanceState::Scheduled) + | None => rollout.state = min(rollout.state, RolloutState::Preparing), + Some(TaskInstanceState::Success) => { + if let ScheduleCache::Valid(try_number, _) = &cache_entry.schedule { + if *try_number != task_instance.try_number { + info!(target: "frontend_api::get_rollout_data", "{}: resetting schedule cache", dag_run.dag_run_id); + // Another task run of the same task has executed. We must clear the cache entry. + cache_entry.schedule = ScheduleCache::Empty; } } - Some(state) => match state { - // https://stackoverflow.com/questions/53654302/tasks-are-moved-to-removed-state-in-airflow-when-they-are-queued-and-not-restore - // If a task is removed, we cannot decide rollout state based on it. - // https://stackoverflow.com/questions/77426996/skipping-a-task-in-airflow - // If a task is skipped, the next task (in state Running / Deferred) - // will pick up the slack for changing subnet state. - TaskInstanceState::Removed | TaskInstanceState::Skipped => { - trace!(target: "subnet_state", "{}: ignoring task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); - } - TaskInstanceState::UpForRetry | TaskInstanceState::Restarting => { - trans_min!(SubnetRolloutState::Error); - rollout.state = min(rollout.state, RolloutState::Problem) - } - TaskInstanceState::Failed => { - trans_min!(SubnetRolloutState::Error); - rollout.state = min(rollout.state, RolloutState::Failed) - } - TaskInstanceState::UpstreamFailed => { - trans_min!(SubnetRolloutState::PredecessorFailed); - rollout.state = min(rollout.state, RolloutState::Failed) - } - TaskInstanceState::UpForReschedule - | TaskInstanceState::Running - | TaskInstanceState::Deferred - | TaskInstanceState::Queued - | TaskInstanceState::Scheduled => { - match task_name { - "collect_batch_subnets" => { - trans_min!(SubnetRolloutState::Pending); - } - "wait_until_start_time" => { - trans_min!(SubnetRolloutState::Waiting); - } - "wait_for_preconditions" => { - trans_min!(SubnetRolloutState::Waiting); - } - "create_proposal_if_none_exists" => { - trans_min!(SubnetRolloutState::Proposing); + let schedule_string = match &cache_entry.schedule { + ScheduleCache::Valid(_, s) => s, + ScheduleCache::Empty => { + let value = airflow_api + .xcom_entry( + &dag_id, + dag_run.dag_run_id.as_str(), + task_instance.task_id.as_str(), + task_instance.map_index, + "return_value", + ) + .await; + let schedule = match value { + Ok(schedule) => { + cache_entry.schedule = ScheduleCache::Valid( + task_instance.try_number, + schedule.value.clone(), + ); + info!(target: "frontend_api::get_rollout_data", "{}: saving schedule cache", dag_run.dag_run_id); + schedule.value } - "request_proposal_vote" => { - // We ignore this one for the purposes of rollout state setup. + Err(AirflowError::StatusCode( + reqwest::StatusCode::NOT_FOUND, + )) => { + // There is no schedule to be found. + // Or there was no schedule to be found last time + // it was queried. + warn!(target: "frontend_api::get_rollout_data", "{}: no schedule despite schedule task finished", dag_run.dag_run_id); + cache_entry.schedule = ScheduleCache::Empty; + continue; } - "wait_until_proposal_is_accepted" => { - trans_min!(SubnetRolloutState::WaitingForElection); + Err(e) => { + return Err(RolloutDataGatherError::AirflowError(e)); } - "wait_for_replica_revision" => { - trans_min!(SubnetRolloutState::WaitingForAdoption); - } - "wait_until_no_alerts" => { - trans_min!(SubnetRolloutState::WaitingForAlertsGone); - } - "join" => { - trans_min!(SubnetRolloutState::Complete); - } - &_ => { - warn!(target: "subnet_state", "{}: no info on to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); - } - } - rollout.state = min(rollout.state, RolloutState::UpgradingSubnets) + }; + &schedule.clone() } - TaskInstanceState::Success => match task_name { - // Tasks corresponding to a subnet that are in state Success - // require somewhat different handling than tasks in states - // Running et al. For once, when a task is successful, - // the subnet state must be set to the *next* state it *would* - // have, if the /next/ task had /already begun executing/. - // - // To give an example: if `wait_until_start_time`` is Success, - // the subnet state is no longer "waiting until start time", - // but rather should be "creating proposal", even though - // perhaps `create_proposal_if_none_exists`` /has not yet run/ - // because we know certainly that the - // `create_proposal_if_none_exists` task is /about to run/ - // anyway. - // - // The same principle applies for all tasks -- if the current - // task is successful, we set the state of the subnet to the - // expected state that corresponds to the successor task. - // - // We could avoid encoding this type of knowledge here, by - // having a table of Airflow tasks vs. expected subnet states, - // and as a special case, on the task Success case, look up the - // successor task on the table to decide what subnet state to - // assign, but this would require a data structure different - // from the current (a vector of ordered task instances) to - // iterate over. This refactor may happen in the future, and - // it will require extra tests to ensure that invariants have - // been preserved between this code (which works well) and - // the future rewrite. + }; + let schedule = RolloutPlan::from_python_string(schedule_string.clone())?; + rollout.batches = schedule.batches; + } + } + } else if task_instance.task_id == "wait_for_other_rollouts" + || task_instance.task_id == "wait_for_revision_to_be_elected" + || task_instance.task_id == "revisions" + { + match task_instance.state { + Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (), + Some(TaskInstanceState::UpForRetry) | Some(TaskInstanceState::Restarting) => { + rollout.state = RolloutState::Problem; + } + Some(TaskInstanceState::Failed) | Some(TaskInstanceState::UpstreamFailed) => { + rollout.state = RolloutState::Failed; + } + Some(TaskInstanceState::UpForReschedule) + | Some(TaskInstanceState::Running) + | Some(TaskInstanceState::Deferred) + | Some(TaskInstanceState::Queued) + | Some(TaskInstanceState::Scheduled) + | None => rollout.state = min(rollout.state, RolloutState::Waiting), + Some(TaskInstanceState::Success) => {} + } + } else if let Some(captured) = + BatchIdentificationRe.captures(task_instance.task_id.as_str()) + { + // Handling of subnet state: + // * for each Airflow task that pertains to a rollout batch, + // * if its state in cache differs (or in some cases is higher) from the + // corresponding subnet state, upgrade the subnet state to be the correct + // state, + // * update the subnet link to the corresponding Airflow task if the + // state of the task (after update) corresponds to the expected state, + // * update rollout state to problem / error depending on the task state. + trace!(target: "frontend_api::get_rollout_data::subnet_state", "{}: processing {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); + let (batch, task_name) = ( + // We get away with unwrap() here because we know we captured an integer. + match rollout + .batches + .get_mut(&usize::from_str(&captured[1]).unwrap()) + { + Some(batch) => batch, + None => { + trace!(target: "frontend_api::get_rollout_data::subnet_state", "{}: no corresponding batch, continuing", task_instance.dag_run_id); + continue; + } + }, + &captured[2], + ); + + macro_rules! trans_min { + ($input:expr) => { + annotate_subnet_state(batch, $input, &task_instance, &airflow_api.url, true) + }; + } + macro_rules! trans_exact { + ($input:expr) => { + annotate_subnet_state( + batch, + $input, + &task_instance, + &airflow_api.url, + false, + ) + }; + } + + // FIXME: perhaps we want to destructure both the task name + // and the task state here. + match &task_instance.state { + None => { + if task_name == "collect_batch_subnets" { + trans_exact!(SubnetRolloutState::Pending); + } else { + trace!(target: "frontend_api::get_rollout_data::subnet_state", "{}: ignoring task instance {} {:?} with no state", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index); + } + } + Some(state) => match state { + // https://stackoverflow.com/questions/53654302/tasks-are-moved-to-removed-state-in-airflow-when-they-are-queued-and-not-restore + // If a task is removed, we cannot decide rollout state based on it. + // https://stackoverflow.com/questions/77426996/skipping-a-task-in-airflow + // If a task is skipped, the next task (in state Running / Deferred) + // will pick up the slack for changing subnet state. + TaskInstanceState::Removed | TaskInstanceState::Skipped => { + trace!(target: "frontend_api::get_rollout_data::subnet_state", "{}: ignoring task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); + } + TaskInstanceState::UpForRetry | TaskInstanceState::Restarting => { + trans_min!(SubnetRolloutState::Error); + rollout.state = min(rollout.state, RolloutState::Problem) + } + TaskInstanceState::Failed => { + trans_min!(SubnetRolloutState::Error); + rollout.state = min(rollout.state, RolloutState::Failed) + } + TaskInstanceState::UpstreamFailed => { + trans_min!(SubnetRolloutState::PredecessorFailed); + rollout.state = min(rollout.state, RolloutState::Failed) + } + TaskInstanceState::UpForReschedule + | TaskInstanceState::Running + | TaskInstanceState::Deferred + | TaskInstanceState::Queued + | TaskInstanceState::Scheduled => { + match task_name { "collect_batch_subnets" => { - trans_min!(SubnetRolloutState::Waiting); + trans_min!(SubnetRolloutState::Pending); } "wait_until_start_time" => { - batch.actual_start_time = match task_instance.end_date { - None => batch.actual_start_time, - Some(end_date) => { - if batch.actual_start_time.is_none() { - Some(end_date) - } else { - let stime = batch.actual_start_time.unwrap(); - Some(min(stime, end_date)) - } - } - }; - trans_exact!(SubnetRolloutState::Waiting); + trans_min!(SubnetRolloutState::Waiting); } "wait_for_preconditions" => { - trans_exact!(SubnetRolloutState::Proposing); + trans_min!(SubnetRolloutState::Waiting); } "create_proposal_if_none_exists" => { - trans_exact!(SubnetRolloutState::WaitingForElection); + trans_min!(SubnetRolloutState::Proposing); } "request_proposal_vote" => { // We ignore this one for the purposes of rollout state setup. } "wait_until_proposal_is_accepted" => { - trans_exact!(SubnetRolloutState::WaitingForAdoption); + trans_min!(SubnetRolloutState::WaitingForElection); } "wait_for_replica_revision" => { - trans_exact!(SubnetRolloutState::WaitingForAlertsGone); + trans_min!(SubnetRolloutState::WaitingForAdoption); } "wait_until_no_alerts" => { - trans_exact!(SubnetRolloutState::Complete); + trans_min!(SubnetRolloutState::WaitingForAlertsGone); } "join" => { - trans_exact!(SubnetRolloutState::Complete); - batch.end_time = task_instance.end_date; + trans_min!(SubnetRolloutState::Complete); } &_ => { - warn!(target: "subnet_state", "{}: no info on how to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); + warn!(target: "frontend_api::get_rollout_data::subnet_state", "{}: no info on to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); } - }, + } + rollout.state = min(rollout.state, RolloutState::UpgradingSubnets) + } + TaskInstanceState::Success => match task_name { + // Tasks corresponding to a subnet that are in state Success + // require somewhat different handling than tasks in states + // Running et al. For once, when a task is successful, + // the subnet state must be set to the *next* state it *would* + // have, if the /next/ task had /already begun executing/. + // + // To give an example: if `wait_until_start_time`` is Success, + // the subnet state is no longer "waiting until start time", + // but rather should be "creating proposal", even though + // perhaps `create_proposal_if_none_exists`` /has not yet run/ + // because we know certainly that the + // `create_proposal_if_none_exists` task is /about to run/ + // anyway. + // + // The same principle applies for all tasks -- if the current + // task is successful, we set the state of the subnet to the + // expected state that corresponds to the successor task. + // + // We could avoid encoding this type of knowledge here, by + // having a table of Airflow tasks vs. expected subnet states, + // and as a special case, on the task Success case, look up the + // successor task on the table to decide what subnet state to + // assign, but this would require a data structure different + // from the current (a vector of ordered task instances) to + // iterate over. This refactor may happen in the future, and + // it will require extra tests to ensure that invariants have + // been preserved between this code (which works well) and + // the future rewrite. + "collect_batch_subnets" => { + trans_min!(SubnetRolloutState::Waiting); + } + "wait_until_start_time" => { + batch.actual_start_time = match task_instance.end_date { + None => batch.actual_start_time, + Some(end_date) => { + if batch.actual_start_time.is_none() { + Some(end_date) + } else { + let stime = batch.actual_start_time.unwrap(); + Some(min(stime, end_date)) + } + } + }; + trans_exact!(SubnetRolloutState::Waiting); + } + "wait_for_preconditions" => { + trans_exact!(SubnetRolloutState::Proposing); + } + "create_proposal_if_none_exists" => { + trans_exact!(SubnetRolloutState::WaitingForElection); + } + "request_proposal_vote" => { + // We ignore this one for the purposes of rollout state setup. + } + "wait_until_proposal_is_accepted" => { + trans_exact!(SubnetRolloutState::WaitingForAdoption); + } + "wait_for_replica_revision" => { + trans_exact!(SubnetRolloutState::WaitingForAlertsGone); + } + "wait_until_no_alerts" => { + trans_exact!(SubnetRolloutState::Complete); + } + "join" => { + trans_exact!(SubnetRolloutState::Complete); + batch.end_time = task_instance.end_date; + } + &_ => { + warn!(target: "frontend_api::get_rollout_data::subnet_state", "{}: no info on how to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); + } }, + }, + } + } else if task_instance.task_id == "upgrade_unassigned_nodes" { + match task_instance.state { + Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (), + Some(TaskInstanceState::UpForRetry) | Some(TaskInstanceState::Restarting) => { + rollout.state = RolloutState::Problem } - } else if task_instance.task_id == "upgrade_unassigned_nodes" { - match task_instance.state { - Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (), - Some(TaskInstanceState::UpForRetry) - | Some(TaskInstanceState::Restarting) => { - rollout.state = RolloutState::Problem - } - Some(TaskInstanceState::Failed) - | Some(TaskInstanceState::UpstreamFailed) => { - rollout.state = RolloutState::Failed - } - Some(TaskInstanceState::UpForReschedule) - | Some(TaskInstanceState::Running) - | Some(TaskInstanceState::Deferred) - | Some(TaskInstanceState::Queued) - | Some(TaskInstanceState::Scheduled) - | Some(TaskInstanceState::Success) - | None => { - rollout.state = - min(rollout.state, RolloutState::UpgradingUnassignedNodes) - } + Some(TaskInstanceState::Failed) | Some(TaskInstanceState::UpstreamFailed) => { + rollout.state = RolloutState::Failed + } + Some(TaskInstanceState::UpForReschedule) + | Some(TaskInstanceState::Running) + | Some(TaskInstanceState::Deferred) + | Some(TaskInstanceState::Queued) + | Some(TaskInstanceState::Scheduled) + | Some(TaskInstanceState::Success) + | None => { + rollout.state = min(rollout.state, RolloutState::UpgradingUnassignedNodes) } - } else { - warn!(target: "frontend_api", "{}: unknown task {}", task_instance.dag_run_id, task_instance.task_id) } + } else { + warn!(target: "frontend_api::get_rollout_data::subnet_state", "{}: unknown task {}", task_instance.dag_run_id, task_instance.task_id) } + } - if let Some(state) = Some(&dag_run.state) { - match state { - DagRunState::Success => rollout.state = RolloutState::Complete, - DagRunState::Failed => rollout.state = RolloutState::Failed, - _ => (), - } + if let Some(state) = Some(&dag_run.state) { + match state { + DagRunState::Success => rollout.state = RolloutState::Complete, + DagRunState::Failed => rollout.state = RolloutState::Failed, + _ => (), } + } - if rollout_had_changed_tasks { - // We bump the cache entry's last update time, to only retrieve - // tasks from this point in time on during subsequent retrievals. - // We only do this at the end, in case any code above returns - // early, to force a full state recalculation if there was a - // failure or an early return. - cache_entry.last_update_time = Some(now); - } + // We bump the cache entry's last update time, to only retrieve + // tasks from this point in time on during subsequent retrievals. + // We only do this at the end, in case any code above returns + // early, to force a full state recalculation if there was a + // failure or an early return. + cache_entry.last_update_time = new_last_update_time; + + Ok(( + meaningful_updates_to_this_rollout, + rollout, + cache_entry.clone(), + )) + } +} +#[derive(Clone)] +pub struct RolloutApi { + airflow_api: Arc, + cache: Arc>, +} + +impl RolloutApi { + pub fn new(client: AirflowClient) -> Self { + Self { + airflow_api: Arc::new(client), + cache: Arc::new(Mutex::new(RolloutApiCache { + by_dag_run: HashMap::new(), + log_inspector: AirflowIncrementalLogInspector::default(), + })), + } + } + + pub async fn get_cache(&self) -> Vec { + let cache = self.cache.lock().await; + let mut result: Vec<_> = cache + .by_dag_run + .iter() + .map(|(k, v)| { + let linearized_tasks = v + .task_instances + .iter() + .flat_map(|(_, tasks)| tasks.iter().map(|(_, task)| task.clone())) + .collect(); + RolloutDataCacheResponse { + rollout_id: k.clone(), + linearized_task_instances: linearized_tasks, + dispatch_time: v.dispatch_time, + last_update_time: v.last_update_time, + schedule: v.schedule.clone(), + } + }) + .collect(); + drop(cache); + result.sort_by_key(|v| v.dispatch_time); + result.reverse(); + result + } + /// Retrieve all rollout data, using a cache to avoid + /// re-fetching task instances not updated since last time. + /// + /// Returns a tuple of the the rollout data and a flag + /// indicating if the rollout data was updated since + /// the last time. The flag should be used by calling + /// code to decide whether to send data to clients or not. + /// + /// The rollout structure itself is updated on every call + /// for every DAG run. However, not every change in the DAG + /// run is considered to be a meaningful change (causing a + /// true return in the update flag). Currently, only a change + /// in the rollout note, the state of any of its tasks, or + /// the rollout dispatch time are considered meaningful changes. + pub async fn get_rollout_data( + &self, + max_rollouts: usize, + ) -> Result<(Rollouts, bool), RolloutDataGatherError> { + let mut cache = self.cache.lock().await; + let dag_id = "rollout_ic_os_to_mainnet_subnets"; + + // Query the log to see what has changed that isn't expressed in the + // task updated, started or finished fields. + let (updated_log_inspector, dag_run_update_types) = cache + .log_inspector + .incrementally_detect_dag_updates(&self.airflow_api, dag_id) + .await?; + + // Retrieve the latest X DAG runs. + let dag_runs = self + .airflow_api + .dag_runs(dag_id, max_rollouts, 0, None, None) + .await?; + + // Get the tasks of the DAG, and assemble them into a topological + // sorter, so we can process the task instances in the right order. + // Note: perhaps if the number of tasks, or the names of tasks + // have changed, we would want to reset the task instance cache + // and re-request everything again. + let sorter = TaskInstanceTopologicalSorter::new(self.airflow_api.tasks(dag_id).await?)?; + + let mut updaters = dag_runs + .dag_runs + .iter() + .map(|dag_run| RolloutUpdater { + dag_run, + update_type: dag_run_update_types.update_type(&dag_run.dag_run_id), + cache_entry: cache + .by_dag_run + .get(&dag_run.dag_run_id) + .map(ToOwned::to_owned) + .unwrap_or(RolloutDataCache { + task_instances: HashMap::new(), + dispatch_time: dag_run.logical_date, + note: dag_run.note.clone(), + schedule: ScheduleCache::Empty, + last_update_time: None, + }), + }) + .collect::>(); + + let updateds = updaters + .iter_mut() + .map(|dag_and_cache| { + let sorter = sorter.clone(); + dag_and_cache.update( + &self.airflow_api, + sorter, + cache.log_inspector.last_event_log_update, + ) + }) + .collect::>(); + + // Track if any rollout has had any meaningful changes. + // Also see function documentation about meaningful changes. + let mut meaningful_updates_to_any_rollout = false; + let mut res: Rollouts = vec![]; + for fut in updateds.into_iter() { + let (updated, rollout, cache_entry) = fut.await?; + meaningful_updates_to_any_rollout = updated || meaningful_updates_to_any_rollout; + cache.by_dag_run.insert(rollout.name.clone(), cache_entry); res.push(rollout); } + // Save the state of the log inspector after everything was successful. + cache.log_inspector = updated_log_inspector; + Ok((res, meaningful_updates_to_any_rollout)) } } diff --git a/rollout-dashboard/server/src/main.rs b/rollout-dashboard/server/src/main.rs index af456d7..1458ddf 100644 --- a/rollout-dashboard/server/src/main.rs +++ b/rollout-dashboard/server/src/main.rs @@ -116,10 +116,10 @@ impl Server { match d { Ok((new_rollouts, updated)) => { let loop_delta_time = Utc::now() - loop_start_time; - info!(target: "update_loop", "After {}, obtained {} rollouts from Airflow (updated: {})", loop_delta_time, new_rollouts.len(), updated); + info!(target: "server::update_loop", "After {}, obtained {} rollouts from Airflow (updated: {})", loop_delta_time, new_rollouts.len(), updated); changed = updated; if errored { - info!(target: "update_loop", "Successfully processed rollout data again after temporary error"); + info!(target: "server::update_loop", "Successfully processed rollout data again after temporary error"); // Clear error flag. errored = false; // Ensure our data structure is overwritten by whatever data we obtained after the last loop. @@ -129,7 +129,7 @@ impl Server { } Err(res) => { error!( - target: "update_loop", "After processing fetch_rollout_data: {}", + target: "server::update_loop", "After processing fetch_rollout_data: {}", res.1 ); errored = true; @@ -171,13 +171,13 @@ impl Server { } fn produce_rollouts_sse_stream(&self) -> Sse>> { - debug!(target: "sse", "New client connected."); + debug!(target: "server::sse", "New client connected."); struct DisconnectionGuard {} impl Drop for DisconnectionGuard { fn drop(&mut self) { - debug!(target: "sse", "Client disconnected."); + debug!(target: "server::sse", "Client disconnected."); } } @@ -205,7 +205,7 @@ impl Server { loop { if stream_rx.changed().await.is_err() { - debug!(target: "sse", "No more transmissions. Stopping client SSE streaming."); + debug!(target: "server::sse", "No more transmissions. Stopping client SSE streaming."); break; } let current_rollout_status = &stream_rx.borrow_and_update().clone();