Skip to content

Commit

Permalink
Enable AlibabaLoader to load new Task definitions.
Browse files Browse the repository at this point in the history
  • Loading branch information
sukritkalra committed Dec 27, 2023
1 parent 41cccbd commit 647a2e1
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 41 deletions.
85 changes: 50 additions & 35 deletions data/alibaba_loader.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import json
import math
import os
import pathlib
import pickle
import random
import sys
from collections import defaultdict
from typing import List, Mapping, Optional, Sequence
from dataclasses import dataclass
from typing import List, Mapping, Optional

import absl

Expand All @@ -25,6 +25,30 @@
from .base_workload_loader import BaseWorkloadLoader


# Define a Task dataclass for storage of Task information.
@dataclass
class Task:
name: str
job: str
instances: int
status: str
start_time: float
end_time: float
expected_duration: float
actual_duration: float
cpu_requested: float
cpu_usage: float
mem_requested: float
mem_usage: float


class AlibabaTaskUnpickler(pickle.Unpickler):
def find_class(self, module, name):
if name == "Task":
return Task
return super().find_class(module, name)


class AlibabaLoader(BaseWorkloadLoader):
"""Loads the Alibaba trace from the provided file.
Expand Down Expand Up @@ -177,18 +201,20 @@ def _initialize_job_data_generator(self):
def job_data_generator():
for file_path in file_paths:
with open(file_path, "rb") as pickled_file:
data: Mapping[str, List[str]] = pickle.load(pickled_file)
data: Mapping[str, List[str]] = AlibabaTaskUnpickler(
pickled_file
).load()
for job_graph_name, job_tasks in data.items():
try:
self._job_graphs[
job_graph_name
] = self._convert_job_data_to_job_graph(
job_graph = self._convert_job_data_to_job_graph(
job_graph_name, job_tasks
)
except Exception as e:
if job_graph:
self._job_graphs[job_graph_name] = job_graph
except ValueError as e:
self._logger.warning(
f"Failed to convert job graph {job_graph_name} "
f"with error {e}."
f"with error {e.__class__}: {e}."
)
yield

Expand All @@ -204,7 +230,7 @@ def _sample_normal_distribution_random(self, n, mean, std, min_val=0, max_val=10

def _convert_job_data_to_job_graph(
self, job_graph_name: str, job_tasks: List[str]
) -> JobGraph:
) -> Optional[JobGraph]:
"""
Convert the raw job data to a Job object.
Expand All @@ -227,15 +253,15 @@ def _convert_job_data_to_job_graph(
# makes no chance for DAG_Sched to do effective packing that
# would beat EDF by a significant margin.
Resource(name="Slot_1", _id="any"): int(
math.ceil(task.cpu / self._task_cpu_divisor)
math.ceil(task.cpu_usage / self._task_cpu_divisor)
),
}
)

job_resources_2 = Resources(
resource_vector={
Resource(name="Slot_2", _id="any"): int(
math.ceil(task.cpu / self._task_cpu_divisor)
math.ceil(task.cpu_usage / self._task_cpu_divisor)
),
}
)
Expand Down Expand Up @@ -269,15 +295,19 @@ def _convert_job_data_to_job_graph(
# random_task_duration =
# round(self._sample_normal_distribution_random(1, 50, 15)[0])

if task.actual_duration <= 0:
# Some loaded TaskGraphs have no duration, skip those.
return None

job_name = task.name.split("_")[0]
job_runtime_1 = EventTime(
int(math.ceil(task.duration)),
int(math.ceil(task.actual_duration)),
EventTime.Unit.US,
)
# This is used when self._heterogeneous is True
# to support another execution strategy where it runs faster.
job_runtime_2 = EventTime(
int(math.ceil(task.duration * 0.8)),
int(math.ceil(task.actual_duration * 0.8)),
EventTime.Unit.US,
)

Expand Down Expand Up @@ -336,35 +366,20 @@ def _convert_job_data_to_job_graph(
),
)

def get_next_jobs(self, start_time_offset: int = 0) -> Sequence[JobGraph]:
print(f"{start_time_offset=}")
if self._batch_size <= 0:
return [
self._convert_job_data_to_job_graph(
job_graph_name, job_tasks, start_time_offset
)
for job_graph_name, job_tasks in self._job_data_generator
]
else:
batch: List[JobGraph] = []
try:
for _ in range(self._batch_size):
job_graph_name, job_tasks = next(self._job_data_generator)
job_graph = self._convert_job_data_to_job_graph(
job_graph_name, job_tasks, start_time_offset
)
batch.append(job_graph)
except StopIteration:
pass
return batch

def get_next_workload(self, current_time: EventTime) -> Optional[Workload]:
# Load the next batch of jobs into our mapping.
try:
next(self._job_data_generator)
except StopIteration:
pass

if len(self._job_graphs) == 0:
# We have no jobs to choose from, throw an error.
raise ValueError(
"No jobs to choose from. The loaded JobGraphs are empty. "
"Check the provided workload file at: {}".format(self._path)
)

# Get the release times that fit within the range of the current_time and the
# current_time + workload_interval.
released_taskgraph_times = []
Expand Down
6 changes: 0 additions & 6 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import random
import sys
from collections import namedtuple

from absl import app, flags

Expand Down Expand Up @@ -37,11 +36,6 @@

FLAGS = flags.FLAGS

Task = namedtuple(
"Task",
field_names="name,job,instances,status,start_time,end_time,duration,cpu,mem",
)

# Define the flags.
flags.DEFINE_enum(
"execution_mode",
Expand Down

0 comments on commit 647a2e1

Please sign in to comment.