From 18ac80861d504b5014088cf1ea793ecd5ee13883 Mon Sep 17 00:00:00 2001 From: Sukrit Kalra Date: Tue, 16 Jan 2024 12:18:03 -0800 Subject: [PATCH] Implement support for unscheduling a Task. --- simulator.py | 3 +++ workload/tasks.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/simulator.py b/simulator.py index d2a3d033..568c7715 100644 --- a/simulator.py +++ b/simulator.py @@ -711,6 +711,9 @@ def __create_events_from_task_placement_skip( ) self._event_queue.remove_event(future_placement_event) del self._future_placement_events[placement.task.id] + + # Unschedule the Task. + placement.task.unschedule(time) else: self._logger.warning( "[%s] Failed to place %s, skipping it for future reconsideration.", diff --git a/workload/tasks.py b/workload/tasks.py index 9ef3ed7d..412dbe1e 100644 --- a/workload/tasks.py +++ b/workload/tasks.py @@ -143,6 +143,7 @@ def __init__( self._remaining_time = None self._last_step_time = -1 # Time when this task was stepped through. self._state = TaskState.VIRTUAL + self._pre_scheduling_state = TaskState.VIRTUAL # ID of the worker pool on which the task is running. self._worker_pool_id = None @@ -184,6 +185,7 @@ def release(self, time: Optional[EventTime] = None): f"{self.unique_name} from state {self._state}." ) self._state = TaskState.RELEASED + self._pre_scheduling_state = TaskState.RELEASED def schedule( self, @@ -225,6 +227,32 @@ def schedule( self._worker_pool_id = placement.worker_pool_id self.update_remaining_time(placement.execution_strategy.runtime) + def unschedule( + self, + time: EventTime, + ) -> None: + """Unschedules the execution of the task at the given simulator time. + + Args: + time (`EventTime`): The simulation time at which the task was + unscheduled. + + Raises: + `ValueError` if Task is not in `SCHEDULED` state yet. + """ + if self.state != TaskState.SCHEDULED: + raise ValueError( + f"Task must be in SCHEDULED state, currently in {self.state}." + ) + self._logger.debug( + f"[{time.to(EventTime.Unit.US).time}] Transitioning {self} to " + f"{self._pre_scheduling_state} from {TaskState.SCHEDULED}." + ) + self._state = self._pre_scheduling_state + self._scheduling_time = None + self._scheduler_placement = None + self._worker_pool_id = None + def start( self, time: Optional[EventTime] = None,