diff --git a/simulator.py b/simulator.py index d80335fe..f9b0dc55 100644 --- a/simulator.py +++ b/simulator.py @@ -1396,6 +1396,67 @@ def __handle_task_placement(self, event: Event, workload: Workload) -> None: ), "Inconsistency in future placements." task_graph = workload.get_task_graph(task.task_graph) assert task_graph is not None, "Inconsistency in Task placement and Workload." + + # Subroutine to handle avoid automatic re-placement of tasks in the next timestep + # if they were unable to start either due to (i) parent task not finished or + # (ii) worker not ready. The sub-tree rooted at the task is unscheduled and will + # be placed again in the next run of the scheduler. + def unschedule_subtree_rooted_at_task(task): + # Find all dependent tasks rooted from given task to unschedule + def subtree_tasks_to_unschedule(task): + tasks_to_unschedule = [task] + for child_task in task_graph.get_children(task): + tasks_to_unschedule.extend(subtree_tasks_to_unschedule(child_task)) + return tasks_to_unschedule + + tasks_to_unschedule = subtree_tasks_to_unschedule(task) + self._logger.info("[%s] Going to unschedule tasks rooted from %s. " + "List of tasks that will be unscheduled are: %s", + event.time.time, + task, + tasks_to_unschedule) + for unschedule_task in tasks_to_unschedule: + if unschedule_task.id in self._future_placement_events: + future_placement_event = self._future_placement_events[ + unschedule_task.id + ] + if future_placement_event.time > event.time: + # Delete future event from event_queue and from future_placement_events + self._event_queue.remove_event(future_placement_event) + del self._future_placement_events[unschedule_task.id] + msg = ( + f"[{event.time.time}] Retrieved future placement event {future_placement_event} " + f"for task {unschedule_task} and removed it." + ) + self._logger.info(msg) + elif future_placement_event.time == event.time: + # Cannot delete from event_queue, as this event is likely being processed + del self._future_placement_events[unschedule_task.id] + msg = ( + f"[{event.time.time}] Removed future placement event {future_placement_event} " + f"for task {unschedule_task} at the same time." + ) + self._logger.info(msg) + else: + msg = ( + f"[{event.time.time}] Future placement event {future_placement_event} for task " + f"{unschedule_task} is in the past." + ) + self._logger.warning(msg) + + # Unschedule the task + unschedule_task.unschedule(event.time) + self._csv_logger.debug( + f"{event.time.time},TASK_UNSCHEDULED,{unschedule_task.name},{unschedule_task.timestamp}," + f"{unschedule_task.id},{unschedule_task.task_graph}" + ) + + self._logger.info( + "[%s] Finished unscheduling of task %s.", + event.time.time, + unschedule_task, + ) + if not task.is_ready_to_run(task_graph): if task.state == TaskState.CANCELLED or task_graph.is_cancelled(): # The Task was cancelled. Consume the event. @@ -1420,34 +1481,20 @@ def __handle_task_placement(self, event: Event, workload: Workload) -> None: return else: # If the Task is not ready to run and wasn't cancelled, - # find the next possible time to try executing the task. - parent_completion_time = max( - parent.remaining_time for parent in task_graph.get_parents(task) - ) - next_placement_time = event.time + max( - parent_completion_time, - self._min_placement_push_duration, - ) - next_placement_event = Event( - event_type=event.event_type, - time=next_placement_time, - task=event.task, - placement=event.placement, - ) - event.placement._placement_time = next_placement_time - self._future_placement_events[task.id] = next_placement_event - self._event_queue.add_event(next_placement_event) + # unschedule the task and its subtree. self._logger.info( - "[%s] The Task %s was not ready to run, and has been pushed for " - "later placement at %s.", + "[%s] The Task %s was not ready to run. The task along with its " + "sub-tree will be unscheduled.", event.time.to(EventTime.Unit.US).time, task, - next_placement_time, ) self._csv_logger.debug( f"{event.time.time},TASK_NOT_READY,{task.name},{task.timestamp}," f"{task.id},{event.placement.worker_pool_id}" ) + + # Unschedule the task and its subtree rooted at this task. + unschedule_subtree_rooted_at_task(task) return # Initialize the task at the given placement time, and place it on # the WorkerPool. @@ -1484,26 +1531,23 @@ def __handle_task_placement(self, event: Event, workload: Workload) -> None: task.id ] = event.placement else: - next_placement_time = event.time + self._min_placement_push_duration - next_placement_event = Event( - event_type=event.event_type, - time=next_placement_time, - task=event.task, - placement=event.placement, - ) - self._event_queue.add_event(next_placement_event) - self._future_placement_events[task.id] = next_placement_event + # If the placement was not successful, send the sub-tree of the taskgraph + # rooted at this task back to its previous state. It allows the scheduler + # to re-schedule in its next run. self._logger.warning( - "[%s] Task %s cannot be placed on worker %s, pushing placement to %s.", + "[%s] Task %s couldn't be placed on worker %s. The task along with its " + "sub-tree will be unscheduled.", event.time.time, task, - worker_pool, - next_placement_time, + event.placement.worker_pool_id, ) self._csv_logger.debug( f"{event.time.time},WORKER_NOT_READY,{task.name},{task.timestamp}," f"{task.id},{event.placement.worker_pool_id}" ) + + # Unschedule the task and its subtree rooted at this task. + unschedule_subtree_rooted_at_task(task) def __handle_task_migration(self, event: Event) -> None: """Handles the TASK_MIGRATION event. This event must be followed by a