Skip to content

Commit

Permalink
Merge pull request #34 from dfinity/checkplan
Browse files Browse the repository at this point in the history
Check that the rollout plan allows for 24 hours between uzr34 and pzp6e.
  • Loading branch information
DFINITYManu authored Aug 13, 2024
2 parents 875d42a + e229f84 commit af4144c
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 9 deletions.
6 changes: 6 additions & 0 deletions dags/rollout_ic_os_to_subnets.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ def collect_batch_subnets(
}}"""
% batch,
).expand(_ignored=proceed)
>> ic_os_sensor.WaitForPreconditions.partial(
task_id="wait_for_preconditions",
git_revision="{{ params.git_revision }}",
retries=retries,
network=network,
).expand(subnet_id=proceed)
>> ic_os_rollout.CreateProposalIdempotently.partial(
task_id="create_proposal_if_none_exists",
git_revision="{{ params.git_revision }}",
Expand Down
8 changes: 8 additions & 0 deletions plugins/operators/ic_os_rollout.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
RolloutPlanWithRevision,
SubnetIdWithRevision,
assign_default_revision,
check_plan,
rollout_planner,
subnet_id_and_git_revision_from_args,
)
Expand Down Expand Up @@ -336,6 +337,13 @@ def schedule(
f" to be rolled out at {item.start_at} to git"
f" revision {item.git_revision}."
)

try:
check_plan(plan)
except Exception as e:
print("Cannot proceed with rollout plan as planned: %s" % e)
raise AirflowException("Unsafe rollout plan")

return plan


Expand Down
48 changes: 48 additions & 0 deletions plugins/sensors/ic_os_rollout.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,54 @@ def execute(self, context: Context, event: Any = None) -> None:
self.log.info("No other DAGs are running. Proceeding.")


class WaitForPreconditions(ICRolloutSensorBaseOperator):
"""Performs a variety of checks.
Current checks:
* https://dfinity.atlassian.net/browse/REL-2675 delays updates of the
signing subnet and its backup within less than 1 day of each other.
"""

def execute(self, context: Context, event: Any = None) -> None:
subnet_id, _ = subnet_id_and_git_revision_from_args(
self.subnet_id, self.git_revision
)

antipodes = {
"uzr34-akd3s-xrdag-3ql62-ocgoh-ld2ao-tamcv-54e7j-krwgb-2gm4z-oqe": "pzp6e"
"-ekpqk-3c5x7-2h6so-njoeq-mt45d-h3h6c-q3mxf-vpeq5-fk5o7-yae",
"pzp6e-ekpqk-3c5x7-2h6so-njoeq-mt45d-h3h6c-q3mxf-vpeq5-fk5o7-yae": "uzr34"
"-akd3s-xrdag-3ql62-ocgoh-ld2ao-tamcv-54e7j-krwgb-2gm4z-oqe",
}

if subnet_id in antipodes:
other = antipodes[subnet_id]

self.log.info(
f"Checking that {other} has not been updated in the"
f" last 1 day before {subnet_id}"
)
query = "sum(changes(ic_replica_info{" + f'ic_subnet="{other}"' + "}[1d]))"
self.log.info(f"Querying Prometheus servers: {query}")
res = prom.query_prometheus_servers(self.network.prometheus_urls, query)
if not res:
raise RuntimeError(("Prometheus returned no sum of updates: %r" % res,))

update_sum = int(res[0]["value"])
if update_sum > 0:
self.log.info(
f"{other} was updated too recently. Waiting to protect"
" the integrity of signing key or its backup."
)
self.defer(
trigger=TimeDeltaTrigger(datetime.timedelta(minutes=3)),
method_name="execute",
)

self.log.info(f"It is now safe to continue with the update of {subnet_id}.")


if __name__ == "__main__":
import sys

Expand Down
26 changes: 18 additions & 8 deletions rollout-dashboard/server/src/frontend_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ impl Batch {
if (only_decrease && new_state < subnet.state)
|| (!only_decrease && new_state != subnet.state)
{
trace!(target: "subnet_state", "{} {} {:?} transition {} => {} note: {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state, subnet.comment);
trace!(target: "subnet_state", "{}: {} {:?} transition {} => {} note: {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state, subnet.comment);
subnet.state = new_state.clone();
} else {
trace!(target: "subnet_state", "{} {} {:?} NO transition {} => {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state);
trace!(target: "subnet_state", "{}: {} {:?} NO transition {} => {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state);
}
if new_state == subnet.state {
subnet.comment = format!(
Expand Down Expand Up @@ -723,7 +723,7 @@ impl RolloutApi {
// * update the subnet link to the corresponding Airflow task if the
// state of the task (after update) corresponds to the expected state,
// * update rollout state to problem / error depending on the task state.
trace!(target: "subnet_state", "processing {} {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state);
trace!(target: "subnet_state", "{}: processing {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state);
let (batch, task_name) = (
// We get away with unwrap() here because we know we captured an integer.
match rollout
Expand All @@ -732,7 +732,7 @@ impl RolloutApi {
{
Some(batch) => batch,
None => {
trace!(target: "subnet_state", "no corresponding batch, continuing");
trace!(target: "subnet_state", "{}: no corresponding batch, continuing", task_instance.dag_run_id);
continue;
}
},
Expand Down Expand Up @@ -764,6 +764,8 @@ impl RolloutApi {
None => {
if task_name == "collect_batch_subnets" {
trans_exact!(SubnetRolloutState::Pending);
} else {
trace!(target: "subnet_state", "{}: ignoring task instance {} {:?} with no state", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index);
}
}
Some(state) => match state {
Expand All @@ -772,7 +774,9 @@ impl RolloutApi {
// https://stackoverflow.com/questions/77426996/skipping-a-task-in-airflow
// If a task is skipped, the next task (in state Running / Deferred)
// will pick up the slack for changing subnet state.
TaskInstanceState::Removed | TaskInstanceState::Skipped => (),
TaskInstanceState::Removed | TaskInstanceState::Skipped => {
trace!(target: "subnet_state", "{}: ignoring task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state);
}
TaskInstanceState::UpForRetry | TaskInstanceState::Restarting => {
trans_min!(SubnetRolloutState::Error);
rollout.state = min(rollout.state, RolloutState::Problem)
Expand All @@ -797,6 +801,9 @@ impl RolloutApi {
"wait_until_start_time" => {
trans_min!(SubnetRolloutState::Waiting);
}
"wait_for_preconditions" => {
trans_min!(SubnetRolloutState::Waiting);
}
"create_proposal_if_none_exists" => {
trans_min!(SubnetRolloutState::Proposing);
}
Expand All @@ -816,7 +823,7 @@ impl RolloutApi {
trans_min!(SubnetRolloutState::Complete);
}
&_ => {
warn!(target: "subnet_state", "{} do not know how to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state);
warn!(target: "subnet_state", "{}: no info on to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state);
}
}
rollout.state = min(rollout.state, RolloutState::UpgradingSubnets)
Expand Down Expand Up @@ -865,6 +872,9 @@ impl RolloutApi {
}
}
};
trans_exact!(SubnetRolloutState::Waiting);
}
"wait_for_preconditions" => {
trans_exact!(SubnetRolloutState::Proposing);
}
"create_proposal_if_none_exists" => {
Expand All @@ -887,7 +897,7 @@ impl RolloutApi {
batch.end_time = task_instance.end_date;
}
&_ => {
warn!(target: "subnet_state", "{} do not know how to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state);
warn!(target: "subnet_state", "{}: no info on how to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state);
}
},
},
Expand Down Expand Up @@ -915,7 +925,7 @@ impl RolloutApi {
}
}
} else {
warn!(target: "frontend_api", "{} unknown task {}", task_instance.dag_run_id, task_instance.task_id)
warn!(target: "frontend_api", "{}: unknown task {}", task_instance.dag_run_id, task_instance.task_id)
}
}

Expand Down
23 changes: 23 additions & 0 deletions shared/dfinity/ic_os_rollout.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,26 @@ def convert(
current_batch_index = batch_index + 1

return batches


def check_plan(plan: RolloutPlanWithRevision) -> None:
# Check that uzr34 predates pzp6e by at least 24 hours.
# https://dfinity.atlassian.net/browse/REL-2675 .
uzr34_start_time: datetime.datetime | None = None
pzp6e_start_time: datetime.datetime | None = None
for _, batch in plan.values():
for subnet in batch:
if subnet.subnet_id.startswith("uzr34"):
uzr34_start_time = subnet.start_at
if subnet.subnet_id.startswith("pzp6e"):
pzp6e_start_time = subnet.start_at
if uzr34_start_time and pzp6e_start_time:
delta = pzp6e_start_time - uzr34_start_time
if delta < datetime.timedelta(days=1):
raise ValueError(
(
"The update time %s of pzp6e is too "
"close to the update time %s of uzr34 (%s)"
)
% (pzp6e_start_time, uzr34_start_time, delta)
)
82 changes: 81 additions & 1 deletion tests/test_plan_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
from datetime import timezone as tz

import yaml
from dfinity.ic_os_rollout import RolloutPlan, rollout_planner, week_planner
from dfinity.ic_os_rollout import (
RolloutPlan,
RolloutPlanWithRevision,
assign_default_revision,
check_plan,
rollout_planner,
week_planner,
)

_MONDAY = datetime.datetime(2023, 6, 12, 0, 0, 0)

Expand Down Expand Up @@ -369,3 +376,76 @@ def test_rollout_planner_with_odd_parameters(self) -> None:
ValueError,
lambda: rollout_planner(rollout_plan, self.fake_get_subnet_list, _MONDAY),
)


class TestPlanChecker(unittest.TestCase):

def _formulate_plan(self, plan_draft: str) -> RolloutPlanWithRevision:
plan = yaml.safe_load(plan_draft)

def lister() -> list[str]:
return [
"uzr34-akd3s-xrdag-3ql62-ocgoh-ld2ao-tamcv-54e7j-krwgb-2gm4z-oqe",
"pzp6e-ekpqk-3c5x7-2h6so-njoeq-mt45d-h3h6c-q3mxf-vpeq5-fk5o7-yae",
]

return assign_default_revision(
rollout_planner(
plan,
lister,
_MONDAY,
),
"0123456789012345678901234567890123456789",
)

def test_uzr34_pzp6e_timedelta_in_spec(self) -> None:
plan_draft = self._formulate_plan(
"""
Monday:
9:00:
- subnet: uzr34
git_revision: 0123456789012345678901234567890123456789
Tuesday:
9:00:
- subnet: pzp6e
git_revision: 0123456789012345678901234567890123456789
"""
)
check_plan(plan_draft)

def test_uzr34_pzp6e_timedelta_too_short(self) -> None:
plan_draft = self._formulate_plan(
"""
Monday:
9:00:
- subnet: uzr34
git_revision: 0123456789012345678901234567890123456789
Tuesday:
8:00:
- subnet: pzp6e
git_revision: 0123456789012345678901234567890123456789
"""
)
self.assertRaises(ValueError, lambda: check_plan(plan_draft))

def test_uzr34_but_not_pzp6e(self) -> None:
plan_draft = self._formulate_plan(
"""
Monday:
9:00:
- subnet: uzr34
git_revision: 0123456789012345678901234567890123456789
"""
)
check_plan(plan_draft)

def test_pzp6e_but_not_uzr34(self) -> None:
plan_draft = self._formulate_plan(
"""
Monday:
9:00:
- subnet: uzr34
git_revision: 0123456789012345678901234567890123456789
"""
)
check_plan(plan_draft)

0 comments on commit af4144c

Please sign in to comment.