-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
189 additions
and
24 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
178 changes: 178 additions & 0 deletions
178
backend/onyx/background/celery/tasks/tenant_provisioning/tasks.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
""" | ||
Periodic tasks for tenant pre-provisioning. | ||
""" | ||
import asyncio | ||
import datetime | ||
import uuid | ||
|
||
from celery import shared_task | ||
from celery import Task | ||
from redis.lock import Lock as RedisLock | ||
from sqlalchemy.orm import Session | ||
|
||
from ee.onyx.server.tenants.provisioning import setup_tenant | ||
from ee.onyx.server.tenants.schema_management import create_schema_if_not_exists | ||
from ee.onyx.server.tenants.schema_management import get_current_alembic_version | ||
from onyx.background.celery.apps.app_base import task_logger | ||
from onyx.configs.app_configs import JOB_TIMEOUT | ||
from onyx.configs.app_configs import TARGET_AVAILABLE_TENANTS | ||
from onyx.configs.constants import OnyxCeleryPriority | ||
from onyx.configs.constants import OnyxCeleryQueues | ||
from onyx.configs.constants import OnyxCeleryTask | ||
from onyx.configs.constants import OnyxRedisLocks | ||
from onyx.db.engine import get_sqlalchemy_engine | ||
from onyx.db.models import AvailableTenant | ||
from onyx.redis.redis_pool import get_redis_client | ||
from shared_configs.configs import MULTI_TENANT | ||
from shared_configs.configs import TENANT_ID_PREFIX | ||
|
||
# Default number of pre-provisioned tenants to maintain | ||
DEFAULT_TARGET_AVAILABLE_TENANTS = 5 | ||
|
||
# Soft time limit for tenant pre-provisioning tasks (in seconds) | ||
_TENANT_PROVISIONING_SOFT_TIME_LIMIT = 60 * 5 # 5 minutes | ||
# Hard time limit for tenant pre-provisioning tasks (in seconds) | ||
_TENANT_PROVISIONING_TIME_LIMIT = 60 * 10 # 10 minutes | ||
|
||
|
||
@shared_task( | ||
name=OnyxCeleryTask.CHECK_AVAILABLE_TENANTS, | ||
ignore_result=True, | ||
soft_time_limit=JOB_TIMEOUT, | ||
trail=False, | ||
bind=True, | ||
) | ||
def check_available_tenants(self: Task) -> None: | ||
""" | ||
Check if we have enough pre-provisioned tenants available. | ||
If not, trigger the pre-provisioning of new tenants. | ||
""" | ||
task_logger.info("STARTING CHECK_AVAILABLE_TENANTS") | ||
if not MULTI_TENANT: | ||
task_logger.info( | ||
"Multi-tenancy is not enabled, skipping tenant pre-provisioning" | ||
) | ||
return | ||
|
||
r = get_redis_client() | ||
lock_check: RedisLock = r.lock( | ||
OnyxRedisLocks.CHECK_AVAILABLE_TENANTS_LOCK, | ||
timeout=_TENANT_PROVISIONING_SOFT_TIME_LIMIT, | ||
) | ||
|
||
# These tasks should never overlap | ||
if not lock_check.acquire(blocking=False): | ||
task_logger.info( | ||
"Skipping check_available_tenants task because it is already running" | ||
) | ||
return | ||
|
||
try: | ||
# Get the current count of available tenants | ||
with Session(get_sqlalchemy_engine()) as db_session: | ||
available_tenants_count = db_session.query(AvailableTenant).count() | ||
|
||
# Get the target number of available tenants | ||
target_available_tenants = getattr( | ||
TARGET_AVAILABLE_TENANTS, "value", DEFAULT_TARGET_AVAILABLE_TENANTS | ||
) | ||
|
||
# Calculate how many new tenants we need to provision | ||
tenants_to_provision = max( | ||
0, target_available_tenants - available_tenants_count | ||
) | ||
|
||
task_logger.info( | ||
f"Available tenants: {available_tenants_count}, " | ||
f"Target: {target_available_tenants}, " | ||
f"To provision: {tenants_to_provision}" | ||
) | ||
|
||
# Trigger pre-provisioning tasks for each tenant needed | ||
for _ in range(tenants_to_provision): | ||
pre_provision_tenant.apply_async( | ||
priority=OnyxCeleryPriority.LOW, | ||
) | ||
|
||
except Exception as e: | ||
task_logger.exception(f"Error in check_available_tenants task: {e}") | ||
|
||
finally: | ||
lock_check.release() | ||
|
||
|
||
@shared_task( | ||
name=OnyxCeleryTask.PRE_PROVISION_TENANT, | ||
ignore_result=True, | ||
soft_time_limit=_TENANT_PROVISIONING_SOFT_TIME_LIMIT, | ||
time_limit=_TENANT_PROVISIONING_TIME_LIMIT, | ||
queue=OnyxCeleryQueues.PRIMARY, | ||
bind=True, | ||
) | ||
def pre_provision_tenant(self: Task) -> None: | ||
""" | ||
Pre-provision a new tenant and store it in the NewAvailableTenant table. | ||
This function fully sets up the tenant with all necessary configurations, | ||
so it's ready to be assigned to a user immediately. | ||
""" | ||
task_logger.info("STARTING PRE_PROVISION_TENANT") | ||
if not MULTI_TENANT: | ||
task_logger.info( | ||
"Multi-tenancy is not enabled, skipping tenant pre-provisioning" | ||
) | ||
return | ||
r = get_redis_client() | ||
lock_provision: RedisLock = r.lock( | ||
OnyxRedisLocks.PRE_PROVISION_TENANT_LOCK, | ||
timeout=_TENANT_PROVISIONING_SOFT_TIME_LIMIT, | ||
) | ||
|
||
# Allow multiple pre-provisioning tasks to run, but ensure they don't overlap | ||
if not lock_provision.acquire(blocking=False): | ||
task_logger.info( | ||
"Skipping pre_provision_tenant task because it is already running" | ||
) | ||
return | ||
|
||
try: | ||
# Generate a new tenant ID | ||
tenant_id = TENANT_ID_PREFIX + str(uuid.uuid4()) | ||
task_logger.info(f"Starting pre-provisioning for tenant {tenant_id}") | ||
|
||
# Import here to avoid circular imports | ||
|
||
# Create the schema for the new tenant | ||
schema_created = create_schema_if_not_exists(tenant_id) | ||
if schema_created: | ||
task_logger.info(f"Created schema for tenant '{tenant_id}'") | ||
else: | ||
task_logger.info(f"Schema already exists for tenant '{tenant_id}'") | ||
|
||
# Set up the tenant with all necessary configurations | ||
task_logger.info(f"Setting up tenant configuration for '{tenant_id}'") | ||
asyncio.run(setup_tenant(tenant_id)) | ||
task_logger.info(f"Tenant configuration completed for '{tenant_id}'") | ||
|
||
# Get the current Alembic version | ||
alembic_version = get_current_alembic_version(tenant_id) | ||
task_logger.info( | ||
f"Tenant '{tenant_id}' using Alembic version: {alembic_version}" | ||
) | ||
|
||
# Store the pre-provisioned tenant in the database | ||
task_logger.info(f"Storing pre-provisioned tenant '{tenant_id}' in database") | ||
with Session(get_sqlalchemy_engine()) as db_session: | ||
new_tenant = AvailableTenant( | ||
tenant_id=tenant_id, | ||
alembic_version=alembic_version, | ||
date_created=datetime.datetime.now(), | ||
) | ||
db_session.add(new_tenant) | ||
db_session.commit() | ||
|
||
task_logger.info(f"Successfully pre-provisioned tenant {tenant_id}") | ||
|
||
except Exception as e: | ||
task_logger.exception(f"Error in pre_provision_tenant task: {e}") | ||
finally: | ||
lock_provision.release() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters