diff --git a/rpc/service.py b/rpc/service.py index d914641a..eb648a98 100644 --- a/rpc/service.py +++ b/rpc/service.py @@ -4,6 +4,7 @@ import time from collections import defaultdict from concurrent import futures +from operator import attrgetter from typing import Mapping, Sequence from urllib.parse import urlparse @@ -16,6 +17,7 @@ import grpc from absl import app, flags +from schedulers import EDFScheduler from utils import EventTime, setup_logging from workers import Worker, WorkerPool, WorkerPools from workload import ( @@ -66,6 +68,7 @@ def __init__(self) -> None: self._scheduler_running_lock = asyncio.Lock() self._scheduler_running = False self._rerun_scheduler = False + self._scheduler = EDFScheduler() # Placement information maintained by the servicer. # The placements map the application IDs to the Placement retrieved from the @@ -97,34 +100,34 @@ async def schedule(self) -> None: # TODO (Sukrit): Change this to a better implementation. # Let's do some simple scheduling for now, that gives a fixed number of # executors to all the available applications in intervals of 10 seconds. - if len(self._workload.task_graphs) > 0: - tasks = self._workload.get_schedulable_tasks( - current_time, worker_pools=self._worker_pools - ) - self._logger.info( - "Found %s tasks that can be scheduled at %s: %s", - len(tasks), - current_time, - [task.unique_name for task in tasks], - ) - if len(tasks) > 0: - task = tasks[0] - strategy = task.available_execution_strategies.get_fastest_strategy() - placement = Placement( - type=Placement.PlacementType.PLACE_TASK, - computation=tasks[0], - placement_time=EventTime(int(time.time()) + 5, EventTime.Unit.S), - worker_pool_id=self._worker_pool.id, - worker_id=self._worker_pool.workers[0].name, - strategy=strategy, - ) - self._placements[task.task_graph].append(placement) - task.schedule( + if len(self._workload.task_graphs) >= 2: + placements = self._scheduler.schedule( + sim_time=current_time, + workload=self._workload, + worker_pools=self._worker_pools, + ) + # Filter the placements that are not of type PLACE_TASK and that have not + # been placed. + filtered_placements = filter( + lambda p: p.placement_type == Placement.PlacementType.PLACE_TASK + and p.is_placed(), + placements, + ) + for placement in sorted( + filtered_placements, key=attrgetter("placement_time") + ): + self._placements[placement.task.task_graph].append(placement) + # Schedule the task here since marking it as running requires it to be + # scheduled before. We mark it to be running when we inform the + # framework of the placement. + placement.task.schedule( time=placement.placement_time, placement=placement, ) - self._logger.info("Finished a scheduling cycle.") + self._logger.info( + "Finished the scheduling cycle initiated at %s.", current_time + ) # Check if another run of the Scheduler has been requested, and if so, create # a task for it. Otherwise, mark the scheduler as not running. @@ -360,7 +363,7 @@ async def RegisterTaskGraph(self, request, context): ExecutionStrategy( resources=Resources( resource_vector={ - Resource(name="Slot_CPU", _id="any"): 1 + Resource(name="Slot_CPU", _id="any"): 30 } ), batch_size=1, @@ -648,15 +651,12 @@ async def GetPlacements(self, request, context): cores=1, ) ) - self._logger.info( - "Currently %s placements, clipping at %s.", len(placements), clip_at - ) self._placements[request.id] = self._placements[request.id][clip_at + 1 :] self._logger.info( - "Clipped placements length: %s", len(self._placements[request.id]) - ) - self._logger.info( - "Constructed %s placements at time %s.", len(placements), request.timestamp + "Constructed %s placements at time %s for application with ID %s.", + len(placements), + request.timestamp, + request.id, ) return erdos_scheduler_pb2.GetPlacementsResponse( success=True, diff --git a/schedulers/edf_scheduler.py b/schedulers/edf_scheduler.py index e8a98314..e8be0a88 100644 --- a/schedulers/edf_scheduler.py +++ b/schedulers/edf_scheduler.py @@ -69,7 +69,7 @@ def schedule( for worker_pool in schedulable_worker_pools.worker_pools: self._logger.debug( - f"[{sim_time.to(EventTime.Unit.US).time}] The state of {worker_pool} " + f"[{sim_time.time}] The state of {worker_pool} " f"is:{os.linesep} {os.linesep.join(worker_pool.get_utilization())}" ) @@ -86,8 +86,7 @@ def schedule( f"{task.unique_name} ({task.deadline})" for task in ordered_tasks ] self._logger.debug( - f"[{sim_time.to(EventTime.Unit.US).time}] The order of the " - f"tasks is {task_descriptions}." + f"[{sim_time.time}] The order of the tasks is {task_descriptions}." ) # Run the scheduling loop. @@ -99,8 +98,8 @@ def schedule( placements = [] for task in ordered_tasks: self._logger.debug( - f"[{sim_time.to(EventTime.Unit.US).time}] EDFScheduler trying to " - f"schedule {task} with the available execution strategies: " + f"[{sim_time.time}] EDFScheduler trying to schedule {task} with the " + f"available execution strategies: " f"{task.available_execution_strategies}." ) @@ -117,9 +116,9 @@ def schedule( self._logger.debug( "[%s] Task %s has a deadline of %s, which has been missed. " "Cancelling the task.", - sim_time.to(EventTime.Unit.US).time, + sim_time.time, task, - task.deadline.to(EventTime.Unit.US).time, + task.deadline.time, ) continue @@ -141,10 +140,9 @@ def schedule( ) ) self._logger.debug( - f"[{sim_time.to(EventTime.Unit.US).time}] Placed {task} on " - f"Worker Pool ({worker_pool.id}) to be started at " - f"{sim_time} with the execution strategy: " - f"{execution_strategy}." + f"[{sim_time.time}] Placed {task} on Worker Pool " + f"({worker_pool.id}) to be started at {sim_time} with the " + f"execution strategy: {execution_strategy}." ) break if is_task_placed: @@ -153,15 +151,14 @@ def schedule( if is_task_placed: for worker_pool in schedulable_worker_pools.worker_pools: self._logger.debug( - f"[{sim_time.to(EventTime.Unit.US).time}] The state of " - f"{worker_pool} is:{os.linesep}" + f"[{sim_time.time}] The state of {worker_pool} is:{os.linesep}" f"{os.linesep.join(worker_pool.get_utilization())}" ) else: self._logger.debug( "[%s] Failed to place %s because no worker pool " "could accomodate the resource requirements.", - sim_time.to(EventTime.Unit.US).time, + sim_time.time, task, ) placements.append(Placement.create_task_placement(task=task))