Skip to content

Commit

Permalink
Clarify code and flow.
Browse files Browse the repository at this point in the history
  • Loading branch information
DFINITYManu committed Aug 6, 2024
1 parent 50cbc03 commit 4930ae0
Showing 1 changed file with 57 additions and 43 deletions.
100 changes: 57 additions & 43 deletions rollout-dashboard/server/src/frontend_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ pub struct Rollout {
pub note: Option<String>,
pub state: RolloutState,
pub dispatch_time: DateTime<Utc>,
/// Last scheduling decision.
/// Due to the way the central rollout cache is updated, clients may not see
/// an up-to-date value that corresponds to Airflow's last update time for
/// the DAG run. See documentation in get_rollout_data.
pub last_scheduling_decision: Option<DateTime<Utc>>,
pub batches: IndexMap<usize, Batch>,
pub conf: HashMap<String, serde_json::Value>,
Expand Down Expand Up @@ -451,6 +455,13 @@ impl RolloutApi {
/// indicating if the rollout data was updated since
/// the last time. The flag should be used by calling
/// code to decide whether to send data to clients or not.
///
/// The rollout structure itself is updated on every call
/// for every DAG run. However, not every change in the DAG
/// run is considered to be a meaningful change (causing a
/// true return in the update flag). Currently, only a change
/// in the rollout note, the state of any of its tasks, or
/// the rollout dispatch time are considered meaningful changes.
pub async fn get_rollout_data(
&self,
max_rollouts: usize,
Expand All @@ -471,7 +482,9 @@ impl RolloutApi {
let sorter = TaskInstanceTopologicalSorter::new(tasks)?;

let mut res: Vec<Rollout> = vec![];
let mut any_rollout_updated = false;
// Track if any rollout has had any meaningful changes.
// Also see function documentation about meaningful changes.
let mut meaningful_updates_to_any_rollout = false;

for dag_run in dag_runs.dag_runs.iter() {
let cache_entry = cache
Expand Down Expand Up @@ -515,19 +528,53 @@ impl RolloutApi {
};

debug!(
target: "frontend_api", "Updated tasks {} Ended tasks {}",
updated_task_instances.len(), ended_task_instances.len(),
target: "frontend_api", "{}: Updated tasks {} Ended tasks {}",
dag_run.dag_run_id, updated_task_instances.len(), ended_task_instances.len(),
);

if !updated_task_instances.is_empty() || !ended_task_instances.is_empty() {
any_rollout_updated = true;
// If the note of the rollout has changed,
// note that this has been updated.
if cache_entry.note != dag_run.note {
meaningful_updates_to_any_rollout = true;
cache_entry.note = dag_run.note.clone();
}
// Same for the dispatch time.
if cache_entry.dispatch_time != dag_run.logical_date {
meaningful_updates_to_any_rollout = true;
cache_entry.dispatch_time = dag_run.logical_date.clone();
}

let mut rollout = Rollout::new(
dag_run.dag_run_id.to_string(),
{
let mut display_url = self
.airflow_api
.as_ref()
.url
.join(format!("/dags/{}/grid", dag_run.dag_id).as_str())
.unwrap();
display_url
.query_pairs_mut()
.append_pair("dag_run_id", dag_run.dag_run_id.as_str());
display_url.to_string()
},
// Use recently-updated cache values here.
// See function documentation about meaningful changes.
cache_entry.note.clone(),
cache_entry.dispatch_time,
dag_run.last_scheduling_decision,
dag_run.conf.clone(),
);

// Let's update the cache to incorporate the most up-to-date task instances.
for task_instance in updated_task_instances
.into_iter()
.chain(ended_task_instances.into_iter())
{
// At least one task has updated or finished.
// See function documentation about meaningful changes.
meaningful_updates_to_any_rollout = true;

let task_instance_id = task_instance.task_id.clone();
if task_instance_id == "schedule" {
cache_entry.schedule = ScheduleCache::Invalid;
Expand Down Expand Up @@ -565,43 +612,10 @@ impl RolloutApi {
}
}

// If the note of the rollout has changed,
// note that this has been updated.
if cache_entry.note != dag_run.note {
any_rollout_updated = true;
cache_entry.note = dag_run.note.clone();
}
// Same for the dispatch time.
if cache_entry.dispatch_time != dag_run.logical_date {
any_rollout_updated = true;
cache_entry.dispatch_time = dag_run.logical_date.clone();
}

let mut rollout = Rollout::new(
dag_run.dag_run_id.to_string(),
{
let mut display_url = self
.airflow_api
.as_ref()
.url
.join(format!("/dags/{}/grid", dag_run.dag_id).as_str())
.unwrap();
display_url
.query_pairs_mut()
.append_pair("dag_run_id", dag_run.dag_run_id.as_str());
display_url.to_string()
},
cache_entry.note.clone(),
cache_entry.dispatch_time,
dag_run.last_scheduling_decision,
dag_run.conf.clone(),
);

let sorted_task_instances =
sorter.sort_instances(cache_entry.task_instances.clone().into_values());

// Now update rollout and batch state based on the obtained data.
for task_instance in sorted_task_instances {
for task_instance in
sorter.sort_instances(cache_entry.task_instances.clone().into_values())
{
if task_instance.task_id == "schedule" {
match task_instance.state {
Some(TaskInstanceState::Skipped) | Some(TaskInstanceState::Removed) => (),
Expand Down Expand Up @@ -856,9 +870,9 @@ impl RolloutApi {
res.push(rollout);
}

if any_rollout_updated {
if meaningful_updates_to_any_rollout {
cache.last_update_time = Some(now);
}
Ok((res, any_rollout_updated))
Ok((res, meaningful_updates_to_any_rollout))
}
}

0 comments on commit 4930ae0

Please sign in to comment.