Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout parameter as optional for isvc creation and it corresponding change #135

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 20 additions & 76 deletions tests/model_serving/model_server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import json
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import contextmanager
from string import Template
from typing import Any, Generator, Optional
Expand Down Expand Up @@ -34,60 +33,48 @@
LOGGER = get_logger(name=__name__)


def verify_no_failed_pods(client: DynamicClient, isvc: InferenceService, runtime_name: str | None) -> None:
def verify_no_failed_pods(
client: DynamicClient, isvc: InferenceService, runtime_name: str | None, timeout: int = Timeout.TIMEOUT_5MIN
) -> None:
"""
Verify no failed pods.

Args:
client (DynamicClient): DynamicClient object
isvc (InferenceService): InferenceService object
runtime_name (str): ServingRuntime name
timeout (int): Time to wait for the pod.

Raises:
FailedPodsError: If any pod is in failed state

"""
failed_pods: dict[str, Any] = {}

LOGGER.info("Verifying no failed pods")
for pods in TimeoutSampler(
wait_timeout=Timeout.TIMEOUT_5MIN,
wait_timeout=timeout,
sleep=10,
func=get_pods_by_isvc_label,
client=client,
isvc=isvc,
runtime_name=runtime_name,
):
ready_pods = 0
failed_pods: dict[str, Any] = {}

if pods:
for pod in pods:
for condition in pod.instance.status.conditions:
if condition.type == pod.Status.READY and condition.status == pod.Condition.Status.TRUE:
ready_pods += 1

if ready_pods == len(pods):
if all([pod.instance.status.phase == pod.Status.RUNNING for pod in pods]):
return

for pod in pods:
pod_status = pod.instance.status

if pod_status.containerStatuses:
for container_status in pod_status.containerStatuses:
is_waiting_pull_back_off = (
wait_state := container_status.state.waiting
) and wait_state.reason == pod.Status.IMAGE_PULL_BACK_OFF

is_terminated_error = (
terminate_state := container_status.state.terminated
) and terminate_state.reason in (pod.Status.ERROR, pod.Status.CRASH_LOOPBACK_OFF)

if is_waiting_pull_back_off or is_terminated_error:
if (state := container_status.state.waiting) and state.reason == pod.Status.IMAGE_PULL_BACK_OFF:
failed_pods[pod.name] = pod_status

if init_container_status := pod_status.initContainerStatuses:
if container_terminated := init_container_status[0].lastState.terminated:
if container_terminated.reason == "Error":
failed_pods[pod.name] = pod_status
if init_container_status := pod_status.initContainerStatuses:
if container_terminated := init_container_status[0].lastState.terminated:
if container_terminated.reason == "Error":
failed_pods[pod.name] = pod_status

elif pod_status.phase in (
pod.Status.CRASH_LOOPBACK_OFF,
Expand Down Expand Up @@ -125,6 +112,7 @@ def create_isvc(
wait_for_predictor_pods: bool = True,
autoscaler_mode: Optional[str] = None,
multi_node_worker_spec: Optional[dict[str, int]] = None,
timeout: int = Timeout.TIMEOUT_15MIN,
) -> Generator[InferenceService, Any, Any]:
"""
Create InferenceService object.
Expand Down Expand Up @@ -153,6 +141,7 @@ def create_isvc(
autoscaler_mode (str): Autoscaler mode
multi_node_worker_spec (dict[str, int]): Multi node worker spec
wait_for_predictor_pods (bool): Wait for predictor pods
timeout (int): Time to wait for the model inference,deployment to be ready.

Yields:
InferenceService: InferenceService object
Expand Down Expand Up @@ -232,8 +221,10 @@ def create_isvc(
label=labels,
) as inference_service:
if wait_for_predictor_pods:
verify_no_failed_pods(client=client, isvc=inference_service, runtime_name=runtime)
wait_for_inference_deployment_replicas(client=client, isvc=inference_service, runtime_name=runtime)
verify_no_failed_pods(client=client, isvc=inference_service, runtime_name=runtime, timeout=timeout)
wait_for_inference_deployment_replicas(
client=client, isvc=inference_service, runtime_name=runtime, timeout=timeout
)

if wait:
# Modelmesh 2nd server in the ns will fail to be Ready; isvc needs to be re-applied
Expand All @@ -258,7 +249,7 @@ def create_isvc(
inference_service.wait_for_condition(
condition=inference_service.Condition.READY,
status=inference_service.Condition.Status.TRUE,
timeout=15 * 60,
timeout=timeout,
)

yield inference_service
Expand Down Expand Up @@ -416,50 +407,3 @@ def verify_inference_response(

else:
raise InferenceResponseError(f"Inference response output not found in response. Response: {res}")


def run_inference_multiple_times(
isvc: InferenceService,
inference_config: dict[str, Any],
inference_type: str,
protocol: str,
model_name: str,
iterations: int,
run_in_parallel: bool = False,
) -> None:
"""
Run inference multiple times.

Args:
isvc (InferenceService): Inference service.
inference_config (dict[str, Any]): Inference config.
inference_type (str): Inference type.
protocol (str): Protocol.
model_name (str): Model name.
iterations (int): Number of iterations.
run_in_parallel (bool, optional): Run inference in parallel.

"""
futures = []

with ThreadPoolExecutor() as executor:
for iteration in range(iterations):
infer_kwargs = {
"inference_service": isvc,
"inference_config": inference_config,
"inference_type": inference_type,
"protocol": protocol,
"model_name": model_name,
"use_default_query": True,
}

if run_in_parallel:
futures.append(executor.submit(verify_inference_response, **infer_kwargs))
else:
verify_inference_response(**infer_kwargs)

if futures:
for result in as_completed(futures):
_exception = result.exception()
if _exception:
LOGGER.error(f"Failed to run inference. Error: {_exception}")
6 changes: 4 additions & 2 deletions utilities/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,11 @@ class KserveAuth:


class Timeout:
TIMEOUT_1MIN = 60
TIMEOUT_10MIN = 10 * TIMEOUT_1MIN
TIMEOUT_1MIN: int = 60
TIMEOUT_2MIN: int = 2 * TIMEOUT_1MIN
TIMEOUT_5MIN: int = 5 * TIMEOUT_1MIN
TIMEOUT_10MIN: int = 10 * TIMEOUT_1MIN
TIMEOUT_15MIN: int = 15 * TIMEOUT_1MIN


MODEL_REGISTRY: str = "model-registry"
Expand Down
11 changes: 6 additions & 5 deletions utilities/infra.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@
from pyhelper_utils.shell import run_command
from pytest_testconfig import config as py_config
from simple_logger.logger import get_logger

from utilities.constants import Timeout
import utilities.general
from utilities.general import create_isvc_label_selector_str

LOGGER = get_logger(name=__name__)
TIMEOUT_2MIN = 2 * 60


@contextmanager
Expand Down Expand Up @@ -68,7 +67,7 @@ def create_ns(
teardown=teardown,
delete_timeout=delete_timeout,
)
project.wait_for_status(status=project.Status.ACTIVE, timeout=TIMEOUT_2MIN)
project.wait_for_status(status=project.Status.ACTIVE, timeout=Timeout.TIMEOUT_2MIN)
yield project

else:
Expand All @@ -79,7 +78,7 @@ def create_ns(
teardown=teardown,
delete_timeout=delete_timeout,
) as ns:
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=TIMEOUT_2MIN)
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=Timeout.TIMEOUT_2MIN)
yield ns


Expand All @@ -88,6 +87,7 @@ def wait_for_inference_deployment_replicas(
isvc: InferenceService,
runtime_name: str | None,
expected_num_deployments: int = 1,
timeout: int = Timeout.TIMEOUT_5MIN,
) -> list[Deployment]:
"""
Wait for inference deployment replicas to complete.
Expand All @@ -97,6 +97,7 @@ def wait_for_inference_deployment_replicas(
isvc (InferenceService): InferenceService object
runtime_name (str): ServingRuntime name.
expected_num_deployments (int): Expected number of deployments per InferenceService.
timeout (int): Time to wait for the deployment.

Returns:
list[Deployment]: List of Deployment objects for InferenceService.
Expand All @@ -117,7 +118,7 @@ def wait_for_inference_deployment_replicas(
if len(deployments) == expected_num_deployments:
for deployment in deployments:
if deployment.exists:
deployment.wait_for_replicas()
deployment.wait_for_replicas(timeout=timeout)

return deployments

Expand Down