From c699010cc1112d0b3372aff5a6d7aa227ea71d5e Mon Sep 17 00:00:00 2001 From: Mathieu Gascon-Lefebvre Date: Tue, 30 Jul 2024 14:23:46 -0400 Subject: [PATCH] Add SaturnDynamicTopology that allows to dynamic topology generation from python code --- .../worker_manager/config/declarative.py | 10 ++++++++ .../config/declarative_dynamic_topology.py | 23 +++++++++++++++++++ .../config/static_definitions.py | 10 ++++++++ .../config/dynamic_definition.py | 17 ++++++++++++++ .../worker_manager/config/test_declarative.py | 14 +++++++++++ 5 files changed, 74 insertions(+) create mode 100644 src/saturn_engine/worker_manager/config/declarative_dynamic_topology.py create mode 100644 tests/worker_manager/config/dynamic_definition.py diff --git a/src/saturn_engine/worker_manager/config/declarative.py b/src/saturn_engine/worker_manager/config/declarative.py index bdf5e2e8..e76b60a2 100644 --- a/src/saturn_engine/worker_manager/config/declarative.py +++ b/src/saturn_engine/worker_manager/config/declarative.py @@ -9,6 +9,7 @@ from saturn_engine.utils.declarative_config import load_uncompiled_objects_from_str from saturn_engine.utils.options import fromdict +from .declarative_dynamic_topology import DynamicTopology from .declarative_executor import Executor from .declarative_inventory import Inventory from .declarative_job import Job @@ -103,6 +104,15 @@ def compile_static_definitions( resources_provider_item ) + for uncompiled_dynamic_topology in objects_by_kind.pop( + "SaturnDynamicTopology", + dict(), + ).values(): + dynamic_topology: DynamicTopology = fromdict( + uncompiled_dynamic_topology.data, DynamicTopology + ) + definitions.update(dynamic_topology.to_static_definitions()) + for object_kind in objects_by_kind.keys(): raise Exception(f"Unsupported kind {object_kind}") diff --git a/src/saturn_engine/worker_manager/config/declarative_dynamic_topology.py b/src/saturn_engine/worker_manager/config/declarative_dynamic_topology.py new file mode 100644 index 00000000..0bce91c3 --- /dev/null +++ b/src/saturn_engine/worker_manager/config/declarative_dynamic_topology.py @@ -0,0 +1,23 @@ +import typing as t + +import dataclasses + +from saturn_engine.utils import inspect as extra_inspect +from saturn_engine.utils.declarative_config import BaseObject +from saturn_engine.worker_manager.config.static_definitions import StaticDefinitions + + +@dataclasses.dataclass +class DynamicTopologySpec: + module: str + + +@dataclasses.dataclass +class DynamicTopology(BaseObject): + spec: DynamicTopologySpec + + def to_static_definitions(self) -> StaticDefinitions: + return t.cast( + t.Callable[[], StaticDefinitions], + extra_inspect.import_name(self.spec.module), + )() diff --git a/src/saturn_engine/worker_manager/config/static_definitions.py b/src/saturn_engine/worker_manager/config/static_definitions.py index d542c92f..ddcea5e1 100644 --- a/src/saturn_engine/worker_manager/config/static_definitions.py +++ b/src/saturn_engine/worker_manager/config/static_definitions.py @@ -26,3 +26,13 @@ class StaticDefinitions: resources_by_type: dict[str, list[t.Union[ResourceItem, ResourcesProviderItem]]] = ( dataclasses.field(default_factory=lambda: defaultdict(list)) ) + + def update(self, other: "StaticDefinitions") -> None: + self.executors.update(other.executors) + self.inventories.update(other.inventories) + self.topics.update(other.topics) + self.job_definitions.update(other.job_definitions) + self.jobs.update(other.jobs) + self.resources_providers.update(other.resources_providers) + self.resources.update(other.resources) + self.resources_by_type.update(other.resources_by_type) diff --git a/tests/worker_manager/config/dynamic_definition.py b/tests/worker_manager/config/dynamic_definition.py new file mode 100644 index 00000000..2676ecdb --- /dev/null +++ b/tests/worker_manager/config/dynamic_definition.py @@ -0,0 +1,17 @@ +from saturn_engine.utils.declarative_config import ObjectMetadata +from saturn_engine.worker_manager.config.declarative_inventory import Inventory +from saturn_engine.worker_manager.config.declarative_inventory import InventorySpec +from saturn_engine.worker_manager.config.static_definitions import StaticDefinitions + + +def build() -> StaticDefinitions: + return StaticDefinitions( + inventories={ + "test-inventory": Inventory( + metadata=ObjectMetadata(name="test-inventory"), + apiVersion="saturn.flared.io/v1alpha1", + kind="SaturnInventory", + spec=InventorySpec(type="testtype"), + ).to_core_object() + } + ) diff --git a/tests/worker_manager/config/test_declarative.py b/tests/worker_manager/config/test_declarative.py index 72c1df69..67df81cc 100644 --- a/tests/worker_manager/config/test_declarative.py +++ b/tests/worker_manager/config/test_declarative.py @@ -592,3 +592,17 @@ def test_resources_provider() -> None: static_definitions = load_definitions_from_str(resources_provider_str) assert len(static_definitions.resources_providers) == 1 assert len(static_definitions.resources_by_type["TestApiKey"]) == 1 + + +def test_dynamic_definition() -> None: + resources_provider_str = """ + apiVersion: saturn.flared.io/v1alpha1 + kind: SaturnDynamicTopology + metadata: + name: test-dynamic-topology + spec: + module: tests.worker_manager.config.dynamic_definition.build + """ + static_definitions = load_definitions_from_str(resources_provider_str) + assert "test-inventory" in static_definitions.inventories + assert static_definitions.inventories["test-inventory"].name == "test-inventory"