Skip to content

Commit

Permalink
add enforce deadlines flag
Browse files Browse the repository at this point in the history
  • Loading branch information
1ntEgr8 committed Nov 25, 2024
1 parent 400ec44 commit f69c0a9
Showing 1 changed file with 27 additions and 33 deletions.
60 changes: 27 additions & 33 deletions tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

@pytest.fixture(scope="module", autouse=True)
def service():
process = subprocess.Popen(["python", "-m", "rpc.service"])
process = subprocess.Popen(["python", "-m", "rpc.service", "--enforce_deadlines"])
channel = grpc.insecure_channel("localhost:50051")
try:
grpc.channel_ready_future(channel).result(timeout=5)
Expand Down Expand Up @@ -46,7 +46,6 @@ def test_service():
r"Registered worker \(id=1234, name=test_worker\)", response.message
)


# Try to fetch placements for an unregistered task graph
# Get placements for the task, should be empty
request = erdos_scheduler_pb2.GetPlacementsRequest(
Expand Down Expand Up @@ -97,7 +96,7 @@ def test_service():
)
and response.num_executors == 10
)

# Introduce a 2s delay in getting the env ready
time.sleep(2)

Expand Down Expand Up @@ -129,44 +128,38 @@ def test_service():
)
actual_task_ids.add(placement.task_id)
assert actual_task_ids == {0, 1}

# Wait for 3 seconds and trigger notify task completion for tasks 0 and 1
time.sleep(3)

request = erdos_scheduler_pb2.NotifyTaskCompletionRequest(
application_id="task-graph-0",
task_id=0,
timestamp=1234567890
application_id="task-graph-0", task_id=0, timestamp=1234567890
)
response = stub.NotifyTaskCompletion(request)
assert response.success

request = erdos_scheduler_pb2.NotifyTaskCompletionRequest(
application_id="task-graph-0",
task_id=1,
timestamp=1234567890
application_id="task-graph-0", task_id=1, timestamp=1234567890
)
response = stub.NotifyTaskCompletion(request)
assert response.success

# Wait for 20s to allow the service to execute task completion for fastest task
time.sleep(20)

# Attempt to incorrectly notify task completion for task 3, which hasnt started yet
request = erdos_scheduler_pb2.NotifyTaskCompletionRequest(
application_id="task-graph-0",
task_id=3,
timestamp=1234567890
application_id="task-graph-0", task_id=3, timestamp=1234567890
)
response = stub.NotifyTaskCompletion(request)
assert not response.success

# Wait 2s to allow the service to process the incorrect task completion
time.sleep(2)

# Wait for 25s to allow the service to finish execution of task 0
time.sleep(25)

# This will unlock task 2, which should now be returned as a placement
request = erdos_scheduler_pb2.GetPlacementsRequest(
timestamp=1234567890,
Expand All @@ -181,7 +174,7 @@ def test_service():
)
actual_task_ids.add(placement.task_id)
assert actual_task_ids == {2}

# Register the second (correct) TaskGraph, wont be able to run due to inadequate resources
request = erdos_scheduler_pb2.RegisterTaskGraphRequest(
id="task-graph-1",
Expand All @@ -204,10 +197,10 @@ def test_service():
)
and response.num_executors == 10
)

# Introduce a 2s delay in getting the env ready
time.sleep(2)

# Mark the environment as ready
request = erdos_scheduler_pb2.RegisterEnvironmentReadyRequest(
id="task-graph-1",
Expand All @@ -219,10 +212,10 @@ def test_service():
r"Successfully marked environment as ready for task graph 'Q4\[task-graph-1\]@1'",
response.message,
)

# Wait for 10s to get the placements for the second task graph
time.sleep(10)

# Get placements for the task, none should be placed since worker has inadequate resources
request = erdos_scheduler_pb2.GetPlacementsRequest(
timestamp=1234567890,
Expand All @@ -237,33 +230,34 @@ def test_service():
)
actual_task_ids.add(placement.task_id)
assert len(actual_task_ids) == 0

# Wait for 100 more seconds and request placements again
time.sleep(100)

# Notify task completion for task 2 in task graph 0 to trigger scheduler run again
request = erdos_scheduler_pb2.NotifyTaskCompletionRequest(
application_id="task-graph-0",
task_id=2,
timestamp=1234567890
application_id="task-graph-0", task_id=2, timestamp=1234567890
)
response = stub.NotifyTaskCompletion(request)
assert response.success

# Wait for 2 seconds to allow scheduler to process task completion and run scheduler
time.sleep(2)

# Get placements for the task, entire taskgraph should be cancelled
request = erdos_scheduler_pb2.GetPlacementsRequest(
timestamp=1234567890,
id="task-graph-1",
)
response = stub.GetPlacements(request)
print(response)
assert response.success
actual_task_ids = set()
for placement in response.placements:
assert (
placement.worker_id == "None" and placement.application_id == "task-graph-1" and placement.cancelled == True
placement.worker_id == "None"
and placement.application_id == "task-graph-1"
and placement.cancelled == True
)
actual_task_ids.add(placement.task_id)
assert actual_task_ids == {0, 1}
Expand Down

0 comments on commit f69c0a9

Please sign in to comment.