Skip to content

Commit

Permalink
[RPC] Initial implementation of Driver deregistration.
Browse files Browse the repository at this point in the history
  • Loading branch information
sukritkalra committed Feb 13, 2024
1 parent 014901c commit b351193
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 5 deletions.
3 changes: 1 addition & 2 deletions rpc/protos/erdos_scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,9 @@ message RegisterDriverRequest {
message RegisterDriverResponse {
bool success = 1;
string message = 2;
uint64 timestamp = 3;
// The following fields are used to communicate the worker that the driver
// has been scheduled on.
string worker_id = 4;
string worker_id = 3;
}

message DeregisterDriverRequest {
Expand Down
56 changes: 53 additions & 3 deletions rpc/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ExecutionStrategies,
ExecutionStrategy,
Job,
Placement,
Resource,
Resources,
Task,
Expand Down Expand Up @@ -133,8 +134,9 @@ def RegisterDriver(self, request, context):
resources=driver_resources,
batch_size=1,
# NOTE (Sukrit): Drivers are long running, and have no
# fixed runtime. We set it to invalid to indicate this.
runtime=EventTime.invalid(),
# fixed runtime. Setting it to zero helps us unload the
# driver from the Worker whenever we need it.
runtime=EventTime.zero(),
)
]
),
Expand All @@ -155,7 +157,22 @@ def RegisterDriver(self, request, context):
if worker.can_accomodate_strategy(execution_strategy):
# This Worker can accomodate the Driver, we assign it here.
placement_found = True
worker.place_task(driver, execution_strategy)
self._worker_pool.place_task(driver, execution_strategy, worker.id)

# Update the Task's state and placement information.
driver.schedule(
time=EventTime(request.timestamp, EventTime.Unit.S),
placement=Placement(
type=Placement.PlacementType.PLACE_TASK,
computation=driver,
worker_pool_id=self._worker_pool.id,
worker_id=worker.id,
strategy=execution_strategy,
),
)
driver.start()

# Tell the framework to start the driver.
return erdos_scheduler_pb2.RegisterDriverResponse(
success=True,
message=f"Driver {request.id} registered successfully!",
Expand All @@ -171,6 +188,39 @@ def RegisterDriver(self, request, context):
worker_id="",
)

def DeregisterDriver(self, request, context):
if not self._initialized:
self._logger.warning(
"Trying to deregister a driver with id %s, "
"but no framework is registered yet.",
request.id,
)
return erdos_scheduler_pb2.DeregisterDriverResponse(
success=False, message="Framework not registered yet."
)

if request.id not in self._drivers:
self._logger.warning(
"Trying to deregister a driver with id %s, "
"but no driver with that id is registered.",
request.id,
)
return erdos_scheduler_pb2.DeregisterDriverResponse(
success=False,
message=f"Driver with id {request.id} not registered yet.",
)

# Deregister the driver.
driver = self._drivers[request.id]
completion_time = EventTime(request.timestamp, EventTime.Unit.S)
self._worker_pool.remove_task(completion_time, driver)
driver.finish(completion_time)
del self._drivers[request.id]
return erdos_scheduler_pb2.DeregisterDriverResponse(
success=True,
message=f"Driver with id {request.id} deregistered successfully!",
)

def RegisterTaskGraph(self, request, context):
"""Registers a new TaskGraph with the backend scheduler.
This is the entry point for a new application of Spark to register
Expand Down

0 comments on commit b351193

Please sign in to comment.