diff --git a/backend/alembic_tenants/versions/3b45e0018bf1_add_new_available_tenant_table.py b/backend/alembic_tenants/versions/3b45e0018bf1_add_new_available_tenant_table.py new file mode 100644 index 00000000000..4109f64b51c --- /dev/null +++ b/backend/alembic_tenants/versions/3b45e0018bf1_add_new_available_tenant_table.py @@ -0,0 +1,33 @@ +"""add new available tenant table + +Revision ID: 3b45e0018bf1 +Revises: 34e3630c7f32 +Create Date: 2025-03-06 09:55:18.229910 + +""" +import sqlalchemy as sa + +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "3b45e0018bf1" +down_revision = "34e3630c7f32" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Create new_available_tenant table + op.create_table( + "available_tenant", + sa.Column("tenant_id", sa.String(), nullable=False), + sa.Column("alembic_version", sa.String(), nullable=False), + sa.Column("date_created", sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint("tenant_id"), + ) + + +def downgrade() -> None: + # Drop new_available_tenant table + op.drop_table("available_tenant") diff --git a/backend/ee/onyx/server/tenants/provisioning.py b/backend/ee/onyx/server/tenants/provisioning.py index aaf007a27e2..953c9463caf 100644 --- a/backend/ee/onyx/server/tenants/provisioning.py +++ b/backend/ee/onyx/server/tenants/provisioning.py @@ -26,11 +26,12 @@ from onyx.configs.app_configs import CONTROL_PLANE_API_BASE_URL from onyx.configs.app_configs import DEV_MODE from onyx.configs.constants import MilestoneRecordType +from onyx.db.engine import get_session_with_shared_schema from onyx.db.engine import get_session_with_tenant -from onyx.db.engine import get_sqlalchemy_engine from onyx.db.llm import update_default_provider from onyx.db.llm import upsert_cloud_embedding_provider from onyx.db.llm import upsert_llm_provider +from onyx.db.models import AvailableTenant from onyx.db.models import IndexModelStatus from onyx.db.models import SearchSettings from onyx.db.models import UserTenantMapping @@ -60,42 +61,72 @@ async def get_or_provision_tenant( This function should only be called after we have verified we want this user's tenant to exist. It returns the tenant ID associated with the email, creating a new tenant if necessary. """ + # Early return for non-multi-tenant mode if not MULTI_TENANT: return POSTGRES_DEFAULT_SCHEMA if referral_source and request: await submit_to_hubspot(email, referral_source, request) + # First, check if the user already has a tenant + tenant_id: str | None = None try: tenant_id = get_tenant_id_for_email(email) + return tenant_id except exceptions.UserNotExists: - # If tenant does not exist and in Multi tenant mode, provision a new tenant - try: + # User doesn't exist, so we need to create a new tenant or assign an existing one + pass + + try: + # Try to get a pre-provisioned tenant + tenant_id = await get_available_tenant() + + if tenant_id: + # If we have a pre-provisioned tenant, assign it to the user + await assign_tenant_to_user(tenant_id, email, referral_source) + logger.info(f"Assigned pre-provisioned tenant {tenant_id} to user {email}") + return tenant_id + else: + # If no pre-provisioned tenant is available, create a new one on-demand tenant_id = await create_tenant(email, referral_source) - except Exception as e: - logger.error(f"Tenant provisioning failed: {e}") - raise HTTPException(status_code=500, detail="Failed to provision tenant.") + return tenant_id - if not tenant_id: + except Exception as e: + # If we've encountered an error, log and raise an exception + error_msg = "Failed to provision tenant" + logger.error(error_msg, exc_info=e) raise HTTPException( - status_code=401, detail="User does not belong to an organization" + status_code=500, + detail="Failed to provision tenant. Please try again later.", ) - return tenant_id - async def create_tenant(email: str, referral_source: str | None = None) -> str: + """ + Create a new tenant on-demand when no pre-provisioned tenants are available. + This is the fallback method when we can't use a pre-provisioned tenant. + + """ tenant_id = TENANT_ID_PREFIX + str(uuid.uuid4()) + logger.info(f"Creating new tenant {tenant_id} for user {email}") + try: # Provision tenant on data plane await provision_tenant(tenant_id, email) - # Notify control plane - if not DEV_MODE: + + # Notify control plane if not already done in provision_tenant + if not DEV_MODE and referral_source: await notify_control_plane(tenant_id, email, referral_source) + except Exception as e: - logger.error(f"Tenant provisioning failed: {e}") - await rollback_tenant_provisioning(tenant_id) + logger.exception(f"Tenant provisioning failed: {str(e)}") + # Attempt to rollback the tenant provisioning + try: + await rollback_tenant_provisioning(tenant_id) + except Exception: + logger.exception(f"Failed to rollback tenant provisioning for {tenant_id}") raise HTTPException(status_code=500, detail="Failed to provision tenant.") + return tenant_id @@ -109,54 +140,25 @@ async def provision_tenant(tenant_id: str, email: str) -> None: ) logger.debug(f"Provisioning tenant {tenant_id} for user {email}") - token = None try: + # Create the schema for the tenant if not create_schema_if_not_exists(tenant_id): logger.debug(f"Created schema for tenant {tenant_id}") else: logger.debug(f"Schema already exists for tenant {tenant_id}") - token = CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id) - - # Await the Alembic migrations - await asyncio.to_thread(run_alembic_migrations, tenant_id) - - with get_session_with_tenant(tenant_id=tenant_id) as db_session: - configure_default_api_keys(db_session) - - current_search_settings = ( - db_session.query(SearchSettings) - .filter_by(status=IndexModelStatus.FUTURE) - .first() - ) - cohere_enabled = ( - current_search_settings is not None - and current_search_settings.provider_type == EmbeddingProvider.COHERE - ) - setup_onyx(db_session, tenant_id, cohere_enabled=cohere_enabled) - - add_users_to_tenant([email], tenant_id) + # Set up the tenant with all necessary configurations + await setup_tenant(tenant_id) - with get_session_with_tenant(tenant_id=tenant_id) as db_session: - create_milestone_and_report( - user=None, - distinct_id=tenant_id, - event_type=MilestoneRecordType.TENANT_CREATED, - properties={ - "email": email, - }, - db_session=db_session, - ) + # Assign the tenant to the user + await assign_tenant_to_user(tenant_id, email) except Exception as e: logger.exception(f"Failed to create tenant {tenant_id}") raise HTTPException( status_code=500, detail=f"Failed to create tenant: {str(e)}" ) - finally: - if token is not None: - CURRENT_TENANT_ID_CONTEXTVAR.reset(token) async def notify_control_plane( @@ -187,20 +189,74 @@ async def notify_control_plane( async def rollback_tenant_provisioning(tenant_id: str) -> None: - # Logic to rollback tenant provisioning on data plane + """ + Logic to rollback tenant provisioning on data plane. + Handles each step independently to ensure maximum cleanup even if some steps fail. + """ logger.info(f"Rolling back tenant provisioning for tenant_id: {tenant_id}") + + # Track if any part of the rollback fails + rollback_errors = [] + + # 1. Try to drop the tenant's schema try: - # Drop the tenant's schema to rollback provisioning drop_schema(tenant_id) + logger.info(f"Successfully dropped schema for tenant {tenant_id}") + except Exception as e: + error_msg = f"Failed to drop schema for tenant {tenant_id}: {str(e)}" + logger.error(error_msg) + rollback_errors.append(error_msg) + + # 2. Try to remove tenant mapping + try: + with get_session_with_shared_schema() as db_session: + db_session.begin() + try: + db_session.query(UserTenantMapping).filter( + UserTenantMapping.tenant_id == tenant_id + ).delete() + db_session.commit() + logger.info( + f"Successfully removed user mappings for tenant {tenant_id}" + ) + except Exception as e: + db_session.rollback() + raise e + except Exception as e: + error_msg = f"Failed to remove user mappings for tenant {tenant_id}: {str(e)}" + logger.error(error_msg) + rollback_errors.append(error_msg) - # Remove tenant mapping - with Session(get_sqlalchemy_engine()) as db_session: - db_session.query(UserTenantMapping).filter( - UserTenantMapping.tenant_id == tenant_id - ).delete() - db_session.commit() + # 3. If this tenant was in the available tenants table, remove it + try: + with get_session_with_shared_schema() as db_session: + db_session.begin() + try: + available_tenant = ( + db_session.query(AvailableTenant) + .filter(AvailableTenant.tenant_id == tenant_id) + .first() + ) + + if available_tenant: + db_session.delete(available_tenant) + db_session.commit() + logger.info( + f"Removed tenant {tenant_id} from available tenants table" + ) + except Exception as e: + db_session.rollback() + raise e except Exception as e: - logger.error(f"Failed to rollback tenant provisioning: {e}") + error_msg = f"Failed to remove tenant {tenant_id} from available tenants table: {str(e)}" + logger.error(error_msg) + rollback_errors.append(error_msg) + + # Log summary of rollback operation + if rollback_errors: + logger.error(f"Tenant rollback completed with {len(rollback_errors)} errors") + else: + logger.info(f"Tenant rollback completed successfully for tenant {tenant_id}") def configure_default_api_keys(db_session: Session) -> None: @@ -353,3 +409,110 @@ async def delete_user_from_control_plane(tenant_id: str, email: str) -> None: raise Exception( f"Failed to delete tenant on control plane: {error_text}" ) + + +async def get_available_tenant() -> str | None: + """ + Get an available pre-provisioned tenant from the NewAvailableTenant table. + Returns the tenant_id if one is available, None otherwise. + Uses row-level locking to prevent race conditions when multiple processes + try to get an available tenant simultaneously. + """ + if not MULTI_TENANT: + return None + + with get_session_with_shared_schema() as db_session: + try: + db_session.begin() + + # Get the oldest available tenant with FOR UPDATE lock to prevent race conditions + available_tenant = ( + db_session.query(AvailableTenant) + .order_by(AvailableTenant.date_created) + .with_for_update(skip_locked=True) # Skip locked rows to avoid blocking + .first() + ) + + if available_tenant: + tenant_id = available_tenant.tenant_id + # Remove the tenant from the available tenants table + db_session.delete(available_tenant) + db_session.commit() + logger.info(f"Using pre-provisioned tenant {tenant_id}") + return tenant_id + else: + db_session.rollback() + return None + except Exception: + logger.exception("Error getting available tenant") + db_session.rollback() + return None + + +async def setup_tenant(tenant_id: str) -> None: + """ + Set up a tenant with all necessary configurations. + This is a centralized function that handles all tenant setup logic. + """ + token = None + try: + token = CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id) + + # Run Alembic migrations + await asyncio.to_thread(run_alembic_migrations, tenant_id) + + # Configure the tenant with default settings + with get_session_with_tenant(tenant_id=tenant_id) as db_session: + # Configure default API keys + configure_default_api_keys(db_session) + + # Set up Onyx with appropriate settings + current_search_settings = ( + db_session.query(SearchSettings) + .filter_by(status=IndexModelStatus.FUTURE) + .first() + ) + cohere_enabled = ( + current_search_settings is not None + and current_search_settings.provider_type == EmbeddingProvider.COHERE + ) + setup_onyx(db_session, tenant_id, cohere_enabled=cohere_enabled) + + except Exception as e: + logger.exception(f"Failed to set up tenant {tenant_id}") + raise e + finally: + if token is not None: + CURRENT_TENANT_ID_CONTEXTVAR.reset(token) + + +async def assign_tenant_to_user( + tenant_id: str, email: str, referral_source: str | None = None +) -> None: + """ + Assign a tenant to a user and perform necessary operations. + Uses transaction handling to ensure atomicity and includes retry logic + for control plane notifications. + """ + # First, add the user to the tenant in a transaction + try: + add_users_to_tenant([email], tenant_id) + + # Create milestone record in the same transaction context as the tenant assignment + with get_session_with_tenant(tenant_id=tenant_id) as db_session: + create_milestone_and_report( + user=None, + distinct_id=tenant_id, + event_type=MilestoneRecordType.TENANT_CREATED, + properties={ + "email": email, + }, + db_session=db_session, + ) + except Exception: + logger.exception(f"Failed to assign tenant {tenant_id} to user {email}") + raise Exception("Failed to assign tenant to user") + + # Notify control plane with retry logic + if not DEV_MODE: + await notify_control_plane(tenant_id, email, referral_source) diff --git a/backend/ee/onyx/server/tenants/schema_management.py b/backend/ee/onyx/server/tenants/schema_management.py index 80712556ea2..1e9952df963 100644 --- a/backend/ee/onyx/server/tenants/schema_management.py +++ b/backend/ee/onyx/server/tenants/schema_management.py @@ -74,3 +74,21 @@ def drop_schema(tenant_id: str) -> None: text("DROP SCHEMA IF EXISTS %(schema_name)s CASCADE"), {"schema_name": tenant_id}, ) + + +def get_current_alembic_version(tenant_id: str) -> str: + """Get the current Alembic version for a tenant.""" + from alembic.runtime.migration import MigrationContext + from sqlalchemy import text + + engine = get_sqlalchemy_engine() + + # Set the search path to the tenant's schema + with engine.connect() as connection: + connection.execute(text(f'SET search_path TO "{tenant_id}"')) + + # Get the current version from the alembic_version table + context = MigrationContext.configure(connection) + current_rev = context.get_current_revision() + + return current_rev or "head" diff --git a/backend/ee/onyx/server/tenants/user_mapping.py b/backend/ee/onyx/server/tenants/user_mapping.py index b5b0fe19682..77758c9fa44 100644 --- a/backend/ee/onyx/server/tenants/user_mapping.py +++ b/backend/ee/onyx/server/tenants/user_mapping.py @@ -38,13 +38,39 @@ def user_owns_a_tenant(email: str) -> bool: def add_users_to_tenant(emails: list[str], tenant_id: str) -> None: + """ + Add users to a tenant with proper transaction handling. + Checks if users already have a tenant mapping to avoid duplicates. + """ with get_session_with_tenant(tenant_id=POSTGRES_DEFAULT_SCHEMA) as db_session: try: + # Start a transaction + db_session.begin() + for email in emails: - db_session.add(UserTenantMapping(email=email, tenant_id=tenant_id)) + # Check if the user already has a mapping to this tenant + existing_mapping = ( + db_session.query(UserTenantMapping) + .filter( + UserTenantMapping.email == email, + UserTenantMapping.tenant_id == tenant_id, + ) + .with_for_update() + .first() + ) + + if not existing_mapping: + # Only add if mapping doesn't exist + db_session.add(UserTenantMapping(email=email, tenant_id=tenant_id)) + + # Commit the transaction + db_session.commit() + logger.info(f"Successfully added users {emails} to tenant {tenant_id}") + except Exception: logger.exception(f"Failed to add users to tenant {tenant_id}") - db_session.commit() + db_session.rollback() + raise def remove_users_from_tenant(emails: list[str], tenant_id: str) -> None: diff --git a/backend/onyx/background/celery/apps/light.py b/backend/onyx/background/celery/apps/light.py index b6b99ca4c7e..7b7f4fc62dc 100644 --- a/backend/onyx/background/celery/apps/light.py +++ b/backend/onyx/background/celery/apps/light.py @@ -111,5 +111,6 @@ def on_setup_logging( "onyx.background.celery.tasks.vespa", "onyx.background.celery.tasks.connector_deletion", "onyx.background.celery.tasks.doc_permission_syncing", + "onyx.background.celery.tasks.tenant_provisioning", ] ) diff --git a/backend/onyx/background/celery/apps/monitoring.py b/backend/onyx/background/celery/apps/monitoring.py index edaa01225d2..cd42f72f176 100644 --- a/backend/onyx/background/celery/apps/monitoring.py +++ b/backend/onyx/background/celery/apps/monitoring.py @@ -92,5 +92,6 @@ def on_setup_logging( celery_app.autodiscover_tasks( [ "onyx.background.celery.tasks.monitoring", + "onyx.background.celery.tasks.tenant_provisioning", ] ) diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index 689e9436be8..60473bb3aad 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -167,6 +167,16 @@ def make_cloud_generator_task(task: dict[str, Any]) -> dict[str, Any]: "expires": BEAT_EXPIRES_DEFAULT, }, }, + { + "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-available-tenants", + "task": OnyxCeleryTask.CHECK_AVAILABLE_TENANTS, + "schedule": timedelta(minutes=10), + "options": { + "queue": OnyxCeleryQueues.MONITORING, + "priority": OnyxCeleryPriority.HIGH, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, ] # tasks that only run self hosted diff --git a/backend/onyx/background/celery/tasks/periodic/__init__.py b/backend/onyx/background/celery/tasks/periodic/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/backend/onyx/background/celery/tasks/tenant_provisioning/tasks.py b/backend/onyx/background/celery/tasks/tenant_provisioning/tasks.py new file mode 100644 index 00000000000..26efd7c786d --- /dev/null +++ b/backend/onyx/background/celery/tasks/tenant_provisioning/tasks.py @@ -0,0 +1,185 @@ +""" +Periodic tasks for tenant pre-provisioning. +""" +import asyncio +import datetime +import uuid + +from celery import shared_task +from celery import Task +from redis.lock import Lock as RedisLock + +from ee.onyx.server.tenants.provisioning import setup_tenant +from ee.onyx.server.tenants.schema_management import create_schema_if_not_exists +from ee.onyx.server.tenants.schema_management import get_current_alembic_version +from onyx.background.celery.apps.app_base import task_logger +from onyx.configs.app_configs import JOB_TIMEOUT +from onyx.configs.app_configs import TARGET_AVAILABLE_TENANTS +from onyx.configs.constants import OnyxCeleryPriority +from onyx.configs.constants import OnyxCeleryQueues +from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisLocks +from onyx.db.engine import get_session_with_shared_schema +from onyx.db.models import AvailableTenant +from onyx.redis.redis_pool import get_redis_client +from shared_configs.configs import MULTI_TENANT +from shared_configs.configs import TENANT_ID_PREFIX + +# Default number of pre-provisioned tenants to maintain +DEFAULT_TARGET_AVAILABLE_TENANTS = 5 + +# Soft time limit for tenant pre-provisioning tasks (in seconds) +_TENANT_PROVISIONING_SOFT_TIME_LIMIT = 60 * 5 # 5 minutes +# Hard time limit for tenant pre-provisioning tasks (in seconds) +_TENANT_PROVISIONING_TIME_LIMIT = 60 * 10 # 10 minutes + + +@shared_task( + name=OnyxCeleryTask.CHECK_AVAILABLE_TENANTS, + queue=OnyxCeleryQueues.MONITORING, + ignore_result=True, + soft_time_limit=JOB_TIMEOUT, + trail=False, + bind=True, +) +def check_available_tenants(self: Task) -> None: + """ + Check if we have enough pre-provisioned tenants available. + If not, trigger the pre-provisioning of new tenants. + """ + task_logger.info("STARTING CHECK_AVAILABLE_TENANTS") + if not MULTI_TENANT: + task_logger.info( + "Multi-tenancy is not enabled, skipping tenant pre-provisioning" + ) + return + + r = get_redis_client() + lock_check: RedisLock = r.lock( + OnyxRedisLocks.CHECK_AVAILABLE_TENANTS_LOCK, + timeout=_TENANT_PROVISIONING_SOFT_TIME_LIMIT, + ) + + # These tasks should never overlap + if not lock_check.acquire(blocking=False): + task_logger.info( + "Skipping check_available_tenants task because it is already running" + ) + return + + try: + # Get the current count of available tenants + with get_session_with_shared_schema() as db_session: + available_tenants_count = db_session.query(AvailableTenant).count() + + # Get the target number of available tenants + target_available_tenants = getattr( + TARGET_AVAILABLE_TENANTS, "value", DEFAULT_TARGET_AVAILABLE_TENANTS + ) + + # Calculate how many new tenants we need to provision + tenants_to_provision = max( + 0, target_available_tenants - available_tenants_count + ) + + task_logger.info( + f"Available tenants: {available_tenants_count}, " + f"Target: {target_available_tenants}, " + f"To provision: {tenants_to_provision}" + ) + + # Trigger pre-provisioning tasks for each tenant needed + for _ in range(tenants_to_provision): + from celery import current_app + + current_app.send_task( + OnyxCeleryTask.PRE_PROVISION_TENANT, + priority=OnyxCeleryPriority.LOW, + ) + + except Exception: + task_logger.exception("Error in check_available_tenants task") + + finally: + lock_check.release() + + +@shared_task( + name=OnyxCeleryTask.PRE_PROVISION_TENANT, + ignore_result=True, + soft_time_limit=_TENANT_PROVISIONING_SOFT_TIME_LIMIT, + time_limit=_TENANT_PROVISIONING_TIME_LIMIT, + queue=OnyxCeleryQueues.MONITORING, + bind=True, +) +def pre_provision_tenant(self: Task) -> None: + """ + Pre-provision a new tenant and store it in the NewAvailableTenant table. + This function fully sets up the tenant with all necessary configurations, + so it's ready to be assigned to a user immediately. + """ + # The MULTI_TENANT check is now done at the caller level (check_available_tenants) + # rather than inside this function + + r = get_redis_client() + lock_provision: RedisLock = r.lock( + OnyxRedisLocks.PRE_PROVISION_TENANT_LOCK, + timeout=_TENANT_PROVISIONING_SOFT_TIME_LIMIT, + ) + + # Allow multiple pre-provisioning tasks to run, but ensure they don't overlap + if not lock_provision.acquire(blocking=False): + task_logger.debug( + "Skipping pre_provision_tenant task because it is already running" + ) + return + + try: + # Generate a new tenant ID + tenant_id = TENANT_ID_PREFIX + str(uuid.uuid4()) + task_logger.info(f"Pre-provisioning tenant: {tenant_id}") + + # Create the schema for the new tenant + schema_created = create_schema_if_not_exists(tenant_id) + if schema_created: + task_logger.debug(f"Created schema for tenant: {tenant_id}") + else: + task_logger.debug(f"Schema already exists for tenant: {tenant_id}") + + # Set up the tenant with all necessary configurations + task_logger.debug(f"Setting up tenant configuration: {tenant_id}") + asyncio.run(setup_tenant(tenant_id)) + task_logger.debug(f"Tenant configuration completed: {tenant_id}") + + # Get the current Alembic version + alembic_version = get_current_alembic_version(tenant_id) + task_logger.debug( + f"Tenant {tenant_id} using Alembic version: {alembic_version}" + ) + + # Store the pre-provisioned tenant in the database + task_logger.debug(f"Storing pre-provisioned tenant in database: {tenant_id}") + with get_session_with_shared_schema() as db_session: + # Use a transaction to ensure atomicity + db_session.begin() + try: + new_tenant = AvailableTenant( + tenant_id=tenant_id, + alembic_version=alembic_version, + date_created=datetime.datetime.now(), + ) + db_session.add(new_tenant) + db_session.commit() + task_logger.info(f"Successfully pre-provisioned tenant: {tenant_id}") + except Exception: + db_session.rollback() + task_logger.error( + f"Failed to store pre-provisioned tenant: {tenant_id}", + exc_info=True, + ) + raise + + except Exception: + task_logger.error("Error in pre_provision_tenant task", exc_info=True) + finally: + lock_provision.release() diff --git a/backend/onyx/configs/app_configs.py b/backend/onyx/configs/app_configs.py index 4de6dfdc1b7..de720430ee6 100644 --- a/backend/onyx/configs/app_configs.py +++ b/backend/onyx/configs/app_configs.py @@ -643,3 +643,6 @@ DEFAULT_IMAGE_ANALYSIS_MAX_SIZE_MB = 20 + +# Number of pre-provisioned tenants to maintain +TARGET_AVAILABLE_TENANTS = int(os.environ.get("TARGET_AVAILABLE_TENANTS", "5")) diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index c47e4d6e52a..1da5af0652c 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -321,6 +321,8 @@ class OnyxRedisLocks: "da_lock:check_connector_external_group_sync_beat" ) MONITOR_BACKGROUND_PROCESSES_LOCK = "da_lock:monitor_background_processes" + CHECK_AVAILABLE_TENANTS_LOCK = "da_lock:check_available_tenants" + PRE_PROVISION_TENANT_LOCK = "da_lock:pre_provision_tenant" CONNECTOR_DOC_PERMISSIONS_SYNC_LOCK_PREFIX = ( "da_lock:connector_doc_permissions_sync" @@ -383,6 +385,7 @@ class OnyxCeleryTask: CLOUD_MONITOR_CELERY_QUEUES = ( f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor_celery_queues" ) + CHECK_AVAILABLE_TENANTS = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check_available_tenants" CHECK_FOR_CONNECTOR_DELETION = "check_for_connector_deletion_task" CHECK_FOR_VESPA_SYNC_TASK = "check_for_vespa_sync_task" @@ -399,6 +402,9 @@ class OnyxCeleryTask: MONITOR_BACKGROUND_PROCESSES = "monitor_background_processes" MONITOR_CELERY_QUEUES = "monitor_celery_queues" + # Tenant pre-provisioning + PRE_PROVISION_TENANT = "pre_provision_tenant" + KOMBU_MESSAGE_CLEANUP_TASK = "kombu_message_cleanup_task" CONNECTOR_PERMISSION_SYNC_GENERATOR_TASK = ( "connector_permission_sync_generator_task" diff --git a/backend/onyx/db/models.py b/backend/onyx/db/models.py index 484d246209a..0fbef749377 100644 --- a/backend/onyx/db/models.py +++ b/backend/onyx/db/models.py @@ -2310,6 +2310,17 @@ def validate_email(self, key: str, value: str) -> str: return value.lower() if value else value +class AvailableTenant(Base): + __tablename__ = "available_tenant" + """ + These entries will only exist ephemerally and are meant to be picked up by new users on registration. + """ + + tenant_id: Mapped[str] = mapped_column(String, primary_key=True, nullable=False) + alembic_version: Mapped[str] = mapped_column(String, nullable=False) + date_created: Mapped[datetime.datetime] = mapped_column(DateTime, nullable=False) + + # This is a mapping from tenant IDs to anonymous user paths class TenantAnonymousUserPath(Base): __tablename__ = "tenant_anonymous_user_path" diff --git a/web/src/lib/connectors/connectors.tsx b/web/src/lib/connectors/connectors.tsx index e26631864a6..dcfc75eeb54 100644 --- a/web/src/lib/connectors/connectors.tsx +++ b/web/src/lib/connectors/connectors.tsx @@ -1259,21 +1259,18 @@ export function createConnectorInitialValues( name: "", groups: [], access_type: "public", - ...configuration.values.reduce( - (acc, field) => { - if (field.type === "select") { - acc[field.name] = null; - } else if (field.type === "list") { - acc[field.name] = field.default || []; - } else if (field.type === "checkbox") { - acc[field.name] = field.default || false; - } else if (field.default !== undefined) { - acc[field.name] = field.default; - } - return acc; - }, - {} as { [record: string]: any } - ), + ...configuration.values.reduce((acc, field) => { + if (field.type === "select") { + acc[field.name] = null; + } else if (field.type === "list") { + acc[field.name] = field.default || []; + } else if (field.type === "checkbox") { + acc[field.name] = field.default || false; + } else if (field.default !== undefined) { + acc[field.name] = field.default; + } + return acc; + }, {} as { [record: string]: any }), }; } @@ -1285,28 +1282,25 @@ export function createConnectorValidationSchema( return Yup.object().shape({ access_type: Yup.string().required("Access Type is required"), name: Yup.string().required("Connector Name is required"), - ...configuration.values.reduce( - (acc, field) => { - let schema: any = - field.type === "select" - ? Yup.string() - : field.type === "list" - ? Yup.array().of(Yup.string()) - : field.type === "checkbox" - ? Yup.boolean() - : field.type === "file" - ? Yup.mixed() - : Yup.string(); - - if (!field.optional) { - schema = schema.required(`${field.label} is required`); - } - - acc[field.name] = schema; - return acc; - }, - {} as Record - ), + ...configuration.values.reduce((acc, field) => { + let schema: any = + field.type === "select" + ? Yup.string() + : field.type === "list" + ? Yup.array().of(Yup.string()) + : field.type === "checkbox" + ? Yup.boolean() + : field.type === "file" + ? Yup.mixed() + : Yup.string(); + + if (!field.optional) { + schema = schema.required(`${field.label} is required`); + } + + acc[field.name] = schema; + return acc; + }, {} as Record), // These are advanced settings indexingStart: Yup.string().nullable(), pruneFreq: Yup.number().min(0, "Prune frequency must be non-negative"),