From 1461a3cab39676f33bc0a7841057dc8c3bd0e0a8 Mon Sep 17 00:00:00 2001 From: Elton Leander Pinto Date: Wed, 27 Nov 2024 16:58:27 -0500 Subject: [PATCH] fix workload release bug --- rpc/service.py | 2 -- simulator.py | 11 +++++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/rpc/service.py b/rpc/service.py index cd25d664..68bf8fec 100644 --- a/rpc/service.py +++ b/rpc/service.py @@ -141,8 +141,6 @@ def __init__(self) -> None: # Enable orchestrated mode FLAGS.orchestrated = True - # Set minimum placement push duration to 1s - FLAGS.min_placement_push_duration = 1_000_000 # Set scheduler runtime to zero FLAGS.scheduler_runtime = 0 diff --git a/simulator.py b/simulator.py index 98b97a9e..3a937ddf 100644 --- a/simulator.py +++ b/simulator.py @@ -525,6 +525,8 @@ def tick(self, until: EventTime) -> None: """Tick the simulator until the specified time""" def f(): + self._logger.debug(f"EQ: {self._event_queue}") + time_until_next_event = self.__time_until_next_event() if ( @@ -1618,6 +1620,14 @@ def __handle_update_workload(self, event: Event) -> None: # Release the Tasks that have become available. releasable_tasks = self._workload.get_releasable_tasks() + + # Ignore non-source tasks, they get auto-released when the parent finishes + def is_source_task(task): + task_graph = self._workload.get_task_graph(task.task_graph) + return task_graph.is_source_task(task) + + releasable_tasks = [task for task in releasable_tasks if is_source_task(task)] + self._logger.info( "[%s] The WorkloadLoader %s has %s TaskGraphs that released %s tasks.", self._simulator_time.to(EventTime.Unit.US).time, @@ -1666,6 +1676,7 @@ def __handle_update_workload(self, event: Event) -> None: max_release_time = self._simulator_time for task in releasable_tasks: + event = Event( event_type=EventType.TASK_RELEASE, time=task.release_time, task=task )