diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index 32ffa6e..2b76966 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -615,9 +615,18 @@ impl RolloutApi { } // Now update rollout and batch state based on the obtained data. + // What this process does is fairly straightforward: + // * for each and every known up-to-date Airflow task in the cache + // (always processed in topological order), for task_instance in sorter.sort_instances(cache_entry.task_instances.clone().into_values()) { + // * deduce the rollout plan, if available, + // * mark the rollout as having problems or errors depending on what + // the task state is, or as one of the various running states, if + // any non-subnet-related task is running / pending. + // * handle tasks corresponding to a batch/subnet in a special way + // (commented below in its pertinent section). if task_instance.task_id == "schedule" { match task_instance.state { Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (), @@ -704,6 +713,14 @@ impl RolloutApi { } else if let Some(captured) = BatchIdentificationRe.captures(task_instance.task_id.as_str()) { + // Handling of subnet state: + // * for each Airflow task that pertains to a rollout batch, + // * if its state in cache differs (or in some cases is higher) from the + // corresponding subnet state, upgrade the subnet state to be the correct + // state, + // * update the subnet link to the corresponding Airflow task if the + // state of the task (after update) corresponds to the expected state, + // * update rollout state to problem / error depending on the task state. trace!(target: "subnet_state", "processing {} {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); let (batch, task_name) = ( // We get away with unwrap() here because we know we captured an integer. @@ -798,6 +815,34 @@ impl RolloutApi { rollout.state = min(rollout.state, RolloutState::UpgradingSubnets) } TaskInstanceState::Success => match task_name { + // Tasks corresponding to a subnet that are in state Success + // require somewhat different handling than tasks in states + // Running et al. For once, when a task is successful, + // the subnet state must be set to the *next* state it *would* + // have, if the /next/ task had /already begun executing/. + // + // To give an example: if `wait_until_start_time`` is Success, + // the subnet state is no longer "waiting until start time", + // but rather should be "creating proposal", even though + // perhaps `create_proposal_if_none_exists`` /has not yet run/ + // because we know certainly that the + // `create_proposal_if_none_exists` task is /about to run/ + // anyway. + // + // The same principle applies for all tasks -- if the current + // task is successful, we set the state of the subnet to the + // expected state that corresponds to the successor task. + // + // We could avoid encoding this type of knowledge here, by + // having a table of Airflow tasks vs. expected subnet states, + // and as a special case, on the task Success case, look up the + // successor task on the table to decide what subnet state to + // assign, but this would require a data structure different + // from the current (a vector of ordered task instances) to + // iterate over. This refactor may happen in the future, and + // it will require extra tests to ensure that invariants have + // been preserved between this code (which works well) and + // the future rewrite. "wait_until_start_time" => { batch.actual_start_time = match task_instance.end_date { None => batch.actual_start_time,