diff --git a/rpc/protos/erdos_scheduler.proto b/rpc/protos/erdos_scheduler.proto index 4645cf19..1a274a9e 100644 --- a/rpc/protos/erdos_scheduler.proto +++ b/rpc/protos/erdos_scheduler.proto @@ -13,6 +13,11 @@ service SchedulerService { // itself with the backend scheduler, and is intended as an EHLO. rpc RegisterFramework(RegisterFrameworkRequest) returns (RegisterFrameworkResponse) {} + // Registers a new application with the backend scheduler. + // This is the entry point for a new taskgraph (application) to register + // itself with the backend scheduler, and is intended as an EHLO. + rpc RegisterTaskGraph(RegisterTaskGraphRequest) returns (RegisterTaskGraphResponse) {} + /// Deregisters the framework with the backend scheduler. /// This is meant for the framework to signal its intent to shut down. rpc DeregisterFramework(DeregisterFrameworkRequest) returns (DeregisterFrameworkResponse) {} @@ -49,6 +54,19 @@ message DeregisterFrameworkResponse { string message = 2; } +/// Application related RPC structures. +/// The following structures represent the data that an application (TaskGraph) +/// must pass to the scheduling backend. +message RegisterTaskGraphRequest { + string id = 1; + string name = 2; +} + +message RegisterTaskGraphResponse { + bool success = 1; + string message = 2; +} + /// Worker related RPC structures. /// The following structures represent the data that a Worker's addition to the /// framework must pass to the scheduling backend. This may involve things like @@ -64,4 +82,4 @@ message RegisterWorkerRequest { message RegisterWorkerResponse { bool success = 1; string message = 2; -} +} \ No newline at end of file diff --git a/rpc/service.py b/rpc/service.py index a5e60671..81d9d653 100644 --- a/rpc/service.py +++ b/rpc/service.py @@ -2,6 +2,7 @@ import sys from concurrent import futures from urllib.parse import urlparse +import time sys.path.append( os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)) @@ -39,6 +40,9 @@ def __init__(self) -> None: # The simulator types maintained by the Servicer. self._worker_pool = None + # Application (TaskGraph) information maintained by the Servicer. + self._all_task_graphs = {} + super().__init__() def RegisterFramework(self, request, context): @@ -79,6 +83,43 @@ def RegisterFramework(self, request, context): success=True, message=f"{framework_name} at {self._master_uri} registered successfully!", ) + + def RegisterTaskGraph(self, request, context): + """Registers a new TaskGraph with the backend scheduler. + This is the entry point for a new application of Spark to register + itself with the backend scheduler, and is intended as an EHLO. + """ + app_id = request.id + app_name = request.name + received_ts = time.time() + if app_id in self._all_task_graphs: + self._logger.warning( + "Registration failed for app_id %s and name %s. Was already registered!", + app_id, + app_name, + ) + return erdos_scheduler_pb2.RegisterTaskGraphResponse( + success=False, + message=f"Application ID {app_id} with name {app_name} already registered!" + ) + + # Setup a new TaskGraph (application). + self._logger.info( + "Registering application ID %s with name %s at received_ts %s", + app_id, + app_name, + received_ts, + ) + + # Setup application information for servicer. + new_application = {"app_id": app_id, "app_name": app_name, "received_ts": received_ts} + self._all_task_graphs[app_id] = new_application + + # Return the response. + return erdos_scheduler_pb2.RegisterTaskGraphResponse( + success=True, + message=f"Application ID {app_id} with name {app_name} registered successfully!", + ) def DeregisterFramework(self, request, context): """Deregisters the framework with the backend scheduler.