Skip to content

Commit

Permalink
Increase max message size in Firehose (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarshalX authored Jun 1, 2023
1 parent 1783a84 commit 63537e4
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions atproto/firehose/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from atproto.xrpc_client.models.common import XrpcError

_BASE_WEBSOCKET_URL = 'https://bsky.social/xrpc'
_MAX_MESSAGE_SIZE_BYTES = 1024 * 1024 * 5 # 5MB

OnMessageCallback = t.Callable[['MessageFrame'], t.Generator[t.Any, None, t.Any]]
AsyncOnMessageCallback = t.Callable[['MessageFrame'], t.Coroutine[t.Any, t.Any, t.Any]]
Expand Down Expand Up @@ -49,9 +50,10 @@ def _handle_firehose_error_or_stop(exception: Exception) -> bool: # noqa: C901
if isinstance(exception, httpx.TimeoutException):
return False
if isinstance(exception, (CBORDecodingError, DAGCBORDecodingError)):
# FIXME(Marshal): Sometimes firehouse client can't decode CBOR frame.
# Until it's not investigated let's make skip for such frames (it rarely happens)
# https://github.com/MarshalX/atproto/issues/53
# Reconnect will be occurred on DAG-CBOR decoding error.

# Decoding error could occur when bytes len more than _MAX_MESSAGE_SIZE_BYTES (5MB for now).
# More info: https://github.com/MarshalX/atproto/issues/53
return False
if isinstance(exception, WebSocketInvalidTypeReceived):
raise FirehoseError from exception
Expand Down Expand Up @@ -80,10 +82,10 @@ def __init__(
self._on_callback_error_callback: t.Optional[OnCallbackErrorCallback] = None

def _get_client(self):
return connect_ws(self._url, params=self._params)
return connect_ws(self._url, params=self._params, max_message_size_bytes=_MAX_MESSAGE_SIZE_BYTES)

def _get_async_client(self):
return aconnect_ws(self._url, params=self._params)
return aconnect_ws(self._url, params=self._params, max_message_size_bytes=_MAX_MESSAGE_SIZE_BYTES)

def _get_reconnection_delay(self) -> int:
base_sec = 2**self._reconnect_no
Expand Down

0 comments on commit 63537e4

Please sign in to comment.