Skip to content

Commit

Permalink
default to json input_type
Browse files Browse the repository at this point in the history
Signed-off-by: Praneeth Bedapudi <[email protected]>
  • Loading branch information
bedapudi6788 committed Nov 29, 2023
1 parent 923f0c8 commit c792452
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 109 deletions.
2 changes: 1 addition & 1 deletion clients/python/fdclient/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .client import FDClient
from .client import FDClient
28 changes: 21 additions & 7 deletions clients/python/fdclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def infer(self, data, unique_id=None, is_async=False):
params={
"unique_id": unique_id,
"async": is_async,
"pickled": True,
"input_type": "pickle",
"compressed": True if zstandard is not None else False,
},
data=self._compressor.compress(data) if zstandard is not None else data,
Expand Down Expand Up @@ -85,11 +85,25 @@ def infer_background_multiple(self, data_list, unique_ids=None):

if __name__ == "__main__":
client = FDClient("http://localhost:8080")
x = client.infer_background(["this", "is", "some", "data"])

print(x.result())
s = time.time()
print("infer", client.infer(["this", "is", "some", "data"]), time.time() - s)

for _ in range(10):
s = time.time()
client.infer(["this", "is", "some", "data"])
print(time.time() - s)
s = time.time()
x = client.infer_background(["this", "is", "some", "data"])
print("infer_background", x.result(), time.time() - s)

s = time.time()

print(
"infer_background_multiple 40",
len(
[
_.result()
for _ in client.infer_background_multiple(
[["this", "is", "some", "data"]] * 40
)
]
),
time.time() - s,
)
2 changes: 1 addition & 1 deletion clients/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
EMAIL = "[email protected]"
AUTHOR = "BEDAPUDI PRANEETH"
REQUIRES_PYTHON = ">=3.6.0"
VERSION = "3.0.0rc1"
VERSION = "3.0.0rc2"

# What packages are required for this module to be executed?
REQUIRED = ["zstandard", "requests"]
Expand Down
246 changes: 151 additions & 95 deletions fastdeploy/_infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,116 +48,172 @@ def _decompressor(self):
self.local_storage.decompressor = zstandard.ZstdDecompressor()
return self.local_storage.decompressor

def infer(
self,
inputs: bytes,
unique_id: str,
is_pickled_input: bool,
is_compressed: bool,
is_async_request: bool,
):
request_received_at = time.time()

if is_pickled_input:
def read_inputs(self, inputs, input_type, is_compressed):
if input_type == "pickle":
inputs = pickle.loads(
inputs if not is_compressed else self._decompressor.decompress(inputs)
)
else:
elif input_type == "msgpack":
inputs = msgpack.unpackb(
inputs if not is_compressed else self._decompressor.decompress(inputs),
use_list=False,
raw=False,
)
elif input_type == "json":
inputs = json.loads(
inputs if not is_compressed else self._decompressor.decompress(inputs)
)
else:
inputs = None

if not isinstance(inputs, (list, tuple)):
response = {
"success": False,
"reason": "inputs have to be a list or tuple",
"unique_id": unique_id,
"prediction": None,
}

if not inputs:
response = {
"success": True,
"reason": "empty request",
"prediction": [],
"unique_id": unique_id,
}
return inputs

else:
_utils.MAIN_INDEX.update(
{
unique_id: {
"-1.outputs": inputs,
"-1.received_at": request_received_at,
"-1.predicted_in_batch_of": len(inputs),
"-1.predicted_at": 0,
"is_async_request": is_async_request,
"last_predictor_sequence": -1,
"last_predictor_success": True,
}
}
)
def create_response(self, response, is_compressed, input_type):
success = response["success"]
if input_type == "pickle":
response = pickle.dumps(response)
elif input_type == "msgpack":
response = msgpack.packb(response, use_bin_type=True)
elif input_type == "json":
response = json.dumps(response)

while True:
current_results = _utils.MAIN_INDEX.get(
unique_id,
select_keys=[
f"{_utils.LAST_PREDICTOR_SEQUENCE}.outputs",
"last_predictor_success",
"last_predictor_sequence",
],
)[unique_id]

if (
current_results["last_predictor_success"] is True
and current_results["last_predictor_sequence"]
== _utils.LAST_PREDICTOR_SEQUENCE
):
response = {
"success": True,
"unique_id": unique_id,
"prediction": current_results[
f"{_utils.LAST_PREDICTOR_SEQUENCE}.outputs"
],
"reason": None,
}
if is_compressed:
response = self._compressor.compress(response)

_utils.MAIN_INDEX.update(
{unique_id: {"-1.predicted_at": time.time()}}
)
break
elif current_results["last_predictor_success"] is False:
response = {
return success, response

def infer(
self,
inputs: bytes,
unique_id: str,
input_type: str,
is_compressed: bool,
is_async_request: bool,
):
try:
request_received_at = time.time()

inputs = self.read_inputs(inputs, input_type, is_compressed)

if inputs is None:
return self.create_response(
{
"success": False,
"reason": f"prediction failed predictor {current_results['last_predictor_sequence']}",
"reason": "inputs have to be pickled, msgpack or json",
"unique_id": unique_id,
"prediction": None,
},
is_compressed,
input_type,
)

if not isinstance(inputs, (list, tuple)):
return self.create_response(
{
"success": False,
"reason": "inputs have to be a list or tuple",
"unique_id": unique_id,
"prediction": None,
},
is_compressed,
input_type,
)

if not inputs:
return self.create_response(
{
"success": True,
"reason": "empty inputs",
"unique_id": unique_id,
"prediction": [],
},
is_compressed,
input_type,
)

else:
_utils.MAIN_INDEX.update(
{
unique_id: {
"-1.outputs": inputs,
"-1.received_at": request_received_at,
"-1.predicted_in_batch_of": len(inputs),
"-1.predicted_at": 0,
"is_async_request": is_async_request,
"last_predictor_sequence": -1,
"last_predictor_success": True,
}
}
break
else:
)

while True:
current_results = _utils.MAIN_INDEX.get(
unique_id,
select_keys=[
f"{_utils.LAST_PREDICTOR_SEQUENCE}.outputs",
"last_predictor_success",
"last_predictor_sequence",
],
)[unique_id]

if (
self.timeout > 0
and self.timeout
and time.time() - request_received_at >= self.timeout
current_results["last_predictor_success"] is True
and current_results["last_predictor_sequence"]
== _utils.LAST_PREDICTOR_SEQUENCE
):
response = {
"success": False,
"reason": "timeout",
"unique_id": unique_id,
"prediction": None,
}
break

time.sleep(self.result_polling_interval)

return response["success"], msgpack.packb(
response, use_bin_type=True
) if not is_compressed else self._compressor.compress(
msgpack.packb(response, use_bin_type=True)
) if not is_pickled_input else pickle.dumps(
response
) if not is_compressed else self._compressor.compress(
pickle.dumps(response)
)
_utils.MAIN_INDEX.update(
{unique_id: {"-1.predicted_at": time.time()}}
)
return self.create_response(
{
"success": True,
"unique_id": unique_id,
"prediction": current_results[
f"{_utils.LAST_PREDICTOR_SEQUENCE}.outputs"
],
"reason": None,
},
is_compressed,
input_type,
)
elif current_results["last_predictor_success"] is False:
return self.create_response(
{
"success": False,
"reason": f"prediction failed predictor {current_results['last_predictor_sequence']}",
"unique_id": unique_id,
"prediction": None,
},
is_compressed,
input_type,
)
else:
if (
self.timeout > 0
and self.timeout
and time.time() - request_received_at >= self.timeout
):
return self.create_response(
{
"success": False,
"reason": "timeout",
"unique_id": unique_id,
"prediction": None,
},
is_compressed,
input_type,
)
break

time.sleep(self.result_polling_interval)
except Exception as ex:
return self.create_response(
{
"success": False,
"reason": str(ex),
"unique_id": unique_id,
"prediction": None,
},
is_compressed,
input_type,
)
19 changes: 15 additions & 4 deletions fastdeploy/_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,30 @@ def on_post(self, req, resp):
unique_id = str(req.params.get("unique_id", uuid.uuid4()))

is_async_request = ONLY_ASYNC or req.params.get("async", "f")[0].lower() == "t"
is_pickled_input = req.params.get("pickled", "f")[0].lower() == "t"
is_compressed = req.params.get("compressed", "f")[0].lower() == "t"
input_type = req.params.get("input_type", "json")

success, response = self._infer.infer(
inputs=req.stream.read(),
unique_id=unique_id,
is_pickled_input=is_pickled_input,
input_type=input_type,
is_compressed=is_compressed,
is_async_request=is_async_request,
)

resp.data = response
resp.content_type = "application/msgpack"
if is_compressed:
resp.data = response
resp.content_type = "application/octet-stream"

elif input_type == "json":
resp.media = response
elif input_type == "pickle":
resp.data = response
resp.content_type = "application/pickle"
elif input_type == "msgpack":
resp.data = response
resp.content_type = "application/msgpack"

resp.status = falcon.HTTP_200 if success else falcon.HTTP_400


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
EMAIL = "[email protected]"
AUTHOR = "BEDAPUDI PRANEETH"
REQUIRES_PYTHON = ">=3.6.0"
VERSION = "3.0.0rc1"
VERSION = "3.0.0rc2"

# What packages are required for this module to be executed?
REQUIRED = ["falcon", "liteindex", "zstandard", "gunicorn[gevent]"]
Expand Down

0 comments on commit c792452

Please sign in to comment.