Skip to content

Commit

Permalink
Spark service: Correctly log stats on shutdown
Browse files Browse the repository at this point in the history
When the last application is deregistered from the spark service,
execute all remaining events from the simulator.  This allows the
final LOG_STATS event to be processed so we can calculate the SLO
attainment.

Unlike normal runs of the simulator, a SIMULATOR_END event is not
inserted as some tasks might not have finished in the simulator and it's
unclear when they will finish.  The simulator is patched to allow an
empty event queue in Simulator.simulate().
  • Loading branch information
rohanbafna committed Dec 15, 2024
1 parent 51d6f46 commit 3491d59
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
17 changes: 13 additions & 4 deletions rpc/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ def __init__(self, server) -> None:
self._registered_app_drivers = (
{}
) # Spark driver id differs from taskgraph name (application id)
self._shutdown = False
self._received_shutdown = False
self._shutting_down = False
self._lock = threading.Lock()

super().__init__()
Expand Down Expand Up @@ -361,7 +362,15 @@ async def DeregisterDriver(self, request, context):
msg = f"[{stime}] Successfully de-registered driver with id {request.id} for task graph {task_graph_name}"
self._logger.info(msg)

if len(self._registered_app_drivers) == 0 and self._shutdown:
if len(self._registered_app_drivers) == 0 and self._received_shutdown:
self._logger.info(f"[{stime}] The last driver has been deregistered; finishing simulation")
# Signals _tick_simulator() to stop. Shouldn't be
# necessary in principle because after the with block
# ends, there shouldn't be any more events left to run,
# but doesn't hurt.
self._shutting_down = True
with self._lock:
self._simulator.simulate()
await self._server.stop(0)

return erdos_scheduler_pb2.DeregisterDriverResponse(
Expand Down Expand Up @@ -777,11 +786,11 @@ async def NotifyTaskCompletion(self, request, context):
)

async def Shutdown(self, request, context):
self._shutdown = True
self._received_shutdown = True
return erdos_scheduler_pb2.Empty()

async def _tick_simulator(self):
while True:
while not self._shutting_down:
with self._lock:
if self._simulator is not None:
stime = self.__stime()
Expand Down
2 changes: 1 addition & 1 deletion simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ def f():
step_size = time_until_next_event
else:
step_size = time_until_next_event
return step_size
return None if time_until_next_event.is_invalid() else step_size

self.__simulate_f(should_step=f)

Expand Down

0 comments on commit 3491d59

Please sign in to comment.