Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn the dashboard into both a library and a binary. #37

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rollout-dashboard/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion rollout-dashboard/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
[package]
name = "rollout-dashboard"
version = "0.1.0"
version = "0.2.0"
edition = "2021"

[lib]
name = "rollout_dashboard"
path = "src/lib.rs"

[[bin]]
name = "rollout-dashboard"
path = "src/main.rs"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
Expand Down
245 changes: 70 additions & 175 deletions rollout-dashboard/server/src/frontend_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,189 +13,21 @@ use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;
use std::{vec, vec::Vec};
use strum::Display;
use tokio::sync::Mutex;
use topological_sort::TopologicalSort;

use crate::airflow_client::{
AirflowClient, AirflowError, DagRunState, TaskInstanceRequestFilters, TaskInstanceState,
TaskInstancesResponseItem, TasksResponse, TasksResponseItem,
};
use rollout_dashboard::types::{Batch, Rollout, RolloutState, Subnet, SubnetRolloutState};

lazy_static! {
// unwrap() is legitimate here because we know these cannot fail to compile.
static ref SubnetGitRevisionRe: Regex = Regex::new("dfinity.ic_types.SubnetRolloutInstance.*@version=0[(]start_at=.*,subnet_id=([0-9-a-z-]+),git_revision=([0-9a-f]+)[)]").unwrap();
static ref BatchIdentificationRe: Regex = Regex::new("batch_([0-9]+)[.](.+)").unwrap();
}

#[derive(Serialize, Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Display)]
#[serde(rename_all = "snake_case")]
/// Represents the rollout state of a subnet.
/// Ordering matters here.
pub enum SubnetRolloutState {
Error,
PredecessorFailed,
Pending,
Waiting,
Proposing,
WaitingForElection,
WaitingForAdoption,
WaitingForAlertsGone,
Complete,
Unknown,
}

#[derive(Serialize, Debug, Clone)]
pub struct Subnet {
pub subnet_id: String,
pub git_revision: String,
pub state: SubnetRolloutState,
/// Shows a comment for the subnet if available, else empty string.
pub comment: String,
/// Shows a display URL if available, else empty string.
pub display_url: String,
}

#[derive(Serialize, Debug, Clone)]
pub struct Batch {
pub planned_start_time: DateTime<Utc>,
pub actual_start_time: Option<DateTime<Utc>>,
pub end_time: Option<DateTime<Utc>>,
pub subnets: Vec<Subnet>,
}

fn format_some<N>(opt: Option<N>, prefix: &str, fallback: &str) -> String
where
N: Display,
{
match opt {
None => fallback.to_string(),
Some(v) => format!("{}{}", prefix, v),
}
}

impl Batch {
fn set_min_subnet_state(
&mut self,
state: SubnetRolloutState,
task_instance: &TaskInstancesResponseItem,
base_url: &reqwest::Url,
) -> SubnetRolloutState {
self.set_subnet_state(state, task_instance, base_url, true)
}
fn set_specific_subnet_state(
&mut self,
state: SubnetRolloutState,
task_instance: &TaskInstancesResponseItem,
base_url: &reqwest::Url,
) -> SubnetRolloutState {
self.set_subnet_state(state, task_instance, base_url, false)
}
fn set_subnet_state(
&mut self,
state: SubnetRolloutState,
task_instance: &TaskInstancesResponseItem,
base_url: &reqwest::Url,
only_decrease: bool,
) -> SubnetRolloutState {
for subnet in match task_instance.map_index {
None => self.subnets.iter_mut(),
Some(index) => self.subnets[index..=index].iter_mut(),
} {
let new_state = state.clone();
if (only_decrease && new_state < subnet.state)
|| (!only_decrease && new_state != subnet.state)
{
trace!(target: "subnet_state", "{}: {} {:?} transition {} => {} note: {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state, subnet.comment);
subnet.state = new_state.clone();
} else {
trace!(target: "subnet_state", "{}: {} {:?} NO transition {} => {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state);
}
if new_state == subnet.state {
subnet.comment = format!(
"Task {}{} {}",
task_instance.task_id,
format_some(task_instance.map_index, ".", ""),
format_some(
task_instance.state.clone(),
"in state ",
"has no known state"
),
);
subnet.display_url = {
let mut url = base_url
.join(format!("/dags/{}/grid", task_instance.dag_id).as_str())
.unwrap();
url.query_pairs_mut()
.append_pair("dag_run_id", &task_instance.dag_run_id);
url.query_pairs_mut()
.append_pair("task_id", &task_instance.task_id);
url.query_pairs_mut().append_pair("tab", "logs");
if let Some(idx) = task_instance.map_index {
url.query_pairs_mut()
.append_pair("map_index", format!("{}", idx).as_str());
};
url.to_string()
};
};
}
state
}
}

#[derive(Serialize, Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
#[serde(rename_all = "snake_case")]
/// Represents the rollout state. Ordering matters here.
pub enum RolloutState {
Failed,
Problem,
Preparing,
Waiting,
UpgradingSubnets,
UpgradingUnassignedNodes,
Complete,
}

#[derive(Debug, Serialize, Clone)]
pub struct Rollout {
/// name is unique, enforced by Airflow.
pub name: String,
/// Links to the rollout in Airflow.
pub display_url: String,
pub note: Option<String>,
pub state: RolloutState,
pub dispatch_time: DateTime<Utc>,
/// Last scheduling decision.
/// Due to the way the central rollout cache is updated, clients may not see
/// an up-to-date value that corresponds to Airflow's last update time for
/// the DAG run. See documentation in get_rollout_data.
pub last_scheduling_decision: Option<DateTime<Utc>>,
pub batches: IndexMap<usize, Batch>,
pub conf: HashMap<String, serde_json::Value>,
}

impl Rollout {
fn new(
name: String,
display_url: String,
note: Option<String>,
dispatch_time: DateTime<Utc>,
last_scheduling_decision: Option<DateTime<Utc>>,
conf: HashMap<String, serde_json::Value>,
) -> Self {
Self {
name,
display_url,
note,
state: RolloutState::Complete,
dispatch_time,
last_scheduling_decision,
batches: IndexMap::new(),
conf,
}
}
}

#[derive(Debug)]
pub struct CyclicDependencyError {
message: String,
Expand Down Expand Up @@ -433,6 +265,67 @@ struct RolloutApiCache {
by_dag_run: HashMap<String, RolloutDataCache>,
}

fn format_some<N>(opt: Option<N>, prefix: &str, fallback: &str) -> String
where
N: Display,
{
match opt {
None => fallback.to_string(),
Some(v) => format!("{}{}", prefix, v),
}
}

fn annotate_subnet_state(
batch: &mut Batch,
state: SubnetRolloutState,
task_instance: &TaskInstancesResponseItem,
base_url: &reqwest::Url,
only_decrease: bool,
) -> SubnetRolloutState {
for subnet in match task_instance.map_index {
None => batch.subnets.iter_mut(),
Some(index) => batch.subnets[index..=index].iter_mut(),
} {
let new_state = state.clone();
if (only_decrease && new_state < subnet.state)
|| (!only_decrease && new_state != subnet.state)
{
trace!(target: "subnet_state", "{}: {} {:?} transition {} => {} note: {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state, subnet.comment);
subnet.state = new_state.clone();
} else {
trace!(target: "subnet_state", "{}: {} {:?} NO transition {} => {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state);
}
if new_state == subnet.state {
subnet.comment = format!(
"Task {}{} {}",
task_instance.task_id,
format_some(task_instance.map_index, ".", ""),
format_some(
task_instance.state.clone(),
"in state ",
"has no known state"
),
);
subnet.display_url = {
let mut url = base_url
.join(format!("/dags/{}/grid", task_instance.dag_id).as_str())
.unwrap();
url.query_pairs_mut()
.append_pair("dag_run_id", &task_instance.dag_run_id);
url.query_pairs_mut()
.append_pair("task_id", &task_instance.task_id);
url.query_pairs_mut().append_pair("tab", "logs");
if let Some(idx) = task_instance.map_index {
url.query_pairs_mut()
.append_pair("map_index", format!("{}", idx).as_str());
};
url.to_string()
};
};
}
state
}

#[derive(Clone)]
pub struct RolloutApi {
airflow_api: Arc<AirflowClient>,
Expand All @@ -449,9 +342,7 @@ impl RolloutApi {
})),
}
}
}

impl RolloutApi {
/// Retrieve all rollout data, using a cache to avoid
/// re-fetching task instances not updated since last time.
///
Expand Down Expand Up @@ -741,20 +632,24 @@ impl RolloutApi {

macro_rules! trans_min {
($input:expr) => {
batch.set_min_subnet_state(
annotate_subnet_state(
batch,
$input,
&task_instance,
&self.airflow_api.as_ref().url,
);
true,
)
};
}
macro_rules! trans_exact {
($input:expr) => {
batch.set_specific_subnet_state(
annotate_subnet_state(
batch,
$input,
&task_instance,
&self.airflow_api.as_ref().url,
);
false,
)
};
}

Expand Down
1 change: 1 addition & 0 deletions rollout-dashboard/server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod types;
3 changes: 2 additions & 1 deletion rollout-dashboard/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ mod frontend_api;
mod python;

use crate::airflow_client::{AirflowClient, AirflowError};
use crate::frontend_api::{Rollout, RolloutApi, RolloutDataGatherError};
use crate::frontend_api::{RolloutApi, RolloutDataGatherError};
use rollout_dashboard::types::Rollout;

const BACKEND_REFRESH_UPDATE_INTERVAL: u64 = 15;

Expand Down
Loading
Loading