From 24a89db5ced8c8edd36209f644504562a5161b16 Mon Sep 17 00:00:00 2001 From: Sukrit Kalra Date: Fri, 5 Jan 2024 17:14:20 -0800 Subject: [PATCH] [BUG] Fix bugs in AlibabaLoader with critical path runtimes. --- data/alibaba_loader.py | 30 +++++++++++++++++++----------- main.py | 4 ++-- simulator.py | 4 ++-- workload/jobs.py | 3 ++- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/data/alibaba_loader.py b/data/alibaba_loader.py index 7ba254de..c8e1ae8c 100644 --- a/data/alibaba_loader.py +++ b/data/alibaba_loader.py @@ -403,6 +403,7 @@ def job_graph_data_generator( data: Mapping[str, List[str]] = AlibabaTaskUnpickler( pickled_file ).load() + skipped_job_graphs = 0 for job_graph_name, job_tasks in data.items(): try: job_graph = self._convert_job_data_to_job_graph( @@ -420,13 +421,14 @@ def job_graph_data_generator( ): self._job_graphs[path][job_graph_name] = job_graph else: - self._logger.debug( - f"Skipping job graph {job_graph_name} with " - f"critical path runtime " - f"{cp_runtime.to(EventTime.Unit.US).time}" - f" outside of range [{min_critical_path_runtime}, " - f"{max_critical_path_runtime})." - ) + skipped_job_graphs += 1 + # self._logger.debug( + # f"[0] Skipping job graph {job_graph_name} with " + # f"critical path runtime " + # f"{cp_runtime.to(EventTime.Unit.US).time}" + # f" outside of range [{min_critical_path_runtime}," + # f" {max_critical_path_runtime})." + # ) else: self._logger.warning( f"Failed to create job graph {job_graph_name}." @@ -436,6 +438,10 @@ def job_graph_data_generator( f"Failed to convert job graph {job_graph_name} " f"with error {e.__class__}: {e}." ) + self._logger.debug( + f"[0] Skipped {skipped_job_graphs} job graphs from path {path}, " + f"loaded {len(self._job_graphs[path])} job graphs." + ) path_to_job_graph_generator_mapping = {} for index, (path, _) in enumerate(self._workload_paths_and_release_policies): @@ -452,16 +458,18 @@ def job_graph_data_generator( ) min_critical_path_runtime = ( 0 - if index >= len(self._flags.min_critical_path_runtimes) + if index + >= len(self._flags.alibaba_loader_min_critical_path_runtimes) else int( - self._flags.alibaba_loader_max_critical_path_runtime[index] + self._flags.alibaba_loader_min_critical_path_runtimes[index] ) ) max_critical_path_runtime = ( sys.maxsize - if index >= len(self._flags.max_critical_path_runtimes) + if index + >= len(self._flags.alibaba_loader_max_critical_path_runtimes) else int( - self._flags.alibaba_loader_max_critical_path_runtime[index] + self._flags.alibaba_loader_max_critical_path_runtimes[index] ) ) path_to_job_graph_generator_mapping[path] = partial( diff --git a/main.py b/main.py index 7ef88354..37bacc4b 100644 --- a/main.py +++ b/main.py @@ -179,14 +179,14 @@ "The multiplier used for alibaba trace tasks task.duration.", ) flags.DEFINE_list( - "alibaba_loader_min_critical_path_runtime", + "alibaba_loader_min_critical_path_runtimes", [], "The minimum critical path duration for each TaskGraph from the corresponding " "Workload. If the list is empty, then the minimum critical path duration is " "set to 0. TaskGraphs lower than this critical path duration will not be released.", ) flags.DEFINE_list( - "alibaba_loader_max_critical_path_runtime", + "alibaba_loader_max_critical_path_runtimes", [], "The maximum critical path duration for each TaskGraph from the corresponding " "Workload. If the list is empty, then the maximum critical path duration is " diff --git a/simulator.py b/simulator.py index ee3c6cfe..d2a3d033 100644 --- a/simulator.py +++ b/simulator.py @@ -444,11 +444,11 @@ def dry_run(self) -> None: for task_graph in task_graphs: self._logger.info( "[%s] The TaskGraph %s will be released with deadline " - "%s and completion time %s.", + "%s and critical path runtime %s.", task_graph.release_time.to(EventTime.Unit.US).time, task_graph.name, task_graph.deadline, - task_graph.job_graph.completion_time, + task_graph.critical_path_runtime, ) self._csv_logger.info( "%s,TASK_GRAPH_RELEASE,%s,%s,%s,%s", diff --git a/workload/jobs.py b/workload/jobs.py index cb707641..7dcd0fb6 100644 --- a/workload/jobs.py +++ b/workload/jobs.py @@ -924,7 +924,8 @@ def critical_path_runtime(self) -> EventTime: if job.probability > sys.float_info.epsilon else 0 ) - ] + ], + start=EventTime.zero(), ) @property