From 3f2271b7d07f4215bbde52d801ca6229966a0463 Mon Sep 17 00:00:00 2001 From: Sukrit Kalra Date: Wed, 21 Feb 2024 17:03:24 -0800 Subject: [PATCH] Implement a separate STRL session to invoke min makespan scheduling. --- schedulers/graphene_scheduler.py | 110 +++++++++++++++++++++++++++-- schedulers/tetrisched_scheduler.py | 23 ++++-- 2 files changed, 120 insertions(+), 13 deletions(-) diff --git a/schedulers/graphene_scheduler.py b/schedulers/graphene_scheduler.py index 3583f4f8..7350dfb4 100644 --- a/schedulers/graphene_scheduler.py +++ b/schedulers/graphene_scheduler.py @@ -1,18 +1,16 @@ -try: - from .tetrisched_scheduler import TetriSchedScheduler -except ImportError: - raise ImportError( - "TetriSchedScheduler not found. " "Please install the TetriSched package." - ) - +import os from typing import Optional import absl # noqa: F401 +import numpy as np +import tetrisched_py as tetrisched from utils import EventTime from workers import WorkerPools from workload import Placements, Workload +from .tetrisched_scheduler import Partitions, TetriSchedScheduler + class GrapheneScheduler(TetriSchedScheduler): """Implements a STRL-based formulation for the Graphene scheduler. @@ -63,13 +61,34 @@ def __init__( _flags=_flags, ) + # The Graphene scheduler is a STRL-based scheduler, and it requires the + # TetriSched framework to solve the STRL formulation. + self._min_makespan_scheduler = tetrisched.Scheduler( + self._time_discretization.time, + tetrisched.backends.SolverBackendType.GUROBI, + self._log_dir, + ) + self._min_makespan_scheduler_configuration = tetrisched.SchedulerConfig() + self._min_makespan_scheduler_configuration.optimize = ( + self._enable_optimization_passes + ) + # Keep a hash set of the TaskGraph names that have been transformed by the # scheduler already. self._transformed_taskgraphs = set() + # Configuration parameters for the Offline phase of the Graphene scheduler. + # The plan-ahead multiplier is used to determine the plan-ahead window for each + # of the TaskGraph in the Offline phase. The multiplier is multiplied by the + # critical path length of the TaskGraph to determine the plan-ahead window. + self._plan_ahead_multiplier = 2 + def schedule( self, sim_time: EventTime, workload: Workload, worker_pools: WorkerPools ) -> Placements: + # Create a Partitions object from the WorkerPools that are available. + partitions = Partitions(worker_pools=worker_pools) + # Find the task graphs that have been added to the Workload but not yet # transformed by the Graphene scheduler. for task_graph_name, task_graph in workload.task_graphs.items(): @@ -86,6 +105,83 @@ def schedule( task_graph_name, ) + # Create the rewards for the Placement times that allow STRL to construct + # a minimum makespan schedule for this graph. + critical_path_runtime = task_graph.critical_path_runtime + dilated_critical_path_runtime = EventTime( + int(critical_path_runtime.time * self._plan_ahead_multiplier), + critical_path_runtime.unit, + ) + placement_times = self._get_time_discretizations_until( + current_time=sim_time, + end_time=sim_time + dilated_critical_path_runtime, + ) + placement_times_and_rewards = list( + zip( + placement_times, + np.interp( + list(map(lambda x: x.time, placement_times)), + ( + min(placement_times).time, + max(placement_times).time, + ), + (2, 1), + ), + ) + ) + + # Construct the STRL formulation for the minimum makespan scheduling of the + # particular TaskGraph. This corresponds to the first (offline) phase of + # the Graphene scheduling algorithm. + objective_strl = tetrisched.strl.ObjectiveExpression( + f"{task_graph_name}_min_makespan" + ) + task_graph_strl = self.construct_task_graph_strl( + current_time=sim_time, + task_graph=task_graph, + partitions=partitions, + placement_times_and_rewards=placement_times_and_rewards, + ) + if task_graph_strl is None: + raise ValueError(f"Failed to construct the STRL for {task_graph_name}.") + self._logger.debug( + "[%s] Successfully constructed the minimum makespan " + "STRL for TaskGraph %s.", + sim_time.time, + task_graph_name, + ) + objective_strl.addChild(task_graph_strl) + + # Register the STRL expression with the scheduler and solve it. + try: + self._min_makespan_scheduler.registerSTRL( + objective_strl, + partitions.partitions, + sim_time.time, + self._min_makespan_scheduler_configuration, + ) + self._min_makespan_scheduler.schedule(sim_time.time) + except RuntimeError as e: + strl_file_name = f"{task_graph_name}_error.dot" + solver_model_file_name = f"{task_graph_name}_error.lp" + self._logger.error( + "[%s] Received error with description: %s while invoking the " + "STRL-based minimum makespan scheduler for TaskGraph %s. Dumping " + "the model to %s and STRL expression to %s.", + sim_time.time, + e, + task_graph_name, + solver_model_file_name, + strl_file_name, + ) + objective_strl.exportToDot(os.path.join(self._log_dir, strl_file_name)) + self._min_makespan_scheduler.exportLastSolverModel( + os.path.join(self._log_dir, solver_model_file_name) + ) + + # TODO (Sukrit): Retrieve the order of placements from the solver, and + # construct the new TaskGraph with the placements. + # All the TaskGraphs have been transformed, call the TetriSched scheduler and # return the Placements. return super(GrapheneScheduler, self).schedule(sim_time, workload, worker_pools) diff --git a/schedulers/tetrisched_scheduler.py b/schedulers/tetrisched_scheduler.py index f9622c4e..a56bfaf4 100644 --- a/schedulers/tetrisched_scheduler.py +++ b/schedulers/tetrisched_scheduler.py @@ -708,6 +708,7 @@ def schedule( continue # Construct the STRL. + scale_factor = self._previously_placed_reward_scale_factor task_graph_strl = self.construct_task_graph_strl( current_time=sim_time, task_graph=task_graph, @@ -718,6 +719,8 @@ def schedule( task_strls=task_strls, previously_placed=task_graph_name in previously_placed_task_graphs, + use_indicator_utility=self._use_task_graph_indicator_utility, + scale_reward_previously_placed=scale_factor, ) if task_graph_strl is not None: objective_strl.addChild(task_graph_strl) @@ -1657,6 +1660,8 @@ def construct_task_graph_strl( tasks_to_be_scheduled: Optional[List[Task]] = None, task_strls: Optional[Mapping[str, tetrisched.strl.Expression]] = None, previously_placed: Optional[bool] = False, + use_indicator_utility: Optional[bool] = False, + scale_reward_previously_placed: float = 1.0, ) -> tetrisched.strl.Expression: """Constructs the STRL expression subtree for a given TaskGraph. @@ -1670,6 +1675,12 @@ def construct_task_graph_strl( considered. Defaults to `None`. previously_placed (`Optional[bool]`): Whether the TaskGraph has been previously placed. Defaults to `False`. + use_indicator_utility (`Optional[bool]`): Whether to use the indicator of + the placement of the TaskGraph as the utility. If False, the utility + of the individual Task placements is utilized instead. + scale_reward_previously_placed (`float`): The factor by which to scale the + reward of the TaskGraph if it has been previously placed. Defaults to + 1.0. """ # Maintain a cache to be used across the construction of the TaskGraph to make # it DAG-aware, if not provided. @@ -1729,21 +1740,21 @@ def construct_task_graph_strl( # utilities to be scaled, or if we are using the indicator from the topmost # TaskGraph expression to scale the utility. should_scale = ( - self._previously_placed_reward_scale_factor > 1.0 and previously_placed - ) or self._use_task_graph_indicator_utility + scale_reward_previously_placed > 1.0 and previously_placed + ) or use_indicator_utility if should_scale: self._logger.debug( "[%s] Scaling the %s of %s by %s.", current_time.to(EventTime.Unit.US).time, - "indicator" if self._use_task_graph_indicator_utility else "utility", + "indicator" if use_indicator_utility else "utility", task_graph.name, - self._previously_placed_reward_scale_factor, + scale_reward_previously_placed, ) scale_expression = tetrisched.strl.ScaleExpression( f"{task_graph.name}_scale", - self._previously_placed_reward_scale_factor if previously_placed else 1, - self._use_task_graph_indicator_utility, + scale_reward_previously_placed if previously_placed else 1, + use_indicator_utility, ) scale_expression.addChild(task_graph_strl) return scale_expression