From ee733666ffc7ce53c8cb3483425810bd8ee9bdc4 Mon Sep 17 00:00:00 2001 From: Dhruv Garg Date: Thu, 21 Mar 2024 22:50:17 -0400 Subject: [PATCH 1/5] Enforcing correct task completion time using priority queue --- rpc/service.py | 113 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 104 insertions(+), 9 deletions(-) diff --git a/rpc/service.py b/rpc/service.py index 8934d80c..4687cddd 100644 --- a/rpc/service.py +++ b/rpc/service.py @@ -1,4 +1,5 @@ import asyncio +import heapq import os import sys import time @@ -67,6 +68,29 @@ ) +# Define an item containing completion timestamp and task +class TimedItem: + def __init__(self, timestamp, task): + self.timestamp = timestamp + self.task = task + + +# Define a priority queue based on heapq module +class PriorityQueue: + def __init__(self): + self._queue = [] + + def put(self, item): + heapq.heappush(self._queue, (item.timestamp, item)) + + def get(self): + _, item = heapq.heappop(self._queue) + return item + + def empty(self): + return len(self._queue) == 0 + + # Implement the service. class SchedulerServiceServicer(erdos_scheduler_pb2_grpc.SchedulerServiceServicer): def __init__(self) -> None: @@ -101,6 +125,12 @@ def __init__(self) -> None: # NOTE (Sukrit): This must always be sorted by the Placement time. self._placements: Mapping[str, Sequence[Placement]] = defaultdict(list) + # Additional task information maintained by the servicer + self._tasks_marked_for_completion = PriorityQueue() + + # Start the asyncio loop for clearing out pending tasks for completion + asyncio.create_task(self.PopTasksBasedOnTime()) + super().__init__() async def schedule(self) -> None: @@ -404,7 +434,7 @@ async def RegisterTaskGraph(self, request, context): # Construct all the Tasks for the TaskGraph. task_ids_to_task: Mapping[int, Task] = {} default_resource = Resources( - resource_vector={Resource(name="Slot_CPU", _id="any"): 30} + resource_vector={Resource(name="Slot_CPU", _id="any"): 20} ) default_runtime = EventTime(20, EventTime.Unit.US) @@ -684,17 +714,41 @@ async def NotifyTaskCompletion(self, request, context): message=f"Task with ID {request.task_id} " f"not found in TaskGraph {request.application_id}.", ) + + # Instead of completing & removing the task immediately, check + # if it is actually complete or will complete in the future - # Mark the Task as completed. - matched_task.update_remaining_time(EventTime.zero()) - matched_task.finish(EventTime(request.timestamp, EventTime.Unit.S)) + # Get the actual task completion timestamp + actual_task_completion_time = ( + matched_task.start_time.time + matched_task.remaining_time.time + ) - # Run the scheduler since the Workload has changed. - await self.run_scheduler() + self._logger.info( + "Received task for completion at time: %s , task.start_time: %s ," + "task.remaining_time (=runtime): %s , actual completion time: %s ", + time.time(), + matched_task.start_time.time, + matched_task.remaining_time.time, + actual_task_completion_time, + ) + + # TODO DG: remaining_time assumes execution of the slowest strategy + # Should be updated to reflect correct remaining_time based on chosen strategy? + + # Add all tasks to _tasks_marked_for_completion queue. + # If task has actually completed, it will be dequeued immediately + # Else it will be dequeued at its actual task completion time + self._tasks_marked_for_completion.put( + TimedItem(actual_task_completion_time, matched_task) + ) + + # NOTE: task.finish() and run_scheduler() invocations are postponed + # until it is time for the task to be actually marked as complete. return erdos_scheduler_pb2.NotifyTaskCompletionResponse( success=True, - message=f"Task with ID {request.task_id} completed successfully!", + message=f"Task with ID {request.task_id} completed successfully! " + f"Will be removed based on actual_task_completion_time", ) async def GetPlacements(self, request, context): @@ -751,6 +805,42 @@ async def GetPlacements(self, request, context): f"placements at time {request.timestamp}.", ) + # Function to pop tasks from queue based on actual completion time + async def PopTasksBasedOnTime(self): + while True: + if not self._tasks_marked_for_completion.empty(): + # Get the top item from the priority queue + top_item = self._tasks_marked_for_completion._queue[0][1] + + # Check if top item's timestamp is reached or passed by current time + current_time = time.time() + if top_item.timestamp <= current_time: + # Pop the top item + popped_item = self._tasks_marked_for_completion.get() + self._logger.info( + "Removing tasks from pending completion queue: %s at time: %s", + popped_item.task, + current_time, + ) + + # Mark the Task as completed. + # Also release the task from the scheduler service + popped_item.task.update_remaining_time(EventTime.zero()) + popped_item.task.finish( + EventTime(round(current_time), EventTime.Unit.S) + ) + + # Run the scheduler since the Workload has changed. + await self.run_scheduler() + + else: + # If the top item's timestamp hasn't been reached yet, + # sleep for a short duration + await asyncio.sleep(0.1) # TODO: Can adjust value, curr=0.1s + else: + # If the queue is empty, sleep for a short duration + await asyncio.sleep(0.1) # TODO: Can adjust value, curr=0.1s + async def serve(): """Serves the ERDOS Scheduling RPC Server.""" @@ -768,9 +858,14 @@ async def serve(): def main(argv): + # Create an asyncio event loop loop = asyncio.get_event_loop() - loop.run_until_complete(serve()) - loop.close() + + # Run the event loop until serve() completes + try: + loop.run_until_complete(serve()) + finally: + loop.close() if __name__ == "__main__": From 89f71bbd18956551728261545c8a726088050608 Mon Sep 17 00:00:00 2001 From: Dhruv Garg Date: Thu, 21 Mar 2024 22:50:17 -0400 Subject: [PATCH 2/5] Enforcing correct task completion time using priority queue --- rpc/service.py | 113 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 104 insertions(+), 9 deletions(-) diff --git a/rpc/service.py b/rpc/service.py index 8934d80c..4687cddd 100644 --- a/rpc/service.py +++ b/rpc/service.py @@ -1,4 +1,5 @@ import asyncio +import heapq import os import sys import time @@ -67,6 +68,29 @@ ) +# Define an item containing completion timestamp and task +class TimedItem: + def __init__(self, timestamp, task): + self.timestamp = timestamp + self.task = task + + +# Define a priority queue based on heapq module +class PriorityQueue: + def __init__(self): + self._queue = [] + + def put(self, item): + heapq.heappush(self._queue, (item.timestamp, item)) + + def get(self): + _, item = heapq.heappop(self._queue) + return item + + def empty(self): + return len(self._queue) == 0 + + # Implement the service. class SchedulerServiceServicer(erdos_scheduler_pb2_grpc.SchedulerServiceServicer): def __init__(self) -> None: @@ -101,6 +125,12 @@ def __init__(self) -> None: # NOTE (Sukrit): This must always be sorted by the Placement time. self._placements: Mapping[str, Sequence[Placement]] = defaultdict(list) + # Additional task information maintained by the servicer + self._tasks_marked_for_completion = PriorityQueue() + + # Start the asyncio loop for clearing out pending tasks for completion + asyncio.create_task(self.PopTasksBasedOnTime()) + super().__init__() async def schedule(self) -> None: @@ -404,7 +434,7 @@ async def RegisterTaskGraph(self, request, context): # Construct all the Tasks for the TaskGraph. task_ids_to_task: Mapping[int, Task] = {} default_resource = Resources( - resource_vector={Resource(name="Slot_CPU", _id="any"): 30} + resource_vector={Resource(name="Slot_CPU", _id="any"): 20} ) default_runtime = EventTime(20, EventTime.Unit.US) @@ -684,17 +714,41 @@ async def NotifyTaskCompletion(self, request, context): message=f"Task with ID {request.task_id} " f"not found in TaskGraph {request.application_id}.", ) + + # Instead of completing & removing the task immediately, check + # if it is actually complete or will complete in the future - # Mark the Task as completed. - matched_task.update_remaining_time(EventTime.zero()) - matched_task.finish(EventTime(request.timestamp, EventTime.Unit.S)) + # Get the actual task completion timestamp + actual_task_completion_time = ( + matched_task.start_time.time + matched_task.remaining_time.time + ) - # Run the scheduler since the Workload has changed. - await self.run_scheduler() + self._logger.info( + "Received task for completion at time: %s , task.start_time: %s ," + "task.remaining_time (=runtime): %s , actual completion time: %s ", + time.time(), + matched_task.start_time.time, + matched_task.remaining_time.time, + actual_task_completion_time, + ) + + # TODO DG: remaining_time assumes execution of the slowest strategy + # Should be updated to reflect correct remaining_time based on chosen strategy? + + # Add all tasks to _tasks_marked_for_completion queue. + # If task has actually completed, it will be dequeued immediately + # Else it will be dequeued at its actual task completion time + self._tasks_marked_for_completion.put( + TimedItem(actual_task_completion_time, matched_task) + ) + + # NOTE: task.finish() and run_scheduler() invocations are postponed + # until it is time for the task to be actually marked as complete. return erdos_scheduler_pb2.NotifyTaskCompletionResponse( success=True, - message=f"Task with ID {request.task_id} completed successfully!", + message=f"Task with ID {request.task_id} completed successfully! " + f"Will be removed based on actual_task_completion_time", ) async def GetPlacements(self, request, context): @@ -751,6 +805,42 @@ async def GetPlacements(self, request, context): f"placements at time {request.timestamp}.", ) + # Function to pop tasks from queue based on actual completion time + async def PopTasksBasedOnTime(self): + while True: + if not self._tasks_marked_for_completion.empty(): + # Get the top item from the priority queue + top_item = self._tasks_marked_for_completion._queue[0][1] + + # Check if top item's timestamp is reached or passed by current time + current_time = time.time() + if top_item.timestamp <= current_time: + # Pop the top item + popped_item = self._tasks_marked_for_completion.get() + self._logger.info( + "Removing tasks from pending completion queue: %s at time: %s", + popped_item.task, + current_time, + ) + + # Mark the Task as completed. + # Also release the task from the scheduler service + popped_item.task.update_remaining_time(EventTime.zero()) + popped_item.task.finish( + EventTime(round(current_time), EventTime.Unit.S) + ) + + # Run the scheduler since the Workload has changed. + await self.run_scheduler() + + else: + # If the top item's timestamp hasn't been reached yet, + # sleep for a short duration + await asyncio.sleep(0.1) # TODO: Can adjust value, curr=0.1s + else: + # If the queue is empty, sleep for a short duration + await asyncio.sleep(0.1) # TODO: Can adjust value, curr=0.1s + async def serve(): """Serves the ERDOS Scheduling RPC Server.""" @@ -768,9 +858,14 @@ async def serve(): def main(argv): + # Create an asyncio event loop loop = asyncio.get_event_loop() - loop.run_until_complete(serve()) - loop.close() + + # Run the event loop until serve() completes + try: + loop.run_until_complete(serve()) + finally: + loop.close() if __name__ == "__main__": From 5d6ed709eed2acd933dfddfc81d3a319f36558c7 Mon Sep 17 00:00:00 2001 From: Dhruv Garg Date: Sat, 23 Mar 2024 16:02:20 -0400 Subject: [PATCH 3/5] Addressing sukrits comments on the PR --- rpc/service.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/rpc/service.py b/rpc/service.py index 4687cddd..248d62d7 100644 --- a/rpc/service.py +++ b/rpc/service.py @@ -723,10 +723,11 @@ async def NotifyTaskCompletion(self, request, context): matched_task.start_time.time + matched_task.remaining_time.time ) + current_time = time.time() self._logger.info( "Received task for completion at time: %s , task.start_time: %s ," "task.remaining_time (=runtime): %s , actual completion time: %s ", - time.time(), + round(current_time), matched_task.start_time.time, matched_task.remaining_time.time, actual_task_completion_time, @@ -747,8 +748,9 @@ async def NotifyTaskCompletion(self, request, context): return erdos_scheduler_pb2.NotifyTaskCompletionResponse( success=True, - message=f"Task with ID {request.task_id} completed successfully! " - f"Will be removed based on actual_task_completion_time", + message=f"Task with ID {request.task_id} marked for completion at " + f"{round(current_time)}! It will be removed on actual " + f"task completion time at {actual_task_completion_time}", ) async def GetPlacements(self, request, context): From d393ac703dd61bd7174ae9bad600d4464eb58daa Mon Sep 17 00:00:00 2001 From: Dhruv Garg Date: Sat, 23 Mar 2024 16:23:21 -0400 Subject: [PATCH 4/5] Fixing minor formatting issue --- rpc/service.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/rpc/service.py b/rpc/service.py index 248d62d7..87cca150 100644 --- a/rpc/service.py +++ b/rpc/service.py @@ -725,13 +725,13 @@ async def NotifyTaskCompletion(self, request, context): current_time = time.time() self._logger.info( - "Received task for completion at time: %s , task.start_time: %s ," - "task.remaining_time (=runtime): %s , actual completion time: %s ", - round(current_time), - matched_task.start_time.time, - matched_task.remaining_time.time, - actual_task_completion_time, - ) + "Received task for completion at time: %s , task.start_time: %s ," + "task.remaining_time (=runtime): %s , actual completion time: %s ", + round(current_time), + matched_task.start_time.time, + matched_task.remaining_time.time, + actual_task_completion_time, + ) # TODO DG: remaining_time assumes execution of the slowest strategy # Should be updated to reflect correct remaining_time based on chosen strategy? From c8b56082fa1993cc44e8ec44b43aff8556af9fbb Mon Sep 17 00:00:00 2001 From: Dhruv Garg Date: Sat, 23 Mar 2024 16:48:31 -0400 Subject: [PATCH 5/5] Fix formatting issue using pip black --- rpc/service.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rpc/service.py b/rpc/service.py index 87cca150..2aaa2dc9 100644 --- a/rpc/service.py +++ b/rpc/service.py @@ -714,7 +714,7 @@ async def NotifyTaskCompletion(self, request, context): message=f"Task with ID {request.task_id} " f"not found in TaskGraph {request.application_id}.", ) - + # Instead of completing & removing the task immediately, check # if it is actually complete or will complete in the future @@ -741,7 +741,7 @@ async def NotifyTaskCompletion(self, request, context): # Else it will be dequeued at its actual task completion time self._tasks_marked_for_completion.put( TimedItem(actual_task_completion_time, matched_task) - ) + ) # NOTE: task.finish() and run_scheduler() invocations are postponed # until it is time for the task to be actually marked as complete. @@ -830,7 +830,7 @@ async def PopTasksBasedOnTime(self): popped_item.task.update_remaining_time(EventTime.zero()) popped_item.task.finish( EventTime(round(current_time), EventTime.Unit.S) - ) + ) # Run the scheduler since the Workload has changed. await self.run_scheduler()