Skip to content

Commit

Permalink
Add comments that encode knowledge of how the task -> state mapping o…
Browse files Browse the repository at this point in the history
…perations work.
  • Loading branch information
DFINITYManu committed Aug 7, 2024
1 parent b00d87b commit 6a1b9be
Showing 1 changed file with 45 additions and 0 deletions.
45 changes: 45 additions & 0 deletions rollout-dashboard/server/src/frontend_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,9 +615,18 @@ impl RolloutApi {
}

// Now update rollout and batch state based on the obtained data.
// What this process does is fairly straightforward:
// * for each and every known up-to-date Airflow task in the cache
// (always processed in topological order),
for task_instance in
sorter.sort_instances(cache_entry.task_instances.clone().into_values())
{
// * deduce the rollout plan, if available,
// * mark the rollout as having problems or errors depending on what
// the task state is, or as one of the various running states, if
// any non-subnet-related task is running / pending.
// * handle tasks corresponding to a batch/subnet in a special way
// (commented below in its pertinent section).
if task_instance.task_id == "schedule" {
match task_instance.state {
Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (),
Expand Down Expand Up @@ -704,6 +713,14 @@ impl RolloutApi {
} else if let Some(captured) =
BatchIdentificationRe.captures(task_instance.task_id.as_str())
{
// Handling of subnet state:
// * for each Airflow task that pertains to a rollout batch,
// * if its state in cache differs (or in some cases is higher) from the
// corresponding subnet state, upgrade the subnet state to be the correct
// state,
// * update the subnet link to the corresponding Airflow task if the
// state of the task (after update) corresponds to the expected state,
// * update rollout state to problem / error depending on the task state.
trace!(target: "subnet_state", "processing {} {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state);
let (batch, task_name) = (
// We get away with unwrap() here because we know we captured an integer.
Expand Down Expand Up @@ -798,6 +815,34 @@ impl RolloutApi {
rollout.state = min(rollout.state, RolloutState::UpgradingSubnets)
}
TaskInstanceState::Success => match task_name {
// Tasks corresponding to a subnet that are in state Success
// require somewhat different handling than tasks in states
// Running et al. For once, when a task is successful,
// the subnet state must be set to the *next* state it *would*
// have, if the /next/ task had /already begun executing/.
//
// To give an example: if `wait_until_start_time`` is Success,
// the subnet state is no longer "waiting until start time",
// but rather should be "creating proposal", even though
// perhaps `create_proposal_if_none_exists`` /has not yet run/
// because we know certainly that the
// `create_proposal_if_none_exists` task is /about to run/
// anyway.
//
// The same principle applies for all tasks -- if the current
// task is successful, we set the state of the subnet to the
// expected state that corresponds to the successor task.
//
// We could avoid encoding this type of knowledge here, by
// having a table of Airflow tasks vs. expected subnet states,
// and as a special case, on the task Success case, look up the
// successor task on the table to decide what subnet state to
// assign, but this would require a data structure different
// from the current (a vector of ordered task instances) to
// iterate over. This refactor may happen in the future, and
// it will require extra tests to ensure that invariants have
// been preserved between this code (which works well) and
// the future rewrite.
"wait_until_start_time" => {
batch.actual_start_time = match task_instance.end_date {
None => batch.actual_start_time,
Expand Down

0 comments on commit 6a1b9be

Please sign in to comment.