Skip to content

Commit

Permalink
[RPC] Swap the scheduler with EDFScheduler.
Browse files Browse the repository at this point in the history
  • Loading branch information
sukritkalra committed Feb 21, 2024
1 parent 6b6bb4a commit 4dad5f9
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 46 deletions.
64 changes: 32 additions & 32 deletions rpc/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
from collections import defaultdict
from concurrent import futures
from operator import attrgetter
from typing import Mapping, Sequence
from urllib.parse import urlparse

Expand All @@ -16,6 +17,7 @@
import grpc
from absl import app, flags

from schedulers import EDFScheduler
from utils import EventTime, setup_logging
from workers import Worker, WorkerPool, WorkerPools
from workload import (
Expand Down Expand Up @@ -66,6 +68,7 @@ def __init__(self) -> None:
self._scheduler_running_lock = asyncio.Lock()
self._scheduler_running = False
self._rerun_scheduler = False
self._scheduler = EDFScheduler()

# Placement information maintained by the servicer.
# The placements map the application IDs to the Placement retrieved from the
Expand Down Expand Up @@ -97,34 +100,34 @@ async def schedule(self) -> None:
# TODO (Sukrit): Change this to a better implementation.
# Let's do some simple scheduling for now, that gives a fixed number of
# executors to all the available applications in intervals of 10 seconds.
if len(self._workload.task_graphs) > 0:
tasks = self._workload.get_schedulable_tasks(
current_time, worker_pools=self._worker_pools
)
self._logger.info(
"Found %s tasks that can be scheduled at %s: %s",
len(tasks),
current_time,
[task.unique_name for task in tasks],
)
if len(tasks) > 0:
task = tasks[0]
strategy = task.available_execution_strategies.get_fastest_strategy()
placement = Placement(
type=Placement.PlacementType.PLACE_TASK,
computation=tasks[0],
placement_time=EventTime(int(time.time()) + 5, EventTime.Unit.S),
worker_pool_id=self._worker_pool.id,
worker_id=self._worker_pool.workers[0].name,
strategy=strategy,
)
self._placements[task.task_graph].append(placement)
task.schedule(
if len(self._workload.task_graphs) >= 2:
placements = self._scheduler.schedule(
sim_time=current_time,
workload=self._workload,
worker_pools=self._worker_pools,
)
# Filter the placements that are not of type PLACE_TASK and that have not
# been placed.
filtered_placements = filter(
lambda p: p.placement_type == Placement.PlacementType.PLACE_TASK
and p.is_placed(),
placements,
)
for placement in sorted(
filtered_placements, key=attrgetter("placement_time")
):
self._placements[placement.task.task_graph].append(placement)
# Schedule the task here since marking it as running requires it to be
# scheduled before. We mark it to be running when we inform the
# framework of the placement.
placement.task.schedule(
time=placement.placement_time,
placement=placement,
)

self._logger.info("Finished a scheduling cycle.")
self._logger.info(
"Finished the scheduling cycle initiated at %s.", current_time
)

# Check if another run of the Scheduler has been requested, and if so, create
# a task for it. Otherwise, mark the scheduler as not running.
Expand Down Expand Up @@ -360,7 +363,7 @@ async def RegisterTaskGraph(self, request, context):
ExecutionStrategy(
resources=Resources(
resource_vector={
Resource(name="Slot_CPU", _id="any"): 1
Resource(name="Slot_CPU", _id="any"): 30
}
),
batch_size=1,
Expand Down Expand Up @@ -648,15 +651,12 @@ async def GetPlacements(self, request, context):
cores=1,
)
)
self._logger.info(
"Currently %s placements, clipping at %s.", len(placements), clip_at
)
self._placements[request.id] = self._placements[request.id][clip_at + 1 :]
self._logger.info(
"Clipped placements length: %s", len(self._placements[request.id])
)
self._logger.info(
"Constructed %s placements at time %s.", len(placements), request.timestamp
"Constructed %s placements at time %s for application with ID %s.",
len(placements),
request.timestamp,
request.id,
)
return erdos_scheduler_pb2.GetPlacementsResponse(
success=True,
Expand Down
25 changes: 11 additions & 14 deletions schedulers/edf_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def schedule(

for worker_pool in schedulable_worker_pools.worker_pools:
self._logger.debug(
f"[{sim_time.to(EventTime.Unit.US).time}] The state of {worker_pool} "
f"[{sim_time.time}] The state of {worker_pool} "
f"is:{os.linesep} {os.linesep.join(worker_pool.get_utilization())}"
)

Expand All @@ -86,8 +86,7 @@ def schedule(
f"{task.unique_name} ({task.deadline})" for task in ordered_tasks
]
self._logger.debug(
f"[{sim_time.to(EventTime.Unit.US).time}] The order of the "
f"tasks is {task_descriptions}."
f"[{sim_time.time}] The order of the tasks is {task_descriptions}."
)

# Run the scheduling loop.
Expand All @@ -99,8 +98,8 @@ def schedule(
placements = []
for task in ordered_tasks:
self._logger.debug(
f"[{sim_time.to(EventTime.Unit.US).time}] EDFScheduler trying to "
f"schedule {task} with the available execution strategies: "
f"[{sim_time.time}] EDFScheduler trying to schedule {task} with the "
f"available execution strategies: "
f"{task.available_execution_strategies}."
)

Expand All @@ -117,9 +116,9 @@ def schedule(
self._logger.debug(
"[%s] Task %s has a deadline of %s, which has been missed. "
"Cancelling the task.",
sim_time.to(EventTime.Unit.US).time,
sim_time.time,
task,
task.deadline.to(EventTime.Unit.US).time,
task.deadline.time,
)
continue

Expand All @@ -141,10 +140,9 @@ def schedule(
)
)
self._logger.debug(
f"[{sim_time.to(EventTime.Unit.US).time}] Placed {task} on "
f"Worker Pool ({worker_pool.id}) to be started at "
f"{sim_time} with the execution strategy: "
f"{execution_strategy}."
f"[{sim_time.time}] Placed {task} on Worker Pool "
f"({worker_pool.id}) to be started at {sim_time} with the "
f"execution strategy: {execution_strategy}."
)
break
if is_task_placed:
Expand All @@ -153,15 +151,14 @@ def schedule(
if is_task_placed:
for worker_pool in schedulable_worker_pools.worker_pools:
self._logger.debug(
f"[{sim_time.to(EventTime.Unit.US).time}] The state of "
f"{worker_pool} is:{os.linesep}"
f"[{sim_time.time}] The state of {worker_pool} is:{os.linesep}"
f"{os.linesep.join(worker_pool.get_utilization())}"
)
else:
self._logger.debug(
"[%s] Failed to place %s because no worker pool "
"could accomodate the resource requirements.",
sim_time.to(EventTime.Unit.US).time,
sim_time.time,
task,
)
placements.append(Placement.create_task_placement(task=task))
Expand Down

0 comments on commit 4dad5f9

Please sign in to comment.