From af64005b9cfd964a53aa4b24af9bd56c3b8f05e4 Mon Sep 17 00:00:00 2001 From: nareka Date: Mon, 4 Dec 2023 11:43:41 -0800 Subject: [PATCH 1/9] fix: loadbalance right method --- .../runtimes/gateway/request_handling.py | 36 +++++++++---------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/jina/serve/runtimes/gateway/request_handling.py b/jina/serve/runtimes/gateway/request_handling.py index b5c63af086ef8..772a33b62d34a 100644 --- a/jina/serve/runtimes/gateway/request_handling.py +++ b/jina/serve/runtimes/gateway/request_handling.py @@ -1,6 +1,8 @@ import itertools from typing import TYPE_CHECKING, AsyncIterator, Dict +from aiohttp.client import _RequestContextManager + from jina.enums import ProtocolType from jina.helper import get_full_version from jina.proto import jina_pb2 @@ -157,18 +159,19 @@ async def _load_balance(self, request): try: async with aiohttp.ClientSession() as session: - if request.method == 'GET': - request_kwargs = {} - try: - payload = await request.json() - if payload: - request_kwargs['json'] = payload - except Exception: - self.logger.debug('No JSON payload found in request') - - async with session.get( - url=target_url, **request_kwargs - ) as response: + request_kwargs = {} + try: + payload = await request.json() + if payload: + request_kwargs['json'] = payload + except Exception: + self.logger.debug('No JSON payload found in request') + + async with _RequestContextManager( + session._request(request.method, target_url, **request_kwargs) + ) as response: + if request.content_type.endswith('stream'): + # Create a StreamResponse with the same headers and status as the target response stream_response = web.StreamResponse( status=response.status, @@ -185,14 +188,7 @@ async def _load_balance(self, request): # Close the stream response once all chunks are sent await stream_response.write_eof() return stream_response - - elif request.method == 'POST': - d = await request.read() - import json - - async with session.post( - url=target_url, json=json.loads(d.decode()) - ) as response: + else: content = await response.read() return web.Response( body=content, From 4b8574135f9c2dc8c20315f62d5b0b9aa6cf12c9 Mon Sep 17 00:00:00 2001 From: nareka Date: Mon, 4 Dec 2023 11:59:45 -0800 Subject: [PATCH 2/9] docs: add comment for content type --- jina/serve/runtimes/gateway/request_handling.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/jina/serve/runtimes/gateway/request_handling.py b/jina/serve/runtimes/gateway/request_handling.py index 772a33b62d34a..9d1078692a734 100644 --- a/jina/serve/runtimes/gateway/request_handling.py +++ b/jina/serve/runtimes/gateway/request_handling.py @@ -170,6 +170,7 @@ async def _load_balance(self, request): async with _RequestContextManager( session._request(request.method, target_url, **request_kwargs) ) as response: + # Looking for application/octet-stream, text/event-stream, text/stream if request.content_type.endswith('stream'): # Create a StreamResponse with the same headers and status as the target response @@ -188,13 +189,12 @@ async def _load_balance(self, request): # Close the stream response once all chunks are sent await stream_response.write_eof() return stream_response - else: - content = await response.read() - return web.Response( - body=content, - status=response.status, - content_type=response.content_type, - ) + content = await response.read() + return web.Response( + body=content, + status=response.status, + content_type=response.content_type, + ) except aiohttp.ClientError as e: return web.Response(text=f'Error: {str(e)}', status=500) From 8292fcfaf29e96e6448d87aee0ea7522f90e566f Mon Sep 17 00:00:00 2001 From: nareka Date: Mon, 4 Dec 2023 12:58:53 -0800 Subject: [PATCH 3/9] fix: use response content --- jina/serve/runtimes/gateway/request_handling.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/jina/serve/runtimes/gateway/request_handling.py b/jina/serve/runtimes/gateway/request_handling.py index 9d1078692a734..5f402eea9de33 100644 --- a/jina/serve/runtimes/gateway/request_handling.py +++ b/jina/serve/runtimes/gateway/request_handling.py @@ -159,11 +159,14 @@ async def _load_balance(self, request): try: async with aiohttp.ClientSession() as session: - request_kwargs = {} + request_kwargs = { + 'headers': request.headers, + 'params': request.query, + } try: - payload = await request.json() + payload = await request.content.read() if payload: - request_kwargs['json'] = payload + request_kwargs['data'] = payload except Exception: self.logger.debug('No JSON payload found in request') @@ -171,7 +174,7 @@ async def _load_balance(self, request): session._request(request.method, target_url, **request_kwargs) ) as response: # Looking for application/octet-stream, text/event-stream, text/stream - if request.content_type.endswith('stream'): + if response.content_type.endswith('stream'): # Create a StreamResponse with the same headers and status as the target response stream_response = web.StreamResponse( From 42f444a0da1baf81bbed8b5ad2b5902f4e1f177f Mon Sep 17 00:00:00 2001 From: nareka Date: Tue, 5 Dec 2023 12:30:18 -0800 Subject: [PATCH 4/9] fix: use correct protocol --- tests/integration/docarray_v2/test_issues.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/docarray_v2/test_issues.py b/tests/integration/docarray_v2/test_issues.py index 906a895b8ed3a..c646e5d9dc690 100644 --- a/tests/integration/docarray_v2/test_issues.py +++ b/tests/integration/docarray_v2/test_issues.py @@ -178,7 +178,7 @@ async def test_issue_6090_get_params(streaming_deployment): docs = [] url = ( - f"htto://localhost:{streaming_deployment.port}/stream-simple?text=my_input_text" + f"http://localhost:{streaming_deployment.port}/stream-simple?text=my_input_text" ) async with aiohttp.ClientSession() as session: From 7fbe42f920ca4d05889524a1d2563e5dfba4ef13 Mon Sep 17 00:00:00 2001 From: nareka Date: Tue, 5 Dec 2023 13:24:58 -0800 Subject: [PATCH 5/9] fix: user session.request --- jina/serve/runtimes/gateway/request_handling.py | 5 +++-- tests/integration/docarray_v2/test_issues.py | 4 +--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/jina/serve/runtimes/gateway/request_handling.py b/jina/serve/runtimes/gateway/request_handling.py index 5f402eea9de33..c18fa71ab649b 100644 --- a/jina/serve/runtimes/gateway/request_handling.py +++ b/jina/serve/runtimes/gateway/request_handling.py @@ -170,8 +170,8 @@ async def _load_balance(self, request): except Exception: self.logger.debug('No JSON payload found in request') - async with _RequestContextManager( - session._request(request.method, target_url, **request_kwargs) + async with session.request( + request.method, target_url, **request_kwargs ) as response: # Looking for application/octet-stream, text/event-stream, text/stream if response.content_type.endswith('stream'): @@ -197,6 +197,7 @@ async def _load_balance(self, request): body=content, status=response.status, content_type=response.content_type, + headers=response.headers, ) except aiohttp.ClientError as e: return web.Response(text=f'Error: {str(e)}', status=500) diff --git a/tests/integration/docarray_v2/test_issues.py b/tests/integration/docarray_v2/test_issues.py index c646e5d9dc690..7153300d9dcc3 100644 --- a/tests/integration/docarray_v2/test_issues.py +++ b/tests/integration/docarray_v2/test_issues.py @@ -184,13 +184,11 @@ async def test_issue_6090_get_params(streaming_deployment): async with session.get(url) as resp: async for chunk in resp.content.iter_any(): - print(chunk) events = chunk.split(b'event: ')[1:] for event in events: if event.startswith(b'update'): - parsed = event[HTTPClientlet.UPDATE_EVENT_PREFIX:].decode() + parsed = event[HTTPClientlet.UPDATE_EVENT_PREFIX :].decode() parsed = SimpleInput.parse_raw(parsed) - print(parsed) docs.append(parsed) elif event.startswith(b'end'): pass From 676ca4c8ef124e9296cd810e81425a5109d150c8 Mon Sep 17 00:00:00 2001 From: nareka Date: Tue, 5 Dec 2023 14:38:11 -0800 Subject: [PATCH 6/9] fix: passing json --- jina/serve/runtimes/gateway/request_handling.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/jina/serve/runtimes/gateway/request_handling.py b/jina/serve/runtimes/gateway/request_handling.py index c18fa71ab649b..228c411b170ab 100644 --- a/jina/serve/runtimes/gateway/request_handling.py +++ b/jina/serve/runtimes/gateway/request_handling.py @@ -1,4 +1,5 @@ import itertools +import json from typing import TYPE_CHECKING, AsyncIterator, Dict from aiohttp.client import _RequestContextManager @@ -164,9 +165,9 @@ async def _load_balance(self, request): 'params': request.query, } try: - payload = await request.content.read() + payload = await request.read() if payload: - request_kwargs['data'] = payload + request_kwargs['json'] = json.loads(payload.decode()) except Exception: self.logger.debug('No JSON payload found in request') From ac1febe931b7a359037092db830708cbd9c5b344 Mon Sep 17 00:00:00 2001 From: nareka Date: Wed, 6 Dec 2023 05:10:30 -0800 Subject: [PATCH 7/9] fix: always stream response --- .../runtimes/gateway/request_handling.py | 49 +++++++------------ 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/jina/serve/runtimes/gateway/request_handling.py b/jina/serve/runtimes/gateway/request_handling.py index 228c411b170ab..ffdf50451b91c 100644 --- a/jina/serve/runtimes/gateway/request_handling.py +++ b/jina/serve/runtimes/gateway/request_handling.py @@ -2,8 +2,6 @@ import json from typing import TYPE_CHECKING, AsyncIterator, Dict -from aiohttp.client import _RequestContextManager - from jina.enums import ProtocolType from jina.helper import get_full_version from jina.proto import jina_pb2 @@ -159,11 +157,7 @@ async def _load_balance(self, request): try: async with aiohttp.ClientSession() as session: - - request_kwargs = { - 'headers': request.headers, - 'params': request.query, - } + request_kwargs = {} try: payload = await request.read() if payload: @@ -172,34 +166,27 @@ async def _load_balance(self, request): self.logger.debug('No JSON payload found in request') async with session.request( - request.method, target_url, **request_kwargs + request.method, + url=target_url, + headers=request.headers, + **request_kwargs, ) as response: - # Looking for application/octet-stream, text/event-stream, text/stream - if response.content_type.endswith('stream'): - - # Create a StreamResponse with the same headers and status as the target response - stream_response = web.StreamResponse( - status=response.status, - headers=response.headers, - ) - - # Prepare the response to send headers - await stream_response.prepare(request) - - # Stream the response from the target server to the client - async for chunk in response.content.iter_any(): - await stream_response.write(chunk) - - # Close the stream response once all chunks are sent - await stream_response.write_eof() - return stream_response - content = await response.read() - return web.Response( - body=content, + # Create a StreamResponse with the same headers and status as the target response + stream_response = web.StreamResponse( status=response.status, - content_type=response.content_type, headers=response.headers, ) + + # Prepare the response to send headers + await stream_response.prepare(request) + + # Stream the response from the target server to the client + async for chunk in response.content.iter_any(): + await stream_response.write(chunk) + + # Close the stream response once all chunks are sent + await stream_response.write_eof() + return stream_response except aiohttp.ClientError as e: return web.Response(text=f'Error: {str(e)}', status=500) From c94ea5f054305360fa1dea7afb24565e6e226855 Mon Sep 17 00:00:00 2001 From: nareka Date: Wed, 6 Dec 2023 05:20:30 -0800 Subject: [PATCH 8/9] fix: don't pass request headers --- jina/serve/runtimes/gateway/request_handling.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/jina/serve/runtimes/gateway/request_handling.py b/jina/serve/runtimes/gateway/request_handling.py index ffdf50451b91c..64fc295f91e54 100644 --- a/jina/serve/runtimes/gateway/request_handling.py +++ b/jina/serve/runtimes/gateway/request_handling.py @@ -1,5 +1,4 @@ import itertools -import json from typing import TYPE_CHECKING, AsyncIterator, Dict from jina.enums import ProtocolType @@ -157,19 +156,17 @@ async def _load_balance(self, request): try: async with aiohttp.ClientSession() as session: + request_kwargs = {} try: - payload = await request.read() + payload = await request.json() if payload: - request_kwargs['json'] = json.loads(payload.decode()) + request_kwargs['json'] = payload except Exception: self.logger.debug('No JSON payload found in request') async with session.request( - request.method, - url=target_url, - headers=request.headers, - **request_kwargs, + request.method, url=target_url, **request_kwargs ) as response: # Create a StreamResponse with the same headers and status as the target response stream_response = web.StreamResponse( @@ -187,6 +184,7 @@ async def _load_balance(self, request): # Close the stream response once all chunks are sent await stream_response.write_eof() return stream_response + except aiohttp.ClientError as e: return web.Response(text=f'Error: {str(e)}', status=500) From bc91179e6bc8559b9a87f1e2cae331661a380b03 Mon Sep 17 00:00:00 2001 From: nareka Date: Wed, 6 Dec 2023 10:25:48 -0800 Subject: [PATCH 9/9] fix: disable autodecompress --- jina/serve/runtimes/gateway/request_handling.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/jina/serve/runtimes/gateway/request_handling.py b/jina/serve/runtimes/gateway/request_handling.py index 64fc295f91e54..ad294e5f5ef01 100644 --- a/jina/serve/runtimes/gateway/request_handling.py +++ b/jina/serve/runtimes/gateway/request_handling.py @@ -166,7 +166,10 @@ async def _load_balance(self, request): self.logger.debug('No JSON payload found in request') async with session.request( - request.method, url=target_url, **request_kwargs + request.method, + url=target_url, + auto_decompress=False, + **request_kwargs, ) as response: # Create a StreamResponse with the same headers and status as the target response stream_response = web.StreamResponse(