diff --git a/fastdeploy/__main__.py b/fastdeploy/__main__.py index 3ecd141..7dedfc0 100644 --- a/fastdeploy/__main__.py +++ b/fastdeploy/__main__.py @@ -193,7 +193,7 @@ def build_docker_image(): ENTRYPOINT ["/bin/sh", "-c"] -CMD ["ulimit -n 1000000 && python3 -m fastdeploy --recipe /recipe --loop & python3 -m fastdeploy --recipe /recipe --rest"] +CMD ["ulimit -n 1000000 && python3 -m fastdeploy --recipe /recipe --rest & python3 -m fastdeploy --recipe /recipe --loop"] """ ) f.flush() diff --git a/fastdeploy/_infer.py b/fastdeploy/_infer.py index cf8a39b..3d25aeb 100644 --- a/fastdeploy/_infer.py +++ b/fastdeploy/_infer.py @@ -30,6 +30,8 @@ time.sleep(1) +_utils.logger.info(f"pids: {_utils.get_fd_pids()}") + class Infer: started_at_time = started_at_time diff --git a/fastdeploy/_loop.py b/fastdeploy/_loop.py index 63caed9..ee59660 100644 --- a/fastdeploy/_loop.py +++ b/fastdeploy/_loop.py @@ -101,7 +101,8 @@ def fetch_batch( query={ "-1.predicted_at": 0, # prediction not yet done "last_predictor_success": True, # last predictor success - "last_predictor_sequence": predictor_sequence - 1, # last predictor sequence + "last_predictor_sequence": predictor_sequence + - 1, # last predictor sequence "timedout_in_queue": {"$ne": True}, # not timedout in queue }, n=optimal_batch_size, diff --git a/fastdeploy/_rest.py b/fastdeploy/_rest.py index 072cddf..892723a 100644 --- a/fastdeploy/_rest.py +++ b/fastdeploy/_rest.py @@ -175,7 +175,7 @@ def on_get(self, req, resp): number_of_requests_timedout_in_last_x_seconds = _utils.MAIN_INDEX.count( query={ - "-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME}, + "-1.predicted_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME}, "timedout_in_queue": True, } ) @@ -184,6 +184,10 @@ def on_get(self, req, resp): query={"-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME}} ) + requests_processed_in_last_x_seconds = _utils.MAIN_INDEX.count( + query={"-1.predicted_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME}} + ) + requests_received_in_last_x_seconds_that_failed = _utils.MAIN_INDEX.count( query={ "-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME}, @@ -191,6 +195,13 @@ def on_get(self, req, resp): } ) + requests_processed_in_last_x_seconds_that_failed = _utils.MAIN_INDEX.count( + query={ + "-1.predicted_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME}, + "last_predictor_success": False, + } + ) + requests_received_in_last_x_seconds_that_are_pending = _utils.MAIN_INDEX.count( query={ "-1.predicted_at": 0, @@ -205,6 +216,17 @@ def on_get(self, req, resp): "-1.predicted_at": {"$ne": 0}, "last_predictor_success": True, "-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME}, + "timedout_in_queue": {"$ne": True}, + } + ) + ) + + requests_processed_in_last_x_seconds_that_are_successful = ( + _utils.MAIN_INDEX.count( + query={ + "-1.predicted_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME}, + "last_predictor_success": True, + "timedout_in_queue": {"$ne": True}, } ) ) @@ -217,6 +239,7 @@ def on_get(self, req, resp): query={ "-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME}, "-1.predicted_at": {"$ne": 0}, + "timedout_in_queue": {"$ne": True}, }, ) @@ -226,6 +249,7 @@ def on_get(self, req, resp): query={ "-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME}, "-1.predicted_at": {"$ne": 0}, + "timedout_in_queue": {"$ne": True}, }, ) @@ -243,6 +267,7 @@ def on_get(self, req, resp): query={ "-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME}, "-1.predicted_at": {"$ne": 0}, + "timedout_in_queue": {"$ne": True}, }, ) @@ -252,6 +277,7 @@ def on_get(self, req, resp): query={ "-1.received_at": {"$gt": LAST_X_SECONDS, "$lt": CURRENT_TIME}, "-1.predicted_at": {"$ne": 0}, + "timedout_in_queue": {"$ne": True}, }, ) @@ -265,6 +291,10 @@ def on_get(self, req, resp): # TYPE requests_received_in_last_x_seconds gauge requests_received_in_last_x_seconds {requests_received_in_last_x_seconds} +# HELP requests_processed_in_last_x_seconds The number of requests processed in last {_LAST_X_SECONDS} seconds. +# TYPE requests_processed_in_last_x_seconds gauge +requests_processed_in_last_x_seconds {requests_processed_in_last_x_seconds} + # HELP number_of_requests_timedout_in_last_x_seconds The number of requests timedout at predictor(s) in last {_LAST_X_SECONDS} seconds. # TYPE number_of_requests_timedout_in_last_x_seconds gauge number_of_requests_timedout_in_last_x_seconds {number_of_requests_timedout_in_last_x_seconds} @@ -273,6 +303,10 @@ def on_get(self, req, resp): # TYPE requests_received_in_last_x_seconds_that_failed gauge requests_received_in_last_x_seconds_that_failed {requests_received_in_last_x_seconds_that_failed} +# HELP requests_processed_in_last_x_seconds_that_failed The number of requests processed in last {_LAST_X_SECONDS} seconds that failed. +# TYPE requests_processed_in_last_x_seconds_that_failed gauge +requests_processed_in_last_x_seconds_that_failed {requests_processed_in_last_x_seconds_that_failed} + # HELP requests_received_in_last_x_seconds_that_are_pending The number of requests received in last {_LAST_X_SECONDS} seconds that are pending. # TYPE requests_received_in_last_x_seconds_that_are_pending gauge requests_received_in_last_x_seconds_that_are_pending {requests_received_in_last_x_seconds_that_are_pending} @@ -281,6 +315,10 @@ def on_get(self, req, resp): # TYPE requests_received_in_last_x_seconds_that_are_successful gauge requests_received_in_last_x_seconds_that_are_successful {requests_received_in_last_x_seconds_that_are_successful} +# HELP requests_processed_in_last_x_seconds_that_are_successful The number of requests processed in last {_LAST_X_SECONDS} seconds that are successful. +# TYPE requests_processed_in_last_x_seconds_that_are_successful gauge +requests_processed_in_last_x_seconds_that_are_successful {requests_processed_in_last_x_seconds_that_are_successful} + # HELP avg_total_time_per_req_for_reqs_in_last_x_seconds The average total time per request for requests in last {_LAST_X_SECONDS} seconds. # TYPE avg_total_time_per_req_for_reqs_in_last_x_seconds gauge avg_total_time_per_req_for_reqs_in_last_x_seconds {avg_total_time_per_req_for_reqs_in_last_x_seconds} @@ -411,6 +449,14 @@ def on_get(self, req, resp): } +class Die(object): + def on_get(self, req, resp): + if req.params.get("die", "false").lower()[0] == "t": + resp.status = falcon.HTTP_200 + resp.media = {"status": "killed"} + _utils.kill_fd(loop=True, rest=True) + + app = falcon.App( middleware=falcon.CORSMiddleware(allow_origins="*", allow_credentials="*"), ) @@ -418,9 +464,11 @@ def on_get(self, req, resp): infer_api = Infer() prometheus_metrics = PrometheusMetrics() health_api = Health() +die_api = Die() app.add_route("/infer", infer_api) app.add_route("/sync", infer_api) app.add_route("/prometheus_metrics", prometheus_metrics) app.add_route("/health", health_api) app.add_route("/meta", Meta()) +app.add_route("/die", die_api) diff --git a/fastdeploy/_utils.py b/fastdeploy/_utils.py index 1e2ed78..924d563 100644 --- a/fastdeploy/_utils.py +++ b/fastdeploy/_utils.py @@ -13,6 +13,7 @@ import glob import json import time +import psutil from datetime import datetime from liteindex import DefinedIndex, KVIndex @@ -122,6 +123,34 @@ GLOBAL_METRICS_INDEX["total_predictor_up_for_hours"] = 0 +def get_fd_pids(): + # get pids of processes with fastdeploy and rest or loop in their full cmdline + pids = { + "rest": [], + "loop": [] + } + + for proc in psutil.process_iter(): + try: + full_cmdline = " ".join(proc.cmdline()) + if "fastdeploy" in full_cmdline and "--rest" in full_cmdline: + pids["rest"].append(proc.pid) + elif "fastdeploy" in full_cmdline and "--loop" in full_cmdline: + pids["loop"].append(proc.pid) + except Exception as e: + pass + + return pids + + +def kill_fd(loop=True, rest=True): + pids = get_fd_pids() + if loop and pids["loop"]: + os.system(f"kill -9 {' '.join([str(pid) for pid in pids['loop']])}") + if rest and pids["rest"]: + os.system(f"kill -9 {' '.join([str(pid) for pid in pids['rest']])}") + + def warmup(predictor, example_input, n=3): """ Run warmup prediction on the model. @@ -182,12 +211,12 @@ def check_if_requests_timedout_in_last_x_seconds_is_more_than_y( ): time_before_x_seconds = time.time() - last_x_seconds requests_received_in_last_x_seconds = MAIN_INDEX.count( - query={"-1.received_at": {"$gte": time_before_x_seconds}} + query={"-1.predicted_at": {"$gte": time_before_x_seconds}} ) requests_timedout_in_last_x_seconds = MAIN_INDEX.count( query={ - "-1.received_at": {"$gte": time_before_x_seconds}, + "-1.predicted_at": {"$gte": time_before_x_seconds}, "timedout_in_queue": True, } ) @@ -211,7 +240,7 @@ def check_if_percentage_of_requests_failed_in_last_x_seconds_is_more_than_y( ): time_before_x_seconds = time.time() - last_x_seconds requests_received_in_last_x_seconds = MAIN_INDEX.count( - query={"-1.received_at": {"$gte": time_before_x_seconds}} + query={"-1.predicted_at": {"$gte": time_before_x_seconds}} ) if requests_received_in_last_x_seconds == 0: @@ -219,7 +248,7 @@ def check_if_percentage_of_requests_failed_in_last_x_seconds_is_more_than_y( requests_received_in_last_x_seconds_that_failed = MAIN_INDEX.count( query={ - "-1.received_at": {"$gte": time_before_x_seconds}, + "-1.predicted_at": {"$gte": time_before_x_seconds}, "last_predictor_success": False, } )