From fe1b6a6399d5a06601bce2e702e1ac5a6f25055b Mon Sep 17 00:00:00 2001 From: Sukrit Kalra Date: Sun, 17 Mar 2024 16:40:17 -0700 Subject: [PATCH] Add flag to restrict the max resource usage per task in a JobGraph. --- data/alibaba_loader.py | 37 +++++++++++++++++++++++++------------ main.py | 6 ++++++ 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/data/alibaba_loader.py b/data/alibaba_loader.py index 6a50463b..d1bcc2b4 100644 --- a/data/alibaba_loader.py +++ b/data/alibaba_loader.py @@ -50,7 +50,7 @@ def find_class(self, module, name): return super().find_class(module, name) -FILTERED_DAGS = ("j_1442387",) +FILTERED_DAGS = ("j_1442387", "j_2583299", "j_425976") class AlibabaLoader(BaseWorkloadLoader): @@ -102,6 +102,7 @@ def __init__( self._task_cpu_divisor = self._flags.alibaba_loader_task_cpu_divisor self._task_max_pow2_slots = self._flags.alibaba_loader_task_max_pow2_slots + self._task_cpu_usage_max = self._flags.alibaba_loader_task_cpu_usage_max def _construct_workload_definitions( self, @@ -442,9 +443,7 @@ def job_graph_data_generator( # f" {max_critical_path_runtime})." # ) else: - self._logger.warning( - f"Failed to create job graph {job_graph_name}." - ) + skipped_job_graphs += 1 except ValueError as e: self._logger.warning( f"Failed to convert job graph {job_graph_name} " @@ -528,9 +527,13 @@ def _convert_job_data_to_job_graph( # Create the individual Job instances corresponding to each Task. task_name_to_simulator_job_mapping = {} for task in job_tasks: + # The name of the Job from the Task. + job_name = task.name.split("_")[0] + if self._task_max_pow2_slots == 0: # This code will use the cpu requirements from # the alibaba trace and adjust slots + resource_usage = int(math.ceil(task.cpu_usage / self._task_cpu_divisor)) job_resources_1 = Resources( resource_vector={ # Note: We divide the CPU by some self._task_cpu_divisor instead @@ -540,19 +543,24 @@ def _convert_job_data_to_job_graph( # would end up using 1 slot, which is not very interesting and # makes no chance for DAG_Sched to do effective packing that # would beat EDF by a significant margin. - Resource(name="Slot_1", _id="any"): int( - math.ceil(task.cpu_usage / self._task_cpu_divisor) - ), + Resource(name="Slot_1", _id="any"): resource_usage, } ) - job_resources_2 = Resources( resource_vector={ - Resource(name="Slot_2", _id="any"): int( - math.ceil(task.cpu_usage / self._task_cpu_divisor) - ), + Resource(name="Slot_2", _id="any"): resource_usage, } ) + if resource_usage > self._task_cpu_usage_max: + self._logger.debug( + "Skipping JobGraph %s because the Job %s required %s units " + "of the resource, but the maximum allowed is %s", + job_graph_name, + job_name, + resource_usage, + self._task_cpu_usage_max, + ) + return None else: # This code will override cpu requirements from # the alibaba trace and assign random number of slots @@ -583,9 +591,14 @@ def _convert_job_data_to_job_graph( # random_task_duration = # round(self._sample_normal_distribution_random(1, 50, 15)[0]) - job_name = task.name.split("_")[0] if task.actual_duration <= 0: # Some loaded TaskGraphs have no duration, skip those. + self._logger.debug( + "Skipping JobGraph %s because the Job %s has duration %s", + job_graph_name, + job_name, + task.actual_duration, + ) return None job_runtime_1 = EventTime( diff --git a/main.py b/main.py index 20ca92c2..5db02992 100644 --- a/main.py +++ b/main.py @@ -161,6 +161,12 @@ "trace. Num slots will be in powers of 2 and picked at random from " "[1, alibaba_loader_task_max_pow2_slots].", ) +flags.DEFINE_integer( + "alibaba_loader_task_cpu_usage_max", + sys.maxsize, + "The maximum CPU usage of a task that can be released by the Alibaba trace. " + "This property is verified AFTER the task CPU divisor is applied.", +) flags.DEFINE_float( "alibaba_task_duration_multiplier", 1,