Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supabase async python client stops receiving realtime events #1034

Open
2 tasks done
ritikd2 opened this issue Jan 23, 2025 · 2 comments
Open
2 tasks done

Supabase async python client stops receiving realtime events #1034

ritikd2 opened this issue Jan 23, 2025 · 2 comments
Labels
bug Something isn't working

Comments

@ritikd2
Copy link

ritikd2 commented Jan 23, 2025

Bug report

  • I confirm this is a bug with Supabase, not with my own application.
  • I confirm I have searched the Docs, GitHub Discussions, and Discord.

Describe the bug

Supabase async client stops receiving real-time events after remaining idle for a few hours. As far as I can tell, this issue arises randomly after the last time the client gets realtime events correctly.

To Reproduce

Here's the code I am using:

from supabase import acreate_client
from supabase.client import ClientOptions
import asyncio
from gotrue import AsyncMemoryStorage

clients = {}
client_tasks = {}

# A global lock to prevent multiple tasks from re-creating the same client
CLIENT_CREATION_LOCK = asyncio.Lock()

async def setup_listeners(s_user):
    logger.info("[SL_001] Setting up listeners")
    await s_user.channel('schema-db-changes').on_postgres_changes("*", schema="public", callback=handle_event_wrapper).subscribe()
    logger.info("[SL_002] Listeners setup complete")

async def _create_new_supabase_user():
    """
    Actually creates a brand-new Supabase client, signs in, sets up listeners, etc.
    (Extracted out so we can call it from both get_supabase_user() and the listen loop.)
    """
    SUPABASE_URL = os.getenv('SUPABASE_URL')
    SUPABASE_KEY = os.getenv('SUPABASE_API_KEY')
    
    s_client = await acreate_client(
        SUPABASE_URL,
        SUPABASE_KEY,
        options=ClientOptions(
            postgrest_client_timeout=100000, 
            storage_client_timeout=100000, 
            schema="public", 
            realtime={"hb_interval": 30, "auto_reconnect": True},
            storage=AsyncMemoryStorage()
        )
    )
    
    await s_client.auth.sign_in_with_password({
        "email": os.getenv('SUPABASE_USER_EMAIL'),
        "password": os.getenv('SUPABASE_USER_PASSWORD')
    })
    
    await s_client.realtime.connect()

    # Import here to avoid circular dependency
    await setup_listeners(s_client)
    
    return s_client

async def _realtime_listen_loop(client_name: str, s_user):
    """
    This is the indefinite loop that calls s_user.realtime.listen()
    """
    logger.info(f"[_realtime_listen_loop] Starting for {client_name}")
    try:
        # This call blocks until the connection drops or error occurs
        await s_user.realtime.listen()
    except (
        websockets.exceptions.ConnectionClosedError,
        websockets.exceptions.ConnectionClosedOK,
        asyncio.IncompleteReadError,
        Exception
    ) as e:
        logger.error(f"[_realtime_listen_loop] Connection dropped: {e}")
    finally:
        # Cleanup here (the loop is done)
        logger.info(f"[_realtime_listen_loop] Shutting down for {client_name}")
        try:
            await s_user.auth.sign_out()
        except Exception as e2:
            logger.warning(f"[_realtime_listen_loop] Error on disconnect: {e2}")
        if client_tasks.get(client_name) is not None:
            logger.info(f'[_realtime_listen_loop] Deleting client task from dictionary since there was an error')
            del client_tasks[client_name]

async def get_supabase_user(client_name="supabase_user"):
    """
    Return a valid Supabase client. If none exists or if the current one is
    invalid, create a new one. Also ensure there's a background task
    listening for realtime changes.
    """
    async with CLIENT_CREATION_LOCK:
        # 1. Do we already have a client?
        s_client = clients.get(client_name)
        if s_client:
            # Check if it's still valid (e.g., session not expired)
            try:
                session_res = await s_client.auth.get_session()
                if session_res and session_res.access_token:
                    # If the background loop is missing or finished, restart it
                    loop_task = client_tasks.get(client_name)
                    if not loop_task or loop_task.done():
                        logger.info(f"[get_supabase_user] Re-starting loop for existing {client_name} client.")
                        loop_task = asyncio.create_task(_realtime_listen_loop(client_name, s_client))
                        client_tasks[client_name] = loop_task
                    
                    # Return the existing valid client
                    logger.debug(f"[get_supabase_user] Using existing {client_name} client.")
                    return s_client
                else:
                    logger.warning(f"[get_supabase_user] Session is None or invalid. Will re-create.")
            except Exception as e:
                logger.warning(f"[get_supabase_user] Existing client session invalid: {e}")
            
            # If we got here, the existing client is no good.
            # Cleanup references. We do NOT wait for the loop to cancel, 
            # because it might have died already. 
            if client_name in client_tasks:
                old_task = client_tasks[client_name]
                if not old_task.done():
                    # Cancel it
                    logger.info(f"[get_supabase_user] Cancelling old _realtime_listen_loop for {client_name}.")
                    old_task.cancel()
                    # (Optionally await it, but be mindful about locking/cycles)
                del client_tasks[client_name]

            if client_name in clients:
                logger.info(f"[get_supabase_user] Deleting old client for {client_name} since something failed and starting new.")
                del clients[client_name]

        # 2. If we reach here, we need to create a brand-new client
        logger.info(f"[get_supabase_user] Creating new client for {client_name}.")
        new_client = await _create_new_supabase_user()
        clients[client_name] = new_client

        # Start the background realtime listening in a task
        loop_task = asyncio.create_task(_realtime_listen_loop(client_name, new_client))
        client_tasks[client_name] = loop_task

        return new_client

Expected behavior

Expected: The client always receives realtime events properly.

The extra setup I've added in is to account for JWT expired / Refresh token not found errors that pop up occasionally. I have confirmed that the set up works after the JWT expired errors occur, it receives realtime events after the errors, but after a few hours of remaining idle it stops receiving events. There are no other errors that occur before it stops receiving events. The most recent case where it stopped working, the client was idle (received no new realtime events) for ~7 hours before I tested it again and it failed to receive the events.

To ensure the connection stays alive and client is refreshed, I have a task running which calls the supabase client created above every 15 seconds.

Screenshots

N/A

System information

  • Version of supabase-py: 2.11.0
  • Version of realtime: 2.1.0
@ritikd2
Copy link
Author

ritikd2 commented Jan 23, 2025

More recently it stopped receiving events after 20 minutes of being idle. There were no errors indicating something went wrong as far as I can tell.

@ritikd2
Copy link
Author

ritikd2 commented Jan 23, 2025

also catching this error randomly sometimes:

realtime._async.client - ERROR - Failed to establish WebSocket connection after 5 attempts: 'ClientConnection' object has no attribute 'open' from the realtime library here

This error occurs in the await s_client.realtime.connect() call. I'm not sure why but I'm consistently getting this error when I deploy locally, but not in my production environment. The production environment which is running the same code receives realtime events, but the local deployment which got this error does not receive the realtime events. It is possible that the production environment also eventually receives this error which causes the listeners to not get set up in my code.

UPDATE: This does not seem to be the cause of error on my production instance. It stops receiving realtime events but no errors are caught on the await s_client.realtime.connect() call.

2nd UPDATE: The websocket connection error was because of using websockets-14.2. Downgrading to 13.1 fixed the issue. Please ignore this comment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant