Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenthebuilder committed Apr 23, 2024
1 parent 76fcadf commit 9f70eb5
Showing 1 changed file with 15 additions and 13 deletions.
28 changes: 15 additions & 13 deletions replit_river/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def __init__(

# book keeping
self._seq_manager = SeqManager()
self._msg_lock = asyncio.Lock()
self._buffer = MessageBuffer(self._transport_options.buffer_size)
self._task_manager = BackgroundTaskManager()

Expand Down Expand Up @@ -338,11 +339,20 @@ async def send_message(
procedureName=procedure_name,
)
try:
await self._send_transport_message(
msg,
ws,
prefix_bytes=self._transport_options.get_prefix_bytes(),
)
# We need this lock to ensure the buffer order and message sending order
# are the same.
async with self._msg_lock:
try:
await self._buffer.put(msg)
except Exception:
# We should close the session when there are too many messages in buffer
await self.close(True)
return
await self._send_transport_message(
msg,
ws,
prefix_bytes=self._transport_options.get_prefix_bytes(),
)
except ConnectionClosed as e:
logging.error(
f"Connection closed while sending message : {e}, waiting for "
Expand All @@ -352,14 +362,6 @@ async def send_message(
logging.error(
f"Failed sending message : {e}, waiting for retry from buffer"
)
finally:
# We need to put this later to guarantee the ordering of message sent
try:
await self._buffer.put(msg)
except Exception:
# We should close the session when there are too many messages in buffer
await self.close(True)
return

async def _send_responses_from_output_stream(
self,
Expand Down

0 comments on commit 9f70eb5

Please sign in to comment.