diff --git a/tests/test_service.py b/tests/test_service.py index 8fb54911..1b92030f 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -175,7 +175,7 @@ def test_service(): actual_task_ids.add(placement.task_id) assert actual_task_ids == {2} - # Register the second (correct) TaskGraph, wont be able to run due to inadequate resources + # Attempt to register the second TaskGraph, wont be able to run due to inadequate resources request = erdos_scheduler_pb2.RegisterTaskGraphRequest( id="task-graph-1", name="TPCH Query 4 50 200", @@ -189,10 +189,33 @@ def test_service(): ], ) response = stub.RegisterTaskGraph(request) + assert ( + not response.success + and re.search( + r"The worker Pool cannot accomodate the task graph 'task-graph-1'", + response.message, + ) + and response.num_executors == 0 + ) + + # Register the third TaskGraph, will run but will get cancelled due to deadline miss + request = erdos_scheduler_pb2.RegisterTaskGraphRequest( + id="task-graph-2", + name="TPCH Query 4 50 50", + timestamp=1234567890, + dependencies=[ + {"key": {"id": 0, "name": "stage 0"}, "children_ids": [2]}, + {"key": {"id": 1, "name": "stage 1"}, "children_ids": [2]}, + {"key": {"id": 2, "name": "stage 2"}, "children_ids": [3]}, + {"key": {"id": 3, "name": "stage 3"}, "children_ids": [4]}, + {"key": {"id": 4, "name": "stage 4"}, "children_ids": []}, + ], + ) + response = stub.RegisterTaskGraph(request) assert ( response.success and re.search( - r"Registered task graph 'task-graph-1' successfully", + r"Registered task graph 'task-graph-2' successfully", response.message, ) and response.num_executors == 10 @@ -203,33 +226,33 @@ def test_service(): # Mark the environment as ready request = erdos_scheduler_pb2.RegisterEnvironmentReadyRequest( - id="task-graph-1", + id="task-graph-2", num_executors=10, timestamp=1234567890, ) response = stub.RegisterEnvironmentReady(request) assert response.success and re.search( - r"Successfully marked environment as ready for task graph 'Q4\[task-graph-1\]@1'", + r"Successfully marked environment as ready for task graph 'Q4\[task-graph-2\]@1'", response.message, ) # Wait for 10s to get the placements for the second task graph time.sleep(10) - # Get placements for the task, none should be placed since worker has inadequate resources + # Get placements for the taskgraph 3, one of first two root vertices should be placed since there are resources request = erdos_scheduler_pb2.GetPlacementsRequest( timestamp=1234567890, - id="task-graph-1", + id="task-graph-2", ) response = stub.GetPlacements(request) assert response.success actual_task_ids = set() for placement in response.placements: assert ( - placement.worker_id == "1234" and placement.application_id == "task-graph-1" + placement.worker_id == "1234" and placement.application_id == "task-graph-2" ) actual_task_ids.add(placement.task_id) - assert len(actual_task_ids) == 0 + assert actual_task_ids == {1} # Wait for 100 more seconds and request placements again time.sleep(100) @@ -244,21 +267,33 @@ def test_service(): # Wait for 2 seconds to allow scheduler to process task completion and run scheduler time.sleep(2) - # Get placements for the task, entire taskgraph should be cancelled + # Get placements for the task, entire taskgraph should be cancelled since deadline missed + # Other root vertex (0) will be cancelled first. Then the subsequent vertices. + # NOTE: The simulator will return all current placements for a taskgraph (including + # those already sent by the service) until the task is marked as finished. Spark will ignore it. + # In this scenario of task-graph-2, placements has two values- Task 0 in cancelled state and + # Task 1 in running state. The service will return both of them. request = erdos_scheduler_pb2.GetPlacementsRequest( timestamp=1234567890, - id="task-graph-1", + id="task-graph-2", ) response = stub.GetPlacements(request) print(response) assert response.success actual_task_ids = set() for placement in response.placements: - assert ( - placement.worker_id == "None" - and placement.application_id == "task-graph-1" - and placement.cancelled == True - ) + if placement.task_id == 0: + assert ( + placement.worker_id == "None" + and placement.application_id == "task-graph-2" + and placement.cancelled == True + ) + if placement.task_id == 1: + assert ( + placement.worker_id == "1234" + and placement.application_id == "task-graph-2" + and placement.cancelled == False + ) actual_task_ids.add(placement.task_id) assert actual_task_ids == {0, 1}