diff --git a/rollout-dashboard/server/Cargo.lock b/rollout-dashboard/server/Cargo.lock index 25f2454..9df9f98 100644 --- a/rollout-dashboard/server/Cargo.lock +++ b/rollout-dashboard/server/Cargo.lock @@ -1063,6 +1063,12 @@ dependencies = [ "psl-types", ] +[[package]] +name = "querystring" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9318ead08c799aad12a55a3e78b82e0b6167271ffd1f627b758891282f739187" + [[package]] name = "quote" version = "1.0.36" @@ -1183,6 +1189,7 @@ dependencies = [ "indexmap", "lazy_static", "log", + "querystring", "regex", "reqwest", "serde", diff --git a/rollout-dashboard/server/Cargo.toml b/rollout-dashboard/server/Cargo.toml index 67ea9c6..ca2047e 100644 --- a/rollout-dashboard/server/Cargo.toml +++ b/rollout-dashboard/server/Cargo.toml @@ -23,6 +23,7 @@ futures = "0.3.30" indexmap = { version = "2.3.0", features = ["serde"] } lazy_static = "1.5.0" log = "0.4.22" +querystring = "1.1.0" regex = "1.10.5" reqwest = { version = "0.12.5", features = ["json", "cookies"] } serde = { version = "1.0.203", features = ["derive", "std"] } diff --git a/rollout-dashboard/server/src/airflow_client.rs b/rollout-dashboard/server/src/airflow_client.rs index e660d8a..579f38f 100644 --- a/rollout-dashboard/server/src/airflow_client.rs +++ b/rollout-dashboard/server/src/airflow_client.rs @@ -450,6 +450,103 @@ impl Pageable for TasksResponse { } } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct EventLogsResponseItem { + pub event_log_id: u64, + pub when: DateTime, + pub dag_id: Option, + pub task_id: Option, + pub run_id: Option, + pub map_index: Option, + pub try_number: Option, + pub event: String, + pub execution_date: Option>, + pub owner: Option, + pub extra: Option, +} + +#[derive(Debug, Deserialize, Default)] + +pub struct EventLogsResponse { + pub event_logs: Vec, + #[serde(skip_serializing, skip_deserializing)] + position_cache: HashMap, + total_entries: usize, +} + +#[allow(dead_code)] +#[derive(Default)] +pub struct EventLogsResponseFilters<'a> { + pub dag_id: Option<&'a String>, + pub task_id: Option<&'a String>, + pub run_id: Option<&'a String>, + pub map_index: Option, + pub try_number: Option, + pub event: Option<&'a String>, + pub owner: Option<&'a String>, + pub before: Option>, + pub after: Option>, +} + +impl<'a> EventLogsResponseFilters<'a> { + fn as_queryparams(&self) -> Vec<(&str, String)> { + let dfmt = "%Y-%m-%dT%H:%M:%S%.fZ"; + let shit = [ + self.dag_id.as_ref().map(|v| ("dag_id", (*v).clone())), + self.task_id.as_ref().map(|v| ("task_id", (*v).clone())), + self.run_id.as_ref().map(|v| ("run_id", (*v).clone())), + self.map_index + .as_ref() + .map(|v| ("map_index", format!("{}", v).to_owned())), + self.try_number + .as_ref() + .map(|v| ("try_number", format!("{}", v).to_owned())), + self.event.as_ref().map(|v| ("event", (*v).clone())), + self.owner.as_ref().map(|v| ("owner", (*v).clone())), + self.before + .as_ref() + .map(|v| ("before", format!("{}", v.format(dfmt)).to_owned())), + self.after + .as_ref() + .map(|v| ("after", format!("{}", v.format(dfmt)).to_owned())), + ]; + let res: Vec<_> = shit + .iter() + .flatten() + .map(|(k, v)| (*k, (*v).clone())) + .collect(); + res + } +} + +impl Pageable for EventLogsResponse { + fn len(&self) -> usize { + self.event_logs.len() + } + fn merge(&mut self, other: Self) { + for v in other.event_logs.clone().into_iter() { + let id = v.event_log_id; + match self.position_cache.get(&id) { + Some(pos) => { + //debug!(target: "processing", "Replacing {} at position {}", id, pos); + self.event_logs[*pos] = v; + } + None => { + //debug!(target: "processing", "Consuming {}", id); + self.position_cache.insert(id, self.event_logs.len()); + self.event_logs.push(v); + } + } + } + self.total_entries = other.total_entries; + } + fn truncate(&mut self, max_entries: usize) { + if self.event_logs.len() > max_entries { + self.event_logs.truncate(max_entries) + } + } +} + #[derive(Debug)] pub enum AirflowError { StatusCode(reqwest::StatusCode), @@ -509,7 +606,7 @@ where } } - trace!(target: "paged_get", + trace!(target: "airflow_client::paged_get", "retrieving {} instances of {} at offset {}, with {} already retrieved", batch_limit, suburl, @@ -521,7 +618,7 @@ where Ok(json_value) => match ::deserialize(json_value.clone()) { Ok(deserialized) => deserialized, Err(e) => { - warn!("Error deserializing ({})\n{:?}", e, json_value); // FIXME; make proper type + warn!(target: "airflow_client::paged_get" ,"Error deserializing ({})\n{:?}", e, json_value); // FIXME; make proper type return Err(AirflowError::Other(format!( "Could not deserialize structure: {}", e @@ -531,9 +628,9 @@ where Err(e) => return Err(e), }; let batch_len = batch.len(); - trace!(target: "paged_get", "Got {} before discarding", batch_len); + trace!(target: "airflow_client::paged_get", "Got {} before discarding", batch_len); results.merge(batch); - trace!(target: "paged_get", + trace!(target: "airflow_client::paged_get", "Now we have {} objects after retrieving with offset {:?}", results.len(), current_offset @@ -560,6 +657,45 @@ where Ok(results) } +async fn _post<'a, T: Deserialize<'a> + Pageable + Default, W, G, Fut>( + url: String, + content: W, + mut poster: G, +) -> Result +where + G: FnMut(String, W) -> Fut, + W: serde::Serialize + Sync + Send, + Fut: Future>, +{ + let mut results = T::default(); + trace!(target: "airflow_client::paged_post", + "posting to and retrieving {}", + url, + ); + + let batch = match poster(url, content).await { + Ok(json_value) => match ::deserialize(json_value.clone()) { + Ok(deserialized) => deserialized, + Err(e) => { + warn!(target: "airflow_client::paged_post", "Error deserializing ({})\n{:?}", e, json_value); // FIXME; make proper type + return Err(AirflowError::Other(format!( + "Could not deserialize structure: {}", + e + ))); + } + }, + Err(e) => return Err(e), + }; + let batch_len = batch.len(); + trace!(target: "airflow_client::paged_post", "Got {}", batch_len); + results.merge(batch); + trace!(target: "airflow_client::paged_post", + "Now we have {} objects after retrieving", + results.len(), + ); + Ok(results) +} + #[derive(Debug)] pub enum AirflowClientCreationError { Reqwest(reqwest::Error), @@ -629,6 +765,18 @@ impl AirflowClient { self._get_or_login_and_get(suburl, true).await } + async fn _post_logged_in( + &self, + suburl: String, + content: &T, + ) -> Result + where + T: Serialize + Sync + Send, + { + let suburl = "api/v1/".to_string() + &suburl; + self._post_or_login_and_post(suburl, content, true).await + } + #[async_recursion] async fn _get_or_login_and_get( &self, @@ -649,7 +797,7 @@ impl AirflowClient { { Ok(resp) => { let status = resp.status(); - debug!(target: "http_client", "GET {} HTTP {}", url, status); + debug!(target: "airflow_client::http_client", "GET {} HTTP {}", url, status); match status { reqwest::StatusCode::OK => match resp.json().await { Ok(json) => Ok(json), @@ -679,7 +827,67 @@ impl AirflowClient { } Err(err) => Err(AirflowError::ReqwestError(err)), }; - trace!(target: "http_client", "Result: {:#?}", res); + trace!(target: "airflow_client::http_client", "Result: {:#?}", res); + res + } + + // FIXME: deduplicate with get_or_login_and_get + #[async_recursion] + async fn _post_or_login_and_post( + &self, + suburl: String, + content: &T, + attempt_login: bool, + ) -> Result + where + T: Serialize + Sync + Send, + { + // Next one cannot fail because self.url has already succeeded. + let url = self.url.join(suburl.as_str()).unwrap(); + + let c = self.client.clone(); + + let res = match c + .post(url.clone()) + .header(ACCEPT, "application/json") + .header(CONTENT_TYPE, "application/json") + .json(content) + .send() + .await + { + Ok(resp) => { + let status = resp.status(); + debug!(target: "airflow_client::http_client", "POST {} HTTP {}", url, status); + match status { + reqwest::StatusCode::OK => match resp.json().await { + Ok(json) => Ok(json), + Err(err) => Err(AirflowError::Other(format!( + "Could not decode JSON from Airflow: {}", + err + ))), + }, + reqwest::StatusCode::FORBIDDEN => { + if attempt_login { + match self._login().await { + Ok(..) => (), + Err(err) => return Err(err), + }; + self._post_or_login_and_post(suburl, content, false).await + } else { + Err(AirflowError::Other( + "Forbidden from Airflow -- could not log in".into(), + )) + } + } + reqwest::StatusCode::NOT_FOUND => { + Err(AirflowError::StatusCode(reqwest::StatusCode::NOT_FOUND)) + } + other => Err(AirflowError::StatusCode(other)), + } + } + Err(err) => Err(AirflowError::ReqwestError(err)), + }; + trace!(target: "airflow_client::http_client", "Result: {:#?}", res); res } @@ -806,6 +1014,47 @@ impl AirflowClient { .await } + /// Return listed TaskInstances for a number of DAG IDs and DAG runs. + /// Mapped tasks are not returned here. + pub async fn task_instances_batch( + &self, + dag_ids: Option>, + dag_run_ids: Option>, + task_instances: Option>, + ) -> Result { + if let Some(dag_ids) = &dag_ids { + if dag_ids.is_empty() { + return Ok(TaskInstancesResponse::default()); + } + } + if let Some(dag_run_ids) = &dag_run_ids { + if dag_run_ids.is_empty() { + return Ok(TaskInstancesResponse::default()); + } + } + if let Some(task_instances) = &task_instances { + if task_instances.is_empty() { + return Ok(TaskInstancesResponse::default()); + } + } + let url = "dags/~/dagRuns/~/taskInstances/list"; + #[derive(Serialize)] + struct TaskInstancesRequest { + #[serde(skip_serializing_if = "Option::is_none")] + dag_ids: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + dag_run_ids: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + task_ids: Option>, + } + let tr = TaskInstancesRequest { + dag_ids, + dag_run_ids, + task_ids: task_instances, + }; + _post(url.to_string(), &tr, |x, c| self._post_logged_in(x, c)).await + } + /// Return TaskInstances for a DAG run. /// Mapped tasks are not returned here. pub async fn tasks(&self, dag_id: &str) -> Result { @@ -877,7 +1126,7 @@ impl AirflowClient { match ::deserialize(json_value.clone()) { Ok(deserialized) => Ok(deserialized), Err(e) => { - warn!("Error deserializing ({})\n{:?}", e, json_value); + warn!(target: "airflow_client::xcom_entry", "Error deserializing ({})\n{:?}", e, json_value); Err(AirflowError::Other(format!( "Could not deserialize structure: {}", e @@ -885,6 +1134,38 @@ impl AirflowClient { } } } + + /// Return event logs matching the filters specified. + /// Events are returned in chronological order (old to new). + pub async fn event_logs( + &self, + limit: usize, + offset: usize, + filters: &EventLogsResponseFilters<'_>, + order_by: Option, // FIXME: use structural typing here. + ) -> Result { + let qpairs = filters.as_queryparams(); + let qparams: querystring::QueryParams = + qpairs.iter().map(|(k, v)| (*k, v.as_str())).collect(); + let suburl = "eventLogs".to_string() + + (if !qparams.is_empty() { + "?".to_string() + querystring::stringify(qparams).as_str() + } else { + "".to_string() + }) + .as_str(); + _paged_get( + suburl, + match order_by { + Some(x) => Some(x), + None => Some("event_log_id".into()), + }, + Some(PagingParameters { limit, offset }), + MAX_BATCH_SIZE, + |x| self._get_logged_in(x), + ) + .await + } } #[cfg(test)] diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index ac18d12..6aafd29 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -1,6 +1,7 @@ use crate::airflow_client::{ - AirflowClient, AirflowError, DagRunState, TaskInstanceRequestFilters, TaskInstanceState, - TaskInstancesResponseItem, TasksResponse, TasksResponseItem, + AirflowClient, AirflowError, DagRunState, DagRunsResponseItem, EventLogsResponseFilters, + TaskInstanceRequestFilters, TaskInstanceState, TaskInstancesResponseItem, TasksResponse, + TasksResponseItem, }; use crate::python; use chrono::{DateTime, Utc}; @@ -13,9 +14,9 @@ use rollout_dashboard::types::{ Batch, Rollout, RolloutState, Rollouts, Subnet, SubnetRolloutState, }; use serde::Serialize; -use std::cmp::min; +use std::cmp::{max, min}; use std::collections::hash_map::Entry::{Occupied, Vacant}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::{self, Display}; use std::future::Future; use std::num::ParseIntError; @@ -35,6 +36,18 @@ lazy_static! { const TASK_INSTANCE_LIST_LIMIT: usize = 500; +fn max_option_date(d1: Option>, d2: Option>) -> Option> +where + T: chrono::TimeZone, +{ + match (d1, d2) { + (Some(d1), Some(d2)) => Some(max(d1, d2)), + (Some(d1), None) => Some(d1), + (None, Some(d2)) => Some(d2), + (None, None) => None, + } +} + #[derive(Debug)] pub struct CyclicDependencyError { message: String, @@ -46,6 +59,7 @@ impl Display for CyclicDependencyError { } } +#[derive(Clone)] struct TaskInstanceTopologicalSorter { sorted_tasks: Vec>, } @@ -259,6 +273,214 @@ enum ScheduleCache { Valid(usize, String), } +#[derive(Debug, Clone)] +/// DAG run update type. +enum DagRunUpdateType { + /// This DAG run needs all its task instances refreshed. + AllTaskInstances, + /// At least these tasks in this DAG run need to be + /// refreshed, as well as incremental queries of tasks + /// finished, updated, or started since last query. + SomeTaskInstances(HashSet), +} + +/// Describe what kind of update each DAG run needs. +/// A DAG run listed in this DAG can have one of two update types, +/// defined in `DagRunUpdateType``. +struct DagRunUpdatesRequired { + dag_runs: HashMap, +} + +impl DagRunUpdatesRequired { + fn new() -> Self { + Self { + dag_runs: HashMap::new(), + } + } + + /// Returns the `DagRunUpdateType` for a given DAG run. + /// If the DAG run is unknown to this function, then + /// SomeTaskInstances(vec[]) is returned. + fn update_type(&self, dag_run_id: &String) -> DagRunUpdateType { + match self.dag_runs.get(dag_run_id) { + None => DagRunUpdateType::SomeTaskInstances(HashSet::new()), + Some(t) => t.clone(), + } + } +} + +#[derive(Default)] +/// Inspects the Airflow log every time its incrementally_detect_dag_updates +/// function is called. +struct AirflowIncrementalLogInspector { + last_event_log_update: Option>, +} + +impl AirflowIncrementalLogInspector { + /// Inspect changes to the log, and return a `DagRunUpdatesRequired` + /// struct based on the contents of the log since its last inspection. + async fn incrementally_detect_dag_updates( + &self, + airflow_api: &AirflowClient, + dag_id: &str, + ) -> Result<(Self, DagRunUpdatesRequired), AirflowError> { + let mut task_instances_to_update_per_dag = DagRunUpdatesRequired::new(); + + let mut last_event_log_update = self.last_event_log_update; + + if last_event_log_update.is_some() { + // Construct a plan of what tasks will be queried, by using the + // Airflow event log as a deciding factor. + let event_logs = airflow_api + .event_logs( + 1000, + 0, + &EventLogsResponseFilters { + after: last_event_log_update, + dag_id: Some(&dag_id.to_string()), + ..Default::default() + }, + None, + ) + .await?; + + // Process log events. + for event in event_logs.event_logs.iter() { + // Remember the date of the latest event. + last_event_log_update = Some(event.when); + // Ignore events with no dag ID or wrong dag ID. + match &event.dag_id { + Some(d) => match *d == dag_id { + true => d, + false => continue, + }, + None => continue, + }; + // Ignore events that have no run ID. + let event_run_id = match &event.run_id { + Some(r) => r, + None => continue, + }; + // Ignore events that just change the DAG run note. We already + // retrieve the full DAG (not necessarily its tasks or instances), + // so this event is not interesting. + if event.event == "ui.set_dag_run_note" { + continue; + } + // Also ignore UI confirmation events to mark tasks as failed/success. + if event.event == "confirm" { + continue; + } + // Also ignore UI clearing events of tasks not yet confirmed. + if event.event == "clear" { + match &event.extra { + None => continue, + Some(extra) => { + let r = Regex::new(r".*.confirmed.: .true.*").unwrap(); + if r.captures(extra.as_str()).is_none() { + // No confirmation. We continue. + continue; + } + // We found it. We won't continue. + } + } + } + + // Under the following circumstances, the whole rollout has to be refreshed because + // administrative action was taken to clear / fail / succeed tasks that may not in + // fact appear listed in the log as such. + let force_refresh_all_tasks = (event.event == "success" && event.extra.is_some()) + || (event.event == "failed" && event.extra.is_some()) + || (event.event == "clear" && event.extra.is_some()); + + trace!(target: "frontend_api::log_inspector", "Processing event:\n{:#?}\n", event); + + match task_instances_to_update_per_dag + .dag_runs + .entry(event_run_id.to_string()) + { + // No entry. Let's initialize it (all tasks if event has no run_id or forced, else the single task). + Vacant(ventry) => { + ventry.insert(match (&event.task_id, force_refresh_all_tasks) { + (Some(t), false) => { + trace!(target: "frontend_api::log_inspector", "{}: initializing plan with a request to update task {}", event_run_id, t); + let mut init = HashSet::new(); + init.insert(t.clone()); + DagRunUpdateType::SomeTaskInstances(init) + }, + _ => { + trace!(target: "frontend_api::log_inspector", "{}: initializing plan with a request to update all tasks", event_run_id); + DagRunUpdateType::AllTaskInstances + }, + }); + } + // There's an entry. Update to all tasks if this event has no run_id. + Occupied(mut entry) => match (&event.task_id, force_refresh_all_tasks) { + (Some(t), false) => { + if let DagRunUpdateType::SomeTaskInstances(thevec) = entry.get_mut() { + let ts = t.to_string(); + if !thevec.contains(&ts) { + trace!(target: "frontend_api::log_inspector", "{}: adding task {} to plan", event_run_id, ts); + thevec.insert(ts); + } + } + } + _ => { + if let DagRunUpdateType::SomeTaskInstances(_) = entry.get() { + trace!(target: "frontend_api::log_inspector", "{}: switching plan to request to update all tasks", event_run_id); + entry.insert(DagRunUpdateType::AllTaskInstances); + } + } + }, + } + } + + // Now that we have a plan, we know what data to fetch from Airflow, minimizing the load on the server. + for (k, v) in task_instances_to_update_per_dag.dag_runs.iter() { + debug!(target: "frontend_api::log_inspector", "{}: tasks that will be updated: {}", k, match v { + DagRunUpdateType::AllTaskInstances => "all tasks".to_string(), + DagRunUpdateType::SomeTaskInstances(set_of_tasks) => set_of_tasks.iter().cloned().collect::>().join(", "), + }); + } + if !event_logs.event_logs.is_empty() + && !task_instances_to_update_per_dag.dag_runs.is_empty() + { + debug!( + target: "frontend_api::log_inspector", "Setting incremental refresh date to {:?}", + last_event_log_update + ) + }; + } else { + let event_logs = airflow_api + .event_logs( + 1, + 0, + &EventLogsResponseFilters { + after: last_event_log_update, + dag_id: Some(&dag_id.to_string()), + ..Default::default() + }, + Some("-event_log_id".to_string()), + ) + .await?; + for event in event_logs.event_logs.iter() { + last_event_log_update = Some(event.when); + } + if !event_logs.event_logs.is_empty() { + debug!(target: "frontend_api::log_inspector", "Setting initial refresh date to {:?}", last_event_log_update); + } + } + + Ok(( + Self { + last_event_log_update, + }, + task_instances_to_update_per_dag, + )) + } +} + +#[derive(Clone)] struct RolloutDataCache { task_instances: HashMap, TaskInstancesResponseItem>>, dispatch_time: DateTime, @@ -280,6 +502,7 @@ struct RolloutApiCache { /// Map from DAG run ID to task instance ID (with / without index) /// to task instance. by_dag_run: HashMap, + log_inspector: AirflowIncrementalLogInspector, } fn format_some(opt: Option, prefix: &str, fallback: &str) -> String @@ -307,10 +530,10 @@ fn annotate_subnet_state( if (only_decrease && new_state < subnet.state) || (!only_decrease && new_state != subnet.state) { - trace!(target: "subnet_state", "{}: {} {:?} transition {} => {} note: {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state, subnet.comment); + trace!(target: "frontend_api::annotate_subnet_state", "{}: {} {:?} transition {} => {} note: {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state, subnet.comment); subnet.state = new_state.clone(); } else { - trace!(target: "subnet_state", "{}: {} {:?} NO transition {} => {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state); + trace!(target: "frontend_api::annotate_subnet_state", "{}: {} {:?} NO transition {} => {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state); } if new_state == subnet.state { subnet.comment = format!( @@ -343,612 +566,691 @@ fn annotate_subnet_state( state } -#[derive(Clone)] -pub struct RolloutApi { - airflow_api: Arc, - cache: Arc>, +struct RolloutUpdater<'a> { + dag_run: &'a DagRunsResponseItem, + cache_entry: RolloutDataCache, + update_type: DagRunUpdateType, } -impl RolloutApi { - pub fn new(client: AirflowClient) -> Self { - Self { - airflow_api: Arc::new(client), - cache: Arc::new(Mutex::new(RolloutApiCache { - by_dag_run: HashMap::new(), - })), - } - } +impl<'a> RolloutUpdater<'a> { + async fn update( + &mut self, + airflow_api: &AirflowClient, + sorter: TaskInstanceTopologicalSorter, + last_event_log_update: Option>, + ) -> Result<(bool, Rollout, RolloutDataCache), RolloutDataGatherError> { + let mut meaningful_updates_to_this_rollout = false; - pub async fn get_cache(&self) -> Vec { - let cache = self.cache.lock().await; - let mut result: Vec<_> = cache - .by_dag_run - .iter() - .map(|(k, v)| { - let linearized_tasks = v - .task_instances - .iter() - .flat_map(|(_, tasks)| tasks.iter().map(|(_, task)| task.clone())) - .collect(); - RolloutDataCacheResponse { - rollout_id: k.clone(), - linearized_task_instances: linearized_tasks, - dispatch_time: v.dispatch_time, - last_update_time: v.last_update_time, - schedule: v.schedule.clone(), - } - }) - .collect(); - drop(cache); - result.sort_by_key(|v| v.dispatch_time); - result.reverse(); - result - } + // If the note of the rollout has changed, + // note that this has been updated. + let cache_entry = &mut self.cache_entry; + let dag_run = &self.dag_run; + let dag_run_update_type = &self.update_type; - /// Retrieve all rollout data, using a cache to avoid - /// re-fetching task instances not updated since last time. - /// - /// Returns a tuple of the the rollout data and a flag - /// indicating if the rollout data was updated since - /// the last time. The flag should be used by calling - /// code to decide whether to send data to clients or not. - /// - /// The rollout structure itself is updated on every call - /// for every DAG run. However, not every change in the DAG - /// run is considered to be a meaningful change (causing a - /// true return in the update flag). Currently, only a change - /// in the rollout note, the state of any of its tasks, or - /// the rollout dispatch time are considered meaningful changes. - pub async fn get_rollout_data( - &self, - max_rollouts: usize, - ) -> Result<(Rollouts, bool), RolloutDataGatherError> { - let mut cache = self.cache.lock().await; - - let dag_id = "rollout_ic_os_to_mainnet_subnets"; - let dag_runs = self - .airflow_api - .dag_runs(dag_id, max_rollouts, 0, None, None) - .await?; - let tasks = self.airflow_api.tasks(dag_id).await?; - // Note: perhaps if the number of tasks, or the names of tasks - // have changed, we would want to reset the task instance cache - // and re-request everything again. - let sorter = TaskInstanceTopologicalSorter::new(tasks)?; + if cache_entry.note != dag_run.note { + meaningful_updates_to_this_rollout = true; + cache_entry.note.clone_from(&dag_run.note); + } + // Same for the dispatch time. + if cache_entry.dispatch_time != dag_run.logical_date { + meaningful_updates_to_this_rollout = true; + cache_entry.dispatch_time = dag_run.logical_date; + } - let mut res: Rollouts = vec![]; - // Track if any rollout has had any meaningful changes. - // Also see function documentation about meaningful changes. - let mut meaningful_updates_to_any_rollout = false; + type TaskInstanceResponse = Result, AirflowError>; - for dag_run in dag_runs.dag_runs.iter() { - let cache_entry = cache - .by_dag_run - .entry(dag_run.dag_run_id.clone()) - .or_insert(RolloutDataCache { - task_instances: HashMap::new(), - dispatch_time: dag_run.logical_date, - note: dag_run.note.clone(), - schedule: ScheduleCache::Empty, - last_update_time: None, - }); + let last_update_time = cache_entry.last_update_time; + let dag_id = dag_run.dag_id.as_str(); + let dag_run_id = dag_run.dag_run_id.as_str(); - type TaskInstanceResponse = Result, AirflowError>; - - let last_update_time = cache_entry.last_update_time; - let now = Utc::now(); - let requests: Vec + Send>>> = vec![ - Box::pin(async move { - match self - .airflow_api - .task_instances( - dag_id, - dag_run.dag_run_id.as_str(), - TASK_INSTANCE_LIST_LIMIT, - 0, - TaskInstanceRequestFilters::default() - .executed_on_or_after(last_update_time), - ) - .await - { - Ok(r) => Ok(r.task_instances), - Err(e) => Err(e), - } - }), - Box::pin(async move { - match last_update_time { - None => Ok(vec![]), - Some(_) => { - match self - .airflow_api - .task_instances( - dag_id, - dag_run.dag_run_id.as_str(), - TASK_INSTANCE_LIST_LIMIT, - 0, - TaskInstanceRequestFilters::default() - .updated_on_or_after(last_update_time), + let requests: Vec + Send>>> = + match dag_run_update_type { + DagRunUpdateType::AllTaskInstances => { + debug!(target:"frontend_api::get_rollout_data", "{}: collecting data about all task instances", dag_run.dag_run_id); + vec![Box::pin(async move { + match airflow_api + .task_instances( + &dag_id, + dag_run.dag_run_id.as_str(), + TASK_INSTANCE_LIST_LIMIT, + 0, + TaskInstanceRequestFilters::default(), + ) + .await + { + Ok(r) => Ok(r.task_instances), + Err(e) => Err(e), + } + })] + } + DagRunUpdateType::SomeTaskInstances(updated_task_instances) => { + let updated_task_instances = + updated_task_instances.iter().cloned().collect::>(); + debug!(target:"frontend_api::get_rollout_data", "{}: collecting data about task instances updated since {:?} and a specific set of tasks too: {:?}", dag_run_id, last_update_time, updated_task_instances); + vec![ + Box::pin(async move { + match airflow_api + .task_instances_batch( + Some(vec![dag_id.to_string()]), + Some(vec![dag_run_id.to_string()]), + Some(updated_task_instances), ) .await { Ok(r) => Ok(r.task_instances), Err(e) => Err(e), } - } - } - }), - Box::pin(async move { - match last_update_time { - None => Ok(vec![]), - Some(_) => { - match self - .airflow_api + }), + Box::pin(async move { + match airflow_api .task_instances( dag_id, - dag_run.dag_run_id.as_str(), + dag_run_id, TASK_INSTANCE_LIST_LIMIT, 0, TaskInstanceRequestFilters::default() - .ended_on_or_after(last_update_time), + .executed_on_or_after(last_update_time), ) .await { Ok(r) => Ok(r.task_instances), Err(e) => Err(e), } - } - } - }), - ]; - - let results = join_all(requests).await; - let mut all_task_instances: Vec = vec![]; - for r in results.into_iter() { - all_task_instances.append(&mut r?) - } - - debug!( - target: "frontend_api", "{}: Undeduplicated tasks: {}", - dag_run.dag_run_id, all_task_instances.len() - ); - - let rollout_had_changed_tasks = match all_task_instances.is_empty() { - true => false, - false => { - // At least one task has updated or finished. - // See function documentation about meaningful changes. - meaningful_updates_to_any_rollout = true; - // Now remember this rollout was updated. - true + }), + Box::pin(async move { + match last_update_time { + None => Ok(vec![]), + Some(_) => { + match airflow_api + .task_instances( + dag_id, + dag_run_id, + TASK_INSTANCE_LIST_LIMIT, + 0, + TaskInstanceRequestFilters::default() + .updated_on_or_after(last_update_time), + ) + .await + { + Ok(r) => Ok(r.task_instances), + Err(e) => Err(e), + } + } + } + }), + Box::pin(async move { + match last_update_time { + None => Ok(vec![]), + Some(_) => { + match airflow_api + .task_instances( + dag_id, + dag_run_id, + TASK_INSTANCE_LIST_LIMIT, + 0, + TaskInstanceRequestFilters::default() + .ended_on_or_after(last_update_time), + ) + .await + { + Ok(r) => Ok(r.task_instances), + Err(e) => Err(e), + } + } + } + }), + ] } }; - // If the note of the rollout has changed, - // note that this has been updated. - if cache_entry.note != dag_run.note { - meaningful_updates_to_any_rollout = true; - cache_entry.note.clone_from(&dag_run.note); - } - // Same for the dispatch time. - if cache_entry.dispatch_time != dag_run.logical_date { - meaningful_updates_to_any_rollout = true; - cache_entry.dispatch_time = dag_run.logical_date; - } + let mut retrieved_task_instances: Vec = vec![]; + for r in join_all(requests).await.into_iter() { + retrieved_task_instances.append(&mut r?) + } - let mut rollout = Rollout::new( - dag_run.dag_run_id.to_string(), - { - let mut display_url = self - .airflow_api - .as_ref() - .url - .join(format!("/dags/{}/grid", dag_run.dag_id).as_str()) - .unwrap(); - display_url - .query_pairs_mut() - .append_pair("dag_run_id", dag_run.dag_run_id.as_str()); - display_url.to_string() - }, - // Use recently-updated cache values here. - // See function documentation about meaningful changes. - cache_entry.note.clone(), - cache_entry.dispatch_time, - dag_run.last_scheduling_decision, - dag_run.conf.clone(), - ); + debug!( + target: "frontend_api::get_rollout_data", "{}: retrieved {} tasks", + dag_run_id, retrieved_task_instances.len() + ); - // Let's update the cache to incorporate the most up-to-date task instances. - for task_instance in all_task_instances.into_iter() { - let task_instance_id = task_instance.task_id.clone(); + if !retrieved_task_instances.is_empty() { + // At least one task has updated or finished. + // See function documentation about meaningful changes. + meaningful_updates_to_this_rollout = true; + }; - let by_name = cache_entry - .task_instances - .entry(task_instance_id) - .or_insert(HashMap::new()); + let mut rollout = Rollout::new( + dag_run.dag_run_id.to_string(), + { + let mut display_url = airflow_api + .url + .join(format!("/dags/{}/grid", dag_run.dag_id).as_str()) + .unwrap(); + display_url + .query_pairs_mut() + .append_pair("dag_run_id", dag_run.dag_run_id.as_str()); + display_url.to_string() + }, + // Use recently-updated cache values here. + // See function documentation about meaningful changes. + cache_entry.note.clone(), + cache_entry.dispatch_time, + dag_run.last_scheduling_decision, + dag_run.conf.clone(), + ); - match by_name.entry(task_instance.map_index) { - Vacant(entry) => { - entry.insert(task_instance); - } - Occupied(mut entry) => { - if task_instance.latest_date() > entry.get().latest_date() { - entry.insert(task_instance.clone()); - } - } - }; - } + // Let's update the cache to incorporate the most up-to-date task instances. + let mut new_last_update_time = max_option_date(last_update_time, last_event_log_update); + for task_instance in retrieved_task_instances.into_iter() { + let task_instance_id = task_instance.task_id.clone(); + new_last_update_time = + max_option_date(new_last_update_time, Some(task_instance.latest_date())); - for (task_instance_id, tasks) in cache_entry.task_instances.iter_mut() { - // Delete data on all unmapped tasks if a mapped task sibling is present. - if tasks.len() > 1 { - if let Occupied(_) = tasks.entry(None) { - debug!( - target: "frontend_api", "Formerly unmapped task {} is now mapped", - task_instance_id - ); - tasks.remove(&None); + let by_name = cache_entry + .task_instances + .entry(task_instance_id) + .or_insert(HashMap::new()); + + match by_name.entry(task_instance.map_index) { + Vacant(entry) => { + entry.insert(task_instance); + } + Occupied(mut entry) => { + if task_instance.latest_date() > entry.get().latest_date() { + entry.insert(task_instance.clone()); } } - } + }; + } - let linearized_tasks: Vec = cache_entry - .task_instances - .iter() - .flat_map(|(_, tasks)| tasks.iter().map(|(_, task)| task.clone())) - .collect(); + for (task_instance_id, tasks) in cache_entry.task_instances.iter_mut() { + // Delete data on all unmapped tasks if a mapped task sibling is present. + if tasks.len() > 1 { + if let Occupied(_) = tasks.entry(None) { + debug!( + target: "frontend_api::get_rollout_data", "formerly unmapped task {} is now mapped", + task_instance_id + ); + tasks.remove(&None); + } + } + } - debug!( - target: "frontend_api", "{}: Total disambiguated tasks including locally cached ones: {}", - dag_run.dag_run_id, linearized_tasks.len(), - ); + let linearized_tasks: Vec = cache_entry + .task_instances + .iter() + .flat_map(|(_, tasks)| tasks.iter().map(|(_, task)| task.clone())) + .collect(); - // Now update rollout and batch state based on the obtained data. - // What this process does is fairly straightforward: - // * for each and every known up-to-date Airflow task in the cache - // (always processed in topological order), - for task_instance in sorter.sort_instances(linearized_tasks.into_iter()) { - // * deduce the rollout plan, if available, - // * mark the rollout as having problems or errors depending on what - // the task state is, or as one of the various running states, if - // any non-subnet-related task is running / pending. - // * handle tasks corresponding to a batch/subnet in a special way - // (commented below in its pertinent section). - debug!( - target: "frontend_api", "Processing task {}.{:?} in state {:?}", - task_instance.task_id, task_instance.map_index, task_instance.state, - ); - if task_instance.task_id == "schedule" { - match task_instance.state { - Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (), - Some(TaskInstanceState::UpForRetry) - | Some(TaskInstanceState::Restarting) => { - rollout.state = RolloutState::Problem; - } - Some(TaskInstanceState::Failed) - | Some(TaskInstanceState::UpstreamFailed) => { - rollout.state = RolloutState::Failed; - } - Some(TaskInstanceState::UpForReschedule) - | Some(TaskInstanceState::Running) - | Some(TaskInstanceState::Deferred) - | Some(TaskInstanceState::Queued) - | Some(TaskInstanceState::Scheduled) - | None => rollout.state = min(rollout.state, RolloutState::Preparing), - Some(TaskInstanceState::Success) => { - if let ScheduleCache::Valid(try_number, _) = cache_entry.schedule { - if try_number != task_instance.try_number { - info!(target: "frontend_api", "{}: resetting schedule cache", dag_run.dag_run_id); - // Another task run of the same task has executed. We must clear the cache entry. - cache_entry.schedule = ScheduleCache::Empty; - } - } - let schedule_string = match &cache_entry.schedule { - ScheduleCache::Valid(_, s) => s, - ScheduleCache::Empty => { - let value = self - .airflow_api - .xcom_entry( - dag_id, - dag_run.dag_run_id.as_str(), - task_instance.task_id.as_str(), - task_instance.map_index, - "return_value", - ) - .await; - let schedule = match value { - Ok(schedule) => { - cache_entry.schedule = ScheduleCache::Valid( - task_instance.try_number, - schedule.value.clone(), - ); - info!(target: "frontend_api", "{}: saving schedule cache", dag_run.dag_run_id); - schedule.value - } - Err(AirflowError::StatusCode( - reqwest::StatusCode::NOT_FOUND, - )) => { - // There is no schedule to be found. - // Or there was no schedule to be found last time - // it was queried. - warn!(target: "frontend_api", "{}: no schedule despite schedule task finished", dag_run.dag_run_id); - cache_entry.schedule = ScheduleCache::Empty; - continue; - } - Err(e) => { - return Err(RolloutDataGatherError::AirflowError(e)); - } - }; - &schedule.clone() - } - }; - let schedule = - RolloutPlan::from_python_string(schedule_string.clone())?; - rollout.batches = schedule.batches; - } - } - } else if task_instance.task_id == "wait_for_other_rollouts" - || task_instance.task_id == "wait_for_revision_to_be_elected" - || task_instance.task_id == "revisions" - { - match task_instance.state { - Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (), - Some(TaskInstanceState::UpForRetry) - | Some(TaskInstanceState::Restarting) => { - rollout.state = RolloutState::Problem; - } - Some(TaskInstanceState::Failed) - | Some(TaskInstanceState::UpstreamFailed) => { - rollout.state = RolloutState::Failed; - } - Some(TaskInstanceState::UpForReschedule) - | Some(TaskInstanceState::Running) - | Some(TaskInstanceState::Deferred) - | Some(TaskInstanceState::Queued) - | Some(TaskInstanceState::Scheduled) - | None => rollout.state = min(rollout.state, RolloutState::Waiting), - Some(TaskInstanceState::Success) => {} - } - } else if let Some(captured) = - BatchIdentificationRe.captures(task_instance.task_id.as_str()) - { - // Handling of subnet state: - // * for each Airflow task that pertains to a rollout batch, - // * if its state in cache differs (or in some cases is higher) from the - // corresponding subnet state, upgrade the subnet state to be the correct - // state, - // * update the subnet link to the corresponding Airflow task if the - // state of the task (after update) corresponds to the expected state, - // * update rollout state to problem / error depending on the task state. - trace!(target: "subnet_state", "{}: processing {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); - let (batch, task_name) = ( - // We get away with unwrap() here because we know we captured an integer. - match rollout - .batches - .get_mut(&usize::from_str(&captured[1]).unwrap()) - { - Some(batch) => batch, - None => { - trace!(target: "subnet_state", "{}: no corresponding batch, continuing", task_instance.dag_run_id); - continue; - } - }, - &captured[2], - ); + debug!( + target: "frontend_api::get_rollout_data", "{}: total disambiguated tasks including locally cached ones: {}", + dag_run.dag_run_id, linearized_tasks.len(), + ); - macro_rules! trans_min { - ($input:expr) => { - annotate_subnet_state( - batch, - $input, - &task_instance, - &self.airflow_api.as_ref().url, - true, - ) - }; + // Now update rollout and batch state based on the obtained data. + // What this process does is fairly straightforward: + // * for each and every known up-to-date Airflow task in the cache + // (always processed in topological order), + for task_instance in sorter.sort_instances(linearized_tasks.into_iter()) { + // * deduce the rollout plan, if available, + // * mark the rollout as having problems or errors depending on what + // the task state is, or as one of the various running states, if + // any non-subnet-related task is running / pending. + // * handle tasks corresponding to a batch/subnet in a special way + // (commented below in its pertinent section). + trace!( + target: "frontend_api::get_rollout_data", "Processing task {}.{:?} in state {:?}", + task_instance.task_id, task_instance.map_index, task_instance.state, + ); + if task_instance.task_id == "schedule" { + match task_instance.state { + Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (), + Some(TaskInstanceState::UpForRetry) | Some(TaskInstanceState::Restarting) => { + rollout.state = RolloutState::Problem; } - macro_rules! trans_exact { - ($input:expr) => { - annotate_subnet_state( - batch, - $input, - &task_instance, - &self.airflow_api.as_ref().url, - false, - ) - }; + Some(TaskInstanceState::Failed) | Some(TaskInstanceState::UpstreamFailed) => { + rollout.state = RolloutState::Failed; } - - // FIXME: perhaps we want to destructure both the task name - // and the task state here. - match &task_instance.state { - None => { - if task_name == "collect_batch_subnets" { - trans_exact!(SubnetRolloutState::Pending); - } else { - trace!(target: "subnet_state", "{}: ignoring task instance {} {:?} with no state", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index); + Some(TaskInstanceState::UpForReschedule) + | Some(TaskInstanceState::Running) + | Some(TaskInstanceState::Deferred) + | Some(TaskInstanceState::Queued) + | Some(TaskInstanceState::Scheduled) + | None => rollout.state = min(rollout.state, RolloutState::Preparing), + Some(TaskInstanceState::Success) => { + if let ScheduleCache::Valid(try_number, _) = &cache_entry.schedule { + if *try_number != task_instance.try_number { + info!(target: "frontend_api::get_rollout_data", "{}: resetting schedule cache", dag_run.dag_run_id); + // Another task run of the same task has executed. We must clear the cache entry. + cache_entry.schedule = ScheduleCache::Empty; } } - Some(state) => match state { - // https://stackoverflow.com/questions/53654302/tasks-are-moved-to-removed-state-in-airflow-when-they-are-queued-and-not-restore - // If a task is removed, we cannot decide rollout state based on it. - // https://stackoverflow.com/questions/77426996/skipping-a-task-in-airflow - // If a task is skipped, the next task (in state Running / Deferred) - // will pick up the slack for changing subnet state. - TaskInstanceState::Removed | TaskInstanceState::Skipped => { - trace!(target: "subnet_state", "{}: ignoring task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); - } - TaskInstanceState::UpForRetry | TaskInstanceState::Restarting => { - trans_min!(SubnetRolloutState::Error); - rollout.state = min(rollout.state, RolloutState::Problem) - } - TaskInstanceState::Failed => { - trans_min!(SubnetRolloutState::Error); - rollout.state = min(rollout.state, RolloutState::Failed) - } - TaskInstanceState::UpstreamFailed => { - trans_min!(SubnetRolloutState::PredecessorFailed); - rollout.state = min(rollout.state, RolloutState::Failed) - } - TaskInstanceState::UpForReschedule - | TaskInstanceState::Running - | TaskInstanceState::Deferred - | TaskInstanceState::Queued - | TaskInstanceState::Scheduled => { - match task_name { - "collect_batch_subnets" => { - trans_min!(SubnetRolloutState::Pending); - } - "wait_until_start_time" => { - trans_min!(SubnetRolloutState::Waiting); - } - "wait_for_preconditions" => { - trans_min!(SubnetRolloutState::Waiting); - } - "create_proposal_if_none_exists" => { - trans_min!(SubnetRolloutState::Proposing); + let schedule_string = match &cache_entry.schedule { + ScheduleCache::Valid(_, s) => s, + ScheduleCache::Empty => { + let value = airflow_api + .xcom_entry( + &dag_id, + dag_run.dag_run_id.as_str(), + task_instance.task_id.as_str(), + task_instance.map_index, + "return_value", + ) + .await; + let schedule = match value { + Ok(schedule) => { + cache_entry.schedule = ScheduleCache::Valid( + task_instance.try_number, + schedule.value.clone(), + ); + info!(target: "frontend_api::get_rollout_data", "{}: saving schedule cache", dag_run.dag_run_id); + schedule.value } - "request_proposal_vote" => { - // We ignore this one for the purposes of rollout state setup. + Err(AirflowError::StatusCode( + reqwest::StatusCode::NOT_FOUND, + )) => { + // There is no schedule to be found. + // Or there was no schedule to be found last time + // it was queried. + warn!(target: "frontend_api::get_rollout_data", "{}: no schedule despite schedule task finished", dag_run.dag_run_id); + cache_entry.schedule = ScheduleCache::Empty; + continue; } - "wait_until_proposal_is_accepted" => { - trans_min!(SubnetRolloutState::WaitingForElection); + Err(e) => { + return Err(RolloutDataGatherError::AirflowError(e)); } - "wait_for_replica_revision" => { - trans_min!(SubnetRolloutState::WaitingForAdoption); - } - "wait_until_no_alerts" => { - trans_min!(SubnetRolloutState::WaitingForAlertsGone); - } - "join" => { - trans_min!(SubnetRolloutState::Complete); - } - &_ => { - warn!(target: "subnet_state", "{}: no info on to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); - } - } - rollout.state = min(rollout.state, RolloutState::UpgradingSubnets) + }; + &schedule.clone() } - TaskInstanceState::Success => match task_name { - // Tasks corresponding to a subnet that are in state Success - // require somewhat different handling than tasks in states - // Running et al. For once, when a task is successful, - // the subnet state must be set to the *next* state it *would* - // have, if the /next/ task had /already begun executing/. - // - // To give an example: if `wait_until_start_time`` is Success, - // the subnet state is no longer "waiting until start time", - // but rather should be "creating proposal", even though - // perhaps `create_proposal_if_none_exists`` /has not yet run/ - // because we know certainly that the - // `create_proposal_if_none_exists` task is /about to run/ - // anyway. - // - // The same principle applies for all tasks -- if the current - // task is successful, we set the state of the subnet to the - // expected state that corresponds to the successor task. - // - // We could avoid encoding this type of knowledge here, by - // having a table of Airflow tasks vs. expected subnet states, - // and as a special case, on the task Success case, look up the - // successor task on the table to decide what subnet state to - // assign, but this would require a data structure different - // from the current (a vector of ordered task instances) to - // iterate over. This refactor may happen in the future, and - // it will require extra tests to ensure that invariants have - // been preserved between this code (which works well) and - // the future rewrite. + }; + let schedule = RolloutPlan::from_python_string(schedule_string.clone())?; + rollout.batches = schedule.batches; + } + } + } else if task_instance.task_id == "wait_for_other_rollouts" + || task_instance.task_id == "wait_for_revision_to_be_elected" + || task_instance.task_id == "revisions" + { + match task_instance.state { + Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (), + Some(TaskInstanceState::UpForRetry) | Some(TaskInstanceState::Restarting) => { + rollout.state = RolloutState::Problem; + } + Some(TaskInstanceState::Failed) | Some(TaskInstanceState::UpstreamFailed) => { + rollout.state = RolloutState::Failed; + } + Some(TaskInstanceState::UpForReschedule) + | Some(TaskInstanceState::Running) + | Some(TaskInstanceState::Deferred) + | Some(TaskInstanceState::Queued) + | Some(TaskInstanceState::Scheduled) + | None => rollout.state = min(rollout.state, RolloutState::Waiting), + Some(TaskInstanceState::Success) => {} + } + } else if let Some(captured) = + BatchIdentificationRe.captures(task_instance.task_id.as_str()) + { + // Handling of subnet state: + // * for each Airflow task that pertains to a rollout batch, + // * if its state in cache differs (or in some cases is higher) from the + // corresponding subnet state, upgrade the subnet state to be the correct + // state, + // * update the subnet link to the corresponding Airflow task if the + // state of the task (after update) corresponds to the expected state, + // * update rollout state to problem / error depending on the task state. + trace!(target: "frontend_api::get_rollout_data::subnet_state", "{}: processing {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); + let (batch, task_name) = ( + // We get away with unwrap() here because we know we captured an integer. + match rollout + .batches + .get_mut(&usize::from_str(&captured[1]).unwrap()) + { + Some(batch) => batch, + None => { + trace!(target: "frontend_api::get_rollout_data::subnet_state", "{}: no corresponding batch, continuing", task_instance.dag_run_id); + continue; + } + }, + &captured[2], + ); + + macro_rules! trans_min { + ($input:expr) => { + annotate_subnet_state(batch, $input, &task_instance, &airflow_api.url, true) + }; + } + macro_rules! trans_exact { + ($input:expr) => { + annotate_subnet_state( + batch, + $input, + &task_instance, + &airflow_api.url, + false, + ) + }; + } + + // FIXME: perhaps we want to destructure both the task name + // and the task state here. + match &task_instance.state { + None => { + if task_name == "collect_batch_subnets" { + trans_exact!(SubnetRolloutState::Pending); + } else { + trace!(target: "frontend_api::get_rollout_data::subnet_state", "{}: ignoring task instance {} {:?} with no state", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index); + } + } + Some(state) => match state { + // https://stackoverflow.com/questions/53654302/tasks-are-moved-to-removed-state-in-airflow-when-they-are-queued-and-not-restore + // If a task is removed, we cannot decide rollout state based on it. + // https://stackoverflow.com/questions/77426996/skipping-a-task-in-airflow + // If a task is skipped, the next task (in state Running / Deferred) + // will pick up the slack for changing subnet state. + TaskInstanceState::Removed | TaskInstanceState::Skipped => { + trace!(target: "frontend_api::get_rollout_data::subnet_state", "{}: ignoring task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); + } + TaskInstanceState::UpForRetry | TaskInstanceState::Restarting => { + trans_min!(SubnetRolloutState::Error); + rollout.state = min(rollout.state, RolloutState::Problem) + } + TaskInstanceState::Failed => { + trans_min!(SubnetRolloutState::Error); + rollout.state = min(rollout.state, RolloutState::Failed) + } + TaskInstanceState::UpstreamFailed => { + trans_min!(SubnetRolloutState::PredecessorFailed); + rollout.state = min(rollout.state, RolloutState::Failed) + } + TaskInstanceState::UpForReschedule + | TaskInstanceState::Running + | TaskInstanceState::Deferred + | TaskInstanceState::Queued + | TaskInstanceState::Scheduled => { + match task_name { "collect_batch_subnets" => { - trans_min!(SubnetRolloutState::Waiting); + trans_min!(SubnetRolloutState::Pending); } "wait_until_start_time" => { - batch.actual_start_time = match task_instance.end_date { - None => batch.actual_start_time, - Some(end_date) => { - if batch.actual_start_time.is_none() { - Some(end_date) - } else { - let stime = batch.actual_start_time.unwrap(); - Some(min(stime, end_date)) - } - } - }; - trans_exact!(SubnetRolloutState::Waiting); + trans_min!(SubnetRolloutState::Waiting); } "wait_for_preconditions" => { - trans_exact!(SubnetRolloutState::Proposing); + trans_min!(SubnetRolloutState::Waiting); } "create_proposal_if_none_exists" => { - trans_exact!(SubnetRolloutState::WaitingForElection); + trans_min!(SubnetRolloutState::Proposing); } "request_proposal_vote" => { // We ignore this one for the purposes of rollout state setup. } "wait_until_proposal_is_accepted" => { - trans_exact!(SubnetRolloutState::WaitingForAdoption); + trans_min!(SubnetRolloutState::WaitingForElection); } "wait_for_replica_revision" => { - trans_exact!(SubnetRolloutState::WaitingForAlertsGone); + trans_min!(SubnetRolloutState::WaitingForAdoption); } "wait_until_no_alerts" => { - trans_exact!(SubnetRolloutState::Complete); + trans_min!(SubnetRolloutState::WaitingForAlertsGone); } "join" => { - trans_exact!(SubnetRolloutState::Complete); - batch.end_time = task_instance.end_date; + trans_min!(SubnetRolloutState::Complete); } &_ => { - warn!(target: "subnet_state", "{}: no info on how to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); + warn!(target: "frontend_api::get_rollout_data::subnet_state", "{}: no info on to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); } - }, + } + rollout.state = min(rollout.state, RolloutState::UpgradingSubnets) + } + TaskInstanceState::Success => match task_name { + // Tasks corresponding to a subnet that are in state Success + // require somewhat different handling than tasks in states + // Running et al. For once, when a task is successful, + // the subnet state must be set to the *next* state it *would* + // have, if the /next/ task had /already begun executing/. + // + // To give an example: if `wait_until_start_time`` is Success, + // the subnet state is no longer "waiting until start time", + // but rather should be "creating proposal", even though + // perhaps `create_proposal_if_none_exists`` /has not yet run/ + // because we know certainly that the + // `create_proposal_if_none_exists` task is /about to run/ + // anyway. + // + // The same principle applies for all tasks -- if the current + // task is successful, we set the state of the subnet to the + // expected state that corresponds to the successor task. + // + // We could avoid encoding this type of knowledge here, by + // having a table of Airflow tasks vs. expected subnet states, + // and as a special case, on the task Success case, look up the + // successor task on the table to decide what subnet state to + // assign, but this would require a data structure different + // from the current (a vector of ordered task instances) to + // iterate over. This refactor may happen in the future, and + // it will require extra tests to ensure that invariants have + // been preserved between this code (which works well) and + // the future rewrite. + "collect_batch_subnets" => { + trans_min!(SubnetRolloutState::Waiting); + } + "wait_until_start_time" => { + batch.actual_start_time = match task_instance.end_date { + None => batch.actual_start_time, + Some(end_date) => { + if batch.actual_start_time.is_none() { + Some(end_date) + } else { + let stime = batch.actual_start_time.unwrap(); + Some(min(stime, end_date)) + } + } + }; + trans_exact!(SubnetRolloutState::Waiting); + } + "wait_for_preconditions" => { + trans_exact!(SubnetRolloutState::Proposing); + } + "create_proposal_if_none_exists" => { + trans_exact!(SubnetRolloutState::WaitingForElection); + } + "request_proposal_vote" => { + // We ignore this one for the purposes of rollout state setup. + } + "wait_until_proposal_is_accepted" => { + trans_exact!(SubnetRolloutState::WaitingForAdoption); + } + "wait_for_replica_revision" => { + trans_exact!(SubnetRolloutState::WaitingForAlertsGone); + } + "wait_until_no_alerts" => { + trans_exact!(SubnetRolloutState::Complete); + } + "join" => { + trans_exact!(SubnetRolloutState::Complete); + batch.end_time = task_instance.end_date; + } + &_ => { + warn!(target: "frontend_api::get_rollout_data::subnet_state", "{}: no info on how to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); + } }, + }, + } + } else if task_instance.task_id == "upgrade_unassigned_nodes" { + match task_instance.state { + Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (), + Some(TaskInstanceState::UpForRetry) | Some(TaskInstanceState::Restarting) => { + rollout.state = RolloutState::Problem } - } else if task_instance.task_id == "upgrade_unassigned_nodes" { - match task_instance.state { - Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (), - Some(TaskInstanceState::UpForRetry) - | Some(TaskInstanceState::Restarting) => { - rollout.state = RolloutState::Problem - } - Some(TaskInstanceState::Failed) - | Some(TaskInstanceState::UpstreamFailed) => { - rollout.state = RolloutState::Failed - } - Some(TaskInstanceState::UpForReschedule) - | Some(TaskInstanceState::Running) - | Some(TaskInstanceState::Deferred) - | Some(TaskInstanceState::Queued) - | Some(TaskInstanceState::Scheduled) - | Some(TaskInstanceState::Success) - | None => { - rollout.state = - min(rollout.state, RolloutState::UpgradingUnassignedNodes) - } + Some(TaskInstanceState::Failed) | Some(TaskInstanceState::UpstreamFailed) => { + rollout.state = RolloutState::Failed + } + Some(TaskInstanceState::UpForReschedule) + | Some(TaskInstanceState::Running) + | Some(TaskInstanceState::Deferred) + | Some(TaskInstanceState::Queued) + | Some(TaskInstanceState::Scheduled) + | Some(TaskInstanceState::Success) + | None => { + rollout.state = min(rollout.state, RolloutState::UpgradingUnassignedNodes) } - } else { - warn!(target: "frontend_api", "{}: unknown task {}", task_instance.dag_run_id, task_instance.task_id) } + } else { + warn!(target: "frontend_api::get_rollout_data::subnet_state", "{}: unknown task {}", task_instance.dag_run_id, task_instance.task_id) } + } - if let Some(state) = Some(&dag_run.state) { - match state { - DagRunState::Success => rollout.state = RolloutState::Complete, - DagRunState::Failed => rollout.state = RolloutState::Failed, - _ => (), - } + if let Some(state) = Some(&dag_run.state) { + match state { + DagRunState::Success => rollout.state = RolloutState::Complete, + DagRunState::Failed => rollout.state = RolloutState::Failed, + _ => (), } + } - if rollout_had_changed_tasks { - // We bump the cache entry's last update time, to only retrieve - // tasks from this point in time on during subsequent retrievals. - // We only do this at the end, in case any code above returns - // early, to force a full state recalculation if there was a - // failure or an early return. - cache_entry.last_update_time = Some(now); - } + // We bump the cache entry's last update time, to only retrieve + // tasks from this point in time on during subsequent retrievals. + // We only do this at the end, in case any code above returns + // early, to force a full state recalculation if there was a + // failure or an early return. + cache_entry.last_update_time = new_last_update_time; + + Ok(( + meaningful_updates_to_this_rollout, + rollout, + cache_entry.clone(), + )) + } +} +#[derive(Clone)] +pub struct RolloutApi { + airflow_api: Arc, + cache: Arc>, +} + +impl RolloutApi { + pub fn new(client: AirflowClient) -> Self { + Self { + airflow_api: Arc::new(client), + cache: Arc::new(Mutex::new(RolloutApiCache { + by_dag_run: HashMap::new(), + log_inspector: AirflowIncrementalLogInspector::default(), + })), + } + } + + pub async fn get_cache(&self) -> Vec { + let cache = self.cache.lock().await; + let mut result: Vec<_> = cache + .by_dag_run + .iter() + .map(|(k, v)| { + let linearized_tasks = v + .task_instances + .iter() + .flat_map(|(_, tasks)| tasks.iter().map(|(_, task)| task.clone())) + .collect(); + RolloutDataCacheResponse { + rollout_id: k.clone(), + linearized_task_instances: linearized_tasks, + dispatch_time: v.dispatch_time, + last_update_time: v.last_update_time, + schedule: v.schedule.clone(), + } + }) + .collect(); + drop(cache); + result.sort_by_key(|v| v.dispatch_time); + result.reverse(); + result + } + /// Retrieve all rollout data, using a cache to avoid + /// re-fetching task instances not updated since last time. + /// + /// Returns a tuple of the the rollout data and a flag + /// indicating if the rollout data was updated since + /// the last time. The flag should be used by calling + /// code to decide whether to send data to clients or not. + /// + /// The rollout structure itself is updated on every call + /// for every DAG run. However, not every change in the DAG + /// run is considered to be a meaningful change (causing a + /// true return in the update flag). Currently, only a change + /// in the rollout note, the state of any of its tasks, or + /// the rollout dispatch time are considered meaningful changes. + pub async fn get_rollout_data( + &self, + max_rollouts: usize, + ) -> Result<(Rollouts, bool), RolloutDataGatherError> { + let mut cache = self.cache.lock().await; + let dag_id = "rollout_ic_os_to_mainnet_subnets"; + + // Query the log to see what has changed that isn't expressed in the + // task updated, started or finished fields. + let (updated_log_inspector, dag_run_update_types) = cache + .log_inspector + .incrementally_detect_dag_updates(&self.airflow_api, dag_id) + .await?; + + // Retrieve the latest X DAG runs. + let dag_runs = self + .airflow_api + .dag_runs(dag_id, max_rollouts, 0, None, None) + .await?; + + // Get the tasks of the DAG, and assemble them into a topological + // sorter, so we can process the task instances in the right order. + // Note: perhaps if the number of tasks, or the names of tasks + // have changed, we would want to reset the task instance cache + // and re-request everything again. + let sorter = TaskInstanceTopologicalSorter::new(self.airflow_api.tasks(dag_id).await?)?; + + let mut updaters = dag_runs + .dag_runs + .iter() + .map(|dag_run| RolloutUpdater { + dag_run, + update_type: dag_run_update_types.update_type(&dag_run.dag_run_id), + cache_entry: cache + .by_dag_run + .get(&dag_run.dag_run_id) + .map(ToOwned::to_owned) + .unwrap_or(RolloutDataCache { + task_instances: HashMap::new(), + dispatch_time: dag_run.logical_date, + note: dag_run.note.clone(), + schedule: ScheduleCache::Empty, + last_update_time: None, + }), + }) + .collect::>(); + + let updateds = updaters + .iter_mut() + .map(|dag_and_cache| { + let sorter = sorter.clone(); + dag_and_cache.update( + &self.airflow_api, + sorter, + cache.log_inspector.last_event_log_update, + ) + }) + .collect::>(); + + // Track if any rollout has had any meaningful changes. + // Also see function documentation about meaningful changes. + let mut meaningful_updates_to_any_rollout = false; + let mut res: Rollouts = vec![]; + for fut in updateds.into_iter() { + let (updated, rollout, cache_entry) = fut.await?; + meaningful_updates_to_any_rollout = updated || meaningful_updates_to_any_rollout; + cache.by_dag_run.insert(rollout.name.clone(), cache_entry); res.push(rollout); } + // Save the state of the log inspector after everything was successful. + cache.log_inspector = updated_log_inspector; + Ok((res, meaningful_updates_to_any_rollout)) } } diff --git a/rollout-dashboard/server/src/main.rs b/rollout-dashboard/server/src/main.rs index af456d7..1458ddf 100644 --- a/rollout-dashboard/server/src/main.rs +++ b/rollout-dashboard/server/src/main.rs @@ -116,10 +116,10 @@ impl Server { match d { Ok((new_rollouts, updated)) => { let loop_delta_time = Utc::now() - loop_start_time; - info!(target: "update_loop", "After {}, obtained {} rollouts from Airflow (updated: {})", loop_delta_time, new_rollouts.len(), updated); + info!(target: "server::update_loop", "After {}, obtained {} rollouts from Airflow (updated: {})", loop_delta_time, new_rollouts.len(), updated); changed = updated; if errored { - info!(target: "update_loop", "Successfully processed rollout data again after temporary error"); + info!(target: "server::update_loop", "Successfully processed rollout data again after temporary error"); // Clear error flag. errored = false; // Ensure our data structure is overwritten by whatever data we obtained after the last loop. @@ -129,7 +129,7 @@ impl Server { } Err(res) => { error!( - target: "update_loop", "After processing fetch_rollout_data: {}", + target: "server::update_loop", "After processing fetch_rollout_data: {}", res.1 ); errored = true; @@ -171,13 +171,13 @@ impl Server { } fn produce_rollouts_sse_stream(&self) -> Sse>> { - debug!(target: "sse", "New client connected."); + debug!(target: "server::sse", "New client connected."); struct DisconnectionGuard {} impl Drop for DisconnectionGuard { fn drop(&mut self) { - debug!(target: "sse", "Client disconnected."); + debug!(target: "server::sse", "Client disconnected."); } } @@ -205,7 +205,7 @@ impl Server { loop { if stream_rx.changed().await.is_err() { - debug!(target: "sse", "No more transmissions. Stopping client SSE streaming."); + debug!(target: "server::sse", "No more transmissions. Stopping client SSE streaming."); break; } let current_rollout_status = &stream_rx.borrow_and_update().clone();