Skip to content

Commit

Permalink
[simulator] Unschedule subtree rooted at task if task is unable to ru…
Browse files Browse the repository at this point in the history
…n at timestep
  • Loading branch information
Dhruv Garg committed Dec 4, 2024
1 parent bd16310 commit b4aceeb
Showing 1 changed file with 76 additions and 32 deletions.
108 changes: 76 additions & 32 deletions simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1396,6 +1396,67 @@ def __handle_task_placement(self, event: Event, workload: Workload) -> None:
), "Inconsistency in future placements."
task_graph = workload.get_task_graph(task.task_graph)
assert task_graph is not None, "Inconsistency in Task placement and Workload."

# Subroutine to handle avoid automatic re-placement of tasks in the next timestep
# if they were unable to start either due to (i) parent task not finished or
# (ii) worker not ready. The sub-tree rooted at the task is unscheduled and will
# be placed again in the next run of the scheduler.
def unschedule_subtree_rooted_at_task(task):
# Find all dependent tasks rooted from given task to unschedule
def subtree_tasks_to_unschedule(task):
tasks_to_unschedule = [task]
for child_task in task_graph.get_children(task):
tasks_to_unschedule.extend(subtree_tasks_to_unschedule(child_task))
return tasks_to_unschedule

tasks_to_unschedule = subtree_tasks_to_unschedule(task)
self._logger.info("[%s] Going to unschedule tasks rooted from %s. "
"List of tasks that will be unscheduled are: %s",
event.time.time,
task,
tasks_to_unschedule)
for unschedule_task in tasks_to_unschedule:
if unschedule_task.id in self._future_placement_events:
future_placement_event = self._future_placement_events[
unschedule_task.id
]
if future_placement_event.time > event.time:
# Delete future event from event_queue and from future_placement_events
self._event_queue.remove_event(future_placement_event)
del self._future_placement_events[unschedule_task.id]
msg = (
f"[{event.time.time}] Retrieved future placement event {future_placement_event} "
f"for task {unschedule_task} and removed it."
)
self._logger.info(msg)
elif future_placement_event.time == event.time:
# Cannot delete from event_queue, as this event is likely being processed
del self._future_placement_events[unschedule_task.id]
msg = (
f"[{event.time.time}] Removed future placement event {future_placement_event} "
f"for task {unschedule_task} at the same time."
)
self._logger.info(msg)
else:
msg = (
f"[{event.time.time}] Future placement event {future_placement_event} for task "
f"{unschedule_task} is in the past."
)
self._logger.warning(msg)

# Unschedule the task
unschedule_task.unschedule(event.time)
self._csv_logger.debug(
f"{event.time.time},TASK_UNSCHEDULED,{unschedule_task.name},{unschedule_task.timestamp},"
f"{unschedule_task.id},{unschedule_task.task_graph}"
)

self._logger.info(
"[%s] Finished unscheduling of task %s.",
event.time.time,
unschedule_task,
)

if not task.is_ready_to_run(task_graph):
if task.state == TaskState.CANCELLED or task_graph.is_cancelled():
# The Task was cancelled. Consume the event.
Expand All @@ -1420,34 +1481,20 @@ def __handle_task_placement(self, event: Event, workload: Workload) -> None:
return
else:
# If the Task is not ready to run and wasn't cancelled,
# find the next possible time to try executing the task.
parent_completion_time = max(
parent.remaining_time for parent in task_graph.get_parents(task)
)
next_placement_time = event.time + max(
parent_completion_time,
self._min_placement_push_duration,
)
next_placement_event = Event(
event_type=event.event_type,
time=next_placement_time,
task=event.task,
placement=event.placement,
)
event.placement._placement_time = next_placement_time
self._future_placement_events[task.id] = next_placement_event
self._event_queue.add_event(next_placement_event)
# unschedule the task and its subtree.
self._logger.info(
"[%s] The Task %s was not ready to run, and has been pushed for "
"later placement at %s.",
"[%s] The Task %s was not ready to run. The task along with its "
"sub-tree will be unscheduled.",
event.time.to(EventTime.Unit.US).time,
task,
next_placement_time,
)
self._csv_logger.debug(
f"{event.time.time},TASK_NOT_READY,{task.name},{task.timestamp},"
f"{task.id},{event.placement.worker_pool_id}"
)

# Unschedule the task and its subtree rooted at this task.
unschedule_subtree_rooted_at_task(task)
return
# Initialize the task at the given placement time, and place it on
# the WorkerPool.
Expand Down Expand Up @@ -1484,26 +1531,23 @@ def __handle_task_placement(self, event: Event, workload: Workload) -> None:
task.id
] = event.placement
else:
next_placement_time = event.time + self._min_placement_push_duration
next_placement_event = Event(
event_type=event.event_type,
time=next_placement_time,
task=event.task,
placement=event.placement,
)
self._event_queue.add_event(next_placement_event)
self._future_placement_events[task.id] = next_placement_event
# If the placement was not successful, send the sub-tree of the taskgraph
# rooted at this task back to its previous state. It allows the scheduler
# to re-schedule in its next run.
self._logger.warning(
"[%s] Task %s cannot be placed on worker %s, pushing placement to %s.",
"[%s] Task %s couldn't be placed on worker %s. The task along with its "
"sub-tree will be unscheduled.",
event.time.time,
task,
worker_pool,
next_placement_time,
event.placement.worker_pool_id,
)
self._csv_logger.debug(
f"{event.time.time},WORKER_NOT_READY,{task.name},{task.timestamp},"
f"{task.id},{event.placement.worker_pool_id}"
)

# Unschedule the task and its subtree rooted at this task.
unschedule_subtree_rooted_at_task(task)

def __handle_task_migration(self, event: Event) -> None:
"""Handles the TASK_MIGRATION event. This event must be followed by a
Expand Down

0 comments on commit b4aceeb

Please sign in to comment.