Skip to content

Commit

Permalink
hack analyze pipeline to work with tpch output
Browse files Browse the repository at this point in the history
  • Loading branch information
1ntEgr8 committed Dec 4, 2024
1 parent 3831f44 commit 2a74838
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 37 deletions.
14 changes: 6 additions & 8 deletions analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def analyze_resource_utilization(
# Plotting defaults.
# hatches = ['//', '--', '**']
# alphas = np.arange(0.2, 1.2, 0.2)
resource_color = {"GPU": "red", "CPU": "green"}
resource_color = {"Slot": "green"}

# Worker Pool statistics
worker_pool_stats = csv_reader.get_worker_pool_utilizations(scheduler_csv_file)
Expand Down Expand Up @@ -1246,16 +1246,16 @@ def log_aggregate_stats(
/ sum(stat.resource_utilizations[resource])
for stat in worker_pool_stats
]
for resource in ("GPU", "CPU")
for resource in ("Slot",)
}

scheduler_invocations = csv_reader.get_scheduler_invocations(csv_file)
placed_tasks = [
scheduler_invocation.placed_tasks
scheduler_invocation.num_placed_tasks
for scheduler_invocation in scheduler_invocations
]
unplaced_tasks = [
scheduler_invocation.unplaced_tasks
scheduler_invocation.num_unplaced_tasks
for scheduler_invocation in scheduler_invocations
]

Expand All @@ -1268,8 +1268,7 @@ def log_aggregate_stats(
placement_delay,
deadline_delay,
stat_function(e2e_response_time),
stat_function(resource_uses["GPU"]),
stat_function(resource_uses["CPU"]),
stat_function(resource_uses["Slot"]),
stat_function(placed_tasks),
stat_function(unplaced_tasks),
log_name,
Expand All @@ -1288,8 +1287,7 @@ def log_aggregate_stats(
"Placement",
"Deadline",
"JCT",
"GPU",
"CPU",
"Slot",
"Placed",
"Unplaced",
"Log",
Expand Down
5 changes: 5 additions & 0 deletions data/csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]):
)
elif reading[1] == "UPDATE_WORKLOAD":
simulator.total_tasks += int(reading[2])
elif reading[1] == "LOG_STATS":
assert (
simulator is not None
), "No SIMULATOR_START found for a corresponding SIMULATOR_END."
simulator.update_stats(reading)
elif reading[1] == "SIMULATOR_END":
assert (
simulator is not None
Expand Down
26 changes: 19 additions & 7 deletions data/csv_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,18 @@ def __init__(self, csv_path: str, start_time: int, total_tasks: int = 0):
self.scheduler_invocations: list[Scheduler] = []
self.task_graphs: dict[str, TaskGraph] = {}

def update_stats(self, csv_reading: str):
assert (
csv_reading[1] == "LOG_STATS"
), f"The event {csv_reading[1]} was not of type LOG_STATS."
self.finished_tasks = int(csv_reading[2])
self.dropped_tasks = int(csv_reading[3])
self.missed_deadlines = int(csv_reading[4])
self.finished_task_graphs = int(csv_reading[5])
self.dropped_taskgraphs = int(csv_reading[6])
self.missed_taskgraphs = int(csv_reading[7])
self.goodput_taskgraphs = self.finished_task_graphs - self.missed_taskgraphs

def update_finish(self, csv_reading: str):
"""Updates the values of the Simulator based on the SIMULATOR_END event from
CSV.
Expand All @@ -396,10 +408,10 @@ def update_finish(self, csv_reading: str):
csv_reading[1] == "SIMULATOR_END"
), f"The event {csv_reading[1]} was not of type SIMULATOR_END."
self.end_time = int(csv_reading[0])
self.finished_tasks = int(csv_reading[2])
self.dropped_tasks = int(csv_reading[3])
self.missed_deadlines = int(csv_reading[4])
self.finished_task_graphs = int(csv_reading[5])
self.dropped_taskgraphs = int(csv_reading[6])
self.missed_taskgraphs = int(csv_reading[7])
self.goodput_taskgraphs = self.finished_task_graphs - self.missed_taskgraphs
# self.finished_tasks = int(csv_reading[2])
# self.dropped_tasks = int(csv_reading[3])
# self.missed_deadlines = int(csv_reading[4])
# self.finished_task_graphs = int(csv_reading[5])
# self.dropped_taskgraphs = int(csv_reading[6])
# self.missed_taskgraphs = int(csv_reading[7])
# self.goodput_taskgraphs = self.finished_task_graphs - self.missed_taskgraphs
43 changes: 21 additions & 22 deletions simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1694,35 +1694,34 @@ def is_source_task(task):

# Add task graph entry in self._current_task_graph_placements to
# track its task placements
#
# In addition to newly added task graphs, self._workload also
# contains all previously released task graphs.
#
# So, we guard the addition of the entry on two conditions:
# (1) The task graph doesn't have an entry (we don't want to
# nuke an existing one)
# (2) The task graph is not complete (we only keep the entry
# alive while the task graph is running to avoid a memory
# leak)
for task_graph_name, task_graph in self._workload.task_graphs.items():
# In addition to newly added task graphs, self._workload also
# contains all previously released task graphs.
#
# So, we guard the addition of the entry on two conditions:
# (1) The task graph doesn't have an entry (we don't want to
# nuke an existing one)
# (2) The task graph is not complete (we only keep the entry
# alive while the task graph is running to avoid a memory
# leak)
if (
task_graph_name not in self._current_task_graph_placements
and not task_graph.is_complete()
):
self._current_task_graph_placements[task_graph_name] = {}

# # Add the TaskGraphRelease events into the system.
# for task_graph_name, task_graph in self._workload.task_graphs.items():
# event = Event(
# event_type=EventType.TASK_GRAPH_RELEASE,
# time=task_graph.release_time,
# task_graph=task_graph_name,
# )
# self._event_queue.add_event(event)
# self._logger.info(
# "[%s] Added %s to the event queue.",
# self._simulator_time.to(EventTime.Unit.US).time,
# event,
# )
event = Event(
event_type=EventType.TASK_GRAPH_RELEASE,
time=task_graph.release_time,
task_graph=task_graph_name,
)
self._event_queue.add_event(event)
self._logger.info(
"[%s] Added %s to the event queue.",
self._simulator_time.to(EventTime.Unit.US).time,
event,
)

max_release_time = self._simulator_time
for task in releasable_tasks:
Expand Down

0 comments on commit 2a74838

Please sign in to comment.