From fcaeb3efa57981cffdaeafd482245206d557e886 Mon Sep 17 00:00:00 2001 From: Sukrit Kalra Date: Thu, 14 Mar 2024 14:04:47 -0700 Subject: [PATCH] [PLOT] Implement plotting for good resource utilization. --- analyze.py | 81 +++++++++++++++++++++++++++++++++++++++++++++++ data/csv_types.py | 2 +- main.py | 29 ----------------- 3 files changed, 82 insertions(+), 30 deletions(-) diff --git a/analyze.py b/analyze.py index 91dd6ded..9c5e7848 100644 --- a/analyze.py +++ b/analyze.py @@ -238,6 +238,14 @@ "The filename of the missed deadline plot.", ) +flags.DEFINE_bool( + "plot_goodresource_utilization", + False, + "Plot a timeline of the good resource utilization. " + "A good resource utilization is defined as the utilization of the resource " + "for the use of a TaskGraph that meets its deadline.", +) + flags.DEFINE_bool( "end_to_end_response_time", False, @@ -1066,6 +1074,72 @@ def plot_goodput( plt.savefig(output, bbox_inches="tight") +def plot_goodresource_utilization(csv_reader, csv_files, scheduler_labels, output_dir): + """Plots the timeline of the resource utilization. + + A good resource utilization is defined as the utilization of the resource for the + purposes of meeting the deadline of a TaskGraph.""" + # Retrieve the Tasks that were released into the system. + for csv_file, scheduler_label in zip(csv_files, scheduler_labels): + tasks = csv_reader.get_tasks(csv_file) + task_graphs = csv_reader.get_task_graph(csv_file) + good_resource_utilization = {} + for task in tasks: + if task.task_graph not in task_graphs: + raise ValueError(f"Graph {task.task_graph} not found in {csv_file}.") + task_graph = task_graphs[task.task_graph] + if not task_graph.was_completed or task_graph.missed_deadline: + continue + for placement in task.placements: + for resource in placement.resources_used: + if resource.name not in good_resource_utilization: + good_resource_utilization[resource.name] = defaultdict(int) + + for t in range(placement.placement_time, placement.completion_time): + good_resource_utilization[resource.name][t] += resource.quantity + + for resource in good_resource_utilization.keys(): + worker_pools = csv_reader.get_worker_pools(csv_file) + max_resource_available = 0 + for worker_pool in worker_pools: + for wp_resource in worker_pool.resources: + if resource == wp_resource.name: + max_resource_available += wp_resource.quantity + usage_map = [] + wasted_map = [] + for t in range(0, csv_reader.get_simulator_end_time(csv_file)): + if t in good_resource_utilization[resource]: + utilization = good_resource_utilization[resource][t] + usage_map.append(utilization) + wasted_map.append(max_resource_available - utilization) + else: + usage_map.append(0) + wasted_map.append(max_resource_available) + plt.figure(figsize=(8, 8)) + plt.bar( + np.arange(0, len(usage_map)), + usage_map, + color="green", + width=1.0, + ) + plt.bar( + np.arange(0, len(usage_map)), + wasted_map, + bottom=usage_map, + color="red", + width=1.0, + ) + plt.xlabel("Time") + plt.ylabel("Resource Utilization") + plt.savefig( + os.path.join( + output_dir, + f"{resource}_{scheduler_label}_goodresource_utilization.png", + ), + bbox_inches="tight", + ) + + def log_aggregate_stats( csv_reader, csv_files, conf_files, scheduler_labels, task_name_regex, stat="p50" ): @@ -1385,6 +1459,13 @@ def main(argv): figure_size=figure_size, stats=statistics, ) + if FLAGS.plot_goodresource_utilization or FLAGS.all: + plot_goodresource_utilization( + csv_reader, + FLAGS.csv_files, + scheduler_labels, + FLAGS.output_dir, + ) if FLAGS.aggregate_stats: if len(statistics) != 1: diff --git a/data/csv_types.py b/data/csv_types.py index 67a2b1e2..390cd0a6 100644 --- a/data/csv_types.py +++ b/data/csv_types.py @@ -299,7 +299,7 @@ def __str__(self) -> str: + f"critical_path_time={self.critical_path_time}, slack={self.slack}," + f"cancelled={self.cancelled}, cancelled_at={self.cancelled_at}," + f"completion_at={self.completion_at}," - + f"deadline_miss_detected_at={self.deadline_miss_detected_at}" + + f"deadline_miss_detected_at={self.deadline_miss_detected_at})" ) diff --git a/main.py b/main.py index a3382803..20ca92c2 100644 --- a/main.py +++ b/main.py @@ -95,21 +95,6 @@ "the same length as the list of workload profile paths. This is used to annotate " "the TaskGraphs with the corresponding workload profile path label.", ) -flags.register_multi_flags_validator( - ["workload_profile_path", "workload_profile_paths"], - lambda flags: not ( - len(flags["workload_profile_paths"]) > 0 - and flags["workload_profile_path"] is not None - ), - message="Only one of workload_profile_path and workload_profile_paths must be set.", -) -flags.register_multi_flags_validator( - ["workload_profile_path", "workload_profile_paths"], - lambda flags: len(flags["workload_profile_paths"]) > 0 - or flags["workload_profile_path"] is not None, - message="At least one of workload_profile_path and workload_profile_paths must be " - "set.", -) flags.DEFINE_string( "worker_profile_path", "./profiles/workers/worker_profile.json", @@ -507,13 +492,6 @@ "If provided, the list must be of the same length as the list of workload " "profile paths. For a single workload profile path, use `override_release_policy`.", ) -flags.register_validator( - "override_release_policies", - lambda override_release_policies: all( - policy in JobGraph.RELEASE_POLICIES for policy in override_release_policies - ), - "All release policies must be one of {}".format(JobGraph.RELEASE_POLICIES), -) flags.DEFINE_integer( "override_num_invocation", 0, @@ -526,13 +504,6 @@ "If provided, the list must be of the same length as the list of workload " "profile paths. For a single workload profile path, use `override_num_invocation`.", ) -flags.register_validator( - "override_num_invocations", - lambda override_num_invocations: all( - num_invocations.isdigit() for num_invocations in override_num_invocations - ), - "All number of invocations must be an integer.", -) flags.DEFINE_float( "override_poisson_arrival_rate", 0.0,