Skip to content

Commit

Permalink
Enable reconsideration of TaskGraphs to a configurable time limit.
Browse files Browse the repository at this point in the history
  • Loading branch information
sukritkalra committed Jan 2, 2024
1 parent 746645e commit 086e823
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 16 deletions.
143 changes: 127 additions & 16 deletions schedulers/tetrisched_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import time
from collections import defaultdict
from math import ceil
from typing import List, Mapping, Optional, Set, Tuple

import absl # noqa: F401
Expand Down Expand Up @@ -219,10 +220,21 @@ def __init__(
max_occupancy_threshold,
)
self._use_task_graph_indicator_uility = True
self._previously_placed_reward_scale_factor = 2.0
self._previously_placed_reward_scale_factor = 1.0
self._enable_optimization_passes = (
_flags.scheduler_enable_optimization_pass if _flags else False
)
# NOTE (Sukrit): We observe that solving each TaskGraph independently usually
# leads to more missed deadlines than required. To offset this, the following
# parameter sets a threshold until which the scheduler will try to reschedule
# the TaskGraphs in the next invocation of the scheduler.
# For example, a value of 0.2 means that the scheduler will try to reschedule
# the TaskGraphs until the scheduler invocations within 20% of the time between
# the release time and the deadline. So, if a TaskGraph was released at 100 and
# has a deadline of 500, it will be retried until scheduler invocations upto
# 180, and will be dropped after.
self._task_graph_reconsideration_period = 0.20
self._previously_considered_task_graphs: Set[str] = set()

# A cache for the STRLs generated for individual tasks.
# This is used to avoid generating the same STRL multiple times, and so that
Expand Down Expand Up @@ -259,12 +271,19 @@ def __init__(
f"Windowed choose not implemented for the goal: {self._goal}."
)

def _cancel_task_graph_predicate(self, task_graph: TaskGraph) -> bool:
"""Returns True if the TaskGraph should be skipped from scheduling."""
return False

def schedule(
self, sim_time: EventTime, workload: Workload, worker_pools: WorkerPools
) -> Placements:
# Reset the STRL mappings.
self._individual_task_strls = {}

# The Placements to be returned.
placements = []

# Retrieve the schedulable tasks from the Workload.
tasks_to_be_scheduled: List[Task] = workload.get_schedulable_tasks(
time=sim_time,
Expand All @@ -291,6 +310,89 @@ def schedule(
f"(runtimes, deadlines) were: {task_description_string}."
)

# Find the TaskGraphs that are past their reconsideration deadline and cancel
# those upfront.
cancelled_task_graphs: Set[str] = set()
if self._release_taskgraphs:
for task_graph_name in task_graph_names:
task_graph = workload.get_task_graph(task_graph_name)
if task_graph is None:
raise ValueError(
f"Could not find TaskGraph with name {task_graph_name}."
)

# TaskGraphs that have been previously scheduled cannot be cancelled
# upfront since they already have a feasible placement. The scheduler
# must choose to cancel or keep them later.
if task_graph.is_scheduled():
# The TaskGraph has been scheduled before. Keep it.
self._logger.debug(
f"[{sim_time.time}] Keeping TaskGraph {task_graph_name} "
f"released at {task_graph.release_time} with deadline "
f"{task_graph.deadline} since it has been scheduled before."
)
continue

# Check if the TaskGraph needs to be cancelled.
if task_graph_name not in self._previously_considered_task_graphs:
# If this is a new TaskGraph, check if it needs to be cancelled
# upfront as decided by the predicate
if self._cancel_task_graph_predicate(task_graph):
self._logger.debug(
f"[{sim_time.time}] Cancelling TaskGraph {task_graph_name} "
f"since the predicate that decides whether to cancel it is "
f"True."
)
# If the predicate says that we should cancel the TaskGraph
# without trying, we just add all of its nodes to the cancelled
# TaskGraphs.
for task in tasks_to_be_scheduled:
if task.task_graph == task_graph_name:
placements.append(
Placement.create_task_cancellation(task=task)
)
self._previously_considered_task_graphs.add(task_graph_name)
cancelled_task_graphs.add(task_graph_name)
continue
else:
# The TaskGraph has not been scheduled before, and is being
# reconsidered for scheduling. Calculate the slack between the
# release time and the deadline and decide whether the TaskGraph
# should be dropped.
slack = task_graph.deadline - task_graph.release_time
time_until_reconsideration_ends = (
task_graph.release_time
+ EventTime(
ceil(slack.time * self._task_graph_reconsideration_period),
EventTime.Unit.US,
)
)
if sim_time > time_until_reconsideration_ends:
# The TaskGraph has been reconsidered for too long. Cancel it.
self._logger.debug(
f"[{sim_time.time}] Cancelling TaskGraph {task_graph_name} "
f"because it has been reconsidered for too long. It was "
f"released at {task_graph.release_time} with deadline "
f"{task_graph.deadline}, and was reconsidered until "
f"{time_until_reconsideration_ends}."
)
# Emit TASK_CANCEL events for all the tasks in the TaskGraph.
for task in tasks_to_be_scheduled:
if task.task_graph == task_graph_name:
placements.append(
Placement.create_task_cancellation(task=task)
)
# Add the TaskGraph to the cancelled TaskGraphs.
cancelled_task_graphs.add(task_graph_name)
else:
# The TaskGraph has not been reconsidered for too long. Keep it.
self._logger.debug(
f"[{sim_time.time}] Keeping TaskGraph {task_graph_name} "
f"released at {task_graph.release_time} with deadline "
f"{task_graph.deadline}, and it will be reconsidered until "
f"{time_until_reconsideration_ends}."
)

# Find the currently running and scheduled tasks to inform
# the scheduler of previous placements.
if self.retract_schedules:
Expand Down Expand Up @@ -322,9 +424,12 @@ def schedule(

# Construct the STRL expression.
scheduler_start_time = time.time()
placements = []
if len(tasks_to_be_scheduled) > 0 and any(
task.state != TaskState.SCHEDULED for task in tasks_to_be_scheduled
# If there is a Task belonging to a TaskGraph that hasn't been previously
# considered for scheduling, then we run the scheduler.
task.state != TaskState.SCHEDULED
and task.task_graph not in self._previously_considered_task_graphs
for task in tasks_to_be_scheduled
):
# Construct the partitions from the Workers in the WorkerPool.
partitions = Partitions(worker_pools=worker_pools)
Expand Down Expand Up @@ -405,7 +510,20 @@ def schedule(
# together in a single objective expression.
task_strls: Mapping[str, tetrisched.strl.Expression] = {}
for task_graph_name in task_graph_names:
# Retrieve the TaskGraph and construct its STRL.
# Add the TaskGraph to the previously considered TaskGraphs.
self._previously_considered_task_graphs.add(task_graph_name)

# Retrieve the TaskGraph and construct its STRL, if the TaskGraph
# hasn't already been cancelled.
if task_graph_name in cancelled_task_graphs:
self._logger.debug(
f"[{sim_time.time}] Skipping STRL generation for TaskGraph "
f"{task_graph_name} because it has been cancelled in this "
f"run."
)
continue

# Construct the STRL.
task_graph = workload.get_task_graph(task_graph_name)
task_graph_strl = self.construct_task_graph_strl(
current_time=sim_time,
Expand Down Expand Up @@ -1081,19 +1199,12 @@ def _construct_task_graph_strl(
if child_expression:
child_expressions[child_expression.id] = child_expression
elif child.state != TaskState.COMPLETED:
raise RuntimeError(
f"Could not construct the STRL for all the children of "
f"{task.unique_name}."
self._logger.warn(
f"[{current_time.time}] Could not construct the STRL for all the "
f"children of {task.unique_name}. Failing the construction of STRL "
f"for the TaskGraph {task_graph.name} rooted at {task.unique_name}."
)

# If the number of children does not equal the number of children in the
# TaskGraph that haven't completed, then we have not been able to construct the
# STRL for all the children. Return None.
# if len(child_expressions) != num_enforceable_children:
# self._logger.warn(
# f"[{current_time.time}] Could not construct the STRL for all the "
# f"children of {task.unique_name}."
# )
return None

# If there are no children, cache and return the expression for this Task.
if len(child_expressions) == 0:
Expand Down
9 changes: 9 additions & 0 deletions workload/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,15 @@ def merge(self, task_graphs: Sequence["TaskGraph"]) -> "TaskGraph":
"""
raise NotImplementedError("Merging of Taskgraphs has not been implemented yet.")

def is_scheduled(self) -> bool:
"""Check if any of the tasks in the TaskGraph have been scheduled.
Returns:
`True` if any of the tasks in the TaskGraph have been scheduled,
and `False` otherwise.
"""
return any(task.state == TaskState.SCHEDULED for task in self.get_nodes())

def is_complete(self) -> bool:
"""Check if the task graph has finished execution.
Expand Down

0 comments on commit 086e823

Please sign in to comment.