Skip to content

Commit

Permalink
prometheus and health update
Browse files Browse the repository at this point in the history
Signed-off-by: Praneeth Bedapudi <[email protected]>
  • Loading branch information
bedapudi6788 committed Feb 21, 2024
1 parent 5035ff6 commit eede5be
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 55 deletions.
5 changes: 5 additions & 0 deletions fastdeploy/_infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@

from . import _utils

started_at_time = time.time()

for predictor_file, predictor_sequence in _utils.PREDICTOR_FILE_TO_SEQUENCE.items():
log_printed = False
while True:
try:
time_per_example = _utils.META_INDEX.get(
f"{predictor_sequence}", select_keys=["time_per_example"]
)[f"{predictor_sequence}"]["time_per_example"]
started_at_time = time.time()
break
except:
if not log_printed:
Expand All @@ -24,6 +27,8 @@


class Infer:
started_at_time = started_at_time

def __init__(
self,
timeout=float(os.getenv("TIMEOUT", 0)),
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def start_loop(
unique_id_wise_input_count[unique_id] = len(
data[f"{predictor_sequence - 1}.outputs"]
)
input_batch += data[f"{predictor_sequence - 1}.outputs"]
input_batch.extend(data[f"{predictor_sequence - 1}.outputs"])

current_batch_length = len(input_batch)

Expand Down
176 changes: 123 additions & 53 deletions fastdeploy/_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,90 @@ def on_post(self, req, resp):

class PrometheusMetrics(object):
def on_get(self, req, resp):
_LAST_X_SECONDS = 300
_LAST_X_SECONDS = int(req.params.get("last_x_seconds", 60))
CURRENT_TIME = time.time()
LAST_X_SECONDS = time.time() - _LAST_X_SECONDS

requests_received_in_last_x_seconds = _utils.MAIN_INDEX.count(
query={"-1.received_at": {"$gt": LAST_X_SECONDS}}
query={"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME}}
)

requests_received_in_last_x_seconds_that_failed = _utils.MAIN_INDEX.count(
query={
"-1.received_at": {"$gt": LAST_X_SECONDS},
"last_predictor_success": False
"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
"last_predictor_success": False,
}
)

requests_received_in_last_x_seconds_that_are_pending = _utils.MAIN_INDEX.count(
query={"-1.predicted_at": 0, "last_predictor_success": {"$ne": False}}
query={
"-1.predicted_at": 0,
"last_predictor_success": {"$ne": False},
"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
}
)

requests_received_in_last_x_seconds_that_are_successful = (
_utils.MAIN_INDEX.count(
query={
"-1.predicted_at": {"$ne": 0},
"last_predictor_success": True,
"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
}
)
)

requests_received_in_last_x_seconds_that_are_successful = _utils.MAIN_INDEX.count(
query={"-1.predicted_at": {"$ne": 0}, "last_predictor_success": True}
avg_total_time_per_req_for_reqs_in_last_x_seconds = 0

__sum_of_received_at = _utils.MAIN_INDEX.math(
"-1.received_at",
"sum",
query={
"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
"-1.predicted_at": {"$ne": 0},
},
)

__sum_of_predicted_at = _utils.MAIN_INDEX.math(
"-1.predicted_at",
"sum",
query={
"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
"-1.predicted_at": {"$ne": 0},
},
)

if __sum_of_received_at and __sum_of_predicted_at:
avg_total_time_per_req_for_reqs_in_last_x_seconds = (
__sum_of_predicted_at - __sum_of_received_at
) / requests_received_in_last_x_seconds_that_are_successful

avg_actual_total_time_per_req_for_reqs_in_last_x_seconds = 0

for executor_n in [0]:
_temp_sum_of_received_at = _utils.MAIN_INDEX.math(
f"{executor_n}.received_at",
"sum",
query={
"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
"-1.predicted_at": {"$ne": 0},
},
)

_temp_sum_of_predicted_at = _utils.MAIN_INDEX.math(
f"{executor_n}.predicted_at",
"sum",
query={
"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
"-1.predicted_at": {"$ne": 0},
},
)

if _temp_sum_of_received_at and _temp_sum_of_predicted_at:
avg_actual_total_time_per_req_for_reqs_in_last_x_seconds = (
_temp_sum_of_predicted_at - _temp_sum_of_received_at
) / requests_received_in_last_x_seconds_that_are_successful

prometheus_text = f"""# HELP pending_requests The number of pending requests.
# TYPE pending_requests gauge
pending_requests {_utils.MAIN_INDEX.count(query={"-1.predicted_at": 0, "last_predictor_success": True})}
Expand All @@ -93,6 +156,26 @@ def on_get(self, req, resp):
# HELP requests_received_in_last_x_seconds_that_failed The number of requests received in last {_LAST_X_SECONDS} seconds that failed.
# TYPE requests_received_in_last_x_seconds_that_failed gauge
requests_received_in_last_x_seconds_that_failed {requests_received_in_last_x_seconds_that_failed}
# HELP requests_received_in_last_x_seconds_that_are_pending The number of requests received in last {_LAST_X_SECONDS} seconds that are pending.
# TYPE requests_received_in_last_x_seconds_that_are_pending gauge
requests_received_in_last_x_seconds_that_are_pending {requests_received_in_last_x_seconds_that_are_pending}
# HELP requests_received_in_last_x_seconds_that_are_successful The number of requests received in last {_LAST_X_SECONDS} seconds that are successful.
# TYPE requests_received_in_last_x_seconds_that_are_successful gauge
requests_received_in_last_x_seconds_that_are_successful {requests_received_in_last_x_seconds_that_are_successful}
# HELP avg_total_time_per_req_for_reqs_in_last_x_seconds The average total time per request for requests in last {_LAST_X_SECONDS} seconds.
# TYPE avg_total_time_per_req_for_reqs_in_last_x_seconds gauge
avg_total_time_per_req_for_reqs_in_last_x_seconds {avg_total_time_per_req_for_reqs_in_last_x_seconds}
# HELP avg_actual_total_time_per_req_for_reqs_in_last_x_seconds The average actual total time per request for requests in last {_LAST_X_SECONDS} seconds.
# TYPE avg_actual_total_time_per_req_for_reqs_in_last_x_seconds gauge
avg_actual_total_time_per_req_for_reqs_in_last_x_seconds {avg_actual_total_time_per_req_for_reqs_in_last_x_seconds}
# HELP requests_received_in_last_x_seconds The number of requests received in last {_LAST_X_SECONDS} seconds.
# TYPE requests_received_in_last_x_seconds gauge
requests_received_in_last_x_seconds {requests_received_in_last_x_seconds}
"""

resp.status = falcon.HTTP_200
Expand All @@ -102,69 +185,56 @@ def on_get(self, req, resp):

class Health(object):
def on_get(self, req, resp):
fail_if_any_request_takes_more_than_x_seconds_param = req.params.get(
"fail_if_any_request_takes_more_than_x_seconds", None
fail_if_percentage_of_requests_failed_in_last_x_seconds_is_more_than_y_param = req.params.get(
"fail_if_percentage_of_requests_failed_in_last_x_seconds_is_more_than_y",
None,
)

fail_if_percentage_of_requests_failed_in_last_x_seconds_param = req.params.get(
"fail_if_percentage_of_requests_failed_in_last_x_seconds", None
fail_if_requests_older_than_x_seconds_pending_param = req.params.get(
"fail_if_requests_older_than_x_seconds_pending", None
)

if fail_if_percentage_of_requests_failed_in_last_x_seconds_param:
fail_if_percentage_of_requests_failed_in_last_x_seconds_param = fail_if_percentage_of_requests_failed_in_last_x_seconds_param.split(",")
x_seconds_back_time = time.time() - int(
fail_if_percentage_of_requests_failed_in_last_x_seconds_param[1]
)
fail_if_up_time_more_than_x_seconds_param = req.params.get(
"fail_if_up_time_more_than_x_seconds", None
)

max_percentage_of_failed_requests = int(
fail_if_percentage_of_requests_failed_in_last_x_seconds_param[0]
)
is_predictor_is_up_param = req.params.get("is_predictor_is_up", None)

requests_received_in_last_x_seconds = _utils.MAIN_INDEX.count(
query={"-1.received_at": {"$gt": x_seconds_back_time}}
if fail_if_percentage_of_requests_failed_in_last_x_seconds_is_more_than_y_param:
(
x,
y,
) = fail_if_percentage_of_requests_failed_in_last_x_seconds_is_more_than_y_param.split(
","
)
requests_received_in_last_x_seconds_that_failed = _utils.MAIN_INDEX.count(
query={
"-1.received_at": {"$gt": x_seconds_back_time},
"last_predictor_success": False
x, y = int(x), int(y)
if _utils.check_if_percentage_of_requests_failed_in_last_x_seconds_is_more_than_y(
x, y
):
resp.status = falcon.HTTP_503
resp.media = {
"reason": f"More than {y}% requests failed in last {x} seconds"
}
)
return

if requests_received_in_last_x_seconds and (requests_received_in_last_x_seconds_that_failed / requests_received_in_last_x_seconds) * 100 >= max_percentage_of_failed_requests:
elif fail_if_requests_older_than_x_seconds_pending_param:
if _utils.check_if_requests_older_than_x_seconds_pending(
int(fail_if_requests_older_than_x_seconds_pending_param)
):
resp.status = falcon.HTTP_503
resp.media = {
"status": "error",
"message": f"More than {max_percentage_of_failed_requests}% of requests failed in last {fail_if_percentage_of_requests_failed_in_last_x_seconds_param[1]} seconds."
"reason": f"Requests older than {fail_if_requests_older_than_x_seconds_pending_param} seconds are pending"
}
return

return

if fail_if_any_request_takes_more_than_x_seconds_param:
x_seconds_back_time = time.time() - int(
fail_if_any_request_takes_more_than_x_seconds_param
)

if _utils.MAIN_INDEX.count(
query={
"-1.received_at": {"$lt": x_seconds_back_time},
"-1.predicted_at": 0,
"last_predictor_success": {
"$ne": False
}
}
elif fail_if_up_time_more_than_x_seconds_param:
if time.time() - Infer.started_at_time > int(
fail_if_up_time_more_than_x_seconds_param
):
resp.status = falcon.HTTP_503
resp.media = {
"status": "error",
"message": f"Request took more than {fail_if_any_request_takes_more_than_x_seconds_param} seconds to process."
"reason": f"Up time more than {fail_if_up_time_more_than_x_seconds_param} seconds"
}
return

fail_if_percentage_of_requests_failed_in_last_x_seconds_param = req.params.get(
"fail_if_percentage_of_requests_failed_in_same_x_seconds", None
)



resp.status = falcon.HTTP_200
resp.media = {"status": "ok"}
Expand Down
46 changes: 46 additions & 0 deletions fastdeploy/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@
db_path=os.path.join("fastdeploy_dbs", f"main_index.db"),
)

META_INDEX.clear()
MAIN_INDEX.clear()

MAIN_INDEX.optimize_for_query(["last_predictor_success", "last_predictor_sequence"])
MAIN_INDEX.optimize_for_query(["-1.predicted_at", "last_predictor_success"])

Expand Down Expand Up @@ -143,3 +146,46 @@ def calculate_optimum_batch_sizes(
)

return max_batch_size, time_per_example


def check_if_percentage_of_requests_failed_in_last_x_seconds_is_more_than_y(
last_x_seconds, max_percentage_of_failed_requests
):
time_before_x_seconds = time.time() - last_x_seconds
requests_received_in_last_x_seconds = MAIN_INDEX.count(
query={"-1.received_at": {"$gte": time_before_x_seconds}}
)

if requests_received_in_last_x_seconds == 0:
return False

requests_received_in_last_x_seconds_that_failed = MAIN_INDEX.count(
query={
"-1.received_at": {"$gte": time_before_x_seconds},
"last_predictor_success": False,
}
)

if (
requests_received_in_last_x_seconds_that_failed
/ requests_received_in_last_x_seconds
) * 100 >= max_percentage_of_failed_requests:
return True

return False


def check_if_requests_older_than_x_seconds_pending(x):
time_before_x_seconds = time.time() - x

requests_older_than_x_seconds_pending = MAIN_INDEX.count(
query={
"-1.received_at": {"$lte": time_before_x_seconds},
"-1.predicted_at": 0,
"last_predictor_success": {"$ne": False},
}
)

if requests_older_than_x_seconds_pending > 0:
return True
return False
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
EMAIL = "[email protected]"
AUTHOR = "BEDAPUDI PRANEETH"
REQUIRES_PYTHON = ">=3.6.0"
VERSION = "3.0.4"
VERSION = "3.0.5"

# What packages are required for this module to be executed?
REQUIRED = ["falcon", "liteindex", "zstandard", "gunicorn[gevent]", "msgpack"]
Expand Down

0 comments on commit eede5be

Please sign in to comment.