Skip to content

Commit

Permalink
Black formatting with new version.
Browse files Browse the repository at this point in the history
  • Loading branch information
sukritkalra committed Jan 31, 2024
1 parent d2868da commit 9a8515e
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 123 deletions.
30 changes: 18 additions & 12 deletions data/alibaba_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ def __init__(
self._workload_paths_and_release_policies = (
self._construct_workload_definitions()
)
self._job_graph_generators: Mapping[
str, Callable
] = self._initialize_job_graph_generators()
self._job_graph_generators: Mapping[str, Callable] = (
self._initialize_job_graph_generators()
)
self._release_times_and_profiles = self._construct_release_times()

self._job_graphs: Mapping[str, Mapping[str, JobGraph]] = {}
Expand Down Expand Up @@ -633,17 +633,23 @@ def _convert_job_data_to_job_graph(
)

return JobGraph(
name=job_graph_name
if profile_label is None
else f"{job_graph_name}_{profile_label}",
name=(
job_graph_name
if profile_label is None
else f"{job_graph_name}_{profile_label}"
),
jobs=jobs_to_children,
deadline_variance=(
self._flags.min_deadline_variance
if min_deadline_variance is None
else min_deadline_variance,
self._flags.max_deadline_variance
if max_deadline_variance is None
else max_deadline_variance,
(
self._flags.min_deadline_variance
if min_deadline_variance is None
else min_deadline_variance
),
(
self._flags.max_deadline_variance
if max_deadline_variance is None
else max_deadline_variance
),
),
)

Expand Down
18 changes: 9 additions & 9 deletions data/csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]):
window_to_execute=None,
# Checking if len(reading) > 6
# is for backward compatibility
slowest_execution_time=int(reading[6])
if len(reading) > 6
else None,
slowest_execution_time=(
int(reading[6]) if len(reading) > 6 else None
),
)
tasks[reading[4]].cancelled = True
tasks[reading[4]].cancelled_at = int(reading[0])
Expand All @@ -163,9 +163,9 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]):
num_tasks=int(reading[5]),
# Checking if len(reading) > 6
# is for backward compatibility
critical_path_time=int(reading[6])
if len(reading) > 6
else None,
critical_path_time=(
int(reading[6]) if len(reading) > 6 else None
),
)
elif reading[1] == "TASK_GRAPH_FINISHED":
task_graphs[reading[2]].completion_at = int(reading[0])
Expand Down Expand Up @@ -504,9 +504,9 @@ def check_if_time_intersects(between_time, start_time, end_time):
resource_counter = defaultdict(int)
for resource in worker_pool.resources:
resource_counter[resource.name] += 1
resource_ids_to_canonical_names[
resource.id
] = f"{resource.name}_{resource_counter[resource.name]}"
resource_ids_to_canonical_names[resource.id] = (
f"{resource.name}_{resource_counter[resource.name]}"
)

# Output all the tasks and the requested deadlines.
if trace_fmt in ["task", "taskgraph", "application", "resource"]:
Expand Down
84 changes: 50 additions & 34 deletions data/workload_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,21 @@ def __init__(self, path: str, _flags: Optional["absl.flags"] = None) -> None:
# Create the JobGraph.
if self._replication_factor > 1:
for i in range(1, self._replication_factor + 1):
job_graph_mapping[
f"{job_name}_{i}"
] = WorkloadLoader.load_job_graph(
JobGraph(
name=f"{job_name}_{i}",
release_policy=release_policy,
deadline_variance=deadline_variance,
),
job["graph"],
deepcopy(work_profiles)
if not self._unique_work_profiles
else work_profiles,
self._slo,
job_graph_mapping[f"{job_name}_{i}"] = (
WorkloadLoader.load_job_graph(
JobGraph(
name=f"{job_name}_{i}",
release_policy=release_policy,
deadline_variance=deadline_variance,
),
job["graph"],
(
deepcopy(work_profiles)
if not self._unique_work_profiles
else work_profiles
),
self._slo,
)
)
pass
else:
Expand All @@ -163,9 +165,11 @@ def __init__(self, path: str, _flags: Optional["absl.flags"] = None) -> None:
deadline_variance=deadline_variance,
),
job["graph"],
deepcopy(work_profiles)
if not self._unique_work_profiles
else work_profiles,
(
deepcopy(work_profiles)
if not self._unique_work_profiles
else work_profiles
),
self._slo,
)

Expand Down Expand Up @@ -258,9 +262,11 @@ def __create_release_policy(
)
return JobGraph.ReleasePolicy.periodic(
period=EventTime(
job["period"]
if override_arrival_period is None
else override_arrival_period,
(
job["period"]
if override_arrival_period is None
else override_arrival_period
),
EventTime.Unit.US,
),
start=start_time,
Expand All @@ -278,14 +284,18 @@ def __create_release_policy(
)
return JobGraph.ReleasePolicy.fixed(
period=EventTime(
job["period"]
if override_arrival_period is None
else override_arrival_period,
(
job["period"]
if override_arrival_period is None
else override_arrival_period
),
EventTime.Unit.US,
),
num_invocations=job["invocations"]
if override_num_invocations is None
else override_num_invocations,
num_invocations=(
job["invocations"]
if override_num_invocations is None
else override_num_invocations
),
start=start_time,
)
elif job["release_policy"] == "poisson":
Expand All @@ -297,9 +307,11 @@ def __create_release_policy(
"`rate` or `invocations` was not defined for the JobGraph."
)
return JobGraph.ReleasePolicy.poisson(
rate=job["rate"]
if override_poisson_arrival_rate is None
else override_poisson_arrival_rate,
rate=(
job["rate"]
if override_poisson_arrival_rate is None
else override_poisson_arrival_rate
),
num_invocations=job["invocations"],
start=start_time,
)
Expand All @@ -312,13 +324,17 @@ def __create_release_policy(
"`rate` or `coefficient` was not defined for the JobGraph."
)
return JobGraph.ReleasePolicy.gamma(
rate=job["rate"]
if override_poisson_arrival_rate is None
else override_poisson_arrival_rate,
rate=(
job["rate"]
if override_poisson_arrival_rate is None
else override_poisson_arrival_rate
),
num_invocations=job["invocations"],
coefficient=job["coefficient"]
if override_gamma_coefficient is None
else override_gamma_coefficient,
coefficient=(
job["coefficient"]
if override_gamma_coefficient is None
else override_gamma_coefficient
),
start=start_time,
)
elif job["release_policy"] == "closed_loop":
Expand Down
7 changes: 3 additions & 4 deletions experiments/analysis_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,9 @@ def extract_variables_from_filename_v2(filename):
break

if variables["scheduler"] == "TetriSched":
variables[
"scheduler"
] = f'TetriSched_time_dis_{variables["scheduler_time_discretization"]}' + (
"_DAG_aware" if variables["DAG_aware"] else ""
variables["scheduler"] = (
f'TetriSched_time_dis_{variables["scheduler_time_discretization"]}'
+ ("_DAG_aware" if variables["DAG_aware"] else "")
)
return variables

Expand Down
46 changes: 26 additions & 20 deletions schedulers/ilp_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,9 @@ def get_placements(
return [
Placement.create_task_placement(
task=task,
placement_time=start_time
if placement_worker_id and placement_strategy
else None,
placement_time=(
start_time if placement_worker_id and placement_strategy else None
),
worker_pool_id=placement_worker_pool_id,
worker_id=placement_worker_id,
execution_strategy=placement_strategy,
Expand Down Expand Up @@ -833,9 +833,11 @@ def _create_batch_task_variables(
unscheduled_tasks = deque(
sorted(
unscheduled_tasks,
key=lambda t: t.deadline
if t.task_graph not in self._allowed_to_miss_deadlines
else float("inf"),
key=lambda t: (
t.deadline
if t.task_graph not in self._allowed_to_miss_deadlines
else float("inf")
),
)
)
tasks_to_batch_tasks: Mapping[Task, List[BatchTask]] = defaultdict(list)
Expand Down Expand Up @@ -896,12 +898,14 @@ def _create_batch_task_variables(
# Deadlines are not to be enforced if the tasks in this batch are all
# allowed to miss their deadlines. Otherwise, we conservatively aim to
# enforce deadlines.
enforce_deadlines=False
if all(
task.task_graph in self._allowed_to_miss_deadlines
for task in batch_task.tasks
)
else self.enforce_deadlines,
enforce_deadlines=(
False
if all(
task.task_graph in self._allowed_to_miss_deadlines
for task in batch_task.tasks
)
else self.enforce_deadlines
),
retract_schedules=self.retract_schedules,
)

Expand Down Expand Up @@ -1088,14 +1092,16 @@ def _add_task_dependency_constraints(
"variables (along with the number of parents in each variable): %s",
sim_time.to(EventTime.Unit.US).time,
task_name,
", ".join(
[
f"{variable.task.unique_name} ({num_parents})"
for variable, num_parents in parent_variables.items()
]
)
if len(parent_variables) > 0
else "None",
(
", ".join(
[
f"{variable.task.unique_name} ({num_parents})"
for variable, num_parents in parent_variables.items()
]
)
if len(parent_variables) > 0
else "None"
),
)

# Ensure that the task is only placed if all of its parents are placed,
Expand Down
6 changes: 3 additions & 3 deletions schedulers/tetrisched_cplex_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,9 +1286,9 @@ def _add_objective(
# the scheduler.
task_not_placed_penalty = []
if self._batching:
tasks_to_batch_tasks: Mapping[
Task, Set[TaskOptimizerVariables]
] = defaultdict(set)
tasks_to_batch_tasks: Mapping[Task, Set[TaskOptimizerVariables]] = (
defaultdict(set)
)
for batch_task_variable in tasks_to_variables.values():
for task in batch_task_variable.task.tasks:
tasks_to_batch_tasks[task].add(batch_task_variable)
Expand Down
10 changes: 5 additions & 5 deletions schedulers/tetrisched_gurobi_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,11 +719,11 @@ def schedule(
)

for task_variables in tasks_to_variables.values():
placements_for_task: Sequence[
Placement
] = task_variables.get_placements(
worker_index_to_worker=workers,
worker_id_to_worker_pool=worker_to_worker_pool,
placements_for_task: Sequence[Placement] = (
task_variables.get_placements(
worker_index_to_worker=workers,
worker_id_to_worker_pool=worker_to_worker_pool,
)
)
for placement in placements_for_task:
if placement.is_placed():
Expand Down
12 changes: 6 additions & 6 deletions schedulers/tetrisched_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ class Partitions(object):

def __init__(self, worker_pools: WorkerPools) -> None:
self._available_partitions = tetrisched.Partitions()
self._resource_name_to_partitions_map: Mapping[
str, tetrisched.Partitions
] = defaultdict(tetrisched.Partitions)
self._resource_name_to_partitions_map: Mapping[str, tetrisched.Partitions] = (
defaultdict(tetrisched.Partitions)
)
# BUG (Sukrit): The worker_index_to_partition_map is being used to keep the
# Partition objects live on the Python side so we can query the associatedWorker
# and the associatedWorkerPool. Otherwise, pybind11 loses track of the objects
Expand Down Expand Up @@ -90,9 +90,9 @@ def _construct_partitions(self, worker_pools: WorkerPools) -> None:
# Maintain the relevant mappings to transform it to a Placement.
partition.associatedWorker = worker
partition.associatedWorkerPool = worker_pool
self._worker_index_to_partition_map[
self._worker_index_counter
] = partition
self._worker_index_to_partition_map[self._worker_index_counter] = (
partition
)
self._worker_index_counter += 1

def get_partition_for_worker_id(
Expand Down
30 changes: 16 additions & 14 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,22 @@ def create_default_task(
name=task_name,
task_graph=task_graph_name,
job=job,
profile=WorkProfile(
name=f"{task_name}_work_profile",
execution_strategies=ExecutionStrategies(
strategies=[
ExecutionStrategy(
resources=resource_requirements,
batch_size=1,
runtime=EventTime(runtime, EventTime.Unit.US),
)
]
),
)
if profile is None
else profile,
profile=(
WorkProfile(
name=f"{task_name}_work_profile",
execution_strategies=ExecutionStrategies(
strategies=[
ExecutionStrategy(
resources=resource_requirements,
batch_size=1,
runtime=EventTime(runtime, EventTime.Unit.US),
)
]
),
)
if profile is None
else profile
),
deadline=EventTime(deadline, EventTime.Unit.US),
timestamp=timestamp,
release_time=EventTime(release_time, EventTime.Unit.US),
Expand Down
Loading

0 comments on commit 9a8515e

Please sign in to comment.