Skip to content

Commit

Permalink
[ERDOS] Updating proto and service to register application from frame…
Browse files Browse the repository at this point in the history
…work
  • Loading branch information
dhruvsgarg committed Feb 13, 2024
1 parent a768b8b commit b3ca31e
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 1 deletion.
20 changes: 19 additions & 1 deletion rpc/protos/erdos_scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ service SchedulerService {
// itself with the backend scheduler, and is intended as an EHLO.
rpc RegisterFramework(RegisterFrameworkRequest) returns (RegisterFrameworkResponse) {}

// Registers a new application with the backend scheduler.
// This is the entry point for a new taskgraph (application) to register
// itself with the backend scheduler, and is intended as an EHLO.
rpc RegisterTaskGraph(RegisterTaskGraphRequest) returns (RegisterTaskGraphResponse) {}

/// Deregisters the framework with the backend scheduler.
/// This is meant for the framework to signal its intent to shut down.
rpc DeregisterFramework(DeregisterFrameworkRequest) returns (DeregisterFrameworkResponse) {}
Expand Down Expand Up @@ -49,6 +54,19 @@ message DeregisterFrameworkResponse {
string message = 2;
}

/// Application related RPC structures.
/// The following structures represent the data that an application (TaskGraph)
/// must pass to the scheduling backend.
message RegisterTaskGraphRequest {
string id = 1;
string name = 2;
}

message RegisterTaskGraphResponse {
bool success = 1;
string message = 2;
}

/// Worker related RPC structures.
/// The following structures represent the data that a Worker's addition to the
/// framework must pass to the scheduling backend. This may involve things like
Expand All @@ -64,4 +82,4 @@ message RegisterWorkerRequest {
message RegisterWorkerResponse {
bool success = 1;
string message = 2;
}
}
41 changes: 41 additions & 0 deletions rpc/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
from concurrent import futures
from urllib.parse import urlparse
import time

sys.path.append(
os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir))
Expand Down Expand Up @@ -39,6 +40,9 @@ def __init__(self) -> None:
# The simulator types maintained by the Servicer.
self._worker_pool = None

# Application (TaskGraph) information maintained by the Servicer.
self._all_task_graphs = {}

super().__init__()

def RegisterFramework(self, request, context):
Expand Down Expand Up @@ -79,6 +83,43 @@ def RegisterFramework(self, request, context):
success=True,
message=f"{framework_name} at {self._master_uri} registered successfully!",
)

def RegisterTaskGraph(self, request, context):
"""Registers a new TaskGraph with the backend scheduler.
This is the entry point for a new application of Spark to register
itself with the backend scheduler, and is intended as an EHLO.
"""
app_id = request.id
app_name = request.name
received_ts = time.time()
if app_id in self._all_task_graphs:
self._logger.warning(
"Registration failed for app_id %s and name %s. Was already registered!",
app_id,
app_name,
)
return erdos_scheduler_pb2.RegisterTaskGraphResponse(
success=False,
message=f"Application ID {app_id} with name {app_name} already registered!"
)

# Setup a new TaskGraph (application).
self._logger.info(
"Registering application ID %s with name %s at received_ts %s",
app_id,
app_name,
received_ts,
)

# Setup application information for servicer.
new_application = {"app_id": app_id, "app_name": app_name, "received_ts": received_ts}
self._all_task_graphs[app_id] = new_application

# Return the response.
return erdos_scheduler_pb2.RegisterTaskGraphResponse(
success=True,
message=f"Application ID {app_id} with name {app_name} registered successfully!",
)

def DeregisterFramework(self, request, context):
"""Deregisters the framework with the backend scheduler.
Expand Down

0 comments on commit b3ca31e

Please sign in to comment.