Skip to content

Commit

Permalink
Rework local cache to address "not yet computed a rollout plan" issue.
Browse files Browse the repository at this point in the history
The rollout dashboard keeps a local cache of all tasks it has seen, because
retrieving *all* task instances from Airflow is expensive.  We do this
during the first loop, but in subsequent runs, we retrieve tasks that have
updated or modified after the last check's timestamp.  To this, we also add
tasks that have started after the last check.

In addition to that, we now linearize the tasks in case that any of the
retrieved task lists contains the same task, but may have been updated
between requests.  Linearization involves picking, for each task instance
the object with the latest date (be it execution, start or end date).

In addition to that, if the rollout plan is somehow retrieved but marked
empty when the schedule task is complete, we re-retrieve it again.  This
prevents the odd error where the task has completed but the XCom associated
with the task (containing the plan) is not yet saved to the database (or
at least it looks that way, because we're racing to get the value right
after the task finished, but the value is not yet inserted stably into
the database).

Finally, this PR parallelizes the multiple requests that take place when
the task retrieval is performed.  This reduces incremental update time to
roughly half of what it used to be.
  • Loading branch information
DFINITYManu committed Sep 25, 2024
1 parent 21c4c1c commit e043da2
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 93 deletions.
57 changes: 57 additions & 0 deletions rollout-dashboard/server/src/airflow_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ fn add_updated_parameters(
)
}

fn add_executed_parameters(
url: String,
execution_date_lte: Option<DateTime<Utc>>,
execution_date_gte: Option<DateTime<Utc>>,
) -> String {
add_date_parm(
add_date_parm(url, "execution_date_lte", execution_date_lte),
"execution_date_gte",
execution_date_gte,
)
}

fn add_ended_parameters(
url: String,
end_date_lte: Option<DateTime<Utc>>,
Expand Down Expand Up @@ -233,6 +245,39 @@ pub struct TaskInstancesResponseItem {
pub note: Option<String>,
}

impl TaskInstancesResponseItem {
#[allow(dead_code)]
pub fn latest_date(&self) -> DateTime<Utc> {
let mut d = self.execution_date;
if let Some(dd) = self.start_date {
if dd > d {
d = dd
}
}
if let Some(dd) = self.end_date {
if dd > d {
d = dd
}
}
d
}
#[allow(dead_code)]
pub fn earliest_date(&self) -> DateTime<Utc> {
let mut d = self.execution_date;
if let Some(dd) = self.start_date {
if dd < d {
d = dd
}
}
if let Some(dd) = self.end_date {
if dd < d {
d = dd
}
}
d
}
}

#[derive(Debug, Deserialize, Default)]

pub struct TaskInstancesResponse {
Expand Down Expand Up @@ -275,13 +320,24 @@ impl Pageable for TaskInstancesResponse {

#[derive(Default)]
pub struct TaskInstanceRequestFilters {
executed_at_lte: Option<DateTime<Utc>>,
executed_at_gte: Option<DateTime<Utc>>,
updated_at_lte: Option<DateTime<Utc>>,
updated_at_gte: Option<DateTime<Utc>>,
ended_at_lte: Option<DateTime<Utc>>,
ended_at_gte: Option<DateTime<Utc>>,
}

impl TaskInstanceRequestFilters {
#[allow(dead_code)]
pub fn executed_on_or_before(mut self, date: Option<DateTime<Utc>>) -> Self {
self.executed_at_lte = date;
self
}
pub fn executed_on_or_after(mut self, date: Option<DateTime<Utc>>) -> Self {
self.executed_at_gte = date;
self
}
#[allow(dead_code)]
pub fn updated_on_or_before(mut self, date: Option<DateTime<Utc>>) -> Self {
self.updated_at_lte = date;
Expand Down Expand Up @@ -737,6 +793,7 @@ impl AirflowClient {
filters: TaskInstanceRequestFilters,
) -> Result<TaskInstancesResponse, AirflowError> {
let mut url = format!("dags/{}/dagRuns/{}/taskInstances", dag_id, dag_run_id);
url = add_executed_parameters(url, filters.executed_at_lte, filters.executed_at_gte);
url = add_updated_parameters(url, filters.updated_at_lte, filters.updated_at_gte);
url = add_ended_parameters(url, filters.ended_at_lte, filters.ended_at_gte);
_paged_get(
Expand Down
Loading

0 comments on commit e043da2

Please sign in to comment.