Skip to content

Commit

Permalink
[GRAPHENE] Implement Graphene Scheduler (#91)
Browse files Browse the repository at this point in the history
* [GRAPHENE] Begin implementation of the Graphene scheduler.

* Implement basic structure of Graphene's two-phase scheduling.

* Implement a separate STRL session to invoke min makespan scheduling.

* Add a method to update the TaskGraph in-place.

* Test for imports of both Graphene and TetriSched schedulers together.

* Finish the implementation of the offline pass of Graphene.

* Implement the example Workload from Graphene paper.

* Fix black formatting for workload/tasks.py

* Fix flake8 for Graphene.

* Introduce notify_workload_updated to transform TaskGraphs before Tasks from them are released.
  • Loading branch information
sukritkalra authored Feb 22, 2024
1 parent 4c744e7 commit d12947b
Show file tree
Hide file tree
Showing 10 changed files with 513 additions and 12 deletions.
16 changes: 16 additions & 0 deletions configs/simple_graphene_workload.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Output configs
--log=./simple_graphene_workload.log
--log_level=debug
--csv=./simple_graphene_workload.csv

# Task configs
--runtime_variance=0

# Scheduler configs
--scheduler=Graphene
--scheduler_runtime=0

# Execution mode configs
--execution_mode=json
--workload_profile_path=./profiles/workload/simple_graphene_workload.yaml
--worker_profile_path=./profiles/workers/simple_graphene_workers.yaml
16 changes: 16 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
ClockworkScheduler,
EDFScheduler,
FIFOScheduler,
GrapheneScheduler,
ILPScheduler,
LSFScheduler,
TetriSchedCPLEXScheduler,
Expand Down Expand Up @@ -289,6 +290,7 @@
"Clockwork",
"TetriSched",
"GraphenePrime",
"Graphene",
],
"The scheduler to use for this execution.",
)
Expand Down Expand Up @@ -841,6 +843,20 @@ def main(args):
log_to_file=FLAGS.scheduler_log_to_file,
_flags=FLAGS,
)
elif FLAGS.scheduler == "Graphene":
scheduler = GrapheneScheduler(
preemptive=FLAGS.preemption,
runtime=EventTime(FLAGS.scheduler_runtime, EventTime.Unit.US),
lookahead=EventTime(FLAGS.scheduler_lookahead, EventTime.Unit.US),
retract_schedules=FLAGS.retract_schedules,
goal=FLAGS.ilp_goal,
time_discretization=EventTime(
FLAGS.scheduler_time_discretization, EventTime.Unit.US
),
plan_ahead=EventTime(FLAGS.scheduler_plan_ahead, EventTime.Unit.US),
log_to_file=FLAGS.scheduler_log_to_file,
_flags=FLAGS,
)
else:
raise ValueError(
"Unsupported scheduler implementation: {}".format(FLAGS.scheduler)
Expand Down
6 changes: 6 additions & 0 deletions profiles/workers/simple_graphene_workers.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- name: WorkerPool_1
workers:
- name: Worker_1_1
resources:
- name: Slot
quantity: 3
73 changes: 73 additions & 0 deletions profiles/workload/simple_graphene_workload.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Implementation of the example workload specified in the Graphene paper.
# This workload is being used to test the Offline and Online stages of the
# ILP-based reimplementation of the Graphene algorithm.
graphs:
- name: GrapheneMotivation
graph:
- name: "Task0"
work_profile: "Task0Profile"
children: ["Task6"]
- name: "Task1"
work_profile: "Task1Profile"
children: ["Task2"]
- name: "Task2"
work_profile: "Task2Profile"
children: ["Task6"]
- name: "Task3"
work_profile: "Task3Profile"
children: ["Task4"]
- name: "Task4"
work_profile: "Task4Profile"
children: ["Task5"]
- name: "Task5"
work_profile: "Task5Profile"
children: ["Task6"]
- name: "Task6"
work_profile: "Task6Profile"
release_policy: fixed
period: 0 # In us for now.
invocations: 1
deadline_variance: [2000, 2000]
profiles:
- name: Task0Profile
execution_strategies:
- batch_size: 1
runtime: 100
resource_requirements:
Slot:any: 1
- name: Task1Profile
execution_strategies:
- batch_size: 1
runtime: 20
resource_requirements:
Slot:any: 3
- name: Task2Profile
execution_strategies:
- batch_size: 1
runtime: 100
resource_requirements:
Slot:any: 1
- name: Task3Profile
execution_strategies:
- batch_size: 1
runtime: 20
resource_requirements:
Slot:any: 3
- name: Task4Profile
execution_strategies:
- batch_size: 1
runtime: 20
resource_requirements:
Slot:any: 3
- name: Task5Profile
execution_strategies:
- batch_size: 1
runtime: 100
resource_requirements:
Slot:any: 1
- name: Task6Profile
execution_strategies:
- batch_size: 1
runtime: 20
resource_requirements:
Slot:any: 1
2 changes: 2 additions & 0 deletions schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from .tetrisched_gurobi_scheduler import TetriSchedGurobiScheduler

try:
from .graphene_scheduler import GrapheneScheduler
from .tetrisched_scheduler import TetriSchedScheduler
except ImportError:
pass

from .z3_scheduler import Z3Scheduler
19 changes: 19 additions & 0 deletions schedulers/base_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,25 @@ def start(
placements=placements,
)

def notify_workload_updated(
self,
sim_time: EventTime,
workload: Workload,
worker_pools: "WorkerPools", # noqa: F821
) -> None:
"""Notifies the Scheduler that the Workload has been updated.
The Simulator invokes this method to notify the Scheduler that it can do
any necessary bookkeeping or updates to its internal state.
Args:
sim_time (`EventTime`): The time at which the workload was updated.
workload (`Workload`): The `Workload` that was updated.
worker_pools (`WorkerPools`): The set of worker pools available to the
Scheduler at the update of the Workload.
"""
pass

def schedule(
self,
sim_time: EventTime,
Expand Down
Loading

0 comments on commit d12947b

Please sign in to comment.