Skip to content

Commit

Permalink
Make critical path duration for each Workload configurable.
Browse files Browse the repository at this point in the history
  • Loading branch information
sukritkalra committed Jan 5, 2024
1 parent d279b65 commit 2c16894
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 14 deletions.
64 changes: 51 additions & 13 deletions data/alibaba_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ def job_graph_data_generator(
path: str,
min_deadline_variance: int,
max_deadline_variance: int,
min_critical_path_runtime: int,
max_critical_path_runtime: int,
):
if not os.path.isfile(path):
raise FileNotFoundError(f"No such file: {path}")
Expand All @@ -410,7 +412,25 @@ def job_graph_data_generator(
max_deadline_variance,
)
if job_graph:
self._job_graphs[path][job_graph_name] = job_graph
cp_runtime = job_graph.critical_path_runtime
if (
min_critical_path_runtime
<= cp_runtime.to(EventTime.Unit.US).time
< max_critical_path_runtime
):
self._job_graphs[path][job_graph_name] = job_graph
else:
self._logger.debug(
f"Skipping job graph {job_graph_name} with "
f"critical path runtime "
f"{cp_runtime.to(EventTime.Unit.US).time}"
f" outside of range [{min_critical_path_runtime}, "
f"{max_critical_path_runtime})."
)
else:
self._logger.warning(
f"Failed to create job graph {job_graph_name}."
)
except ValueError as e:
self._logger.warning(
f"Failed to convert job graph {job_graph_name} "
Expand All @@ -420,20 +440,38 @@ def job_graph_data_generator(
path_to_job_graph_generator_mapping = {}
for index, (path, _) in enumerate(self._workload_paths_and_release_policies):
if path is not None:
if index >= len(self._flags.min_deadline_variances):
path_to_job_graph_generator_mapping[path] = partial(
job_graph_data_generator,
path,
int(self._flags.min_deadline_variance),
int(self._flags.max_deadline_variance),
min_deadline_variance = (
int(self._flags.min_deadline_variance)
if index >= len(self._flags.min_deadline_variances)
else int(self._flags.min_deadline_variances[index])
)
max_deadline_variance = (
int(self._flags.max_deadline_variance)
if index >= len(self._flags.max_deadline_variances)
else int(self._flags.max_deadline_variances[index])
)
min_critical_path_runtime = (
0
if index >= len(self._flags.min_critical_path_runtimes)
else int(
self._flags.alibaba_loader_max_critical_path_runtime[index]
)
else:
path_to_job_graph_generator_mapping[path] = partial(
job_graph_data_generator,
path,
int(self._flags.min_deadline_variances[index]),
int(self._flags.max_deadline_variances[index]),
)
max_critical_path_runtime = (
sys.maxsize
if index >= len(self._flags.max_critical_path_runtimes)
else int(
self._flags.alibaba_loader_max_critical_path_runtime[index]
)
)
path_to_job_graph_generator_mapping[path] = partial(
job_graph_data_generator,
path,
min_deadline_variance,
max_deadline_variance,
min_critical_path_runtime,
max_critical_path_runtime,
)
return path_to_job_graph_generator_mapping

def _sample_normal_distribution_random(self, n, mean, std, min_val=0, max_val=100):
Expand Down
15 changes: 15 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,21 @@
1,
"The multiplier used for alibaba trace tasks task.duration.",
)
flags.DEFINE_list(
"alibaba_loader_min_critical_path_runtime",
[],
"The minimum critical path duration for each TaskGraph from the corresponding "
"Workload. If the list is empty, then the minimum critical path duration is "
"set to 0. TaskGraphs lower than this critical path duration will not be released.",
)
flags.DEFINE_list(
"alibaba_loader_max_critical_path_runtime",
[],
"The maximum critical path duration for each TaskGraph from the corresponding "
"Workload. If the list is empty, then the maximum critical path duration is "
"set to the maximum critical path duration of the Workload. TaskGraphs higher "
"than this critical path duration will not be released.",
)
flags.DEFINE_bool(
"alibaba_enable_heterogeneous_resource_type",
False,
Expand Down
18 changes: 17 additions & 1 deletion workload/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
import uuid
from enum import Enum
from functools import cached_property
from typing import List, Mapping, Optional, Sequence, Tuple

import absl
Expand Down Expand Up @@ -906,11 +907,26 @@ def __get_completion_time(self, start=EventTime.zero()) -> EventTime:
)

@property
def completion_time(self):
def completion_time(self) -> EventTime:
if not self._completion_time and len(self) != 0:
self._completion_time = self.__get_completion_time()
return self._completion_time

@cached_property
def critical_path_runtime(self) -> EventTime:
return sum(
[
job.execution_strategies.get_slowest_strategy().runtime
for job in self.get_longest_path(
weights=lambda job: job.execution_strategies.get_slowest_strategy()
.runtime.to(EventTime.Unit.US)
.time
if job.probability > sys.float_info.epsilon
else 0
)
]
)

@property
def release_policy(self) -> Optional["ReleasePolicy"]:
return self._release_policy
Expand Down

0 comments on commit 2c16894

Please sign in to comment.