Skip to content

Commit

Permalink
Implement a separate STRL session to invoke min makespan scheduling.
Browse files Browse the repository at this point in the history
  • Loading branch information
sukritkalra committed Feb 22, 2024
1 parent 6486d92 commit 3f2271b
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 13 deletions.
110 changes: 103 additions & 7 deletions schedulers/graphene_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
try:
from .tetrisched_scheduler import TetriSchedScheduler
except ImportError:
raise ImportError(
"TetriSchedScheduler not found. " "Please install the TetriSched package."
)

import os
from typing import Optional

import absl # noqa: F401
import numpy as np
import tetrisched_py as tetrisched

from utils import EventTime
from workers import WorkerPools
from workload import Placements, Workload

from .tetrisched_scheduler import Partitions, TetriSchedScheduler


class GrapheneScheduler(TetriSchedScheduler):
"""Implements a STRL-based formulation for the Graphene scheduler.
Expand Down Expand Up @@ -63,13 +61,34 @@ def __init__(
_flags=_flags,
)

# The Graphene scheduler is a STRL-based scheduler, and it requires the
# TetriSched framework to solve the STRL formulation.
self._min_makespan_scheduler = tetrisched.Scheduler(
self._time_discretization.time,
tetrisched.backends.SolverBackendType.GUROBI,
self._log_dir,
)
self._min_makespan_scheduler_configuration = tetrisched.SchedulerConfig()
self._min_makespan_scheduler_configuration.optimize = (
self._enable_optimization_passes
)

# Keep a hash set of the TaskGraph names that have been transformed by the
# scheduler already.
self._transformed_taskgraphs = set()

# Configuration parameters for the Offline phase of the Graphene scheduler.
# The plan-ahead multiplier is used to determine the plan-ahead window for each
# of the TaskGraph in the Offline phase. The multiplier is multiplied by the
# critical path length of the TaskGraph to determine the plan-ahead window.
self._plan_ahead_multiplier = 2

def schedule(
self, sim_time: EventTime, workload: Workload, worker_pools: WorkerPools
) -> Placements:
# Create a Partitions object from the WorkerPools that are available.
partitions = Partitions(worker_pools=worker_pools)

# Find the task graphs that have been added to the Workload but not yet
# transformed by the Graphene scheduler.
for task_graph_name, task_graph in workload.task_graphs.items():
Expand All @@ -86,6 +105,83 @@ def schedule(
task_graph_name,
)

# Create the rewards for the Placement times that allow STRL to construct
# a minimum makespan schedule for this graph.
critical_path_runtime = task_graph.critical_path_runtime
dilated_critical_path_runtime = EventTime(
int(critical_path_runtime.time * self._plan_ahead_multiplier),
critical_path_runtime.unit,
)
placement_times = self._get_time_discretizations_until(
current_time=sim_time,
end_time=sim_time + dilated_critical_path_runtime,
)
placement_times_and_rewards = list(
zip(
placement_times,
np.interp(
list(map(lambda x: x.time, placement_times)),
(
min(placement_times).time,
max(placement_times).time,
),
(2, 1),
),
)
)

# Construct the STRL formulation for the minimum makespan scheduling of the
# particular TaskGraph. This corresponds to the first (offline) phase of
# the Graphene scheduling algorithm.
objective_strl = tetrisched.strl.ObjectiveExpression(
f"{task_graph_name}_min_makespan"
)
task_graph_strl = self.construct_task_graph_strl(
current_time=sim_time,
task_graph=task_graph,
partitions=partitions,
placement_times_and_rewards=placement_times_and_rewards,
)
if task_graph_strl is None:
raise ValueError(f"Failed to construct the STRL for {task_graph_name}.")
self._logger.debug(
"[%s] Successfully constructed the minimum makespan "
"STRL for TaskGraph %s.",
sim_time.time,
task_graph_name,
)
objective_strl.addChild(task_graph_strl)

# Register the STRL expression with the scheduler and solve it.
try:
self._min_makespan_scheduler.registerSTRL(
objective_strl,
partitions.partitions,
sim_time.time,
self._min_makespan_scheduler_configuration,
)
self._min_makespan_scheduler.schedule(sim_time.time)
except RuntimeError as e:
strl_file_name = f"{task_graph_name}_error.dot"
solver_model_file_name = f"{task_graph_name}_error.lp"
self._logger.error(
"[%s] Received error with description: %s while invoking the "
"STRL-based minimum makespan scheduler for TaskGraph %s. Dumping "
"the model to %s and STRL expression to %s.",
sim_time.time,
e,
task_graph_name,
solver_model_file_name,
strl_file_name,
)
objective_strl.exportToDot(os.path.join(self._log_dir, strl_file_name))
self._min_makespan_scheduler.exportLastSolverModel(
os.path.join(self._log_dir, solver_model_file_name)
)

# TODO (Sukrit): Retrieve the order of placements from the solver, and
# construct the new TaskGraph with the placements.

# All the TaskGraphs have been transformed, call the TetriSched scheduler and
# return the Placements.
return super(GrapheneScheduler, self).schedule(sim_time, workload, worker_pools)
23 changes: 17 additions & 6 deletions schedulers/tetrisched_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ def schedule(
continue

# Construct the STRL.
scale_factor = self._previously_placed_reward_scale_factor
task_graph_strl = self.construct_task_graph_strl(
current_time=sim_time,
task_graph=task_graph,
Expand All @@ -718,6 +719,8 @@ def schedule(
task_strls=task_strls,
previously_placed=task_graph_name
in previously_placed_task_graphs,
use_indicator_utility=self._use_task_graph_indicator_utility,
scale_reward_previously_placed=scale_factor,
)
if task_graph_strl is not None:
objective_strl.addChild(task_graph_strl)
Expand Down Expand Up @@ -1657,6 +1660,8 @@ def construct_task_graph_strl(
tasks_to_be_scheduled: Optional[List[Task]] = None,
task_strls: Optional[Mapping[str, tetrisched.strl.Expression]] = None,
previously_placed: Optional[bool] = False,
use_indicator_utility: Optional[bool] = False,
scale_reward_previously_placed: float = 1.0,
) -> tetrisched.strl.Expression:
"""Constructs the STRL expression subtree for a given TaskGraph.
Expand All @@ -1670,6 +1675,12 @@ def construct_task_graph_strl(
considered. Defaults to `None`.
previously_placed (`Optional[bool]`): Whether the TaskGraph has been
previously placed. Defaults to `False`.
use_indicator_utility (`Optional[bool]`): Whether to use the indicator of
the placement of the TaskGraph as the utility. If False, the utility
of the individual Task placements is utilized instead.
scale_reward_previously_placed (`float`): The factor by which to scale the
reward of the TaskGraph if it has been previously placed. Defaults to
1.0.
"""
# Maintain a cache to be used across the construction of the TaskGraph to make
# it DAG-aware, if not provided.
Expand Down Expand Up @@ -1729,21 +1740,21 @@ def construct_task_graph_strl(
# utilities to be scaled, or if we are using the indicator from the topmost
# TaskGraph expression to scale the utility.
should_scale = (
self._previously_placed_reward_scale_factor > 1.0 and previously_placed
) or self._use_task_graph_indicator_utility
scale_reward_previously_placed > 1.0 and previously_placed
) or use_indicator_utility

if should_scale:
self._logger.debug(
"[%s] Scaling the %s of %s by %s.",
current_time.to(EventTime.Unit.US).time,
"indicator" if self._use_task_graph_indicator_utility else "utility",
"indicator" if use_indicator_utility else "utility",
task_graph.name,
self._previously_placed_reward_scale_factor,
scale_reward_previously_placed,
)
scale_expression = tetrisched.strl.ScaleExpression(
f"{task_graph.name}_scale",
self._previously_placed_reward_scale_factor if previously_placed else 1,
self._use_task_graph_indicator_utility,
scale_reward_previously_placed if previously_placed else 1,
use_indicator_utility,
)
scale_expression.addChild(task_graph_strl)
return scale_expression
Expand Down

0 comments on commit 3f2271b

Please sign in to comment.