Skip to content

Commit

Permalink
Add flag to restrict the max resource usage per task in a JobGraph.
Browse files Browse the repository at this point in the history
  • Loading branch information
sukritkalra committed Mar 17, 2024
1 parent ecb7679 commit fe1b6a6
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 12 deletions.
37 changes: 25 additions & 12 deletions data/alibaba_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def find_class(self, module, name):
return super().find_class(module, name)


FILTERED_DAGS = ("j_1442387",)
FILTERED_DAGS = ("j_1442387", "j_2583299", "j_425976")


class AlibabaLoader(BaseWorkloadLoader):
Expand Down Expand Up @@ -102,6 +102,7 @@ def __init__(

self._task_cpu_divisor = self._flags.alibaba_loader_task_cpu_divisor
self._task_max_pow2_slots = self._flags.alibaba_loader_task_max_pow2_slots
self._task_cpu_usage_max = self._flags.alibaba_loader_task_cpu_usage_max

def _construct_workload_definitions(
self,
Expand Down Expand Up @@ -442,9 +443,7 @@ def job_graph_data_generator(
# f" {max_critical_path_runtime})."
# )
else:
self._logger.warning(
f"Failed to create job graph {job_graph_name}."
)
skipped_job_graphs += 1
except ValueError as e:
self._logger.warning(
f"Failed to convert job graph {job_graph_name} "
Expand Down Expand Up @@ -528,9 +527,13 @@ def _convert_job_data_to_job_graph(
# Create the individual Job instances corresponding to each Task.
task_name_to_simulator_job_mapping = {}
for task in job_tasks:
# The name of the Job from the Task.
job_name = task.name.split("_")[0]

if self._task_max_pow2_slots == 0:
# This code will use the cpu requirements from
# the alibaba trace and adjust slots
resource_usage = int(math.ceil(task.cpu_usage / self._task_cpu_divisor))
job_resources_1 = Resources(
resource_vector={
# Note: We divide the CPU by some self._task_cpu_divisor instead
Expand All @@ -540,19 +543,24 @@ def _convert_job_data_to_job_graph(
# would end up using 1 slot, which is not very interesting and
# makes no chance for DAG_Sched to do effective packing that
# would beat EDF by a significant margin.
Resource(name="Slot_1", _id="any"): int(
math.ceil(task.cpu_usage / self._task_cpu_divisor)
),
Resource(name="Slot_1", _id="any"): resource_usage,
}
)

job_resources_2 = Resources(
resource_vector={
Resource(name="Slot_2", _id="any"): int(
math.ceil(task.cpu_usage / self._task_cpu_divisor)
),
Resource(name="Slot_2", _id="any"): resource_usage,
}
)
if resource_usage > self._task_cpu_usage_max:
self._logger.debug(
"Skipping JobGraph %s because the Job %s required %s units "
"of the resource, but the maximum allowed is %s",
job_graph_name,
job_name,
resource_usage,
self._task_cpu_usage_max,
)
return None
else:
# This code will override cpu requirements from
# the alibaba trace and assign random number of slots
Expand Down Expand Up @@ -583,9 +591,14 @@ def _convert_job_data_to_job_graph(
# random_task_duration =
# round(self._sample_normal_distribution_random(1, 50, 15)[0])

job_name = task.name.split("_")[0]
if task.actual_duration <= 0:
# Some loaded TaskGraphs have no duration, skip those.
self._logger.debug(
"Skipping JobGraph %s because the Job %s has duration %s",
job_graph_name,
job_name,
task.actual_duration,
)
return None

job_runtime_1 = EventTime(
Expand Down
6 changes: 6 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@
"trace. Num slots will be in powers of 2 and picked at random from "
"[1, alibaba_loader_task_max_pow2_slots].",
)
flags.DEFINE_integer(
"alibaba_loader_task_cpu_usage_max",
sys.maxsize,
"The maximum CPU usage of a task that can be released by the Alibaba trace. "
"This property is verified AFTER the task CPU divisor is applied.",
)
flags.DEFINE_float(
"alibaba_task_duration_multiplier",
1,
Expand Down

0 comments on commit fe1b6a6

Please sign in to comment.