Skip to content

Commit

Permalink
k
Browse files Browse the repository at this point in the history
  • Loading branch information
pablonyx committed Mar 6, 2025
1 parent 1bb496a commit e361e69
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
def upgrade() -> None:
# Create new_available_tenant table
op.create_table(
"new_available_tenant",
"available_tenant",
sa.Column("tenant_id", sa.String(), nullable=False),
sa.Column("alembic_version", sa.String(), nullable=False),
sa.Column("date_created", sa.DateTime(), nullable=False),
Expand All @@ -30,4 +30,4 @@ def upgrade() -> None:

def downgrade() -> None:
# Drop new_available_tenant table
op.drop_table("new_available_tenant")
op.drop_table("available_tenant")
6 changes: 3 additions & 3 deletions backend/ee/onyx/server/tenants/provisioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
from onyx.db.llm import update_default_provider
from onyx.db.llm import upsert_cloud_embedding_provider
from onyx.db.llm import upsert_llm_provider
from onyx.db.models import AvailableTenant
from onyx.db.models import IndexModelStatus
from onyx.db.models import NewAvailableTenant
from onyx.db.models import SearchSettings
from onyx.db.models import UserTenantMapping
from onyx.llm.llm_provider_options import ANTHROPIC_MODEL_NAMES
Expand Down Expand Up @@ -361,8 +361,8 @@ async def get_available_tenant() -> str | None:
with Session(get_sqlalchemy_engine()) as db_session:
# Get the oldest available tenant
available_tenant = (
db_session.query(NewAvailableTenant)
.order_by(NewAvailableTenant.date_created)
db_session.query(AvailableTenant)
.order_by(AvailableTenant.date_created)
.first()
)

Expand Down
2 changes: 1 addition & 1 deletion backend/onyx/background/celery/apps/light.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,6 @@ def on_setup_logging(
"onyx.background.celery.tasks.vespa",
"onyx.background.celery.tasks.connector_deletion",
"onyx.background.celery.tasks.doc_permission_syncing",
"onyx.background.celery.tasks.periodic.tenant_provisioning",
"onyx.background.celery.tasks.tenant_provisioning",
]
)
13 changes: 0 additions & 13 deletions backend/onyx/background/celery/apps/primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,6 @@
celery_app.config_from_object("onyx.background.celery.configs.primary")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]

# Import tasks to ensure they are registered with Celery
import onyx.background.celery.tasks.connector_deletion.tasks # noqa
import onyx.background.celery.tasks.doc_permission_syncing.tasks # noqa
import onyx.background.celery.tasks.external_group_syncing.tasks # noqa
import onyx.background.celery.tasks.indexing.tasks # noqa
import onyx.background.celery.tasks.llm_model_update.tasks # noqa
import onyx.background.celery.tasks.monitoring.tasks # noqa
import onyx.background.celery.tasks.periodic.tasks # noqa
import onyx.background.celery.tasks.periodic.tenant_provisioning # noqa
import onyx.background.celery.tasks.pruning.tasks # noqa
import onyx.background.celery.tasks.shared.tasks # noqa
import onyx.background.celery.tasks.vespa.tasks # noqa


@signals.task_prerun.connect
def on_task_prerun(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
from onyx.db.engine import get_sqlalchemy_engine
from onyx.db.models import NewAvailableTenant
from onyx.db.models import AvailableTenant
from onyx.redis.redis_pool import get_redis_client
from shared_configs.configs import MULTI_TENANT
from shared_configs.configs import TENANT_ID_PREFIX
Expand Down Expand Up @@ -67,7 +67,7 @@ def check_available_tenants(self: Task) -> None:
try:
# Get the current count of available tenants
with Session(get_sqlalchemy_engine()) as db_session:
available_tenants_count = db_session.query(NewAvailableTenant).count()
available_tenants_count = db_session.query(AvailableTenant).count()

# Get the target number of available tenants
target_available_tenants = getattr(
Expand Down Expand Up @@ -162,7 +162,7 @@ def pre_provision_tenant(self: Task) -> None:
# Store the pre-provisioned tenant in the database
task_logger.warning(f"Storing pre-provisioned tenant '{tenant_id}' in database")
with Session(get_sqlalchemy_engine()) as db_session:
new_tenant = NewAvailableTenant(
new_tenant = AvailableTenant(
tenant_id=tenant_id,
alembic_version=alembic_version,
date_created=datetime.datetime.now(),
Expand Down
178 changes: 178 additions & 0 deletions backend/onyx/background/celery/tasks/tenant_provisioning/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
"""
Periodic tasks for tenant pre-provisioning.
"""
import asyncio
import datetime
import uuid

from celery import shared_task
from celery import Task
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session

from ee.onyx.server.tenants.provisioning import setup_tenant
from ee.onyx.server.tenants.schema_management import create_schema_if_not_exists
from ee.onyx.server.tenants.schema_management import get_current_alembic_version
from onyx.background.celery.apps.app_base import task_logger
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.app_configs import TARGET_AVAILABLE_TENANTS
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
from onyx.db.engine import get_sqlalchemy_engine
from onyx.db.models import AvailableTenant
from onyx.redis.redis_pool import get_redis_client
from shared_configs.configs import MULTI_TENANT
from shared_configs.configs import TENANT_ID_PREFIX

# Default number of pre-provisioned tenants to maintain
DEFAULT_TARGET_AVAILABLE_TENANTS = 5

# Soft time limit for tenant pre-provisioning tasks (in seconds)
_TENANT_PROVISIONING_SOFT_TIME_LIMIT = 60 * 5 # 5 minutes
# Hard time limit for tenant pre-provisioning tasks (in seconds)
_TENANT_PROVISIONING_TIME_LIMIT = 60 * 10 # 10 minutes


@shared_task(
name=OnyxCeleryTask.CHECK_AVAILABLE_TENANTS,
ignore_result=True,
soft_time_limit=JOB_TIMEOUT,
trail=False,
bind=True,
)
def check_available_tenants(self: Task) -> None:
"""
Check if we have enough pre-provisioned tenants available.
If not, trigger the pre-provisioning of new tenants.
"""
task_logger.info("STARTING CHECK_AVAILABLE_TENANTS")
if not MULTI_TENANT:
task_logger.info(
"Multi-tenancy is not enabled, skipping tenant pre-provisioning"
)
return

r = get_redis_client()
lock_check: RedisLock = r.lock(
OnyxRedisLocks.CHECK_AVAILABLE_TENANTS_LOCK,
timeout=_TENANT_PROVISIONING_SOFT_TIME_LIMIT,
)

# These tasks should never overlap
if not lock_check.acquire(blocking=False):
task_logger.info(
"Skipping check_available_tenants task because it is already running"
)
return

try:
# Get the current count of available tenants
with Session(get_sqlalchemy_engine()) as db_session:
available_tenants_count = db_session.query(AvailableTenant).count()

# Get the target number of available tenants
target_available_tenants = getattr(
TARGET_AVAILABLE_TENANTS, "value", DEFAULT_TARGET_AVAILABLE_TENANTS
)

# Calculate how many new tenants we need to provision
tenants_to_provision = max(
0, target_available_tenants - available_tenants_count
)

task_logger.info(
f"Available tenants: {available_tenants_count}, "
f"Target: {target_available_tenants}, "
f"To provision: {tenants_to_provision}"
)

# Trigger pre-provisioning tasks for each tenant needed
for _ in range(tenants_to_provision):
pre_provision_tenant.apply_async(
priority=OnyxCeleryPriority.LOW,
)

except Exception as e:
task_logger.exception(f"Error in check_available_tenants task: {e}")

finally:
lock_check.release()


@shared_task(
name=OnyxCeleryTask.PRE_PROVISION_TENANT,
ignore_result=True,
soft_time_limit=_TENANT_PROVISIONING_SOFT_TIME_LIMIT,
time_limit=_TENANT_PROVISIONING_TIME_LIMIT,
queue=OnyxCeleryQueues.PRIMARY,
bind=True,
)
def pre_provision_tenant(self: Task) -> None:
"""
Pre-provision a new tenant and store it in the NewAvailableTenant table.
This function fully sets up the tenant with all necessary configurations,
so it's ready to be assigned to a user immediately.
"""
task_logger.info("STARTING PRE_PROVISION_TENANT")
if not MULTI_TENANT:
task_logger.info(
"Multi-tenancy is not enabled, skipping tenant pre-provisioning"
)
return
r = get_redis_client()
lock_provision: RedisLock = r.lock(
OnyxRedisLocks.PRE_PROVISION_TENANT_LOCK,
timeout=_TENANT_PROVISIONING_SOFT_TIME_LIMIT,
)

# Allow multiple pre-provisioning tasks to run, but ensure they don't overlap
if not lock_provision.acquire(blocking=False):
task_logger.info(
"Skipping pre_provision_tenant task because it is already running"
)
return

try:
# Generate a new tenant ID
tenant_id = TENANT_ID_PREFIX + str(uuid.uuid4())
task_logger.info(f"Starting pre-provisioning for tenant {tenant_id}")

# Import here to avoid circular imports

# Create the schema for the new tenant
schema_created = create_schema_if_not_exists(tenant_id)
if schema_created:
task_logger.info(f"Created schema for tenant '{tenant_id}'")
else:
task_logger.info(f"Schema already exists for tenant '{tenant_id}'")

# Set up the tenant with all necessary configurations
task_logger.info(f"Setting up tenant configuration for '{tenant_id}'")
asyncio.run(setup_tenant(tenant_id))
task_logger.info(f"Tenant configuration completed for '{tenant_id}'")

# Get the current Alembic version
alembic_version = get_current_alembic_version(tenant_id)
task_logger.info(
f"Tenant '{tenant_id}' using Alembic version: {alembic_version}"
)

# Store the pre-provisioned tenant in the database
task_logger.info(f"Storing pre-provisioned tenant '{tenant_id}' in database")
with Session(get_sqlalchemy_engine()) as db_session:
new_tenant = AvailableTenant(
tenant_id=tenant_id,
alembic_version=alembic_version,
date_created=datetime.datetime.now(),
)
db_session.add(new_tenant)
db_session.commit()

task_logger.info(f"Successfully pre-provisioned tenant {tenant_id}")

except Exception as e:
task_logger.exception(f"Error in pre_provision_tenant task: {e}")
finally:
lock_provision.release()
4 changes: 2 additions & 2 deletions backend/onyx/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2310,8 +2310,8 @@ def validate_email(self, key: str, value: str) -> str:
return value.lower() if value else value


class NewAvailableTenant(Base):
__tablename__ = "new_available_tenant"
class AvailableTenant(Base):
__tablename__ = "available_tenant"
"""
These entries will only exist ephemerally and are meant to be picked up by new users on registration.
"""
Expand Down

0 comments on commit e361e69

Please sign in to comment.