Skip to content

Commit

Permalink
update timeout calculation logics
Browse files Browse the repository at this point in the history
Signed-off-by: Praneeth Bedapudi <[email protected]>
  • Loading branch information
bedapudi6788 committed Nov 7, 2024
1 parent 992958f commit fc0faa6
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 7 deletions.
2 changes: 1 addition & 1 deletion fastdeploy/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def build_docker_image():
ENTRYPOINT ["/bin/sh", "-c"]
CMD ["ulimit -n 1000000 && python3 -m fastdeploy --recipe /recipe --loop & python3 -m fastdeploy --recipe /recipe --rest"]
CMD ["ulimit -n 1000000 && python3 -m fastdeploy --recipe /recipe --rest & python3 -m fastdeploy --recipe /recipe --loop"]
"""
)
f.flush()
Expand Down
2 changes: 2 additions & 0 deletions fastdeploy/_infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
time.sleep(1)


_utils.logger.info(f"pids: {_utils.get_fd_pids()}")

class Infer:
started_at_time = started_at_time

Expand Down
3 changes: 2 additions & 1 deletion fastdeploy/_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ def fetch_batch(
query={
"-1.predicted_at": 0, # prediction not yet done
"last_predictor_success": True, # last predictor success
"last_predictor_sequence": predictor_sequence - 1, # last predictor sequence
"last_predictor_sequence": predictor_sequence
- 1, # last predictor sequence
"timedout_in_queue": {"$ne": True}, # not timedout in queue
},
n=optimal_batch_size,
Expand Down
50 changes: 49 additions & 1 deletion fastdeploy/_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def on_get(self, req, resp):

number_of_requests_timedout_in_last_x_seconds = _utils.MAIN_INDEX.count(
query={
"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
"-1.predicted_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
"timedout_in_queue": True,
}
)
Expand All @@ -184,13 +184,24 @@ def on_get(self, req, resp):
query={"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME}}
)

requests_processed_in_last_x_seconds = _utils.MAIN_INDEX.count(
query={"-1.predicted_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME}}
)

requests_received_in_last_x_seconds_that_failed = _utils.MAIN_INDEX.count(
query={
"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
"last_predictor_success": False,
}
)

requests_processed_in_last_x_seconds_that_failed = _utils.MAIN_INDEX.count(
query={
"-1.predicted_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
"last_predictor_success": False,
}
)

requests_received_in_last_x_seconds_that_are_pending = _utils.MAIN_INDEX.count(
query={
"-1.predicted_at": 0,
Expand All @@ -205,6 +216,17 @@ def on_get(self, req, resp):
"-1.predicted_at": {"$ne": 0},
"last_predictor_success": True,
"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
"timedout_in_queue": {"$ne": True},
}
)
)

requests_processed_in_last_x_seconds_that_are_successful = (
_utils.MAIN_INDEX.count(
query={
"-1.predicted_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
"last_predictor_success": True,
"timedout_in_queue": {"$ne": True},
}
)
)
Expand All @@ -217,6 +239,7 @@ def on_get(self, req, resp):
query={
"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
"-1.predicted_at": {"$ne": 0},
"timedout_in_queue": {"$ne": True},
},
)

Expand All @@ -226,6 +249,7 @@ def on_get(self, req, resp):
query={
"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
"-1.predicted_at": {"$ne": 0},
"timedout_in_queue": {"$ne": True},
},
)

Expand All @@ -243,6 +267,7 @@ def on_get(self, req, resp):
query={
"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
"-1.predicted_at": {"$ne": 0},
"timedout_in_queue": {"$ne": True},
},
)

Expand All @@ -252,6 +277,7 @@ def on_get(self, req, resp):
query={
"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME},
"-1.predicted_at": {"$ne": 0},
"timedout_in_queue": {"$ne": True},
},
)

Expand All @@ -265,6 +291,10 @@ def on_get(self, req, resp):
# TYPE requests_received_in_last_x_seconds gauge
requests_received_in_last_x_seconds {requests_received_in_last_x_seconds}
# HELP requests_processed_in_last_x_seconds The number of requests processed in last {_LAST_X_SECONDS} seconds.
# TYPE requests_processed_in_last_x_seconds gauge
requests_processed_in_last_x_seconds {requests_processed_in_last_x_seconds}
# HELP number_of_requests_timedout_in_last_x_seconds The number of requests timedout at predictor(s) in last {_LAST_X_SECONDS} seconds.
# TYPE number_of_requests_timedout_in_last_x_seconds gauge
number_of_requests_timedout_in_last_x_seconds {number_of_requests_timedout_in_last_x_seconds}
Expand All @@ -273,6 +303,10 @@ def on_get(self, req, resp):
# TYPE requests_received_in_last_x_seconds_that_failed gauge
requests_received_in_last_x_seconds_that_failed {requests_received_in_last_x_seconds_that_failed}
# HELP requests_processed_in_last_x_seconds_that_failed The number of requests processed in last {_LAST_X_SECONDS} seconds that failed.
# TYPE requests_processed_in_last_x_seconds_that_failed gauge
requests_processed_in_last_x_seconds_that_failed {requests_processed_in_last_x_seconds_that_failed}
# HELP requests_received_in_last_x_seconds_that_are_pending The number of requests received in last {_LAST_X_SECONDS} seconds that are pending.
# TYPE requests_received_in_last_x_seconds_that_are_pending gauge
requests_received_in_last_x_seconds_that_are_pending {requests_received_in_last_x_seconds_that_are_pending}
Expand All @@ -281,6 +315,10 @@ def on_get(self, req, resp):
# TYPE requests_received_in_last_x_seconds_that_are_successful gauge
requests_received_in_last_x_seconds_that_are_successful {requests_received_in_last_x_seconds_that_are_successful}
# HELP requests_processed_in_last_x_seconds_that_are_successful The number of requests processed in last {_LAST_X_SECONDS} seconds that are successful.
# TYPE requests_processed_in_last_x_seconds_that_are_successful gauge
requests_processed_in_last_x_seconds_that_are_successful {requests_processed_in_last_x_seconds_that_are_successful}
# HELP avg_total_time_per_req_for_reqs_in_last_x_seconds The average total time per request for requests in last {_LAST_X_SECONDS} seconds.
# TYPE avg_total_time_per_req_for_reqs_in_last_x_seconds gauge
avg_total_time_per_req_for_reqs_in_last_x_seconds {avg_total_time_per_req_for_reqs_in_last_x_seconds}
Expand Down Expand Up @@ -411,16 +449,26 @@ def on_get(self, req, resp):
}


class Die(object):
def on_get(self, req, resp):
if req.params.get("die", "false").lower()[0] == "t":
resp.status = falcon.HTTP_200
resp.media = {"status": "killed"}
_utils.kill_fd(loop=True, rest=True)


app = falcon.App(
middleware=falcon.CORSMiddleware(allow_origins="*", allow_credentials="*"),
)

infer_api = Infer()
prometheus_metrics = PrometheusMetrics()
health_api = Health()
die_api = Die()

app.add_route("/infer", infer_api)
app.add_route("/sync", infer_api)
app.add_route("/prometheus_metrics", prometheus_metrics)
app.add_route("/health", health_api)
app.add_route("/meta", Meta())
app.add_route("/die", die_api)
37 changes: 33 additions & 4 deletions fastdeploy/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import glob
import json
import time
import psutil
from datetime import datetime
from liteindex import DefinedIndex, KVIndex

Expand Down Expand Up @@ -122,6 +123,34 @@
GLOBAL_METRICS_INDEX["total_predictor_up_for_hours"] = 0


def get_fd_pids():
# get pids of processes with fastdeploy and rest or loop in their full cmdline
pids = {
"rest": [],
"loop": []
}

for proc in psutil.process_iter():
try:
full_cmdline = " ".join(proc.cmdline())
if "fastdeploy" in full_cmdline and "--rest" in full_cmdline:
pids["rest"].append(proc.pid)
elif "fastdeploy" in full_cmdline and "--loop" in full_cmdline:
pids["loop"].append(proc.pid)
except Exception as e:
pass

return pids


def kill_fd(loop=True, rest=True):
pids = get_fd_pids()
if loop and pids["loop"]:
os.system(f"kill -9 {' '.join([str(pid) for pid in pids['loop']])}")
if rest and pids["rest"]:
os.system(f"kill -9 {' '.join([str(pid) for pid in pids['rest']])}")


def warmup(predictor, example_input, n=3):
"""
Run warmup prediction on the model.
Expand Down Expand Up @@ -182,12 +211,12 @@ def check_if_requests_timedout_in_last_x_seconds_is_more_than_y(
):
time_before_x_seconds = time.time() - last_x_seconds
requests_received_in_last_x_seconds = MAIN_INDEX.count(
query={"-1.received_at": {"$gte": time_before_x_seconds}}
query={"-1.predicted_at": {"$gte": time_before_x_seconds}}
)

requests_timedout_in_last_x_seconds = MAIN_INDEX.count(
query={
"-1.received_at": {"$gte": time_before_x_seconds},
"-1.predicted_at": {"$gte": time_before_x_seconds},
"timedout_in_queue": True,
}
)
Expand All @@ -211,15 +240,15 @@ def check_if_percentage_of_requests_failed_in_last_x_seconds_is_more_than_y(
):
time_before_x_seconds = time.time() - last_x_seconds
requests_received_in_last_x_seconds = MAIN_INDEX.count(
query={"-1.received_at": {"$gte": time_before_x_seconds}}
query={"-1.predicted_at": {"$gte": time_before_x_seconds}}
)

if requests_received_in_last_x_seconds == 0:
return False

requests_received_in_last_x_seconds_that_failed = MAIN_INDEX.count(
query={
"-1.received_at": {"$gte": time_before_x_seconds},
"-1.predicted_at": {"$gte": time_before_x_seconds},
"last_predictor_success": False,
}
)
Expand Down

0 comments on commit fc0faa6

Please sign in to comment.