From 9a8515e6ec72dee4bbe5bff075742b76b92169b2 Mon Sep 17 00:00:00 2001 From: Sukrit Kalra Date: Wed, 31 Jan 2024 09:22:56 -0800 Subject: [PATCH] Black formatting with new version. --- data/alibaba_loader.py | 30 ++++---- data/csv_reader.py | 18 ++--- data/workload_loader.py | 84 ++++++++++++++--------- experiments/analysis_utils.py | 7 +- schedulers/ilp_scheduler.py | 46 +++++++------ schedulers/tetrisched_cplex_scheduler.py | 6 +- schedulers/tetrisched_gurobi_scheduler.py | 10 +-- schedulers/tetrisched_scheduler.py | 12 ++-- tests/utils.py | 30 ++++---- workload/jobs.py | 40 ++++++----- 10 files changed, 160 insertions(+), 123 deletions(-) diff --git a/data/alibaba_loader.py b/data/alibaba_loader.py index e7966fbb..3c595835 100644 --- a/data/alibaba_loader.py +++ b/data/alibaba_loader.py @@ -81,9 +81,9 @@ def __init__( self._workload_paths_and_release_policies = ( self._construct_workload_definitions() ) - self._job_graph_generators: Mapping[ - str, Callable - ] = self._initialize_job_graph_generators() + self._job_graph_generators: Mapping[str, Callable] = ( + self._initialize_job_graph_generators() + ) self._release_times_and_profiles = self._construct_release_times() self._job_graphs: Mapping[str, Mapping[str, JobGraph]] = {} @@ -633,17 +633,23 @@ def _convert_job_data_to_job_graph( ) return JobGraph( - name=job_graph_name - if profile_label is None - else f"{job_graph_name}_{profile_label}", + name=( + job_graph_name + if profile_label is None + else f"{job_graph_name}_{profile_label}" + ), jobs=jobs_to_children, deadline_variance=( - self._flags.min_deadline_variance - if min_deadline_variance is None - else min_deadline_variance, - self._flags.max_deadline_variance - if max_deadline_variance is None - else max_deadline_variance, + ( + self._flags.min_deadline_variance + if min_deadline_variance is None + else min_deadline_variance + ), + ( + self._flags.max_deadline_variance + if max_deadline_variance is None + else max_deadline_variance + ), ), ) diff --git a/data/csv_reader.py b/data/csv_reader.py index f59084ea..d4d0d1f4 100644 --- a/data/csv_reader.py +++ b/data/csv_reader.py @@ -134,9 +134,9 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]): window_to_execute=None, # Checking if len(reading) > 6 # is for backward compatibility - slowest_execution_time=int(reading[6]) - if len(reading) > 6 - else None, + slowest_execution_time=( + int(reading[6]) if len(reading) > 6 else None + ), ) tasks[reading[4]].cancelled = True tasks[reading[4]].cancelled_at = int(reading[0]) @@ -163,9 +163,9 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]): num_tasks=int(reading[5]), # Checking if len(reading) > 6 # is for backward compatibility - critical_path_time=int(reading[6]) - if len(reading) > 6 - else None, + critical_path_time=( + int(reading[6]) if len(reading) > 6 else None + ), ) elif reading[1] == "TASK_GRAPH_FINISHED": task_graphs[reading[2]].completion_at = int(reading[0]) @@ -504,9 +504,9 @@ def check_if_time_intersects(between_time, start_time, end_time): resource_counter = defaultdict(int) for resource in worker_pool.resources: resource_counter[resource.name] += 1 - resource_ids_to_canonical_names[ - resource.id - ] = f"{resource.name}_{resource_counter[resource.name]}" + resource_ids_to_canonical_names[resource.id] = ( + f"{resource.name}_{resource_counter[resource.name]}" + ) # Output all the tasks and the requested deadlines. if trace_fmt in ["task", "taskgraph", "application", "resource"]: diff --git a/data/workload_loader.py b/data/workload_loader.py index e7a93a18..06e9b324 100644 --- a/data/workload_loader.py +++ b/data/workload_loader.py @@ -140,19 +140,21 @@ def __init__(self, path: str, _flags: Optional["absl.flags"] = None) -> None: # Create the JobGraph. if self._replication_factor > 1: for i in range(1, self._replication_factor + 1): - job_graph_mapping[ - f"{job_name}_{i}" - ] = WorkloadLoader.load_job_graph( - JobGraph( - name=f"{job_name}_{i}", - release_policy=release_policy, - deadline_variance=deadline_variance, - ), - job["graph"], - deepcopy(work_profiles) - if not self._unique_work_profiles - else work_profiles, - self._slo, + job_graph_mapping[f"{job_name}_{i}"] = ( + WorkloadLoader.load_job_graph( + JobGraph( + name=f"{job_name}_{i}", + release_policy=release_policy, + deadline_variance=deadline_variance, + ), + job["graph"], + ( + deepcopy(work_profiles) + if not self._unique_work_profiles + else work_profiles + ), + self._slo, + ) ) pass else: @@ -163,9 +165,11 @@ def __init__(self, path: str, _flags: Optional["absl.flags"] = None) -> None: deadline_variance=deadline_variance, ), job["graph"], - deepcopy(work_profiles) - if not self._unique_work_profiles - else work_profiles, + ( + deepcopy(work_profiles) + if not self._unique_work_profiles + else work_profiles + ), self._slo, ) @@ -258,9 +262,11 @@ def __create_release_policy( ) return JobGraph.ReleasePolicy.periodic( period=EventTime( - job["period"] - if override_arrival_period is None - else override_arrival_period, + ( + job["period"] + if override_arrival_period is None + else override_arrival_period + ), EventTime.Unit.US, ), start=start_time, @@ -278,14 +284,18 @@ def __create_release_policy( ) return JobGraph.ReleasePolicy.fixed( period=EventTime( - job["period"] - if override_arrival_period is None - else override_arrival_period, + ( + job["period"] + if override_arrival_period is None + else override_arrival_period + ), EventTime.Unit.US, ), - num_invocations=job["invocations"] - if override_num_invocations is None - else override_num_invocations, + num_invocations=( + job["invocations"] + if override_num_invocations is None + else override_num_invocations + ), start=start_time, ) elif job["release_policy"] == "poisson": @@ -297,9 +307,11 @@ def __create_release_policy( "`rate` or `invocations` was not defined for the JobGraph." ) return JobGraph.ReleasePolicy.poisson( - rate=job["rate"] - if override_poisson_arrival_rate is None - else override_poisson_arrival_rate, + rate=( + job["rate"] + if override_poisson_arrival_rate is None + else override_poisson_arrival_rate + ), num_invocations=job["invocations"], start=start_time, ) @@ -312,13 +324,17 @@ def __create_release_policy( "`rate` or `coefficient` was not defined for the JobGraph." ) return JobGraph.ReleasePolicy.gamma( - rate=job["rate"] - if override_poisson_arrival_rate is None - else override_poisson_arrival_rate, + rate=( + job["rate"] + if override_poisson_arrival_rate is None + else override_poisson_arrival_rate + ), num_invocations=job["invocations"], - coefficient=job["coefficient"] - if override_gamma_coefficient is None - else override_gamma_coefficient, + coefficient=( + job["coefficient"] + if override_gamma_coefficient is None + else override_gamma_coefficient + ), start=start_time, ) elif job["release_policy"] == "closed_loop": diff --git a/experiments/analysis_utils.py b/experiments/analysis_utils.py index dea895e1..f63165d5 100644 --- a/experiments/analysis_utils.py +++ b/experiments/analysis_utils.py @@ -119,10 +119,9 @@ def extract_variables_from_filename_v2(filename): break if variables["scheduler"] == "TetriSched": - variables[ - "scheduler" - ] = f'TetriSched_time_dis_{variables["scheduler_time_discretization"]}' + ( - "_DAG_aware" if variables["DAG_aware"] else "" + variables["scheduler"] = ( + f'TetriSched_time_dis_{variables["scheduler_time_discretization"]}' + + ("_DAG_aware" if variables["DAG_aware"] else "") ) return variables diff --git a/schedulers/ilp_scheduler.py b/schedulers/ilp_scheduler.py index a54cbd29..0efe5482 100644 --- a/schedulers/ilp_scheduler.py +++ b/schedulers/ilp_scheduler.py @@ -456,9 +456,9 @@ def get_placements( return [ Placement.create_task_placement( task=task, - placement_time=start_time - if placement_worker_id and placement_strategy - else None, + placement_time=( + start_time if placement_worker_id and placement_strategy else None + ), worker_pool_id=placement_worker_pool_id, worker_id=placement_worker_id, execution_strategy=placement_strategy, @@ -833,9 +833,11 @@ def _create_batch_task_variables( unscheduled_tasks = deque( sorted( unscheduled_tasks, - key=lambda t: t.deadline - if t.task_graph not in self._allowed_to_miss_deadlines - else float("inf"), + key=lambda t: ( + t.deadline + if t.task_graph not in self._allowed_to_miss_deadlines + else float("inf") + ), ) ) tasks_to_batch_tasks: Mapping[Task, List[BatchTask]] = defaultdict(list) @@ -896,12 +898,14 @@ def _create_batch_task_variables( # Deadlines are not to be enforced if the tasks in this batch are all # allowed to miss their deadlines. Otherwise, we conservatively aim to # enforce deadlines. - enforce_deadlines=False - if all( - task.task_graph in self._allowed_to_miss_deadlines - for task in batch_task.tasks - ) - else self.enforce_deadlines, + enforce_deadlines=( + False + if all( + task.task_graph in self._allowed_to_miss_deadlines + for task in batch_task.tasks + ) + else self.enforce_deadlines + ), retract_schedules=self.retract_schedules, ) @@ -1088,14 +1092,16 @@ def _add_task_dependency_constraints( "variables (along with the number of parents in each variable): %s", sim_time.to(EventTime.Unit.US).time, task_name, - ", ".join( - [ - f"{variable.task.unique_name} ({num_parents})" - for variable, num_parents in parent_variables.items() - ] - ) - if len(parent_variables) > 0 - else "None", + ( + ", ".join( + [ + f"{variable.task.unique_name} ({num_parents})" + for variable, num_parents in parent_variables.items() + ] + ) + if len(parent_variables) > 0 + else "None" + ), ) # Ensure that the task is only placed if all of its parents are placed, diff --git a/schedulers/tetrisched_cplex_scheduler.py b/schedulers/tetrisched_cplex_scheduler.py index d204f341..8452c35f 100644 --- a/schedulers/tetrisched_cplex_scheduler.py +++ b/schedulers/tetrisched_cplex_scheduler.py @@ -1286,9 +1286,9 @@ def _add_objective( # the scheduler. task_not_placed_penalty = [] if self._batching: - tasks_to_batch_tasks: Mapping[ - Task, Set[TaskOptimizerVariables] - ] = defaultdict(set) + tasks_to_batch_tasks: Mapping[Task, Set[TaskOptimizerVariables]] = ( + defaultdict(set) + ) for batch_task_variable in tasks_to_variables.values(): for task in batch_task_variable.task.tasks: tasks_to_batch_tasks[task].add(batch_task_variable) diff --git a/schedulers/tetrisched_gurobi_scheduler.py b/schedulers/tetrisched_gurobi_scheduler.py index 1060f6f4..7c7949b0 100644 --- a/schedulers/tetrisched_gurobi_scheduler.py +++ b/schedulers/tetrisched_gurobi_scheduler.py @@ -719,11 +719,11 @@ def schedule( ) for task_variables in tasks_to_variables.values(): - placements_for_task: Sequence[ - Placement - ] = task_variables.get_placements( - worker_index_to_worker=workers, - worker_id_to_worker_pool=worker_to_worker_pool, + placements_for_task: Sequence[Placement] = ( + task_variables.get_placements( + worker_index_to_worker=workers, + worker_id_to_worker_pool=worker_to_worker_pool, + ) ) for placement in placements_for_task: if placement.is_placed(): diff --git a/schedulers/tetrisched_scheduler.py b/schedulers/tetrisched_scheduler.py index f4a6f461..22e24d52 100644 --- a/schedulers/tetrisched_scheduler.py +++ b/schedulers/tetrisched_scheduler.py @@ -35,9 +35,9 @@ class Partitions(object): def __init__(self, worker_pools: WorkerPools) -> None: self._available_partitions = tetrisched.Partitions() - self._resource_name_to_partitions_map: Mapping[ - str, tetrisched.Partitions - ] = defaultdict(tetrisched.Partitions) + self._resource_name_to_partitions_map: Mapping[str, tetrisched.Partitions] = ( + defaultdict(tetrisched.Partitions) + ) # BUG (Sukrit): The worker_index_to_partition_map is being used to keep the # Partition objects live on the Python side so we can query the associatedWorker # and the associatedWorkerPool. Otherwise, pybind11 loses track of the objects @@ -90,9 +90,9 @@ def _construct_partitions(self, worker_pools: WorkerPools) -> None: # Maintain the relevant mappings to transform it to a Placement. partition.associatedWorker = worker partition.associatedWorkerPool = worker_pool - self._worker_index_to_partition_map[ - self._worker_index_counter - ] = partition + self._worker_index_to_partition_map[self._worker_index_counter] = ( + partition + ) self._worker_index_counter += 1 def get_partition_for_worker_id( diff --git a/tests/utils.py b/tests/utils.py index 75456fa5..8c94ddb7 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -47,20 +47,22 @@ def create_default_task( name=task_name, task_graph=task_graph_name, job=job, - profile=WorkProfile( - name=f"{task_name}_work_profile", - execution_strategies=ExecutionStrategies( - strategies=[ - ExecutionStrategy( - resources=resource_requirements, - batch_size=1, - runtime=EventTime(runtime, EventTime.Unit.US), - ) - ] - ), - ) - if profile is None - else profile, + profile=( + WorkProfile( + name=f"{task_name}_work_profile", + execution_strategies=ExecutionStrategies( + strategies=[ + ExecutionStrategy( + resources=resource_requirements, + batch_size=1, + runtime=EventTime(runtime, EventTime.Unit.US), + ) + ] + ), + ) + if profile is None + else profile + ), deadline=EventTime(deadline, EventTime.Unit.US), timestamp=timestamp, release_time=EventTime(release_time, EventTime.Unit.US), diff --git a/workload/jobs.py b/workload/jobs.py index 7dcd0fb6..de469191 100644 --- a/workload/jobs.py +++ b/workload/jobs.py @@ -871,9 +871,11 @@ def _generate_task_graph( ( task.job.runtime for task in task_graph.get_longest_path( - weights=lambda task: task.job.runtime.time - if task.probability > sys.float_info.epsilon - else 0 + weights=lambda task: ( + task.job.runtime.time + if task.probability > sys.float_info.epsilon + else 0 + ) ) ), start=EventTime.zero(), @@ -892,15 +894,19 @@ def _generate_task_graph( def __get_completion_time(self, start=EventTime.zero()) -> EventTime: return sum( ( - job.slo - if job.slo != EventTime.invalid() - else job.execution_strategies.get_slowest_strategy().runtime + ( + job.slo + if job.slo != EventTime.invalid() + else job.execution_strategies.get_slowest_strategy().runtime + ) for job in self.get_longest_path( - weights=lambda job: job.execution_strategies.get_slowest_strategy() - .runtime.to(EventTime.Unit.US) - .time - if job.probability > sys.float_info.epsilon - else 0 + weights=lambda job: ( + job.execution_strategies.get_slowest_strategy() + .runtime.to(EventTime.Unit.US) + .time + if job.probability > sys.float_info.epsilon + else 0 + ) ) ), start=start, @@ -918,11 +924,13 @@ def critical_path_runtime(self) -> EventTime: [ job.execution_strategies.get_slowest_strategy().runtime for job in self.get_longest_path( - weights=lambda job: job.execution_strategies.get_slowest_strategy() - .runtime.to(EventTime.Unit.US) - .time - if job.probability > sys.float_info.epsilon - else 0 + weights=lambda job: ( + job.execution_strategies.get_slowest_strategy() + .runtime.to(EventTime.Unit.US) + .time + if job.probability > sys.float_info.epsilon + else 0 + ) ) ], start=EventTime.zero(),