Skip to content

Commit

Permalink
[BUG] Fix bugs in AlibabaLoader with critical path runtimes.
Browse files Browse the repository at this point in the history
  • Loading branch information
sukritkalra committed Jan 6, 2024
1 parent 2c16894 commit 24a89db
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 16 deletions.
30 changes: 19 additions & 11 deletions data/alibaba_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ def job_graph_data_generator(
data: Mapping[str, List[str]] = AlibabaTaskUnpickler(
pickled_file
).load()
skipped_job_graphs = 0
for job_graph_name, job_tasks in data.items():
try:
job_graph = self._convert_job_data_to_job_graph(
Expand All @@ -420,13 +421,14 @@ def job_graph_data_generator(
):
self._job_graphs[path][job_graph_name] = job_graph
else:
self._logger.debug(
f"Skipping job graph {job_graph_name} with "
f"critical path runtime "
f"{cp_runtime.to(EventTime.Unit.US).time}"
f" outside of range [{min_critical_path_runtime}, "
f"{max_critical_path_runtime})."
)
skipped_job_graphs += 1
# self._logger.debug(
# f"[0] Skipping job graph {job_graph_name} with "
# f"critical path runtime "
# f"{cp_runtime.to(EventTime.Unit.US).time}"
# f" outside of range [{min_critical_path_runtime},"
# f" {max_critical_path_runtime})."
# )
else:
self._logger.warning(
f"Failed to create job graph {job_graph_name}."
Expand All @@ -436,6 +438,10 @@ def job_graph_data_generator(
f"Failed to convert job graph {job_graph_name} "
f"with error {e.__class__}: {e}."
)
self._logger.debug(
f"[0] Skipped {skipped_job_graphs} job graphs from path {path}, "
f"loaded {len(self._job_graphs[path])} job graphs."
)

path_to_job_graph_generator_mapping = {}
for index, (path, _) in enumerate(self._workload_paths_and_release_policies):
Expand All @@ -452,16 +458,18 @@ def job_graph_data_generator(
)
min_critical_path_runtime = (
0
if index >= len(self._flags.min_critical_path_runtimes)
if index
>= len(self._flags.alibaba_loader_min_critical_path_runtimes)
else int(
self._flags.alibaba_loader_max_critical_path_runtime[index]
self._flags.alibaba_loader_min_critical_path_runtimes[index]
)
)
max_critical_path_runtime = (
sys.maxsize
if index >= len(self._flags.max_critical_path_runtimes)
if index
>= len(self._flags.alibaba_loader_max_critical_path_runtimes)
else int(
self._flags.alibaba_loader_max_critical_path_runtime[index]
self._flags.alibaba_loader_max_critical_path_runtimes[index]
)
)
path_to_job_graph_generator_mapping[path] = partial(
Expand Down
4 changes: 2 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,14 @@
"The multiplier used for alibaba trace tasks task.duration.",
)
flags.DEFINE_list(
"alibaba_loader_min_critical_path_runtime",
"alibaba_loader_min_critical_path_runtimes",
[],
"The minimum critical path duration for each TaskGraph from the corresponding "
"Workload. If the list is empty, then the minimum critical path duration is "
"set to 0. TaskGraphs lower than this critical path duration will not be released.",
)
flags.DEFINE_list(
"alibaba_loader_max_critical_path_runtime",
"alibaba_loader_max_critical_path_runtimes",
[],
"The maximum critical path duration for each TaskGraph from the corresponding "
"Workload. If the list is empty, then the maximum critical path duration is "
Expand Down
4 changes: 2 additions & 2 deletions simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,11 +444,11 @@ def dry_run(self) -> None:
for task_graph in task_graphs:
self._logger.info(
"[%s] The TaskGraph %s will be released with deadline "
"%s and completion time %s.",
"%s and critical path runtime %s.",
task_graph.release_time.to(EventTime.Unit.US).time,
task_graph.name,
task_graph.deadline,
task_graph.job_graph.completion_time,
task_graph.critical_path_runtime,
)
self._csv_logger.info(
"%s,TASK_GRAPH_RELEASE,%s,%s,%s,%s",
Expand Down
3 changes: 2 additions & 1 deletion workload/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,8 @@ def critical_path_runtime(self) -> EventTime:
if job.probability > sys.float_info.epsilon
else 0
)
]
],
start=EventTime.zero(),
)

@property
Expand Down

0 comments on commit 24a89db

Please sign in to comment.