From 1ea3dedbb419ab0c5a6aed9bad88412f5383ae81 Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Tue, 6 Aug 2024 16:45:49 +0200 Subject: [PATCH 1/6] Increase efficiency. Previously, we were sending the full rollout to the frontend. This is entirely unnecessary unless tasks of the rollout have changed in status. We now track and use this information to send no data to the frontend (except for the every-5-second keepalives) when there are no changes to the data harvested by the backend. --- rollout-dashboard/server/src/frontend_api.rs | 8 ++++-- rollout-dashboard/server/src/main.rs | 30 +++++++++++++------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index 543692c..58d7542 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -444,10 +444,14 @@ impl RolloutApi { impl RolloutApi { /// Retrieve all rollout data, using a cache to avoid /// re-fetching task instances not updated since last time. + /// + /// Returns a tuple of the the rollout data and a flag + /// indicating if the rollout data was updated since + /// the last time. pub async fn get_rollout_data( &self, max_rollouts: usize, - ) -> Result, RolloutDataGatherError> { + ) -> Result<(Vec, bool), RolloutDataGatherError> { let mut cache = self.cache.lock().await; let now = Utc::now(); let last_update_time = cache.last_update_time; @@ -835,6 +839,6 @@ impl RolloutApi { if any_rollout_updated { cache.last_update_time = Some(now); } - Ok(res) + Ok((res, any_rollout_updated)) } } diff --git a/rollout-dashboard/server/src/main.rs b/rollout-dashboard/server/src/main.rs index be66f3b..0ec9253 100644 --- a/rollout-dashboard/server/src/main.rs +++ b/rollout-dashboard/server/src/main.rs @@ -59,9 +59,9 @@ impl Server { async fn fetch_rollout_data( &self, max_rollouts: usize, - ) -> Result, (StatusCode, String)> { + ) -> Result<(VecDeque, bool), (StatusCode, String)> { match self.rollout_api.get_rollout_data(max_rollouts).await { - Ok(rollouts) => Ok(rollouts.into()), + Ok((rollouts, updated)) => Ok((rollouts.into(), updated)), Err(e) => { let res = match e { RolloutDataGatherError::AirflowError(AirflowError::StatusCode(c)) => { @@ -108,16 +108,21 @@ impl Server { loop { let loop_start_time: DateTime = Utc::now(); + let mut changed = true; let data = select! { d = self.fetch_rollout_data(max_rollouts) => { match d { - Ok(new_rollouts) => { + Ok((new_rollouts, updated)) => { + let loop_delta_time = Utc::now() - loop_start_time; + info!(target: "update_loop", "After {}, obtained {} rollouts from Airflow (updated: {})", loop_delta_time, new_rollouts.len(), updated); + changed = updated; if errored { info!(target: "update_loop", "Successfully processed rollout data again after temporary error"); - errored = false - }; - let loop_delta_time = Utc::now() - loop_start_time; - info!(target: "update_loop", "After {}, obtained {} rollouts from Airflow", loop_delta_time, new_rollouts.len()); + // Clear error flag. + errored = false; + // Ensure our data structure is overwritten by whatever data we obtained after the last loop. + changed = true; + } Ok(new_rollouts) } Err(res) => { @@ -133,11 +138,14 @@ impl Server { _ignored = &mut cancel => break, }; - let _ = self.stream_tx.send(data.clone()); + if changed { + println!("changed"); + let _ = self.stream_tx.send(data.clone()); - let mut container = self.last_rollout_data.lock().await; - *container = data; - drop(container); + let mut container = self.last_rollout_data.lock().await; + *container = data; + drop(container); + } select! { _ignored1 = sleep(Duration::from_secs(refresh_interval)) => (), From 50cbc0384ea8ea42c896799497c6d02b4c3239bb Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Tue, 6 Aug 2024 17:00:44 +0200 Subject: [PATCH 2/6] Ensure that changes in rollout notes or dispatch times are also interpreted as changed data. --- rollout-dashboard/server/src/frontend_api.rs | 30 ++++++++++++++++---- rollout-dashboard/server/src/main.rs | 1 - 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index 58d7542..6caf89a 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -413,6 +413,8 @@ enum ScheduleCache { } struct RolloutDataCache { task_instances: HashMap, + dispatch_time: DateTime, + note: Option, schedule: ScheduleCache, } @@ -447,7 +449,8 @@ impl RolloutApi { /// /// Returns a tuple of the the rollout data and a flag /// indicating if the rollout data was updated since - /// the last time. + /// the last time. The flag should be used by calling + /// code to decide whether to send data to clients or not. pub async fn get_rollout_data( &self, max_rollouts: usize, @@ -476,8 +479,13 @@ impl RolloutApi { .entry(dag_run.dag_run_id.clone()) .or_insert(RolloutDataCache { task_instances: HashMap::new(), + dispatch_time: dag_run.logical_date, + note: dag_run.note.clone(), schedule: ScheduleCache::Empty, }); + + // All new task instances that have not been seen before. This includes + // tasks of rollouts newly created since last time this loop checked for rollouts. let updated_task_instances = self .airflow_api .task_instances( @@ -557,8 +565,17 @@ impl RolloutApi { } } - let sorted_task_instances = - sorter.sort_instances(cache_entry.task_instances.clone().into_values()); + // If the note of the rollout has changed, + // note that this has been updated. + if cache_entry.note != dag_run.note { + any_rollout_updated = true; + cache_entry.note = dag_run.note.clone(); + } + // Same for the dispatch time. + if cache_entry.dispatch_time != dag_run.logical_date { + any_rollout_updated = true; + cache_entry.dispatch_time = dag_run.logical_date.clone(); + } let mut rollout = Rollout::new( dag_run.dag_run_id.to_string(), @@ -574,12 +591,15 @@ impl RolloutApi { .append_pair("dag_run_id", dag_run.dag_run_id.as_str()); display_url.to_string() }, - dag_run.note.clone(), - dag_run.logical_date, + cache_entry.note.clone(), + cache_entry.dispatch_time, dag_run.last_scheduling_decision, dag_run.conf.clone(), ); + let sorted_task_instances = + sorter.sort_instances(cache_entry.task_instances.clone().into_values()); + // Now update rollout and batch state based on the obtained data. for task_instance in sorted_task_instances { if task_instance.task_id == "schedule" { diff --git a/rollout-dashboard/server/src/main.rs b/rollout-dashboard/server/src/main.rs index 0ec9253..af4efa9 100644 --- a/rollout-dashboard/server/src/main.rs +++ b/rollout-dashboard/server/src/main.rs @@ -139,7 +139,6 @@ impl Server { }; if changed { - println!("changed"); let _ = self.stream_tx.send(data.clone()); let mut container = self.last_rollout_data.lock().await; From 4930ae0c292ffddf1507e46c1d21704a2326ac6f Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Tue, 6 Aug 2024 17:14:42 +0200 Subject: [PATCH 3/6] Clarify code and flow. --- rollout-dashboard/server/src/frontend_api.rs | 100 +++++++++++-------- 1 file changed, 57 insertions(+), 43 deletions(-) diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index 6caf89a..0a48230 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -161,6 +161,10 @@ pub struct Rollout { pub note: Option, pub state: RolloutState, pub dispatch_time: DateTime, + /// Last scheduling decision. + /// Due to the way the central rollout cache is updated, clients may not see + /// an up-to-date value that corresponds to Airflow's last update time for + /// the DAG run. See documentation in get_rollout_data. pub last_scheduling_decision: Option>, pub batches: IndexMap, pub conf: HashMap, @@ -451,6 +455,13 @@ impl RolloutApi { /// indicating if the rollout data was updated since /// the last time. The flag should be used by calling /// code to decide whether to send data to clients or not. + /// + /// The rollout structure itself is updated on every call + /// for every DAG run. However, not every change in the DAG + /// run is considered to be a meaningful change (causing a + /// true return in the update flag). Currently, only a change + /// in the rollout note, the state of any of its tasks, or + /// the rollout dispatch time are considered meaningful changes. pub async fn get_rollout_data( &self, max_rollouts: usize, @@ -471,7 +482,9 @@ impl RolloutApi { let sorter = TaskInstanceTopologicalSorter::new(tasks)?; let mut res: Vec = vec![]; - let mut any_rollout_updated = false; + // Track if any rollout has had any meaningful changes. + // Also see function documentation about meaningful changes. + let mut meaningful_updates_to_any_rollout = false; for dag_run in dag_runs.dag_runs.iter() { let cache_entry = cache @@ -515,19 +528,53 @@ impl RolloutApi { }; debug!( - target: "frontend_api", "Updated tasks {} Ended tasks {}", - updated_task_instances.len(), ended_task_instances.len(), + target: "frontend_api", "{}: Updated tasks {} Ended tasks {}", + dag_run.dag_run_id, updated_task_instances.len(), ended_task_instances.len(), ); - if !updated_task_instances.is_empty() || !ended_task_instances.is_empty() { - any_rollout_updated = true; + // If the note of the rollout has changed, + // note that this has been updated. + if cache_entry.note != dag_run.note { + meaningful_updates_to_any_rollout = true; + cache_entry.note = dag_run.note.clone(); + } + // Same for the dispatch time. + if cache_entry.dispatch_time != dag_run.logical_date { + meaningful_updates_to_any_rollout = true; + cache_entry.dispatch_time = dag_run.logical_date.clone(); } + let mut rollout = Rollout::new( + dag_run.dag_run_id.to_string(), + { + let mut display_url = self + .airflow_api + .as_ref() + .url + .join(format!("/dags/{}/grid", dag_run.dag_id).as_str()) + .unwrap(); + display_url + .query_pairs_mut() + .append_pair("dag_run_id", dag_run.dag_run_id.as_str()); + display_url.to_string() + }, + // Use recently-updated cache values here. + // See function documentation about meaningful changes. + cache_entry.note.clone(), + cache_entry.dispatch_time, + dag_run.last_scheduling_decision, + dag_run.conf.clone(), + ); + // Let's update the cache to incorporate the most up-to-date task instances. for task_instance in updated_task_instances .into_iter() .chain(ended_task_instances.into_iter()) { + // At least one task has updated or finished. + // See function documentation about meaningful changes. + meaningful_updates_to_any_rollout = true; + let task_instance_id = task_instance.task_id.clone(); if task_instance_id == "schedule" { cache_entry.schedule = ScheduleCache::Invalid; @@ -565,43 +612,10 @@ impl RolloutApi { } } - // If the note of the rollout has changed, - // note that this has been updated. - if cache_entry.note != dag_run.note { - any_rollout_updated = true; - cache_entry.note = dag_run.note.clone(); - } - // Same for the dispatch time. - if cache_entry.dispatch_time != dag_run.logical_date { - any_rollout_updated = true; - cache_entry.dispatch_time = dag_run.logical_date.clone(); - } - - let mut rollout = Rollout::new( - dag_run.dag_run_id.to_string(), - { - let mut display_url = self - .airflow_api - .as_ref() - .url - .join(format!("/dags/{}/grid", dag_run.dag_id).as_str()) - .unwrap(); - display_url - .query_pairs_mut() - .append_pair("dag_run_id", dag_run.dag_run_id.as_str()); - display_url.to_string() - }, - cache_entry.note.clone(), - cache_entry.dispatch_time, - dag_run.last_scheduling_decision, - dag_run.conf.clone(), - ); - - let sorted_task_instances = - sorter.sort_instances(cache_entry.task_instances.clone().into_values()); - // Now update rollout and batch state based on the obtained data. - for task_instance in sorted_task_instances { + for task_instance in + sorter.sort_instances(cache_entry.task_instances.clone().into_values()) + { if task_instance.task_id == "schedule" { match task_instance.state { Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (), @@ -856,9 +870,9 @@ impl RolloutApi { res.push(rollout); } - if any_rollout_updated { + if meaningful_updates_to_any_rollout { cache.last_update_time = Some(now); } - Ok((res, any_rollout_updated)) + Ok((res, meaningful_updates_to_any_rollout)) } } From bc6ef6c3724815e1223ae4b64b5485c484f87930 Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Tue, 6 Aug 2024 17:16:05 +0200 Subject: [PATCH 4/6] Note about incremental change. --- rollout-dashboard/server/src/frontend_api.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index 0a48230..150470c 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -871,6 +871,8 @@ impl RolloutApi { } if meaningful_updates_to_any_rollout { + // Preserve the value for next loop so that we have a baseline + // of date/time to query data incrementally. cache.last_update_time = Some(now); } Ok((res, meaningful_updates_to_any_rollout)) From ed8e226a81dd0e8d1807bf4136bcfe09696651d5 Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Tue, 6 Aug 2024 17:18:19 +0200 Subject: [PATCH 5/6] Cargo clippy efficiency improvement suggestions applied. --- rollout-dashboard/server/src/frontend_api.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index 150470c..86a849b 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -536,12 +536,12 @@ impl RolloutApi { // note that this has been updated. if cache_entry.note != dag_run.note { meaningful_updates_to_any_rollout = true; - cache_entry.note = dag_run.note.clone(); + cache_entry.note.clone_from(&dag_run.note); } // Same for the dispatch time. if cache_entry.dispatch_time != dag_run.logical_date { meaningful_updates_to_any_rollout = true; - cache_entry.dispatch_time = dag_run.logical_date.clone(); + cache_entry.dispatch_time = dag_run.logical_date; } let mut rollout = Rollout::new( From ebc287590151e6182c7dba311ad7ea19cbe1b17f Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Tue, 6 Aug 2024 17:29:23 +0200 Subject: [PATCH 6/6] Make subnet state tooltip HTML-native. Goodbye horrible black background tooltips that cause margin problems on the right side of the page. --- rollout-dashboard/frontend/src/app.css | 26 ------------------- .../frontend/src/lib/Batch.svelte | 13 +++------- .../frontend/src/lib/Rollout.svelte | 2 +- rollout-dashboard/frontend/src/lib/types.ts | 7 +++++ 4 files changed, 12 insertions(+), 36 deletions(-) diff --git a/rollout-dashboard/frontend/src/app.css b/rollout-dashboard/frontend/src/app.css index 38f73e1..c8d1f84 100644 --- a/rollout-dashboard/frontend/src/app.css +++ b/rollout-dashboard/frontend/src/app.css @@ -34,32 +34,6 @@ nav { row-gap: 2rem; } -.tooltip { - position: relative; - display: inline-block; - cursor: pointer; -} - -.tooltip .tooltiptext { - display: none; - visibility: hidden; - min-width: 250px; - background-color: black; - color: #fff; - text-align: center; - border-radius: 6px; - padding: 5px 0; - - /* Position the tooltip */ - position: absolute; - z-index: 1; -} - -.tooltip:hover .tooltiptext { - display: inline-block; - visibility: visible; -} - time { border-bottom: 1px dotted #000; cursor: help; diff --git a/rollout-dashboard/frontend/src/lib/Batch.svelte b/rollout-dashboard/frontend/src/lib/Batch.svelte index 8d5e9c3..4982bbd 100644 --- a/rollout-dashboard/frontend/src/lib/Batch.svelte +++ b/rollout-dashboard/frontend/src/lib/Batch.svelte @@ -2,7 +2,7 @@ import Time from "svelte-time"; import { copy } from "svelte-copy"; import { toast } from "@zerodevx/svelte-toast"; - import { type Batch, batchStateName, batchStateIcon } from "./types"; + import { type Batch, batchStateComment, batchStateIcon } from "./types"; export let batch_num: String; export let batch: Batch; @@ -54,15 +54,10 @@ href={subnet.display_url || ""} target="_blank" data-sveltekit-preload-data="off" + title={batchStateComment(subnet)} > -
- {batchStateIcon(subnet.state)}{batchStateName( - subnet.state, - )}{#if subnet.comment}
{subnet.comment}{/if}
+
+ {batchStateIcon(subnet.state)}
(simulated){/if}
- {rolloutStateIcon(rollout.state)}{rolloutStateName(rollout.state)}
diff --git a/rollout-dashboard/frontend/src/lib/types.ts b/rollout-dashboard/frontend/src/lib/types.ts index a1ccc37..db4a8c8 100644 --- a/rollout-dashboard/frontend/src/lib/types.ts +++ b/rollout-dashboard/frontend/src/lib/types.ts @@ -26,6 +26,13 @@ export function batchStateIcon(state: keyof typeof subnet_rollout_states): Strin export function batchStateName(state: keyof typeof subnet_rollout_states): String { return subnet_rollout_states[state].name; } +export function batchStateComment(subnet: Subnet): string { + let s = subnet_rollout_states[subnet.state].name; + if (subnet.comment) { + s = s + ": " + subnet.comment + } + return s +} export type Subnet = { subnet_id: string; git_revision: string;