Skip to content

Commit

Permalink
Merge pull request #1004 from roboflow/feature/improve_communication_…
Browse files Browse the repository at this point in the history
…with_rf_api

Add ability of RF API client to retry transient errors
  • Loading branch information
PawelPeczek-Roboflow authored Feb 6, 2025
2 parents 4d410f7 + c151553 commit 9b48382
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 21 deletions.
36 changes: 22 additions & 14 deletions docs/server_configuration/environmental_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,25 @@

Below is a list of some environmental values that require more in-depth explanation.

Environmental variable | Description | Default
------------------------------------------ |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -----------
`ONNXRUNTIME_EXECUTION_PROVIDERS` | List of execution providers in priority order, warning message will be displayed if provider is not supported on user platform | See [here](https://github.com/roboflow/inference/blob/main/inference/core/env.py#L262)
`SAM2_MAX_EMBEDDING_CACHE_SIZE` | The number of sam2 embeddings that will be held in memory. The embeddings will be held in gpu memory. Each embedding takes 16777216 bytes. | 100
`SAM2_MAX_LOGITS_CACHE_SIZE` | The number of sam2 logits that will be held in memory. The the logits will be in cpu memory. Each logit takes 262144 bytes. | 1000
`DISABLE_SAM2_LOGITS_CACHE` | If set to True, disables the caching of SAM2 logits. This can be useful for debugging or in scenarios where memory usage needs to be minimized, but may result in slower performance for repeated similar requests. | False
`ENABLE_WORKFLOWS_PROFILING` | If set to True, in `inference` server allows the server to output Workflows profiler traces the client, running in Python package with `InferencePipeline` it enables profiling. | False
`WORKFLOWS_PROFILER_BUFFER_SIZE` | Size of profiler buffer (number of consecutive Wrofklows Execution Engine `run(...)` invocations to trace in buffer. | 64
`RUNS_ON_JETSON` | Boolean flag to tell if `inference` runs on Jetson device - set to `True` in all docker builds for Jetson architecture. | False
`WORKFLOWS_DEFINITION_CACHE_EXPIRY` | Number of seconds to cache Workflows definitions as a result of `get_workflow_specification(...)` function call | `15 * 60` - 15 minutes
`DOCKER_SOCKET_PATH` | Path to the local socket mounted to the container - by default empty, if provided - enables pooling docker container stats from the docker deamon socket. See more [here](./server_configuration/container_statistics.md) | Not Set
`ENABLE_PROMETHEUS` | Boolean flag to enable Prometeus `/metrics` enpoint. | True for docker images in dockerhub
`ENABLE_STREAM_API` | Flag to enable Stream Management API in `inference` server - see [more](/workflows/video_processing/overview.md). | False
`STREAM_API_PRELOADED_PROCESSES` | In context of Stream API - this environment variable controlls how many idle processes are warmed-up ready to be a worker for `InferencePipeline` - helps speeding up workers processes start on GPU | 0
Environmental variable | Description | Default
-----------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------
`ONNXRUNTIME_EXECUTION_PROVIDERS` | List of execution providers in priority order, warning message will be displayed if provider is not supported on user platform | See [here](https://github.com/roboflow/inference/blob/main/inference/core/env.py#L262)
`SAM2_MAX_EMBEDDING_CACHE_SIZE` | The number of sam2 embeddings that will be held in memory. The embeddings will be held in gpu memory. Each embedding takes 16777216 bytes. | 100
`SAM2_MAX_LOGITS_CACHE_SIZE` | The number of sam2 logits that will be held in memory. The the logits will be in cpu memory. Each logit takes 262144 bytes. | 1000
`DISABLE_SAM2_LOGITS_CACHE` | If set to True, disables the caching of SAM2 logits. This can be useful for debugging or in scenarios where memory usage needs to be minimized, but may result in slower performance for repeated similar requests. | False
`ENABLE_WORKFLOWS_PROFILING` | If set to True, in `inference` server allows the server to output Workflows profiler traces the client, running in Python package with `InferencePipeline` it enables profiling. | False
`WORKFLOWS_PROFILER_BUFFER_SIZE` | Size of profiler buffer (number of consecutive Wrofklows Execution Engine `run(...)` invocations to trace in buffer. | 64
`RUNS_ON_JETSON` | Boolean flag to tell if `inference` runs on Jetson device - set to `True` in all docker builds for Jetson architecture. | False
`WORKFLOWS_DEFINITION_CACHE_EXPIRY` | Number of seconds to cache Workflows definitions as a result of `get_workflow_specification(...)` function call | `15 * 60` - 15 minutes
`DOCKER_SOCKET_PATH` | Path to the local socket mounted to the container - by default empty, if provided - enables pooling docker container stats from the docker deamon socket. See more [here](./server_configuration/container_statistics.md) | Not Set
`ENABLE_PROMETHEUS` | Boolean flag to enable Prometeus `/metrics` enpoint. | True for docker images in dockerhub
`ENABLE_STREAM_API` | Flag to enable Stream Management API in `inference` server - see [more](/workflows/video_processing/overview.md). | False
`STREAM_API_PRELOADED_PROCESSES` | In context of Stream API - this environment variable controlls how many idle processes are warmed-up ready to be a worker for `InferencePipeline` - helps speeding up workers processes start on GPU | 0
`TRANSIENT_ROBOFLOW_API_ERRORS` | List of (comma separated) HTTP codes from RF API that should be retried (only applicable to GET endpoints) | `None`
`RETRY_CONNECTION_ERRORS_TO_ROBOFLOW_API` | Fleg to decide if connection errors for RF API should be retried (only applicable to GET endpoints) | `False`
`ROBOFLOW_API_REQUEST_TIMEOUT` | Timeout (in seconds given as integer) for requests to RF API | `None`
`TRANSIENT_ROBOFLOW_API_ERRORS_RETRIES` | Number of times transient errors (connection errors and HTTP transient codes) to RF API will be retried (only applicable to GET endpoints) | `3`
`TRANSIENT_ROBOFLOW_API_ERRORS_RETRY_INTERVAL` | Delay interval of retries (for connection errors and HTTP transient codes) of RF API requests (only applicable to GET endpoints) | `3`
`METRICS_ENABLED` | Flag to control Roboflow Model Monitoring | `True`
`MODEL_VALIDATION_DISABLED` | Flag that can make model loading faster by skipping trial inference | `False`
`DISABLE_VERSION_CHECK` | Flag to disable `inference` version chack in background thread | `False`
17 changes: 17 additions & 0 deletions inference/core/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,3 +479,20 @@
)

LOAD_ENTERPRISE_BLOCKS = str2bool(os.getenv("LOAD_ENTERPRISE_BLOCKS", "False"))
TRANSIENT_ROBOFLOW_API_ERRORS = set(
int(e)
for e in os.getenv("TRANSIENT_ROBOFLOW_API_ERRORS", "").split(",")
if len(e) > 0
)
RETRY_CONNECTION_ERRORS_TO_ROBOFLOW_API = str2bool(
os.getenv("RETRY_CONNECTION_ERRORS_TO_ROBOFLOW_API", "False")
)
TRANSIENT_ROBOFLOW_API_ERRORS_RETRIES = int(
os.getenv("TRANSIENT_ROBOFLOW_API_ERRORS_RETRIES", "3")
)
TRANSIENT_ROBOFLOW_API_ERRORS_RETRY_INTERVAL = int(
os.getenv("TRANSIENT_ROBOFLOW_API_ERRORS_RETRY_INTERVAL", "1")
)
ROBOFLOW_API_REQUEST_TIMEOUT = os.getenv("ROBOFLOW_API_REQUEST_TIMEOUT")
if ROBOFLOW_API_REQUEST_TIMEOUT:
ROBOFLOW_API_REQUEST_TIMEOUT = int(ROBOFLOW_API_REQUEST_TIMEOUT)
18 changes: 18 additions & 0 deletions inference/core/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from typing import Optional


class ContentTypeInvalid(Exception):
"""Raised when the content type is invalid.
Expand Down Expand Up @@ -152,6 +155,10 @@ class RoboflowAPIConnectionError(RoboflowAPIRequestError):
pass


class RoboflowAPITimeoutError(RoboflowAPIRequestError):
pass


class RoboflowAPIImageUploadRejectionError(RoboflowAPIRequestError):
pass

Expand Down Expand Up @@ -190,3 +197,14 @@ class ActiveLearningConfigurationError(ActiveLearningError):

class CannotInitialiseModelError(Exception):
pass


class RetryRequestError(Exception):

def __init__(self, message: str, inner_error: Exception):
super().__init__(message)
self._inner_error = inner_error

@property
def inner_error(self) -> Exception:
return self._inner_error
9 changes: 9 additions & 0 deletions inference/core/interfaces/http/http_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
RoboflowAPIConnectionError,
RoboflowAPINotAuthorizedError,
RoboflowAPINotNotFoundError,
RoboflowAPITimeoutError,
RoboflowAPIUnsuccessfulRequestError,
ServiceConfigurationError,
WorkspaceLoadError,
Expand Down Expand Up @@ -437,6 +438,14 @@ async def wrapped_route(*args, **kwargs):
},
)
traceback.print_exc()
except RoboflowAPITimeoutError:
resp = JSONResponse(
status_code=504,
content={
"message": "Timeout when attempting to connect to Roboflow API."
},
)
traceback.print_exc()
except StepExecutionError as error:
content = WorkflowErrorResponse(
message=error.public_message,
Expand Down
52 changes: 45 additions & 7 deletions inference/core/roboflow_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union

import backoff
import requests
from requests import Response
from requests import Response, Timeout
from requests_toolbelt import MultipartEncoder

from inference.core import logger
Expand All @@ -23,20 +24,27 @@
from inference.core.env import (
API_BASE_URL,
MODEL_CACHE_DIR,
RETRY_CONNECTION_ERRORS_TO_ROBOFLOW_API,
ROBOFLOW_API_EXTRA_HEADERS,
ROBOFLOW_API_REQUEST_TIMEOUT,
TRANSIENT_ROBOFLOW_API_ERRORS,
TRANSIENT_ROBOFLOW_API_ERRORS_RETRIES,
TRANSIENT_ROBOFLOW_API_ERRORS_RETRY_INTERVAL,
USE_FILE_CACHE_FOR_WORKFLOWS_DEFINITIONS,
WORKFLOWS_DEFINITION_CACHE_EXPIRY,
)
from inference.core.exceptions import (
MalformedRoboflowAPIResponseError,
MalformedWorkflowResponseError,
MissingDefaultModelError,
RetryRequestError,
RoboflowAPIConnectionError,
RoboflowAPIIAlreadyAnnotatedError,
RoboflowAPIIAnnotationRejectionError,
RoboflowAPIImageUploadRejectionError,
RoboflowAPINotAuthorizedError,
RoboflowAPINotNotFoundError,
RoboflowAPITimeoutError,
RoboflowAPIUnsuccessfulRequestError,
WorkspaceLoadError,
)
Expand Down Expand Up @@ -86,7 +94,14 @@ def wrap_roboflow_api_errors(
def decorator(function: callable) -> callable:
def wrapper(*args, **kwargs) -> Any:
try:
return function(*args, **kwargs)
try:
return function(*args, **kwargs)
except RetryRequestError as error:
raise error.inner_error
except Timeout as error:
raise RoboflowAPITimeoutError(
"Timeout when attempting to connect to Roboflow API."
) from error
except (requests.exceptions.ConnectionError, ConnectionError) as error:
raise RoboflowAPIConnectionError(
"Could not connect to Roboflow API."
Expand Down Expand Up @@ -150,6 +165,7 @@ def add_custom_metadata(
]
},
headers=build_roboflow_api_headers(),
timeout=ROBOFLOW_API_REQUEST_TIMEOUT,
)
api_key_safe_raise_for_status(response=response)

Expand Down Expand Up @@ -361,6 +377,7 @@ def register_image_at_roboflow(
url=wrapped_url,
data=m,
headers=headers,
timeout=ROBOFLOW_API_REQUEST_TIMEOUT,
)
api_key_safe_raise_for_status(response=response)
parsed_response = response.json()
Expand Down Expand Up @@ -403,6 +420,7 @@ def annotate_image_at_roboflow(
wrapped_url,
data=annotation_content,
headers=headers,
timeout=ROBOFLOW_API_REQUEST_TIMEOUT,
)
api_key_safe_raise_for_status(response=response)
parsed_response = response.json()
Expand Down Expand Up @@ -638,12 +656,31 @@ def get_from_url(
return _get_from_url(url=url, json_response=json_response)


@backoff.on_exception(
backoff.constant,
exception=RetryRequestError,
max_tries=TRANSIENT_ROBOFLOW_API_ERRORS_RETRIES,
interval=TRANSIENT_ROBOFLOW_API_ERRORS_RETRY_INTERVAL,
)
def _get_from_url(url: str, json_response: bool = True) -> Union[Response, dict]:
response = requests.get(
wrap_url(url),
headers=build_roboflow_api_headers(),
)
api_key_safe_raise_for_status(response=response)
try:
response = requests.get(
wrap_url(url),
headers=build_roboflow_api_headers(),
timeout=ROBOFLOW_API_REQUEST_TIMEOUT,
)
except (ConnectionError, Timeout, requests.exceptions.ConnectionError) as error:
if RETRY_CONNECTION_ERRORS_TO_ROBOFLOW_API:
raise RetryRequestError(
message="Connectivity error", inner_error=error
) from error
raise error
try:
api_key_safe_raise_for_status(response=response)
except Exception as error:
if response.status_code in TRANSIENT_ROBOFLOW_API_ERRORS:
raise RetryRequestError(message=str(error), inner_error=error) from error
raise error
if json_response:
return response.json()
return response
Expand Down Expand Up @@ -673,6 +710,7 @@ def send_inference_results_to_model_monitoring(
url=api_url,
json=inference_data,
headers=build_roboflow_api_headers(),
timeout=ROBOFLOW_API_REQUEST_TIMEOUT,
)
api_key_safe_raise_for_status(response=response)

Expand Down
1 change: 1 addition & 0 deletions requirements/_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ twilio~=9.3.7
httpx>=0.25.1,<0.28.0 # must be pinned as bc in 0.28.0 is causing Anthropics to fail
pylogix==1.0.5
pymodbus>=3.6.9,<=3.8.3
backoff~=2.2.0
Loading

0 comments on commit 9b48382

Please sign in to comment.