You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Supabase async client stops receiving real-time events after remaining idle for a few hours. As far as I can tell, this issue arises randomly after the last time the client gets realtime events correctly.
To Reproduce
Here's the code I am using:
from supabase import acreate_client
from supabase.client import ClientOptions
import asyncio
from gotrue import AsyncMemoryStorage
clients = {}
client_tasks = {}
# A global lock to prevent multiple tasks from re-creating the same client
CLIENT_CREATION_LOCK = asyncio.Lock()
async def setup_listeners(s_user):
logger.info("[SL_001] Setting up listeners")
await s_user.channel('schema-db-changes').on_postgres_changes("*", schema="public", callback=handle_event_wrapper).subscribe()
logger.info("[SL_002] Listeners setup complete")
async def _create_new_supabase_user():
"""
Actually creates a brand-new Supabase client, signs in, sets up listeners, etc.
(Extracted out so we can call it from both get_supabase_user() and the listen loop.)
"""
SUPABASE_URL = os.getenv('SUPABASE_URL')
SUPABASE_KEY = os.getenv('SUPABASE_API_KEY')
s_client = await acreate_client(
SUPABASE_URL,
SUPABASE_KEY,
options=ClientOptions(
postgrest_client_timeout=100000,
storage_client_timeout=100000,
schema="public",
realtime={"hb_interval": 30, "auto_reconnect": True},
storage=AsyncMemoryStorage()
)
)
await s_client.auth.sign_in_with_password({
"email": os.getenv('SUPABASE_USER_EMAIL'),
"password": os.getenv('SUPABASE_USER_PASSWORD')
})
await s_client.realtime.connect()
# Import here to avoid circular dependency
await setup_listeners(s_client)
return s_client
async def _realtime_listen_loop(client_name: str, s_user):
"""
This is the indefinite loop that calls s_user.realtime.listen()
"""
logger.info(f"[_realtime_listen_loop] Starting for {client_name}")
try:
# This call blocks until the connection drops or error occurs
await s_user.realtime.listen()
except (
websockets.exceptions.ConnectionClosedError,
websockets.exceptions.ConnectionClosedOK,
asyncio.IncompleteReadError,
Exception
) as e:
logger.error(f"[_realtime_listen_loop] Connection dropped: {e}")
finally:
# Cleanup here (the loop is done)
logger.info(f"[_realtime_listen_loop] Shutting down for {client_name}")
try:
await s_user.auth.sign_out()
except Exception as e2:
logger.warning(f"[_realtime_listen_loop] Error on disconnect: {e2}")
if client_tasks.get(client_name) is not None:
logger.info(f'[_realtime_listen_loop] Deleting client task from dictionary since there was an error')
del client_tasks[client_name]
async def get_supabase_user(client_name="supabase_user"):
"""
Return a valid Supabase client. If none exists or if the current one is
invalid, create a new one. Also ensure there's a background task
listening for realtime changes.
"""
async with CLIENT_CREATION_LOCK:
# 1. Do we already have a client?
s_client = clients.get(client_name)
if s_client:
# Check if it's still valid (e.g., session not expired)
try:
session_res = await s_client.auth.get_session()
if session_res and session_res.access_token:
# If the background loop is missing or finished, restart it
loop_task = client_tasks.get(client_name)
if not loop_task or loop_task.done():
logger.info(f"[get_supabase_user] Re-starting loop for existing {client_name} client.")
loop_task = asyncio.create_task(_realtime_listen_loop(client_name, s_client))
client_tasks[client_name] = loop_task
# Return the existing valid client
logger.debug(f"[get_supabase_user] Using existing {client_name} client.")
return s_client
else:
logger.warning(f"[get_supabase_user] Session is None or invalid. Will re-create.")
except Exception as e:
logger.warning(f"[get_supabase_user] Existing client session invalid: {e}")
# If we got here, the existing client is no good.
# Cleanup references. We do NOT wait for the loop to cancel,
# because it might have died already.
if client_name in client_tasks:
old_task = client_tasks[client_name]
if not old_task.done():
# Cancel it
logger.info(f"[get_supabase_user] Cancelling old _realtime_listen_loop for {client_name}.")
old_task.cancel()
# (Optionally await it, but be mindful about locking/cycles)
del client_tasks[client_name]
if client_name in clients:
logger.info(f"[get_supabase_user] Deleting old client for {client_name} since something failed and starting new.")
del clients[client_name]
# 2. If we reach here, we need to create a brand-new client
logger.info(f"[get_supabase_user] Creating new client for {client_name}.")
new_client = await _create_new_supabase_user()
clients[client_name] = new_client
# Start the background realtime listening in a task
loop_task = asyncio.create_task(_realtime_listen_loop(client_name, new_client))
client_tasks[client_name] = loop_task
return new_client
Expected behavior
Expected: The client always receives realtime events properly.
The extra setup I've added in is to account for JWT expired / Refresh token not found errors that pop up occasionally. I have confirmed that the set up works after the JWT expired errors occur, it receives realtime events after the errors, but after a few hours of remaining idle it stops receiving events. There are no other errors that occur before it stops receiving events. The most recent case where it stopped working, the client was idle (received no new realtime events) for ~7 hours before I tested it again and it failed to receive the events.
To ensure the connection stays alive and client is refreshed, I have a task running which calls the supabase client created above every 15 seconds.
Screenshots
N/A
System information
Version of supabase-py: 2.11.0
Version of realtime: 2.1.0
The text was updated successfully, but these errors were encountered:
realtime._async.client - ERROR - Failed to establish WebSocket connection after 5 attempts: 'ClientConnection' object has no attribute 'open' from the realtime library here
This error occurs in the await s_client.realtime.connect() call. I'm not sure why but I'm consistently getting this error when I deploy locally, but not in my production environment. The production environment which is running the same code receives realtime events, but the local deployment which got this error does not receive the realtime events. It is possible that the production environment also eventually receives this error which causes the listeners to not get set up in my code.
UPDATE: This does not seem to be the cause of error on my production instance. It stops receiving realtime events but no errors are caught on the await s_client.realtime.connect() call.
2nd UPDATE: The websocket connection error was because of using websockets-14.2. Downgrading to 13.1 fixed the issue. Please ignore this comment.
Bug report
Describe the bug
Supabase async client stops receiving real-time events after remaining idle for a few hours. As far as I can tell, this issue arises randomly after the last time the client gets realtime events correctly.
To Reproduce
Here's the code I am using:
Expected behavior
Expected: The client always receives realtime events properly.
The extra setup I've added in is to account for JWT expired / Refresh token not found errors that pop up occasionally. I have confirmed that the set up works after the JWT expired errors occur, it receives realtime events after the errors, but after a few hours of remaining idle it stops receiving events. There are no other errors that occur before it stops receiving events. The most recent case where it stopped working, the client was idle (received no new realtime events) for ~7 hours before I tested it again and it failed to receive the events.
To ensure the connection stays alive and client is refreshed, I have a task running which calls the supabase client created above every 15 seconds.
Screenshots
N/A
System information
The text was updated successfully, but these errors were encountered: