From 26b35216e2b38f7ed2d8f8df3893d9120bc4d60a Mon Sep 17 00:00:00 2001 From: blublinsky Date: Fri, 14 Feb 2025 11:51:58 +0000 Subject: [PATCH 1/7] removing nested-asyncio from Model Registry client Signed-off-by: blublinsky --- .../src/model_registry/_async_task_runner.py | 102 ++++++++++++++++++ clients/python/src/model_registry/_client.py | 52 ++++----- 2 files changed, 124 insertions(+), 30 deletions(-) create mode 100644 clients/python/src/model_registry/_async_task_runner.py diff --git a/clients/python/src/model_registry/_async_task_runner.py b/clients/python/src/model_registry/_async_task_runner.py new file mode 100644 index 000000000..1334ad8af --- /dev/null +++ b/clients/python/src/model_registry/_async_task_runner.py @@ -0,0 +1,102 @@ +# from https://gist.github.com/blink1073/969aeba85f32c285235750626f2eadd8 + +""" +Copyright (c) 2022 Steven Silvester +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" + +import asyncio +from typing import Coroutine, Optional, Any +from threading import Thread, Lock +import atexit + + +class AsyncTaskRunner: + """ + A singleton task runner that runs an asyncio event loop on a background thread. + """ + __instance = None + + @staticmethod + def get_instance(): + """ + Get an AsyncTaskRunner (singleton) + """ + if AsyncTaskRunner.__instance is None: + AsyncTaskRunner() + assert AsyncTaskRunner.__instance is not None + return AsyncTaskRunner.__instance + + def __init__(self): + """ + Initialize + """ + # make sure it is a singleton + if AsyncTaskRunner.__instance is not None: + raise Exception("This class is a singleton!") + else: + AsyncTaskRunner.__instance = self + # initialize variables + self.__io_loop: Optional[asyncio.AbstractEventLoop] = None + self.__runner_thread: Optional[Thread] = None + self.__lock = Lock() + # register exit handler + atexit.register(self._close) + + def _close(self): + """ + Clean up. Stop the loop if running + """ + if self.__io_loop: + self.__io_loop.stop() + + def _runner(self) -> None: + """ + Function to run in a thread + """ + loop = self.__io_loop + assert loop is not None + try: + loop.run_forever() + finally: + loop.close() + + def run(self, coro: Coroutine) -> Any: + """ + Synchronously run a coroutine on a background thread. + """ + with self.__lock: + if self.__io_loop is None: + # If the asyncio loop does not exist + self.__io_loop = asyncio.new_event_loop() + self.__runner_thread = Thread(target=self._runner, daemon=True) + self.__runner_thread.start() + # run coroutine thread safe inside a thread. This return concurrent future + fut = asyncio.run_coroutine_threadsafe(coro, self.__io_loop) + # get concurrent future result + return fut.result() diff --git a/clients/python/src/model_registry/_client.py b/clients/python/src/model_registry/_client.py index a6aa73477..ba9a087e2 100644 --- a/clients/python/src/model_registry/_client.py +++ b/clients/python/src/model_registry/_client.py @@ -6,7 +6,7 @@ import os from collections.abc import Mapping from pathlib import Path -from typing import Any, TypeVar, Union, get_args +from typing import TypeVar, Union, get_args from warnings import warn from .core import ModelRegistryAPIClient @@ -19,6 +19,8 @@ RegisteredModel, SupportedTypes, ) +from ._async_task_runner import AsyncTaskRunner + ModelTypes = Union[RegisteredModel, ModelVersion, ModelArtifact] TModel = TypeVar("TModel", bound=ModelTypes) @@ -74,17 +76,16 @@ def __init__( author: Name of the author. is_secure: Whether to use a secure connection. Defaults to True. user_token: The PEM-encoded user token as a string. - user_token_envvar: Environment variable to read the user token from if it's not passed as an arg. Defaults to KF_PIPELINES_SA_TOKEN_PATH. + user_token_envvar: Environment variable to read the user token from if it's not passed as an arg. + Defaults to KF_PIPELINES_SA_TOKEN_PATH. custom_ca: Path to the PEM-encoded root certificates as a string. custom_ca_envvar: Environment variable to read the custom CA from if it's not passed as an arg. log_level: Log level. Defaults to logging.WARNING. """ logger.setLevel(log_level) - - import nest_asyncio - logger.debug("Setting up reentrant async event loop") - nest_asyncio.apply() + + self.runner = AsyncTaskRunner.get_instance() # TODO: get remaining args from env self._author = author @@ -127,16 +128,6 @@ def __init__( ) self.get_registered_models().page_size(1)._next_page() - def async_runner(self, coro: Any) -> Any: - import asyncio - - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - return loop.run_until_complete(coro) - async def _register_model(self, name: str, **kwargs) -> RegisteredModel: if rm := await self._api.get_registered_model_by_params(name): return rm @@ -210,8 +201,8 @@ def register_model( Returns: Registered model. """ - rm = self.async_runner(self._register_model(name, owner=owner or self._author)) - mv = self.async_runner( + rm = self.runner.run(self._register_model(name, owner=owner or self._author)) + mv = self.runner.run( self._register_new_version( rm, version, @@ -220,7 +211,7 @@ def register_model( custom_properties=metadata or {}, ) ) - self.async_runner( + self.runner.run( self._register_model_artifact( mv, name, @@ -244,10 +235,10 @@ def update(self, model: TModel) -> TModel: msg = f"Model must be one of {get_args(ModelTypes)}" raise StoreError(msg) if isinstance(model, RegisteredModel): - return self.async_runner(self._api.upsert_registered_model(model)) + return self.runner.run(self._api.upsert_registered_model(model)) if isinstance(model, ModelVersion): - return self.async_runner(self._api.upsert_model_version(model, None)) - return self.async_runner(self._api.upsert_model_artifact(model)) + return self.runner.run(self._api.upsert_model_version(model, None)) + return self.runner.run(self._api.upsert_model_artifact(model)) def register_hf_model( self, @@ -289,8 +280,8 @@ def register_hf_model( from huggingface_hub import HfApi, hf_hub_url, utils except ImportError as e: msg = """package `huggingface-hub` is not installed. - To import models from Hugging Face Hub, start by installing the `huggingface-hub` package, either directly or as an - extra (available as `model-registry[hf]`), e.g.: + To import models from Hugging Face Hub, start by installing the `huggingface-hub` package, + either directly or as an extra (available as `model-registry[hf]`), e.g.: ```sh !pip install --pre model-registry[hf] ``` @@ -363,7 +354,7 @@ def get_registered_model(self, name: str) -> RegisteredModel | None: Returns: Registered model. """ - return self.async_runner(self._api.get_registered_model_by_params(name)) + return self.runner.run(self._api.get_registered_model_by_params(name)) def get_model_version(self, name: str, version: str) -> ModelVersion | None: """Get a model version. @@ -382,7 +373,7 @@ def get_model_version(self, name: str, version: str) -> ModelVersion | None: msg = f"Model {name} does not exist" raise StoreError(msg) assert rm.id - return self.async_runner(self._api.get_model_version_by_params(rm.id, version)) + return self.runner.run(self._api.get_model_version_by_params(rm.id, version)) def get_model_artifact(self, name: str, version: str) -> ModelArtifact | None: """Get a model artifact. @@ -401,7 +392,7 @@ def get_model_artifact(self, name: str, version: str) -> ModelArtifact | None: msg = f"Version {version} does not exist" raise StoreError(msg) assert mv.id - return self.async_runner(self._api.get_model_artifact_by_params(name, mv.id)) + return self.runner.run(self._api.get_model_artifact_by_params(name, mv.id)) def get_registered_models(self) -> Pager[RegisteredModel]: """Get a pager for registered models. @@ -411,7 +402,7 @@ def get_registered_models(self) -> Pager[RegisteredModel]: """ def rm_list(options: ListOptions) -> list[RegisteredModel]: - return self.async_runner(self._api.get_registered_models(options)) + return self.runner.run(self._api.get_registered_models(options)) return Pager[RegisteredModel](rm_list) @@ -432,8 +423,9 @@ def get_model_versions(self, name: str) -> Pager[ModelVersion]: raise StoreError(msg) def rm_versions(options: ListOptions) -> list[ModelVersion]: - # type checkers can't restrict the type inside a nested function: https://mypy.readthedocs.io/en/stable/common_issues.html#narrowing-and-inner-functions + # type checkers can't restrict the type inside a nested function: + # https://mypy.readthedocs.io/en/stable/common_issues.html#narrowing-and-inner-functions assert rm.id - return self.async_runner(self._api.get_model_versions(rm.id, options)) + return self.runner.run(self._api.get_model_versions(rm.id, options)) return Pager[ModelVersion](rm_versions) From d1f771741e9d7350891ad0aab41cfdf55728146b Mon Sep 17 00:00:00 2001 From: blublinsky Date: Fri, 14 Feb 2025 13:12:41 +0000 Subject: [PATCH 2/7] fixed formatting Signed-off-by: blublinsky --- .../src/model_registry/_async_task_runner.py | 42 +++++++------------ clients/python/src/model_registry/_client.py | 5 +-- 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/clients/python/src/model_registry/_async_task_runner.py b/clients/python/src/model_registry/_async_task_runner.py index 1334ad8af..b00be5897 100644 --- a/clients/python/src/model_registry/_async_task_runner.py +++ b/clients/python/src/model_registry/_async_task_runner.py @@ -1,7 +1,7 @@ # from https://gist.github.com/blink1073/969aeba85f32c285235750626f2eadd8 -""" -Copyright (c) 2022 Steven Silvester +"""Copyright (c) 2022 Steven Silvester. + All rights reserved. Redistribution and use in source and binary forms, with or without @@ -29,38 +29,34 @@ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ - import asyncio -from typing import Coroutine, Optional, Any -from threading import Thread, Lock import atexit +from collections.abc import Coroutine +from threading import Lock, Thread +from typing import Any, Optional + +SINGLETON = "This class is a singleton!" class AsyncTaskRunner: - """ - A singleton task runner that runs an asyncio event loop on a background thread. - """ + """A singleton task runner that runs an asyncio event loop on a background thread.""" + __instance = None @staticmethod def get_instance(): - """ - Get an AsyncTaskRunner (singleton) - """ + """Get an AsyncTaskRunner (singleton).""" if AsyncTaskRunner.__instance is None: AsyncTaskRunner() assert AsyncTaskRunner.__instance is not None return AsyncTaskRunner.__instance def __init__(self): - """ - Initialize - """ + """Initialize.""" # make sure it is a singleton if AsyncTaskRunner.__instance is not None: - raise Exception("This class is a singleton!") - else: - AsyncTaskRunner.__instance = self + raise Exception(SINGLETON) + AsyncTaskRunner.__instance = self # initialize variables self.__io_loop: Optional[asyncio.AbstractEventLoop] = None self.__runner_thread: Optional[Thread] = None @@ -69,16 +65,12 @@ def __init__(self): atexit.register(self._close) def _close(self): - """ - Clean up. Stop the loop if running - """ + """Clean up. Stop the loop if running.""" if self.__io_loop: self.__io_loop.stop() def _runner(self) -> None: - """ - Function to run in a thread - """ + """Function to run in a thread.""" loop = self.__io_loop assert loop is not None try: @@ -87,9 +79,7 @@ def _runner(self) -> None: loop.close() def run(self, coro: Coroutine) -> Any: - """ - Synchronously run a coroutine on a background thread. - """ + """Synchronously run a coroutine on a background thread.""" with self.__lock: if self.__io_loop is None: # If the asyncio loop does not exist diff --git a/clients/python/src/model_registry/_client.py b/clients/python/src/model_registry/_client.py index ba9a087e2..5a3817446 100644 --- a/clients/python/src/model_registry/_client.py +++ b/clients/python/src/model_registry/_client.py @@ -9,6 +9,7 @@ from typing import TypeVar, Union, get_args from warnings import warn +from ._async_task_runner import AsyncTaskRunner from .core import ModelRegistryAPIClient from .exceptions import StoreError from .types import ( @@ -19,8 +20,6 @@ RegisteredModel, SupportedTypes, ) -from ._async_task_runner import AsyncTaskRunner - ModelTypes = Union[RegisteredModel, ModelVersion, ModelArtifact] TModel = TypeVar("TModel", bound=ModelTypes) @@ -280,7 +279,7 @@ def register_hf_model( from huggingface_hub import HfApi, hf_hub_url, utils except ImportError as e: msg = """package `huggingface-hub` is not installed. - To import models from Hugging Face Hub, start by installing the `huggingface-hub` package, + To import models from Hugging Face Hub, start by installing the `huggingface-hub` package, either directly or as an extra (available as `model-registry[hf]`), e.g.: ```sh !pip install --pre model-registry[hf] From 008dc13bb9d3f0b2ea57e400bc3b3705a3c45aa8 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Sun, 16 Feb 2025 10:15:27 +0000 Subject: [PATCH 3/7] Add support for pluggable AsyncTaskRunner Signed-off-by: blublinsky --- .../_async_task_runner_asyncio.py | 37 +++++++++++++++++++ .../model_registry/_async_task_runner_base.py | 14 +++++++ .../_async_task_runner_factory.py | 13 +++++++ ...runner.py => _async_task_runner_thread.py} | 16 ++++---- clients/python/src/model_registry/_client.py | 4 +- 5 files changed, 75 insertions(+), 9 deletions(-) create mode 100644 clients/python/src/model_registry/_async_task_runner_asyncio.py create mode 100644 clients/python/src/model_registry/_async_task_runner_base.py create mode 100644 clients/python/src/model_registry/_async_task_runner_factory.py rename clients/python/src/model_registry/{_async_task_runner.py => _async_task_runner_thread.py} (89%) diff --git a/clients/python/src/model_registry/_async_task_runner_asyncio.py b/clients/python/src/model_registry/_async_task_runner_asyncio.py new file mode 100644 index 000000000..83a9bf3b3 --- /dev/null +++ b/clients/python/src/model_registry/_async_task_runner_asyncio.py @@ -0,0 +1,37 @@ +import asyncio +from collections.abc import Coroutine +from typing import Any + +from ._async_task_runner_base import AsyncTaskRunnerBase + +SINGLETON = "This class is a singleton!" + + +class AsyncTaskRunnerAcyncio(AsyncTaskRunnerBase): + """A singleton task runner that runs an asyncio event loop on a background thread.""" + + __instance = None + + @staticmethod + def get_instance(): + """Get an AsyncTaskRunner (singleton).""" + if AsyncTaskRunnerAcyncio.__instance is None: + return AsyncTaskRunnerAcyncio() + return AsyncTaskRunnerAcyncio.__instance + + def __init__(self): + """Initialize.""" + if AsyncTaskRunnerAcyncio.__instance is not None: + raise Exception(SINGLETON) + import nest_asyncio + nest_asyncio.apply() + AsyncTaskRunnerAcyncio.__instance = self + + def run(self, coro: Coroutine) -> Any: + """Synchronously run a coroutine on a background thread.""" + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop.run_until_complete(coro) diff --git a/clients/python/src/model_registry/_async_task_runner_base.py b/clients/python/src/model_registry/_async_task_runner_base.py new file mode 100644 index 000000000..f638fbb31 --- /dev/null +++ b/clients/python/src/model_registry/_async_task_runner_base.py @@ -0,0 +1,14 @@ +from collections.abc import Coroutine +from typing import Any + +NOT_IMPLEMENTED = "Must be implemented by subclass" + + +class AsyncTaskRunnerBase: + """A base task runner that runs an asyncio event loop on a background thread. + + A user can add his own representation of this class + """ + def run(self, coro: Coroutine) -> Any: + """Synchronously run a coroutine on a background thread.""" + raise ValueError(NOT_IMPLEMENTED) diff --git a/clients/python/src/model_registry/_async_task_runner_factory.py b/clients/python/src/model_registry/_async_task_runner_factory.py new file mode 100644 index 000000000..93cf8d692 --- /dev/null +++ b/clients/python/src/model_registry/_async_task_runner_factory.py @@ -0,0 +1,13 @@ +from ._async_task_runner_thread import AsyncTaskRunnerThread +from ._async_task_runner_base import AsyncTaskRunnerBase + + +class AsyncTaskRunnerFactory: + """A factory to create an AsyncTaskRunner. + + A user can overwrite it to use his own AsyncTaskRunner implementation + """ + @staticmethod + def get_instance() -> AsyncTaskRunnerBase: + return AsyncTaskRunnerThread.get_instance() + diff --git a/clients/python/src/model_registry/_async_task_runner.py b/clients/python/src/model_registry/_async_task_runner_thread.py similarity index 89% rename from clients/python/src/model_registry/_async_task_runner.py rename to clients/python/src/model_registry/_async_task_runner_thread.py index b00be5897..73afd1f20 100644 --- a/clients/python/src/model_registry/_async_task_runner.py +++ b/clients/python/src/model_registry/_async_task_runner_thread.py @@ -35,10 +35,12 @@ from threading import Lock, Thread from typing import Any, Optional +from ._async_task_runner_base import AsyncTaskRunnerBase + SINGLETON = "This class is a singleton!" -class AsyncTaskRunner: +class AsyncTaskRunnerThread(AsyncTaskRunnerBase): """A singleton task runner that runs an asyncio event loop on a background thread.""" __instance = None @@ -46,17 +48,17 @@ class AsyncTaskRunner: @staticmethod def get_instance(): """Get an AsyncTaskRunner (singleton).""" - if AsyncTaskRunner.__instance is None: - AsyncTaskRunner() - assert AsyncTaskRunner.__instance is not None - return AsyncTaskRunner.__instance + if AsyncTaskRunnerThread.__instance is None: + AsyncTaskRunnerThread() + assert AsyncTaskRunnerThread.__instance is not None + return AsyncTaskRunnerThread.__instance def __init__(self): """Initialize.""" # make sure it is a singleton - if AsyncTaskRunner.__instance is not None: + if AsyncTaskRunnerThread.__instance is not None: raise Exception(SINGLETON) - AsyncTaskRunner.__instance = self + AsyncTaskRunnerThread.__instance = self # initialize variables self.__io_loop: Optional[asyncio.AbstractEventLoop] = None self.__runner_thread: Optional[Thread] = None diff --git a/clients/python/src/model_registry/_client.py b/clients/python/src/model_registry/_client.py index 5a3817446..21fc143c2 100644 --- a/clients/python/src/model_registry/_client.py +++ b/clients/python/src/model_registry/_client.py @@ -9,7 +9,7 @@ from typing import TypeVar, Union, get_args from warnings import warn -from ._async_task_runner import AsyncTaskRunner +from ._async_task_runner_factory import AsyncTaskRunnerFactory from .core import ModelRegistryAPIClient from .exceptions import StoreError from .types import ( @@ -84,7 +84,7 @@ def __init__( logger.setLevel(log_level) logger.debug("Setting up reentrant async event loop") - self.runner = AsyncTaskRunner.get_instance() + self.runner = AsyncTaskRunnerFactory.get_instance() # TODO: get remaining args from env self._author = author From 289bcfdeac3e33c6b9ef474a6243a92f6d6b323c Mon Sep 17 00:00:00 2001 From: blublinsky Date: Sun, 16 Feb 2025 10:44:46 +0000 Subject: [PATCH 4/7] Add support for pluggable AsyncTaskRunner Signed-off-by: blublinsky --- .../_async_task_runner_asyncio.py | 37 ------------------- .../model_registry/_async_task_runner_base.py | 5 +++ .../_async_task_runner_factory.py | 13 ------- clients/python/src/model_registry/_client.py | 8 ++-- 4 files changed, 10 insertions(+), 53 deletions(-) delete mode 100644 clients/python/src/model_registry/_async_task_runner_asyncio.py delete mode 100644 clients/python/src/model_registry/_async_task_runner_factory.py diff --git a/clients/python/src/model_registry/_async_task_runner_asyncio.py b/clients/python/src/model_registry/_async_task_runner_asyncio.py deleted file mode 100644 index 83a9bf3b3..000000000 --- a/clients/python/src/model_registry/_async_task_runner_asyncio.py +++ /dev/null @@ -1,37 +0,0 @@ -import asyncio -from collections.abc import Coroutine -from typing import Any - -from ._async_task_runner_base import AsyncTaskRunnerBase - -SINGLETON = "This class is a singleton!" - - -class AsyncTaskRunnerAcyncio(AsyncTaskRunnerBase): - """A singleton task runner that runs an asyncio event loop on a background thread.""" - - __instance = None - - @staticmethod - def get_instance(): - """Get an AsyncTaskRunner (singleton).""" - if AsyncTaskRunnerAcyncio.__instance is None: - return AsyncTaskRunnerAcyncio() - return AsyncTaskRunnerAcyncio.__instance - - def __init__(self): - """Initialize.""" - if AsyncTaskRunnerAcyncio.__instance is not None: - raise Exception(SINGLETON) - import nest_asyncio - nest_asyncio.apply() - AsyncTaskRunnerAcyncio.__instance = self - - def run(self, coro: Coroutine) -> Any: - """Synchronously run a coroutine on a background thread.""" - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - return loop.run_until_complete(coro) diff --git a/clients/python/src/model_registry/_async_task_runner_base.py b/clients/python/src/model_registry/_async_task_runner_base.py index f638fbb31..7ecbc3363 100644 --- a/clients/python/src/model_registry/_async_task_runner_base.py +++ b/clients/python/src/model_registry/_async_task_runner_base.py @@ -9,6 +9,11 @@ class AsyncTaskRunnerBase: A user can add his own representation of this class """ + @staticmethod + def get_instance(): + """Get an AsyncTaskRunner (singleton).""" + raise ValueError(NOT_IMPLEMENTED) + def run(self, coro: Coroutine) -> Any: """Synchronously run a coroutine on a background thread.""" raise ValueError(NOT_IMPLEMENTED) diff --git a/clients/python/src/model_registry/_async_task_runner_factory.py b/clients/python/src/model_registry/_async_task_runner_factory.py deleted file mode 100644 index 93cf8d692..000000000 --- a/clients/python/src/model_registry/_async_task_runner_factory.py +++ /dev/null @@ -1,13 +0,0 @@ -from ._async_task_runner_thread import AsyncTaskRunnerThread -from ._async_task_runner_base import AsyncTaskRunnerBase - - -class AsyncTaskRunnerFactory: - """A factory to create an AsyncTaskRunner. - - A user can overwrite it to use his own AsyncTaskRunner implementation - """ - @staticmethod - def get_instance() -> AsyncTaskRunnerBase: - return AsyncTaskRunnerThread.get_instance() - diff --git a/clients/python/src/model_registry/_client.py b/clients/python/src/model_registry/_client.py index 21fc143c2..57dea9ee8 100644 --- a/clients/python/src/model_registry/_client.py +++ b/clients/python/src/model_registry/_client.py @@ -9,7 +9,8 @@ from typing import TypeVar, Union, get_args from warnings import warn -from ._async_task_runner_factory import AsyncTaskRunnerFactory +from ._async_task_runner_base import AsyncTaskRunnerBase +from ._async_task_runner_thread import AsyncTaskRunnerThread from .core import ModelRegistryAPIClient from .exceptions import StoreError from .types import ( @@ -64,6 +65,7 @@ def __init__( custom_ca: str | None = None, custom_ca_envvar: str | None = None, log_level: int = logging.WARNING, + async_task_runner: type[AsyncTaskRunnerBase] = AsyncTaskRunnerThread ): """Constructor. @@ -80,11 +82,11 @@ def __init__( custom_ca: Path to the PEM-encoded root certificates as a string. custom_ca_envvar: Environment variable to read the custom CA from if it's not passed as an arg. log_level: Log level. Defaults to logging.WARNING. + async_task_runner: implementation of async task runner. Default - AsyncTaskRunnerThread """ logger.setLevel(log_level) - logger.debug("Setting up reentrant async event loop") - self.runner = AsyncTaskRunnerFactory.get_instance() + self.runner = async_task_runner.get_instance() # TODO: get remaining args from env self._author = author From 65921b6452bb16c6667a7d6fb3ecb95d23fdc405 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Sun, 16 Feb 2025 13:38:01 +0000 Subject: [PATCH 5/7] Making AsyncTaskRunnerBase public Signed-off-by: blublinsky --- clients/python/src/model_registry/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/clients/python/src/model_registry/__init__.py b/clients/python/src/model_registry/__init__.py index b7c4fce21..8d14c092d 100644 --- a/clients/python/src/model_registry/__init__.py +++ b/clients/python/src/model_registry/__init__.py @@ -3,7 +3,9 @@ __version__ = "0.2.14" from ._client import ModelRegistry +from ._async_task_runner_base import AsyncTaskRunnerBase __all__ = [ "ModelRegistry", -] + "AsyncTaskRunnerBase", +] \ No newline at end of file From 0b95b4ca3fb2d9ceaaf47df2be7fea99cdce4c6a Mon Sep 17 00:00:00 2001 From: blublinsky Date: Sun, 16 Feb 2025 15:54:28 +0000 Subject: [PATCH 6/7] Updating documentation Signed-off-by: blublinsky --- clients/python/README.md | 14 ++++++++++++++ .../model_registry/_async_task_runner_thread.py | 2 -- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/clients/python/README.md b/clients/python/README.md index e7df330c3..0a3538e97 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -57,6 +57,20 @@ Or you can set the `is_secure` flag to `False` to connect **without** TLS (not r registry = ModelRegistry("http://server-address", 8080, author="Ada Lovelace", is_secure=False) # insecure port set to 8080 ``` +ModelRegistry client is using asynch execution of HTTP API calls getting results synchronously. To do this, the +client's implementation is leveraging [AsyncTaskRunnerThread](src/model_registry/_async_task_runner_thread.py), +based on this [gist](https://gist.github.com/blink1073/969aeba85f32c285235750626f2eadd8), that works for both +standard [asyncio](https://docs.python.org/3/library/asyncio.html) and [uviloop](https://github.com/MagicStack/uvloop). +If you would like to overwrite it, you can create `ModelRegistry` using the following code: + +```py +registry = ModelRegistry("http://server-address", 8080, author="Ada Lovelace", is_secure=False, async_task_runner=MyAsyncTaskRunner) +``` + +Where MyAsyncTaskRunner is an implementation, that should extend +[AsyncTaskRunnerBase](src/model_registry/_async_task_runner_base.py) implementing both `get_instance` +and `run` method. + ### Registering models To register your first model, you can use the `register_model` method: diff --git a/clients/python/src/model_registry/_async_task_runner_thread.py b/clients/python/src/model_registry/_async_task_runner_thread.py index 73afd1f20..fe1e6e527 100644 --- a/clients/python/src/model_registry/_async_task_runner_thread.py +++ b/clients/python/src/model_registry/_async_task_runner_thread.py @@ -1,5 +1,3 @@ -# from https://gist.github.com/blink1073/969aeba85f32c285235750626f2eadd8 - """Copyright (c) 2022 Steven Silvester. All rights reserved. From 9c5298945a17b794a2c5c9633671a9a4132d17c2 Mon Sep 17 00:00:00 2001 From: blublinsky Date: Mon, 17 Feb 2025 08:40:47 +0000 Subject: [PATCH 7/7] fixed formatting Signed-off-by: blublinsky --- clients/python/src/model_registry/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/python/src/model_registry/__init__.py b/clients/python/src/model_registry/__init__.py index 8d14c092d..f6e12c406 100644 --- a/clients/python/src/model_registry/__init__.py +++ b/clients/python/src/model_registry/__init__.py @@ -2,10 +2,10 @@ __version__ = "0.2.14" -from ._client import ModelRegistry from ._async_task_runner_base import AsyncTaskRunnerBase +from ._client import ModelRegistry __all__ = [ "ModelRegistry", "AsyncTaskRunnerBase", -] \ No newline at end of file +]