Skip to content

Commit

Permalink
Merge pull request #37 from dfinity/librefactor
Browse files Browse the repository at this point in the history
Turn the dashboard into both a library and a binary.
  • Loading branch information
DFINITYManu authored Aug 15, 2024
2 parents 517a7b3 + ccdaa64 commit 11b44c1
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 183 deletions.
16 changes: 13 additions & 3 deletions rollout-dashboard/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

* [Live dashboard here](https://rollout-dashboard.ch1-rel1.dfinity.network/)

This application collects information from Airflow to display in a nice
easy-to-use form. In production, it is composed of two distinct pieces:
This application:

* collects information from Airflow to display in a nice easy-to-use screen,
* provides a REST endpoint for other programs to retrieve the data it collects,
* provides a client library for those programs to use the retrieved data.

In production, it is composed of two distinct pieces:

1. A Rust-based backend that periodically collects information to
assemble in the right format, and serves it to clients.
assemble in the right format, and serves it to clients via REST API.
2. A collection of compiled TypeScript and CSS files that form the
Web client, which (when loaded by the browser) polls the backend
and displays the data returned by the backend. This collection of
Expand All @@ -15,6 +20,11 @@ easy-to-use form. In production, it is composed of two distinct pieces:
To upgrade the dashboard in production,
[consult the relevant document](https://dfinity-lab.gitlab.io/private/k8s/k8s/#/bases/apps/rollout-dashboard/).

If you are building a client of this application, consult the programming
documentation available under folder [`server/`](server/) by running
the `cargo rustdoc` program within the folder and then launching the Web
page it generates for you.

[[TOC]]

## Setting up a development environment
Expand Down
2 changes: 1 addition & 1 deletion rollout-dashboard/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion rollout-dashboard/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
[package]
name = "rollout-dashboard"
version = "0.1.0"
version = "0.2.0"
edition = "2021"

[lib]
name = "rollout_dashboard"
path = "src/lib.rs"

[[bin]]
name = "rollout-dashboard"
path = "src/main.rs"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
Expand Down
251 changes: 74 additions & 177 deletions rollout-dashboard/server/src/frontend_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,189 +13,23 @@ use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;
use std::{vec, vec::Vec};
use strum::Display;
use tokio::sync::Mutex;
use topological_sort::TopologicalSort;

use crate::airflow_client::{
AirflowClient, AirflowError, DagRunState, TaskInstanceRequestFilters, TaskInstanceState,
TaskInstancesResponseItem, TasksResponse, TasksResponseItem,
};
use rollout_dashboard::types::{
Batch, Rollout, RolloutState, Rollouts, Subnet, SubnetRolloutState,
};

lazy_static! {
// unwrap() is legitimate here because we know these cannot fail to compile.
static ref SubnetGitRevisionRe: Regex = Regex::new("dfinity.ic_types.SubnetRolloutInstance.*@version=0[(]start_at=.*,subnet_id=([0-9-a-z-]+),git_revision=([0-9a-f]+)[)]").unwrap();
static ref BatchIdentificationRe: Regex = Regex::new("batch_([0-9]+)[.](.+)").unwrap();
}

#[derive(Serialize, Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Display)]
#[serde(rename_all = "snake_case")]
/// Represents the rollout state of a subnet.
/// Ordering matters here.
pub enum SubnetRolloutState {
Error,
PredecessorFailed,
Pending,
Waiting,
Proposing,
WaitingForElection,
WaitingForAdoption,
WaitingForAlertsGone,
Complete,
Unknown,
}

#[derive(Serialize, Debug, Clone)]
pub struct Subnet {
pub subnet_id: String,
pub git_revision: String,
pub state: SubnetRolloutState,
/// Shows a comment for the subnet if available, else empty string.
pub comment: String,
/// Shows a display URL if available, else empty string.
pub display_url: String,
}

#[derive(Serialize, Debug, Clone)]
pub struct Batch {
pub planned_start_time: DateTime<Utc>,
pub actual_start_time: Option<DateTime<Utc>>,
pub end_time: Option<DateTime<Utc>>,
pub subnets: Vec<Subnet>,
}

fn format_some<N>(opt: Option<N>, prefix: &str, fallback: &str) -> String
where
N: Display,
{
match opt {
None => fallback.to_string(),
Some(v) => format!("{}{}", prefix, v),
}
}

impl Batch {
fn set_min_subnet_state(
&mut self,
state: SubnetRolloutState,
task_instance: &TaskInstancesResponseItem,
base_url: &reqwest::Url,
) -> SubnetRolloutState {
self.set_subnet_state(state, task_instance, base_url, true)
}
fn set_specific_subnet_state(
&mut self,
state: SubnetRolloutState,
task_instance: &TaskInstancesResponseItem,
base_url: &reqwest::Url,
) -> SubnetRolloutState {
self.set_subnet_state(state, task_instance, base_url, false)
}
fn set_subnet_state(
&mut self,
state: SubnetRolloutState,
task_instance: &TaskInstancesResponseItem,
base_url: &reqwest::Url,
only_decrease: bool,
) -> SubnetRolloutState {
for subnet in match task_instance.map_index {
None => self.subnets.iter_mut(),
Some(index) => self.subnets[index..=index].iter_mut(),
} {
let new_state = state.clone();
if (only_decrease && new_state < subnet.state)
|| (!only_decrease && new_state != subnet.state)
{
trace!(target: "subnet_state", "{}: {} {:?} transition {} => {} note: {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state, subnet.comment);
subnet.state = new_state.clone();
} else {
trace!(target: "subnet_state", "{}: {} {:?} NO transition {} => {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state);
}
if new_state == subnet.state {
subnet.comment = format!(
"Task {}{} {}",
task_instance.task_id,
format_some(task_instance.map_index, ".", ""),
format_some(
task_instance.state.clone(),
"in state ",
"has no known state"
),
);
subnet.display_url = {
let mut url = base_url
.join(format!("/dags/{}/grid", task_instance.dag_id).as_str())
.unwrap();
url.query_pairs_mut()
.append_pair("dag_run_id", &task_instance.dag_run_id);
url.query_pairs_mut()
.append_pair("task_id", &task_instance.task_id);
url.query_pairs_mut().append_pair("tab", "logs");
if let Some(idx) = task_instance.map_index {
url.query_pairs_mut()
.append_pair("map_index", format!("{}", idx).as_str());
};
url.to_string()
};
};
}
state
}
}

#[derive(Serialize, Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
#[serde(rename_all = "snake_case")]
/// Represents the rollout state. Ordering matters here.
pub enum RolloutState {
Failed,
Problem,
Preparing,
Waiting,
UpgradingSubnets,
UpgradingUnassignedNodes,
Complete,
}

#[derive(Debug, Serialize, Clone)]
pub struct Rollout {
/// name is unique, enforced by Airflow.
pub name: String,
/// Links to the rollout in Airflow.
pub display_url: String,
pub note: Option<String>,
pub state: RolloutState,
pub dispatch_time: DateTime<Utc>,
/// Last scheduling decision.
/// Due to the way the central rollout cache is updated, clients may not see
/// an up-to-date value that corresponds to Airflow's last update time for
/// the DAG run. See documentation in get_rollout_data.
pub last_scheduling_decision: Option<DateTime<Utc>>,
pub batches: IndexMap<usize, Batch>,
pub conf: HashMap<String, serde_json::Value>,
}

impl Rollout {
fn new(
name: String,
display_url: String,
note: Option<String>,
dispatch_time: DateTime<Utc>,
last_scheduling_decision: Option<DateTime<Utc>>,
conf: HashMap<String, serde_json::Value>,
) -> Self {
Self {
name,
display_url,
note,
state: RolloutState::Complete,
dispatch_time,
last_scheduling_decision,
batches: IndexMap::new(),
conf,
}
}
}

#[derive(Debug)]
pub struct CyclicDependencyError {
message: String,
Expand Down Expand Up @@ -433,6 +267,67 @@ struct RolloutApiCache {
by_dag_run: HashMap<String, RolloutDataCache>,
}

fn format_some<N>(opt: Option<N>, prefix: &str, fallback: &str) -> String
where
N: Display,
{
match opt {
None => fallback.to_string(),
Some(v) => format!("{}{}", prefix, v),
}
}

fn annotate_subnet_state(
batch: &mut Batch,
state: SubnetRolloutState,
task_instance: &TaskInstancesResponseItem,
base_url: &reqwest::Url,
only_decrease: bool,
) -> SubnetRolloutState {
for subnet in match task_instance.map_index {
None => batch.subnets.iter_mut(),
Some(index) => batch.subnets[index..=index].iter_mut(),
} {
let new_state = state.clone();
if (only_decrease && new_state < subnet.state)
|| (!only_decrease && new_state != subnet.state)
{
trace!(target: "subnet_state", "{}: {} {:?} transition {} => {} note: {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state, subnet.comment);
subnet.state = new_state.clone();
} else {
trace!(target: "subnet_state", "{}: {} {:?} NO transition {} => {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state);
}
if new_state == subnet.state {
subnet.comment = format!(
"Task {}{} {}",
task_instance.task_id,
format_some(task_instance.map_index, ".", ""),
format_some(
task_instance.state.clone(),
"in state ",
"has no known state"
),
);
subnet.display_url = {
let mut url = base_url
.join(format!("/dags/{}/grid", task_instance.dag_id).as_str())
.unwrap();
url.query_pairs_mut()
.append_pair("dag_run_id", &task_instance.dag_run_id);
url.query_pairs_mut()
.append_pair("task_id", &task_instance.task_id);
url.query_pairs_mut().append_pair("tab", "logs");
if let Some(idx) = task_instance.map_index {
url.query_pairs_mut()
.append_pair("map_index", format!("{}", idx).as_str());
};
url.to_string()
};
};
}
state
}

#[derive(Clone)]
pub struct RolloutApi {
airflow_api: Arc<AirflowClient>,
Expand All @@ -449,9 +344,7 @@ impl RolloutApi {
})),
}
}
}

impl RolloutApi {
/// Retrieve all rollout data, using a cache to avoid
/// re-fetching task instances not updated since last time.
///
Expand All @@ -469,7 +362,7 @@ impl RolloutApi {
pub async fn get_rollout_data(
&self,
max_rollouts: usize,
) -> Result<(Vec<Rollout>, bool), RolloutDataGatherError> {
) -> Result<(Rollouts, bool), RolloutDataGatherError> {
let mut cache = self.cache.lock().await;
let now = Utc::now();
let last_update_time = cache.last_update_time;
Expand All @@ -485,7 +378,7 @@ impl RolloutApi {
// and re-request everything again.
let sorter = TaskInstanceTopologicalSorter::new(tasks)?;

let mut res: Vec<Rollout> = vec![];
let mut res: Rollouts = vec![];
// Track if any rollout has had any meaningful changes.
// Also see function documentation about meaningful changes.
let mut meaningful_updates_to_any_rollout = false;
Expand Down Expand Up @@ -741,20 +634,24 @@ impl RolloutApi {

macro_rules! trans_min {
($input:expr) => {
batch.set_min_subnet_state(
annotate_subnet_state(
batch,
$input,
&task_instance,
&self.airflow_api.as_ref().url,
);
true,
)
};
}
macro_rules! trans_exact {
($input:expr) => {
batch.set_specific_subnet_state(
annotate_subnet_state(
batch,
$input,
&task_instance,
&self.airflow_api.as_ref().url,
);
false,
)
};
}

Expand Down
Loading

0 comments on commit 11b44c1

Please sign in to comment.