Skip to content

Commit

Permalink
Merge pull request #25 from dfinity/tooltip
Browse files Browse the repository at this point in the history
Make subnet state tooltip HTML-native.
  • Loading branch information
DFINITYManu authored Aug 6, 2024
2 parents 03dff04 + ebc2875 commit 0d11479
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 79 deletions.
26 changes: 0 additions & 26 deletions rollout-dashboard/frontend/src/app.css
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,6 @@ nav {
row-gap: 2rem;
}

.tooltip {
position: relative;
display: inline-block;
cursor: pointer;
}

.tooltip .tooltiptext {
display: none;
visibility: hidden;
min-width: 250px;
background-color: black;
color: #fff;
text-align: center;
border-radius: 6px;
padding: 5px 0;

/* Position the tooltip */
position: absolute;
z-index: 1;
}

.tooltip:hover .tooltiptext {
display: inline-block;
visibility: visible;
}

time {
border-bottom: 1px dotted #000;
cursor: help;
Expand Down
13 changes: 4 additions & 9 deletions rollout-dashboard/frontend/src/lib/Batch.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import Time from "svelte-time";
import { copy } from "svelte-copy";
import { toast } from "@zerodevx/svelte-toast";
import { type Batch, batchStateName, batchStateIcon } from "./types";
import { type Batch, batchStateComment, batchStateIcon } from "./types";
export let batch_num: String;
export let batch: Batch;
Expand Down Expand Up @@ -54,15 +54,10 @@
href={subnet.display_url || ""}
target="_blank"
data-sveltekit-preload-data="off"
title={batchStateComment(subnet)}
>
<div class="subnet_state_icon tooltip">
{batchStateIcon(subnet.state)}<span
class="subnet_state tooltiptext"
>{batchStateName(
subnet.state,
)}{#if subnet.comment}<br
/>{subnet.comment}{/if}</span
>
<div class="subnet_state_icon">
{batchStateIcon(subnet.state)}
</div></a
>
<div
Expand Down
2 changes: 1 addition & 1 deletion rollout-dashboard/frontend/src/lib/Rollout.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
{#if rollout.conf.simulate}<i class="simulated">(simulated)</i>{/if}
</div>
<div class="state_icon tooltip">
{rolloutStateIcon(rollout.state)}<span class="state tooltiptext"
{rolloutStateIcon(rollout.state)}<span class="state"
>{rolloutStateName(rollout.state)}</span
>
</div>
Expand Down
7 changes: 7 additions & 0 deletions rollout-dashboard/frontend/src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ export function batchStateIcon(state: keyof typeof subnet_rollout_states): Strin
export function batchStateName(state: keyof typeof subnet_rollout_states): String {
return subnet_rollout_states[state].name;
}
export function batchStateComment(subnet: Subnet): string {
let s = subnet_rollout_states[subnet.state].name;
if (subnet.comment) {
s = s + ": " + subnet.comment
}
return s
}
export type Subnet = {
subnet_id: string;
git_revision: string;
Expand Down
104 changes: 72 additions & 32 deletions rollout-dashboard/server/src/frontend_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ pub struct Rollout {
pub note: Option<String>,
pub state: RolloutState,
pub dispatch_time: DateTime<Utc>,
/// Last scheduling decision.
/// Due to the way the central rollout cache is updated, clients may not see
/// an up-to-date value that corresponds to Airflow's last update time for
/// the DAG run. See documentation in get_rollout_data.
pub last_scheduling_decision: Option<DateTime<Utc>>,
pub batches: IndexMap<usize, Batch>,
pub conf: HashMap<String, serde_json::Value>,
Expand Down Expand Up @@ -413,6 +417,8 @@ enum ScheduleCache {
}
struct RolloutDataCache {
task_instances: HashMap<String, TaskInstancesResponseItem>,
dispatch_time: DateTime<Utc>,
note: Option<String>,
schedule: ScheduleCache,
}

Expand Down Expand Up @@ -444,10 +450,22 @@ impl RolloutApi {
impl RolloutApi {
/// Retrieve all rollout data, using a cache to avoid
/// re-fetching task instances not updated since last time.
///
/// Returns a tuple of the the rollout data and a flag
/// indicating if the rollout data was updated since
/// the last time. The flag should be used by calling
/// code to decide whether to send data to clients or not.
///
/// The rollout structure itself is updated on every call
/// for every DAG run. However, not every change in the DAG
/// run is considered to be a meaningful change (causing a
/// true return in the update flag). Currently, only a change
/// in the rollout note, the state of any of its tasks, or
/// the rollout dispatch time are considered meaningful changes.
pub async fn get_rollout_data(
&self,
max_rollouts: usize,
) -> Result<Vec<Rollout>, RolloutDataGatherError> {
) -> Result<(Vec<Rollout>, bool), RolloutDataGatherError> {
let mut cache = self.cache.lock().await;
let now = Utc::now();
let last_update_time = cache.last_update_time;
Expand All @@ -464,16 +482,23 @@ impl RolloutApi {
let sorter = TaskInstanceTopologicalSorter::new(tasks)?;

let mut res: Vec<Rollout> = vec![];
let mut any_rollout_updated = false;
// Track if any rollout has had any meaningful changes.
// Also see function documentation about meaningful changes.
let mut meaningful_updates_to_any_rollout = false;

for dag_run in dag_runs.dag_runs.iter() {
let cache_entry = cache
.by_dag_run
.entry(dag_run.dag_run_id.clone())
.or_insert(RolloutDataCache {
task_instances: HashMap::new(),
dispatch_time: dag_run.logical_date,
note: dag_run.note.clone(),
schedule: ScheduleCache::Empty,
});

// All new task instances that have not been seen before. This includes
// tasks of rollouts newly created since last time this loop checked for rollouts.
let updated_task_instances = self
.airflow_api
.task_instances(
Expand Down Expand Up @@ -503,19 +528,53 @@ impl RolloutApi {
};

debug!(
target: "frontend_api", "Updated tasks {} Ended tasks {}",
updated_task_instances.len(), ended_task_instances.len(),
target: "frontend_api", "{}: Updated tasks {} Ended tasks {}",
dag_run.dag_run_id, updated_task_instances.len(), ended_task_instances.len(),
);

if !updated_task_instances.is_empty() || !ended_task_instances.is_empty() {
any_rollout_updated = true;
// If the note of the rollout has changed,
// note that this has been updated.
if cache_entry.note != dag_run.note {
meaningful_updates_to_any_rollout = true;
cache_entry.note.clone_from(&dag_run.note);
}
// Same for the dispatch time.
if cache_entry.dispatch_time != dag_run.logical_date {
meaningful_updates_to_any_rollout = true;
cache_entry.dispatch_time = dag_run.logical_date;
}

let mut rollout = Rollout::new(
dag_run.dag_run_id.to_string(),
{
let mut display_url = self
.airflow_api
.as_ref()
.url
.join(format!("/dags/{}/grid", dag_run.dag_id).as_str())
.unwrap();
display_url
.query_pairs_mut()
.append_pair("dag_run_id", dag_run.dag_run_id.as_str());
display_url.to_string()
},
// Use recently-updated cache values here.
// See function documentation about meaningful changes.
cache_entry.note.clone(),
cache_entry.dispatch_time,
dag_run.last_scheduling_decision,
dag_run.conf.clone(),
);

// Let's update the cache to incorporate the most up-to-date task instances.
for task_instance in updated_task_instances
.into_iter()
.chain(ended_task_instances.into_iter())
{
// At least one task has updated or finished.
// See function documentation about meaningful changes.
meaningful_updates_to_any_rollout = true;

let task_instance_id = task_instance.task_id.clone();
if task_instance_id == "schedule" {
cache_entry.schedule = ScheduleCache::Invalid;
Expand Down Expand Up @@ -553,31 +612,10 @@ impl RolloutApi {
}
}

let sorted_task_instances =
sorter.sort_instances(cache_entry.task_instances.clone().into_values());

let mut rollout = Rollout::new(
dag_run.dag_run_id.to_string(),
{
let mut display_url = self
.airflow_api
.as_ref()
.url
.join(format!("/dags/{}/grid", dag_run.dag_id).as_str())
.unwrap();
display_url
.query_pairs_mut()
.append_pair("dag_run_id", dag_run.dag_run_id.as_str());
display_url.to_string()
},
dag_run.note.clone(),
dag_run.logical_date,
dag_run.last_scheduling_decision,
dag_run.conf.clone(),
);

// Now update rollout and batch state based on the obtained data.
for task_instance in sorted_task_instances {
for task_instance in
sorter.sort_instances(cache_entry.task_instances.clone().into_values())
{
if task_instance.task_id == "schedule" {
match task_instance.state {
Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (),
Expand Down Expand Up @@ -832,9 +870,11 @@ impl RolloutApi {
res.push(rollout);
}

if any_rollout_updated {
if meaningful_updates_to_any_rollout {
// Preserve the value for next loop so that we have a baseline
// of date/time to query data incrementally.
cache.last_update_time = Some(now);
}
Ok(res)
Ok((res, meaningful_updates_to_any_rollout))
}
}
29 changes: 18 additions & 11 deletions rollout-dashboard/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ impl Server {
async fn fetch_rollout_data(
&self,
max_rollouts: usize,
) -> Result<VecDeque<Rollout>, (StatusCode, String)> {
) -> Result<(VecDeque<Rollout>, bool), (StatusCode, String)> {
match self.rollout_api.get_rollout_data(max_rollouts).await {
Ok(rollouts) => Ok(rollouts.into()),
Ok((rollouts, updated)) => Ok((rollouts.into(), updated)),
Err(e) => {
let res = match e {
RolloutDataGatherError::AirflowError(AirflowError::StatusCode(c)) => {
Expand Down Expand Up @@ -108,16 +108,21 @@ impl Server {
loop {
let loop_start_time: DateTime<Utc> = Utc::now();

let mut changed = true;
let data = select! {
d = self.fetch_rollout_data(max_rollouts) => {
match d {
Ok(new_rollouts) => {
Ok((new_rollouts, updated)) => {
let loop_delta_time = Utc::now() - loop_start_time;
info!(target: "update_loop", "After {}, obtained {} rollouts from Airflow (updated: {})", loop_delta_time, new_rollouts.len(), updated);
changed = updated;
if errored {
info!(target: "update_loop", "Successfully processed rollout data again after temporary error");
errored = false
};
let loop_delta_time = Utc::now() - loop_start_time;
info!(target: "update_loop", "After {}, obtained {} rollouts from Airflow", loop_delta_time, new_rollouts.len());
// Clear error flag.
errored = false;
// Ensure our data structure is overwritten by whatever data we obtained after the last loop.
changed = true;
}
Ok(new_rollouts)
}
Err(res) => {
Expand All @@ -133,11 +138,13 @@ impl Server {
_ignored = &mut cancel => break,
};

let _ = self.stream_tx.send(data.clone());
if changed {
let _ = self.stream_tx.send(data.clone());

let mut container = self.last_rollout_data.lock().await;
*container = data;
drop(container);
let mut container = self.last_rollout_data.lock().await;
*container = data;
drop(container);
}

select! {
_ignored1 = sleep(Duration::from_secs(refresh_interval)) => (),
Expand Down

0 comments on commit 0d11479

Please sign in to comment.