Skip to content

Commit

Permalink
Init DB before init the StandaloneWorkerManagerClient
Browse files Browse the repository at this point in the history
  • Loading branch information
Olivier Michaud authored and infherny committed Oct 21, 2024
1 parent 5775460 commit b8b543c
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 19 deletions.
8 changes: 2 additions & 6 deletions src/saturn_engine/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@
from .job import Job
from .job_cursor_state import JobCursorState
from .queue import Queue
from .topology_patches import TopologyPatch

__all__ = [
"Base",
"Job",
"Queue",
"JobCursorState",
]
__all__ = ["Base", "Job", "Queue", "JobCursorState", "TopologyPatch"]
16 changes: 15 additions & 1 deletion src/saturn_engine/worker/services/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from sqlalchemy.orm import sessionmaker

from saturn_engine.client.worker_manager import WorkerManagerClient
from saturn_engine.models.base import Base
from saturn_engine.worker.services.databases import Databases
from saturn_engine.worker.services.tasks_runner import TasksRunnerService
from saturn_engine.worker.worker_manager import StandaloneWorkerManagerClient
Expand Down Expand Up @@ -48,17 +49,30 @@ class StandaloneApiClient(Service[StandaloneServices, None]):
SYNC_DELAY = 60

async def open(self) -> None:
await self.init_db()
self.client = StandaloneWorkerManagerClient(
config=self.services.config,
sessionmaker=sessionmaker(self.services.databases.sync_engine()),
)

await self.client.init_db()
await self.client.sync_jobs()
self.services.tasks_runner.create_task(
self._sync_jobs(), name="StandaloneClient.sync-jobs"
)

async def init_db(self) -> None:
# TODO: Eventually figure out some nice monadic pattern to support both
# sync/async IO in stores.
def _sync_init_db() -> None:
Base.metadata.create_all(
bind=sessionmaker(self.services.databases.sync_engine()).kw["bind"]
)

return await asyncio.get_event_loop().run_in_executor(
None,
_sync_init_db,
)

async def _sync_jobs(self) -> None:
while True:
await asyncio.sleep(self.SYNC_DELAY)
Expand Down
12 changes: 0 additions & 12 deletions src/saturn_engine/worker/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from saturn_engine.core.api import JobsStatesSyncResponse
from saturn_engine.core.api import LockInput
from saturn_engine.core.api import LockResponse
from saturn_engine.models.base import Base
from saturn_engine.stores import jobs_store
from saturn_engine.worker_manager.context import WorkerManagerContext
from saturn_engine.worker_manager.services.lock import lock_jobs
Expand All @@ -32,12 +31,6 @@ def __init__(
with self.sessionmaker() as session:
self.context.load_static_definition(session=session)

async def init_db(self) -> None:
return await asyncio.get_event_loop().run_in_executor(
None,
self._sync_init_db,
)

async def lock(self) -> LockResponse:
return await asyncio.get_event_loop().run_in_executor(
None,
Expand Down Expand Up @@ -66,11 +59,6 @@ async def sync_jobs(self) -> None:
self._sync_jobs,
)

# TODO: Eventually figure out some nice monadic pattern to support both
# sync/async IO in stores.
def _sync_init_db(self) -> None:
Base.metadata.create_all(bind=self.sessionmaker.kw["bind"])

def _sync_lock(self) -> LockResponse:
with self.sessionmaker() as session:
lock = lock_jobs(
Expand Down

0 comments on commit b8b543c

Please sign in to comment.