diff --git a/dags/rollout_ic_os_to_subnets.py b/dags/rollout_ic_os_to_subnets.py index 81627dc..d6656d2 100644 --- a/dags/rollout_ic_os_to_subnets.py +++ b/dags/rollout_ic_os_to_subnets.py @@ -114,6 +114,12 @@ def collect_batch_subnets( }}""" % batch, ).expand(_ignored=proceed) + >> ic_os_sensor.WaitForPreconditions.partial( + task_id="wait_for_preconditions", + git_revision="{{ params.git_revision }}", + retries=retries, + network=network, + ).expand(subnet_id=proceed) >> ic_os_rollout.CreateProposalIdempotently.partial( task_id="create_proposal_if_none_exists", git_revision="{{ params.git_revision }}", diff --git a/plugins/operators/ic_os_rollout.py b/plugins/operators/ic_os_rollout.py index 46d44ea..a11d68f 100644 --- a/plugins/operators/ic_os_rollout.py +++ b/plugins/operators/ic_os_rollout.py @@ -16,6 +16,7 @@ RolloutPlanWithRevision, SubnetIdWithRevision, assign_default_revision, + check_plan, rollout_planner, subnet_id_and_git_revision_from_args, ) @@ -336,6 +337,13 @@ def schedule( f" to be rolled out at {item.start_at} to git" f" revision {item.git_revision}." ) + + try: + check_plan(plan) + except Exception as e: + print("Cannot proceed with rollout plan as planned: %s" % e) + raise AirflowException("Unsafe rollout plan") + return plan diff --git a/plugins/sensors/ic_os_rollout.py b/plugins/sensors/ic_os_rollout.py index 9f4899c..1c822b3 100644 --- a/plugins/sensors/ic_os_rollout.py +++ b/plugins/sensors/ic_os_rollout.py @@ -597,6 +597,54 @@ def execute(self, context: Context, event: Any = None) -> None: self.log.info("No other DAGs are running. Proceeding.") +class WaitForPreconditions(ICRolloutSensorBaseOperator): + """Performs a variety of checks. + + Current checks: + + * https://dfinity.atlassian.net/browse/REL-2675 delays updates of the + signing subnet and its backup within less than 1 day of each other. + """ + + def execute(self, context: Context, event: Any = None) -> None: + subnet_id, _ = subnet_id_and_git_revision_from_args( + self.subnet_id, self.git_revision + ) + + antipodes = { + "uzr34-akd3s-xrdag-3ql62-ocgoh-ld2ao-tamcv-54e7j-krwgb-2gm4z-oqe": "pzp6e" + "-ekpqk-3c5x7-2h6so-njoeq-mt45d-h3h6c-q3mxf-vpeq5-fk5o7-yae", + "pzp6e-ekpqk-3c5x7-2h6so-njoeq-mt45d-h3h6c-q3mxf-vpeq5-fk5o7-yae": "uzr34" + "-akd3s-xrdag-3ql62-ocgoh-ld2ao-tamcv-54e7j-krwgb-2gm4z-oqe", + } + + if subnet_id in antipodes: + other = antipodes[subnet_id] + + self.log.info( + f"Checking that {other} has not been updated in the" + f" last 1 day before {subnet_id}" + ) + query = "sum(changes(ic_replica_info{" + f'ic_subnet="{other}"' + "}[1d]))" + self.log.info(f"Querying Prometheus servers: {query}") + res = prom.query_prometheus_servers(self.network.prometheus_urls, query) + if not res: + raise RuntimeError(("Prometheus returned no sum of updates: %r" % res,)) + + update_sum = int(res[0]["value"]) + if update_sum > 0: + self.log.info( + f"{other} was updated too recently. Waiting to protect" + " the integrity of signing key or its backup." + ) + self.defer( + trigger=TimeDeltaTrigger(datetime.timedelta(minutes=3)), + method_name="execute", + ) + + self.log.info(f"It is now safe to continue with the update of {subnet_id}.") + + if __name__ == "__main__": import sys diff --git a/rollout-dashboard/server/src/frontend_api.rs b/rollout-dashboard/server/src/frontend_api.rs index b3a41c8..7540d1a 100644 --- a/rollout-dashboard/server/src/frontend_api.rs +++ b/rollout-dashboard/server/src/frontend_api.rs @@ -106,10 +106,10 @@ impl Batch { if (only_decrease && new_state < subnet.state) || (!only_decrease && new_state != subnet.state) { - trace!(target: "subnet_state", "{} {} {:?} transition {} => {} note: {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state, subnet.comment); + trace!(target: "subnet_state", "{}: {} {:?} transition {} => {} note: {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state, subnet.comment); subnet.state = new_state.clone(); } else { - trace!(target: "subnet_state", "{} {} {:?} NO transition {} => {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state); + trace!(target: "subnet_state", "{}: {} {:?} NO transition {} => {}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, subnet.state, new_state); } if new_state == subnet.state { subnet.comment = format!( @@ -723,7 +723,7 @@ impl RolloutApi { // * update the subnet link to the corresponding Airflow task if the // state of the task (after update) corresponds to the expected state, // * update rollout state to problem / error depending on the task state. - trace!(target: "subnet_state", "processing {} {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); + trace!(target: "subnet_state", "{}: processing {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); let (batch, task_name) = ( // We get away with unwrap() here because we know we captured an integer. match rollout @@ -732,7 +732,7 @@ impl RolloutApi { { Some(batch) => batch, None => { - trace!(target: "subnet_state", "no corresponding batch, continuing"); + trace!(target: "subnet_state", "{}: no corresponding batch, continuing", task_instance.dag_run_id); continue; } }, @@ -764,6 +764,8 @@ impl RolloutApi { None => { if task_name == "collect_batch_subnets" { trans_exact!(SubnetRolloutState::Pending); + } else { + trace!(target: "subnet_state", "{}: ignoring task instance {} {:?} with no state", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index); } } Some(state) => match state { @@ -772,7 +774,9 @@ impl RolloutApi { // https://stackoverflow.com/questions/77426996/skipping-a-task-in-airflow // If a task is skipped, the next task (in state Running / Deferred) // will pick up the slack for changing subnet state. - TaskInstanceState::Removed | TaskInstanceState::Skipped => (), + TaskInstanceState::Removed | TaskInstanceState::Skipped => { + trace!(target: "subnet_state", "{}: ignoring task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); + } TaskInstanceState::UpForRetry | TaskInstanceState::Restarting => { trans_min!(SubnetRolloutState::Error); rollout.state = min(rollout.state, RolloutState::Problem) @@ -797,6 +801,9 @@ impl RolloutApi { "wait_until_start_time" => { trans_min!(SubnetRolloutState::Waiting); } + "wait_for_preconditions" => { + trans_min!(SubnetRolloutState::Waiting); + } "create_proposal_if_none_exists" => { trans_min!(SubnetRolloutState::Proposing); } @@ -816,7 +823,7 @@ impl RolloutApi { trans_min!(SubnetRolloutState::Complete); } &_ => { - warn!(target: "subnet_state", "{} do not know how to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); + warn!(target: "subnet_state", "{}: no info on to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); } } rollout.state = min(rollout.state, RolloutState::UpgradingSubnets) @@ -865,6 +872,9 @@ impl RolloutApi { } } }; + trans_exact!(SubnetRolloutState::Waiting); + } + "wait_for_preconditions" => { trans_exact!(SubnetRolloutState::Proposing); } "create_proposal_if_none_exists" => { @@ -887,7 +897,7 @@ impl RolloutApi { batch.end_time = task_instance.end_date; } &_ => { - warn!(target: "subnet_state", "{} do not know how to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); + warn!(target: "subnet_state", "{}: no info on how to handle task instance {} {:?} in state {:?}", task_instance.dag_run_id, task_instance.task_id, task_instance.map_index, task_instance.state); } }, }, @@ -915,7 +925,7 @@ impl RolloutApi { } } } else { - warn!(target: "frontend_api", "{} unknown task {}", task_instance.dag_run_id, task_instance.task_id) + warn!(target: "frontend_api", "{}: unknown task {}", task_instance.dag_run_id, task_instance.task_id) } } diff --git a/shared/dfinity/ic_os_rollout.py b/shared/dfinity/ic_os_rollout.py index ea2e355..71aac36 100644 --- a/shared/dfinity/ic_os_rollout.py +++ b/shared/dfinity/ic_os_rollout.py @@ -328,3 +328,26 @@ def convert( current_batch_index = batch_index + 1 return batches + + +def check_plan(plan: RolloutPlanWithRevision) -> None: + # Check that uzr34 predates pzp6e by at least 24 hours. + # https://dfinity.atlassian.net/browse/REL-2675 . + uzr34_start_time: datetime.datetime | None = None + pzp6e_start_time: datetime.datetime | None = None + for _, batch in plan.values(): + for subnet in batch: + if subnet.subnet_id.startswith("uzr34"): + uzr34_start_time = subnet.start_at + if subnet.subnet_id.startswith("pzp6e"): + pzp6e_start_time = subnet.start_at + if uzr34_start_time and pzp6e_start_time: + delta = pzp6e_start_time - uzr34_start_time + if delta < datetime.timedelta(days=1): + raise ValueError( + ( + "The update time %s of pzp6e is too " + "close to the update time %s of uzr34 (%s)" + ) + % (pzp6e_start_time, uzr34_start_time, delta) + ) diff --git a/tests/test_plan_generator.py b/tests/test_plan_generator.py index f899d0a..77fe1a6 100644 --- a/tests/test_plan_generator.py +++ b/tests/test_plan_generator.py @@ -3,7 +3,14 @@ from datetime import timezone as tz import yaml -from dfinity.ic_os_rollout import RolloutPlan, rollout_planner, week_planner +from dfinity.ic_os_rollout import ( + RolloutPlan, + RolloutPlanWithRevision, + assign_default_revision, + check_plan, + rollout_planner, + week_planner, +) _MONDAY = datetime.datetime(2023, 6, 12, 0, 0, 0) @@ -369,3 +376,76 @@ def test_rollout_planner_with_odd_parameters(self) -> None: ValueError, lambda: rollout_planner(rollout_plan, self.fake_get_subnet_list, _MONDAY), ) + + +class TestPlanChecker(unittest.TestCase): + + def _formulate_plan(self, plan_draft: str) -> RolloutPlanWithRevision: + plan = yaml.safe_load(plan_draft) + + def lister() -> list[str]: + return [ + "uzr34-akd3s-xrdag-3ql62-ocgoh-ld2ao-tamcv-54e7j-krwgb-2gm4z-oqe", + "pzp6e-ekpqk-3c5x7-2h6so-njoeq-mt45d-h3h6c-q3mxf-vpeq5-fk5o7-yae", + ] + + return assign_default_revision( + rollout_planner( + plan, + lister, + _MONDAY, + ), + "0123456789012345678901234567890123456789", + ) + + def test_uzr34_pzp6e_timedelta_in_spec(self) -> None: + plan_draft = self._formulate_plan( + """ +Monday: + 9:00: + - subnet: uzr34 + git_revision: 0123456789012345678901234567890123456789 +Tuesday: + 9:00: + - subnet: pzp6e + git_revision: 0123456789012345678901234567890123456789 +""" + ) + check_plan(plan_draft) + + def test_uzr34_pzp6e_timedelta_too_short(self) -> None: + plan_draft = self._formulate_plan( + """ +Monday: + 9:00: + - subnet: uzr34 + git_revision: 0123456789012345678901234567890123456789 +Tuesday: + 8:00: + - subnet: pzp6e + git_revision: 0123456789012345678901234567890123456789 +""" + ) + self.assertRaises(ValueError, lambda: check_plan(plan_draft)) + + def test_uzr34_but_not_pzp6e(self) -> None: + plan_draft = self._formulate_plan( + """ +Monday: + 9:00: + - subnet: uzr34 + git_revision: 0123456789012345678901234567890123456789 +""" + ) + check_plan(plan_draft) + + def test_pzp6e_but_not_uzr34(self) -> None: + plan_draft = self._formulate_plan( + """ +Monday: + 9:00: + - subnet: uzr34 + git_revision: 0123456789012345678901234567890123456789 +""" + ) + check_plan(plan_draft)