Skip to content

Commit

Permalink
[PLOT] Implement plotting for good resource utilization.
Browse files Browse the repository at this point in the history
  • Loading branch information
sukritkalra committed Mar 14, 2024
1 parent 18bdd27 commit fcaeb3e
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 30 deletions.
81 changes: 81 additions & 0 deletions analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@
"The filename of the missed deadline plot.",
)

flags.DEFINE_bool(
"plot_goodresource_utilization",
False,
"Plot a timeline of the good resource utilization. "
"A good resource utilization is defined as the utilization of the resource "
"for the use of a TaskGraph that meets its deadline.",
)

flags.DEFINE_bool(
"end_to_end_response_time",
False,
Expand Down Expand Up @@ -1066,6 +1074,72 @@ def plot_goodput(
plt.savefig(output, bbox_inches="tight")


def plot_goodresource_utilization(csv_reader, csv_files, scheduler_labels, output_dir):
"""Plots the timeline of the resource utilization.
A good resource utilization is defined as the utilization of the resource for the
purposes of meeting the deadline of a TaskGraph."""
# Retrieve the Tasks that were released into the system.
for csv_file, scheduler_label in zip(csv_files, scheduler_labels):
tasks = csv_reader.get_tasks(csv_file)
task_graphs = csv_reader.get_task_graph(csv_file)
good_resource_utilization = {}
for task in tasks:
if task.task_graph not in task_graphs:
raise ValueError(f"Graph {task.task_graph} not found in {csv_file}.")
task_graph = task_graphs[task.task_graph]
if not task_graph.was_completed or task_graph.missed_deadline:
continue
for placement in task.placements:
for resource in placement.resources_used:
if resource.name not in good_resource_utilization:
good_resource_utilization[resource.name] = defaultdict(int)

for t in range(placement.placement_time, placement.completion_time):
good_resource_utilization[resource.name][t] += resource.quantity

for resource in good_resource_utilization.keys():
worker_pools = csv_reader.get_worker_pools(csv_file)
max_resource_available = 0
for worker_pool in worker_pools:
for wp_resource in worker_pool.resources:
if resource == wp_resource.name:
max_resource_available += wp_resource.quantity
usage_map = []
wasted_map = []
for t in range(0, csv_reader.get_simulator_end_time(csv_file)):
if t in good_resource_utilization[resource]:
utilization = good_resource_utilization[resource][t]
usage_map.append(utilization)
wasted_map.append(max_resource_available - utilization)
else:
usage_map.append(0)
wasted_map.append(max_resource_available)
plt.figure(figsize=(8, 8))
plt.bar(
np.arange(0, len(usage_map)),
usage_map,
color="green",
width=1.0,
)
plt.bar(
np.arange(0, len(usage_map)),
wasted_map,
bottom=usage_map,
color="red",
width=1.0,
)
plt.xlabel("Time")
plt.ylabel("Resource Utilization")
plt.savefig(
os.path.join(
output_dir,
f"{resource}_{scheduler_label}_goodresource_utilization.png",
),
bbox_inches="tight",
)


def log_aggregate_stats(
csv_reader, csv_files, conf_files, scheduler_labels, task_name_regex, stat="p50"
):
Expand Down Expand Up @@ -1385,6 +1459,13 @@ def main(argv):
figure_size=figure_size,
stats=statistics,
)
if FLAGS.plot_goodresource_utilization or FLAGS.all:
plot_goodresource_utilization(
csv_reader,
FLAGS.csv_files,
scheduler_labels,
FLAGS.output_dir,
)

if FLAGS.aggregate_stats:
if len(statistics) != 1:
Expand Down
2 changes: 1 addition & 1 deletion data/csv_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def __str__(self) -> str:
+ f"critical_path_time={self.critical_path_time}, slack={self.slack},"
+ f"cancelled={self.cancelled}, cancelled_at={self.cancelled_at},"
+ f"completion_at={self.completion_at},"
+ f"deadline_miss_detected_at={self.deadline_miss_detected_at}"
+ f"deadline_miss_detected_at={self.deadline_miss_detected_at})"
)


Expand Down
29 changes: 0 additions & 29 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,6 @@
"the same length as the list of workload profile paths. This is used to annotate "
"the TaskGraphs with the corresponding workload profile path label.",
)
flags.register_multi_flags_validator(
["workload_profile_path", "workload_profile_paths"],
lambda flags: not (
len(flags["workload_profile_paths"]) > 0
and flags["workload_profile_path"] is not None
),
message="Only one of workload_profile_path and workload_profile_paths must be set.",
)
flags.register_multi_flags_validator(
["workload_profile_path", "workload_profile_paths"],
lambda flags: len(flags["workload_profile_paths"]) > 0
or flags["workload_profile_path"] is not None,
message="At least one of workload_profile_path and workload_profile_paths must be "
"set.",
)
flags.DEFINE_string(
"worker_profile_path",
"./profiles/workers/worker_profile.json",
Expand Down Expand Up @@ -507,13 +492,6 @@
"If provided, the list must be of the same length as the list of workload "
"profile paths. For a single workload profile path, use `override_release_policy`.",
)
flags.register_validator(
"override_release_policies",
lambda override_release_policies: all(
policy in JobGraph.RELEASE_POLICIES for policy in override_release_policies
),
"All release policies must be one of {}".format(JobGraph.RELEASE_POLICIES),
)
flags.DEFINE_integer(
"override_num_invocation",
0,
Expand All @@ -526,13 +504,6 @@
"If provided, the list must be of the same length as the list of workload "
"profile paths. For a single workload profile path, use `override_num_invocation`.",
)
flags.register_validator(
"override_num_invocations",
lambda override_num_invocations: all(
num_invocations.isdigit() for num_invocations in override_num_invocations
),
"All number of invocations must be an integer.",
)
flags.DEFINE_float(
"override_poisson_arrival_rate",
0.0,
Expand Down

0 comments on commit fcaeb3e

Please sign in to comment.