diff --git a/tests/test_service.py b/tests/test_service.py index 741209dc..8fb54911 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -10,7 +10,7 @@ @pytest.fixture(scope="module", autouse=True) def service(): - process = subprocess.Popen(["python", "-m", "rpc.service"]) + process = subprocess.Popen(["python", "-m", "rpc.service", "--enforce_deadlines"]) channel = grpc.insecure_channel("localhost:50051") try: grpc.channel_ready_future(channel).result(timeout=5) @@ -46,7 +46,6 @@ def test_service(): r"Registered worker \(id=1234, name=test_worker\)", response.message ) - # Try to fetch placements for an unregistered task graph # Get placements for the task, should be empty request = erdos_scheduler_pb2.GetPlacementsRequest( @@ -97,7 +96,7 @@ def test_service(): ) and response.num_executors == 10 ) - + # Introduce a 2s delay in getting the env ready time.sleep(2) @@ -129,44 +128,38 @@ def test_service(): ) actual_task_ids.add(placement.task_id) assert actual_task_ids == {0, 1} - + # Wait for 3 seconds and trigger notify task completion for tasks 0 and 1 time.sleep(3) - + request = erdos_scheduler_pb2.NotifyTaskCompletionRequest( - application_id="task-graph-0", - task_id=0, - timestamp=1234567890 + application_id="task-graph-0", task_id=0, timestamp=1234567890 ) response = stub.NotifyTaskCompletion(request) assert response.success - + request = erdos_scheduler_pb2.NotifyTaskCompletionRequest( - application_id="task-graph-0", - task_id=1, - timestamp=1234567890 + application_id="task-graph-0", task_id=1, timestamp=1234567890 ) response = stub.NotifyTaskCompletion(request) assert response.success - + # Wait for 20s to allow the service to execute task completion for fastest task time.sleep(20) - + # Attempt to incorrectly notify task completion for task 3, which hasnt started yet request = erdos_scheduler_pb2.NotifyTaskCompletionRequest( - application_id="task-graph-0", - task_id=3, - timestamp=1234567890 + application_id="task-graph-0", task_id=3, timestamp=1234567890 ) response = stub.NotifyTaskCompletion(request) assert not response.success - + # Wait 2s to allow the service to process the incorrect task completion time.sleep(2) - + # Wait for 25s to allow the service to finish execution of task 0 time.sleep(25) - + # This will unlock task 2, which should now be returned as a placement request = erdos_scheduler_pb2.GetPlacementsRequest( timestamp=1234567890, @@ -181,7 +174,7 @@ def test_service(): ) actual_task_ids.add(placement.task_id) assert actual_task_ids == {2} - + # Register the second (correct) TaskGraph, wont be able to run due to inadequate resources request = erdos_scheduler_pb2.RegisterTaskGraphRequest( id="task-graph-1", @@ -204,10 +197,10 @@ def test_service(): ) and response.num_executors == 10 ) - + # Introduce a 2s delay in getting the env ready time.sleep(2) - + # Mark the environment as ready request = erdos_scheduler_pb2.RegisterEnvironmentReadyRequest( id="task-graph-1", @@ -219,10 +212,10 @@ def test_service(): r"Successfully marked environment as ready for task graph 'Q4\[task-graph-1\]@1'", response.message, ) - + # Wait for 10s to get the placements for the second task graph time.sleep(10) - + # Get placements for the task, none should be placed since worker has inadequate resources request = erdos_scheduler_pb2.GetPlacementsRequest( timestamp=1234567890, @@ -237,33 +230,34 @@ def test_service(): ) actual_task_ids.add(placement.task_id) assert len(actual_task_ids) == 0 - + # Wait for 100 more seconds and request placements again time.sleep(100) - + # Notify task completion for task 2 in task graph 0 to trigger scheduler run again request = erdos_scheduler_pb2.NotifyTaskCompletionRequest( - application_id="task-graph-0", - task_id=2, - timestamp=1234567890 + application_id="task-graph-0", task_id=2, timestamp=1234567890 ) response = stub.NotifyTaskCompletion(request) assert response.success - + # Wait for 2 seconds to allow scheduler to process task completion and run scheduler time.sleep(2) - + # Get placements for the task, entire taskgraph should be cancelled request = erdos_scheduler_pb2.GetPlacementsRequest( timestamp=1234567890, id="task-graph-1", ) response = stub.GetPlacements(request) + print(response) assert response.success actual_task_ids = set() for placement in response.placements: assert ( - placement.worker_id == "None" and placement.application_id == "task-graph-1" and placement.cancelled == True + placement.worker_id == "None" + and placement.application_id == "task-graph-1" + and placement.cancelled == True ) actual_task_ids.add(placement.task_id) assert actual_task_ids == {0, 1}