Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task link fixed when tasks are in Deferred state. #27

Merged
merged 4 commits into from
Aug 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 89 additions & 29 deletions rollout-dashboard/server/src/frontend_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::python;
use chrono::{DateTime, Utc};
use indexmap::IndexMap;
use lazy_static::lazy_static;
use log::{debug, error, trace};
use log::{debug, trace, warn};
use regex::Regex;
use serde::Serialize;
use std::cmp::min;
Expand Down Expand Up @@ -30,6 +30,8 @@ lazy_static! {

#[derive(Serialize, Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Display)]
#[serde(rename_all = "snake_case")]
/// Represents the rollout state of a subnet.
/// Ordering matters here.
pub enum SubnetRolloutState {
Error,
PredecessorFailed,
Expand All @@ -40,7 +42,6 @@ pub enum SubnetRolloutState {
WaitingForAdoption,
WaitingForAlertsGone,
Complete,
Skipped,
Unknown,
}

Expand Down Expand Up @@ -105,16 +106,6 @@ impl Batch {
if (only_decrease && new_state < subnet.state)
|| (!only_decrease && new_state != subnet.state)
{
subnet.comment = format!(
"Task {}{} {}",
task_instance.task_id,
format_some(task_instance.map_index, ".", ""),
format_some(
task_instance.state.clone(),
"in state ",
"has no known state"
),
);
subnet.display_url = {
let mut url = base_url
.join(format!("/dags/{}/grid", task_instance.dag_id).as_str())
Expand All @@ -131,17 +122,30 @@ impl Batch {
url.to_string()
};
trace!(target: "subnet_state", "{} {} {:?} transition {} => {} note: {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state, subnet.comment);
subnet.state = new_state;
subnet.state = new_state.clone();
} else {
trace!(target: "subnet_state", "{} {} {:?} NO transition {} => {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state);
}
if new_state == subnet.state {
subnet.comment = format!(
"Task {}{} {}",
task_instance.task_id,
format_some(task_instance.map_index, ".", ""),
format_some(
task_instance.state.clone(),
"in state ",
"has no known state"
),
)
};
}
state
}
}

#[derive(Serialize, Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
#[serde(rename_all = "snake_case")]
/// Represents the rollout state. Ordering matters here.
pub enum RolloutState {
Failed,
Problem,
Expand Down Expand Up @@ -613,9 +617,18 @@ impl RolloutApi {
}

// Now update rollout and batch state based on the obtained data.
// What this process does is fairly straightforward:
// * for each and every known up-to-date Airflow task in the cache
// (always processed in topological order),
for task_instance in
sorter.sort_instances(cache_entry.task_instances.clone().into_values())
{
// * deduce the rollout plan, if available,
// * mark the rollout as having problems or errors depending on what
// the task state is, or as one of the various running states, if
// any non-subnet-related task is running / pending.
// * handle tasks corresponding to a batch/subnet in a special way
// (commented below in its pertinent section).
if task_instance.task_id == "schedule" {
match task_instance.state {
Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (),
Expand Down Expand Up @@ -702,6 +715,14 @@ impl RolloutApi {
} else if let Some(captured) =
BatchIdentificationRe.captures(task_instance.task_id.as_str())
{
// Handling of subnet state:
// * for each Airflow task that pertains to a rollout batch,
// * if its state in cache differs (or in some cases is higher) from the
// corresponding subnet state, upgrade the subnet state to be the correct
// state,
// * update the subnet link to the corresponding Airflow task if the
// state of the task (after update) corresponds to the expected state,
// * update rollout state to problem / error depending on the task state.
trace!(target: "subnet_state", "processing {} {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state);
let (batch, task_name) = (
// We get away with unwrap() here because we know we captured an integer.
Expand Down Expand Up @@ -737,31 +758,32 @@ impl RolloutApi {
};
}

// FIXME: perhaps we want to destructure both the task name
// and the task state here.
match &task_instance.state {
None => {
if task_name == "collect_batch_subnets" {
trans_exact!(SubnetRolloutState::Pending);
}
}
Some(state) => match state {
TaskInstanceState::Skipped => {
trans_exact!(SubnetRolloutState::Skipped);
}
// https://stackoverflow.com/questions/53654302/tasks-are-moved-to-removed-state-in-airflow-when-they-are-queued-and-not-restore
// If a task is removed, we cannot decide rollout state based on it.
// https://stackoverflow.com/questions/77426996/skipping-a-task-in-airflow
// If a task is skipped, the next task (in state Running / Deferred)
// will pick up the slack for changing subnet state.
TaskInstanceState::Removed | TaskInstanceState::Skipped => (),
TaskInstanceState::UpForRetry | TaskInstanceState::Restarting => {
trans_exact!(SubnetRolloutState::Error);
rollout.state = RolloutState::Problem
trans_min!(SubnetRolloutState::Error);
rollout.state = min(rollout.state, RolloutState::Problem)
}
TaskInstanceState::Failed => {
trans_exact!(SubnetRolloutState::Error);
rollout.state = RolloutState::Failed
trans_min!(SubnetRolloutState::Error);
rollout.state = min(rollout.state, RolloutState::Failed)
}
TaskInstanceState::UpstreamFailed => {
trans_min!(SubnetRolloutState::PredecessorFailed);
rollout.state = RolloutState::Failed
}
TaskInstanceState::Removed => {
trans_exact!(SubnetRolloutState::Unknown);
rollout.state = RolloutState::Failed
rollout.state = min(rollout.state, RolloutState::Failed)
}
TaskInstanceState::UpForReschedule
| TaskInstanceState::Running
Expand All @@ -772,13 +794,15 @@ impl RolloutApi {
"collect_batch_subnets" => {
trans_min!(SubnetRolloutState::Pending);
}

"wait_until_start_time" => {
trans_min!(SubnetRolloutState::Waiting);
}
"create_proposal_if_none_exists" => {
trans_min!(SubnetRolloutState::Proposing);
}
"request_proposal_vote" => {
// We ignore this one for the purposes of rollout state setup.
}
"wait_until_proposal_is_accepted" => {
trans_min!(SubnetRolloutState::WaitingForElection);
}
Expand All @@ -791,11 +815,44 @@ impl RolloutApi {
"join" => {
trans_min!(SubnetRolloutState::Complete);
}
&_ => (),
&_ => {
warn!(target: "subnet_state", "{} do not know how to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state);
}
}
rollout.state = min(rollout.state, RolloutState::UpgradingSubnets)
}
TaskInstanceState::Success => match task_name {
// Tasks corresponding to a subnet that are in state Success
// require somewhat different handling than tasks in states
// Running et al. For once, when a task is successful,
// the subnet state must be set to the *next* state it *would*
// have, if the /next/ task had /already begun executing/.
//
// To give an example: if `wait_until_start_time`` is Success,
// the subnet state is no longer "waiting until start time",
// but rather should be "creating proposal", even though
// perhaps `create_proposal_if_none_exists`` /has not yet run/
// because we know certainly that the
// `create_proposal_if_none_exists` task is /about to run/
// anyway.
//
// The same principle applies for all tasks -- if the current
// task is successful, we set the state of the subnet to the
// expected state that corresponds to the successor task.
//
// We could avoid encoding this type of knowledge here, by
// having a table of Airflow tasks vs. expected subnet states,
// and as a special case, on the task Success case, look up the
// successor task on the table to decide what subnet state to
// assign, but this would require a data structure different
// from the current (a vector of ordered task instances) to
// iterate over. This refactor may happen in the future, and
// it will require extra tests to ensure that invariants have
// been preserved between this code (which works well) and
// the future rewrite.
"collect_batch_subnets" => {
trans_min!(SubnetRolloutState::Waiting);
}
"wait_until_start_time" => {
batch.actual_start_time = match task_instance.end_date {
None => batch.actual_start_time,
Expand All @@ -813,6 +870,9 @@ impl RolloutApi {
"create_proposal_if_none_exists" => {
trans_exact!(SubnetRolloutState::WaitingForElection);
}
"request_proposal_vote" => {
// We ignore this one for the purposes of rollout state setup.
}
"wait_until_proposal_is_accepted" => {
trans_exact!(SubnetRolloutState::WaitingForAdoption);
}
Expand All @@ -827,7 +887,7 @@ impl RolloutApi {
batch.end_time = task_instance.end_date;
}
&_ => {
trace!(target: "subnet_state", "{} {} {:?} ignoring task in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state);
warn!(target: "subnet_state", "{} do not know how to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state);
}
},
},
Expand Down Expand Up @@ -855,7 +915,7 @@ impl RolloutApi {
}
}
} else {
error!(target: "frontend_api", "Unknown task {}", task_instance.task_id)
warn!(target: "frontend_api", "{} unknown task {}", task_instance.dag_run_id, task_instance.task_id)
}
}

Expand Down
Loading