Skip to content

Commit

Permalink
[RPC] Implement DeregisterFramework RPC call.
Browse files Browse the repository at this point in the history
  • Loading branch information
sukritkalra committed Feb 12, 2024
1 parent 0444da9 commit 1761d67
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
15 changes: 15 additions & 0 deletions rpc/protos/erdos_scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ service SchedulerService {
// This is the entry point for a new instance of Spark / Flink to register
// itself with the backend scheduler, and is intended as an EHLO.
rpc RegisterFramework(RegisterFrameworkRequest) returns (RegisterFrameworkResponse) {}

/// Deregisters the framework with the backend scheduler.
/// This is meant for the framework to signal its intent to shut down.
rpc DeregisterFramework(DeregisterFrameworkRequest) returns (DeregisterFrameworkResponse) {}
}

message RegisterFrameworkRequest {
Expand All @@ -24,3 +28,14 @@ message RegisterFrameworkResponse {
bool success = 1;
string message = 2;
}

message DeregisterFrameworkRequest {
string name = 1;
string uri = 2;
uint64 timestamp = 3;
}

message DeregisterFrameworkResponse {
bool success = 1;
string message = 2;
}
48 changes: 47 additions & 1 deletion rpc/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ def RegisterFramework(self, request, context):
itself with the backend scheduler, and is intended as an EHLO.
"""
if self._initialized:
self._logger.warning(
"Framework already registered at %s with the address %s",
self._initialization_time,
self._master_uri,
)
return erdos_scheduler_pb2.RegisterFrameworkResponse(
success=False,
message=f"Framework already registered at "
Expand All @@ -50,12 +55,53 @@ def RegisterFramework(self, request, context):
framework_name = request.name
self._master_uri = request.uri
self._initialization_time = request.timestamp
print("Registering framework %s with URI %s", framework_name, self._master_uri)
self._initialized = True
self._logger.info(
"Registering framework %s with URI %s at %s",
framework_name,
self._master_uri,
self._initialization_time,
)
return erdos_scheduler_pb2.RegisterFrameworkResponse(
success=True,
message=f"{framework_name} at {self._master_uri} registered successfully!",
)

def DeregisterFramework(self, request, context):
"""Deregisters the framework with the backend scheduler.
This is the exit point for a running instance of Spark / Flink to deregister"""
if not self._initialized:
self._logger.warning(
"Trying to deregister the framework at %s, "
"but no framework is registered yet.",
request.uri,
)
return erdos_scheduler_pb2.DeregisterFrameworkResponse(
success=False, message="Framework not registered yet."
)

if not self._master_uri == request.uri:
self._logger.warning(
"Trying to deregister the framework at %s, "
"but the registered framework is at %s.",
request.uri,
self._master_uri,
)
return erdos_scheduler_pb2.DeregisterFrameworkResponse(
success=False,
message=f"Framework not registered at {request.uri} yet.",
)

# Deregister the framework.
self._initialization_time = None
self._master_uri = None
self._initialized = False
self._logger.info("Deregistering framework at %s", request.uri)
return erdos_scheduler_pb2.DeregisterFrameworkResponse(
success=True,
message=f"Framework at {request.uri} deregistered successfully!",
)


def serve(args):
"""Serves the ERDOS Scheduling RPC Server."""
Expand Down

0 comments on commit 1761d67

Please sign in to comment.