diff --git a/rpc/service.py b/rpc/service.py index c35f5e49..ca069f46 100644 --- a/rpc/service.py +++ b/rpc/service.py @@ -71,8 +71,8 @@ class DataLoader(Enum): class WorkloadLoader(BaseWorkloadLoader): - def __init__(self) -> None: - self._workload = Workload.empty() + def __init__(self, _flags) -> None: + self._workload = Workload.empty(_flags) def add_task_graph(self, task_graph: TaskGraph): self._workload.add_task_graph(task_graph) @@ -249,7 +249,7 @@ async def RegisterFramework(self, request, context): name=f"WorkerPool_{parsed_uri.netloc}", _logger=self._logger, ) - self._workload_loader = WorkloadLoader() + self._workload_loader = WorkloadLoader(FLAGS) # Enable orchestrated mode FLAGS.orchestrated = True @@ -298,6 +298,14 @@ async def DeregisterFramework(self, request, context): async def RegisterDriver(self, request, context): stime = self.__stime() + if not self.__worker_registered(): + msg = f"[{stime}] Failed to register driver (id={request.id}) because no worker has been registered yet." + self._logger.error(msg) + return erdos_scheduler_pb2.RegisterDriverResponse( + success=False, + message=msg, + ) + if request.id in self._registered_app_drivers: msg = f"[{stime}] Driver with id '{request.id}' is already registered" self._logger.error(msg) @@ -337,6 +345,14 @@ async def DeregisterDriver(self, request, context): task_graph_name = self._registered_app_drivers[request.id] del self._registered_app_drivers[request.id] + # Log stats + log_stats_event = Event( + event_type=EventType.LOG_STATS, + time=stime, + ) + with self._lock: + self._simulator._event_queue.add_event(log_stats_event) + msg = f"[{stime}] Successfully de-registered driver with id {request.id} for task graph {task_graph_name}" self._logger.info(msg) return erdos_scheduler_pb2.DeregisterDriverResponse( diff --git a/simulator.py b/simulator.py index 7a8ef63b..98b97a9e 100644 --- a/simulator.py +++ b/simulator.py @@ -1138,6 +1138,7 @@ def __handle_task_cancellation(self, event: Event) -> None: f"{event.task.timestamp},{event.task.id},{event.task.task_graph}," f"{event.task.slowest_execution_strategy.runtime.time}" ) + self.log_stats(event.time) # If the task already had a placement, we remove the placement from our queue. if event.task.id in self._future_placement_events: @@ -1255,8 +1256,12 @@ def __handle_task_finished(self, event: Event) -> None: f"{task_graph.deadline.to(EventTime.Unit.US).time}," f"{tardiness.to(EventTime.Unit.US).time}" ) + if task_graph.deadline < event.time: self._missed_task_graph_deadlines += 1 + + self.log_stats(event.time) + self._logger.info( "[%s] Finished the TaskGraph %s with a deadline %s at the " "completion of the task %s with a tardiness of %s.", @@ -1764,8 +1769,7 @@ def __handle_event(self, event: Event) -> bool: self.__handle_scheduler_finish(event) elif event.event_type == EventType.SIMULATOR_END: # End of the simulator loop. - assert event.time == self._simulator_time - self.log_stats() + self.log_stats(event.time) self._csv_logger.debug( f"{event.time.time},SIMULATOR_END", ) @@ -1787,7 +1791,9 @@ def __step(self, step_size: EventTime = EventTime(1, EventTime.Unit.US)) -> None the clock (in us). """ if step_size < EventTime.zero(): - raise ValueError(f"Simulator cannot step backwards {step_size}") + raise ValueError( + f"[{self._simulator_time}] Simulator cannot step backwards {step_size}" + ) # Step the simulator for the required steps and construct TASK_FINISHED events # for any tasks that were able to complete their execution. @@ -2156,9 +2162,9 @@ def __log_utilization(self, sim_time: EventTime): f"{worker_pool_resources.get_available_quantity(resource)}" ) - def log_stats(self): + def log_stats(self, sim_time: EventTime): self._csv_logger.debug( - f"{self._simulator_time.time},LOG_STATS,{self._finished_tasks}," + f"{sim_time.time},LOG_STATS,{self._finished_tasks}," f"{self._cancelled_tasks},{self._missed_task_deadlines}," f"{self._finished_task_graphs}," f"{len(self._workload.get_cancelled_task_graphs())},"