From b8b543c5fbb8a53e8a2dc70b31662c5c07ffb747 Mon Sep 17 00:00:00 2001 From: Olivier Michaud Date: Mon, 21 Oct 2024 10:27:50 -0400 Subject: [PATCH] Init DB before init the StandaloneWorkerManagerClient --- src/saturn_engine/models/__init__.py | 8 ++------ src/saturn_engine/worker/services/api_client.py | 16 +++++++++++++++- src/saturn_engine/worker/worker_manager.py | 12 ------------ 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/src/saturn_engine/models/__init__.py b/src/saturn_engine/models/__init__.py index ee4a6ae8..f6d6261d 100644 --- a/src/saturn_engine/models/__init__.py +++ b/src/saturn_engine/models/__init__.py @@ -2,10 +2,6 @@ from .job import Job from .job_cursor_state import JobCursorState from .queue import Queue +from .topology_patches import TopologyPatch -__all__ = [ - "Base", - "Job", - "Queue", - "JobCursorState", -] +__all__ = ["Base", "Job", "Queue", "JobCursorState", "TopologyPatch"] diff --git a/src/saturn_engine/worker/services/api_client.py b/src/saturn_engine/worker/services/api_client.py index 1ed7194a..c4333462 100644 --- a/src/saturn_engine/worker/services/api_client.py +++ b/src/saturn_engine/worker/services/api_client.py @@ -3,6 +3,7 @@ from sqlalchemy.orm import sessionmaker from saturn_engine.client.worker_manager import WorkerManagerClient +from saturn_engine.models.base import Base from saturn_engine.worker.services.databases import Databases from saturn_engine.worker.services.tasks_runner import TasksRunnerService from saturn_engine.worker.worker_manager import StandaloneWorkerManagerClient @@ -48,17 +49,30 @@ class StandaloneApiClient(Service[StandaloneServices, None]): SYNC_DELAY = 60 async def open(self) -> None: + await self.init_db() self.client = StandaloneWorkerManagerClient( config=self.services.config, sessionmaker=sessionmaker(self.services.databases.sync_engine()), ) - await self.client.init_db() await self.client.sync_jobs() self.services.tasks_runner.create_task( self._sync_jobs(), name="StandaloneClient.sync-jobs" ) + async def init_db(self) -> None: + # TODO: Eventually figure out some nice monadic pattern to support both + # sync/async IO in stores. + def _sync_init_db() -> None: + Base.metadata.create_all( + bind=sessionmaker(self.services.databases.sync_engine()).kw["bind"] + ) + + return await asyncio.get_event_loop().run_in_executor( + None, + _sync_init_db, + ) + async def _sync_jobs(self) -> None: while True: await asyncio.sleep(self.SYNC_DELAY) diff --git a/src/saturn_engine/worker/worker_manager.py b/src/saturn_engine/worker/worker_manager.py index 3cd5e3f8..15bc6355 100644 --- a/src/saturn_engine/worker/worker_manager.py +++ b/src/saturn_engine/worker/worker_manager.py @@ -10,7 +10,6 @@ from saturn_engine.core.api import JobsStatesSyncResponse from saturn_engine.core.api import LockInput from saturn_engine.core.api import LockResponse -from saturn_engine.models.base import Base from saturn_engine.stores import jobs_store from saturn_engine.worker_manager.context import WorkerManagerContext from saturn_engine.worker_manager.services.lock import lock_jobs @@ -32,12 +31,6 @@ def __init__( with self.sessionmaker() as session: self.context.load_static_definition(session=session) - async def init_db(self) -> None: - return await asyncio.get_event_loop().run_in_executor( - None, - self._sync_init_db, - ) - async def lock(self) -> LockResponse: return await asyncio.get_event_loop().run_in_executor( None, @@ -66,11 +59,6 @@ async def sync_jobs(self) -> None: self._sync_jobs, ) - # TODO: Eventually figure out some nice monadic pattern to support both - # sync/async IO in stores. - def _sync_init_db(self) -> None: - Base.metadata.create_all(bind=self.sessionmaker.kw["bind"]) - def _sync_lock(self) -> LockResponse: with self.sessionmaker() as session: lock = lock_jobs(