diff --git a/rpc/protos/erdos_scheduler.proto b/rpc/protos/erdos_scheduler.proto index ac69620b..4645cf19 100644 --- a/rpc/protos/erdos_scheduler.proto +++ b/rpc/protos/erdos_scheduler.proto @@ -17,6 +17,8 @@ service SchedulerService { /// This is meant for the framework to signal its intent to shut down. rpc DeregisterFramework(DeregisterFrameworkRequest) returns (DeregisterFrameworkResponse) {} + /// Registers a new Worker with the backend scheduler. + /// This is invoked whenever a new Worker joins the framework. rpc RegisterWorker(RegisterWorkerRequest) returns (RegisterWorkerResponse) {} } diff --git a/rpc/service.py b/rpc/service.py index d328803b..a5e60671 100644 --- a/rpc/service.py +++ b/rpc/service.py @@ -1,6 +1,7 @@ import os import sys from concurrent import futures +from urllib.parse import urlparse sys.path.append( os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)) @@ -12,6 +13,8 @@ from absl import app, flags from utils import setup_logging +from workers import Worker, WorkerPool +from workload import Resource, Resources FLAGS = flags.FLAGS @@ -32,6 +35,10 @@ def __init__(self) -> None: self._initialized = False self._initialization_time = -1 self._master_uri = None + + # The simulator types maintained by the Servicer. + self._worker_pool = None + super().__init__() def RegisterFramework(self, request, context): @@ -62,6 +69,12 @@ def RegisterFramework(self, request, context): self._master_uri, self._initialization_time, ) + + # Setup the simulator types. + parsed_uri = urlparse(self._master_uri) + self._worker_pool = WorkerPool(name=f"WorkerPool_{parsed_uri.netloc}") + + # Return the response. return erdos_scheduler_pb2.RegisterFrameworkResponse( success=True, message=f"{framework_name} at {self._master_uri} registered successfully!", @@ -102,6 +115,47 @@ def DeregisterFramework(self, request, context): message=f"Framework at {request.uri} deregistered successfully!", ) + def RegisterWorker(self, request, context): + """Registers a new worker with the backend scheduler.""" + if not self._initialized: + self._logger.warning( + "Trying to register a worker with name %s and id %s, " + "but no framework is registered yet.", + request.name, + request.id, + ) + return erdos_scheduler_pb2.RegisterWorkerResponse( + success=False, message="Framework not registered yet." + ) + + # First, we construct the Resources with the given size. + # TODO (Sukrit): Right now, we drop the memory requirements, we should use + # them to do multi-dimensional packing using STRL. + cpu_resource = Resource(name="Slot_CPU") + worker_resources = Resources(resource_vector={cpu_resource: request.cores}) + self._logger.debug( + "Successfully constructed the resources for the worker %s: %s.", + request.name, + worker_resources, + ) + + # Construct a new Worker instance, and add it to the WorkerPool. + worker = Worker( + name=request.id, + resources=worker_resources, + ) + self._worker_pool.add_workers([worker]) + + self._logger.info( + "Registering worker with name %s, and resources %s.", + worker.name, + worker_resources, + ) + + return erdos_scheduler_pb2.RegisterWorkerResponse( + success=True, message=f"Worker {request.name} registered successfully!" + ) + def serve(args): """Serves the ERDOS Scheduling RPC Server."""