Skip to content

Commit

Permalink
update documentation in service
Browse files Browse the repository at this point in the history
  • Loading branch information
1ntEgr8 committed Nov 27, 2024
1 parent 618348a commit 15bf380
Showing 1 changed file with 37 additions and 11 deletions.
48 changes: 37 additions & 11 deletions rpc/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import asyncio
from concurrent import futures
from urllib.parse import urlparse
from typing import Optional, Dict
from typing import Optional, Dict, Callable, Tuple
from enum import Enum
from dataclasses import dataclass

Expand Down Expand Up @@ -83,30 +83,56 @@ def get_next_workload(self, current_time: EventTime) -> Optional[Workload]:

@dataclass
class RegisteredApplication:
# TODO(elton): documentation
"""
Represents a registered application that can be used to generate task
graphs. It also manages the mapping between Spark stage IDs and canonical
task IDs.
gen: any # TODO(elton): proper type
A registered application is ready if the `task_graph` attribute is set.
Attributes:
gen (Callable[[EventTime], Tuple[TaskGraph, Dict[int,int]]]):
A function that takes a release time and outputs:
- A task graph
- A mapping from Spark stage IDs to canonical task IDs
task_graph (TaskGraph, optional):
The generated task graph for the application. Defaults to None.
Methods:
generate_task_graph(release_time: EventTime):
Sets the `task_graph` attribute by generating a task graph for a
given `release_time`.
spark_task_id(task_id: int):
Returns the canonical task ID corresponding to a Spark stage ID.
canonical_task_id(stage_id: int):
Returns the Spark stage ID corresponding to a canonical task ID.
"""

gen: Callable[[EventTime], Tuple[TaskGraph, Dict[int, int]]]
task_graph: TaskGraph = None

_forward: any = None # TODO(elton): proper type
_backward: any = None # TODO(elton): proper type
_last_gen: any = None # TODO(elton): proper type
_forward: Dict[int, int] = None # spark stage id => canonical task id
_backward: Dict[int, int] = None # canonical task id => spark stage id
_last_gen: EventTime = None

def __init__(self, gen):
self.gen = gen

def generate_task_graph(self, release_time):
def generate_task_graph(self, release_time: EventTime):
task_graph, stage_id_mapping = self.gen(release_time)
self.task_graph = task_graph
self._forward = stage_id_mapping
self._backward = {v: k for k, v in self._forward.items()}
self._last_gen = release_time

def spark_task_id(self, task_id):
def spark_task_id(self, task_id: int):
return self._backward[task_id]

def canonical_task_id(self, task_id):
return self._forward[task_id]
def canonical_task_id(self, stage_id: int):
return self._forward[stage_id]


class Servicer(erdos_scheduler_pb2_grpc.SchedulerServiceServicer):
Expand Down Expand Up @@ -443,7 +469,7 @@ async def RegisterEnvironmentReady(self, request, context):
self._simulator._event_queue.add_event(update_workload_event)
self._simulator._event_queue.add_event(scheduler_start_event)
self._logger.info(
f"[{stime}] Adding event {update_workload_event} to the simulator's event queue"
f"[{stime}] Added event {update_workload_event} to the simulator's event queue"
)
self._logger.info(
f"[{stime}] Added event {scheduler_start_event} to the simulator's event queue"
Expand Down

0 comments on commit 15bf380

Please sign in to comment.