From ff8258dea5c4e50d9d617a4a43e390125224702c Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Wed, 29 Jan 2025 10:02:05 -0600 Subject: [PATCH 01/10] add dbt profiles utilities --- .../prefect-dbt/prefect_dbt/core/profiles.py | 228 +++++++ .../prefect-dbt/tests/core/test_profiles.py | 564 ++++++++++++++++++ 2 files changed, 792 insertions(+) create mode 100644 src/integrations/prefect-dbt/prefect_dbt/core/profiles.py create mode 100644 src/integrations/prefect-dbt/tests/core/test_profiles.py diff --git a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py new file mode 100644 index 000000000000..90bf932fb718 --- /dev/null +++ b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py @@ -0,0 +1,228 @@ +import contextlib +import os +import shutil +import tempfile +from pathlib import Path +from typing import TYPE_CHECKING, Any, Optional, TypeVar, Union + +import slugify +import yaml + +from prefect.client.utilities import inject_client +from prefect.utilities.annotations import NotSet +from prefect.utilities.asyncutils import run_coro_as_sync +from prefect.utilities.collections import get_from_dict +from prefect.utilities.templating import ( + PlaceholderType, + find_placeholders, + resolve_variables, +) + +if TYPE_CHECKING: + from prefect.client.orchestration import PrefectClient + + +T = TypeVar("T", str, int, float, bool, dict[Any, Any], list[Any], None) + +BLOCK_DOCUMENT_PLACEHOLDER_PREFIX = "prefect.blocks." +VARIABLE_PLACEHOLDER_PREFIX = "prefect.variables." + + +def get_profiles_dir() -> str: + """Get the dbt profiles directory from environment or default location.""" + profiles_dir = os.getenv("DBT_PROFILES_DIR") + if not profiles_dir: + profiles_dir = os.path.expanduser("~/.dbt") + return profiles_dir + + +def load_profiles_yml(profiles_dir: Optional[str]) -> dict[str, Any]: + """ + Load and parse the profiles.yml file. + + Args: + profiles_dir: Path to the directory containing profiles.yml. + If None, uses the default profiles directory. + + Returns: + Dict containing the parsed profiles.yml contents + + Raises: + ValueError: If profiles.yml is not found + """ + if profiles_dir is None: + profiles_dir = get_profiles_dir() + + profiles_path = os.path.join(profiles_dir, "profiles.yml") + if not os.path.exists(profiles_path): + raise ValueError(f"No profiles.yml found at {profiles_path}") + + with open(profiles_path, "r") as f: + return yaml.safe_load(f) + + +@contextlib.asynccontextmanager +async def aresolve_profiles_yml(profiles_dir: Optional[str] = None): + """ + Asynchronous context manager that creates a temporary directory with a resolved profiles.yml file. + + Args: + profiles_dir: Path to the directory containing profiles.yml. + If None, uses the default profiles directory. + + Yields: + str: Path to temporary directory containing the resolved profiles.yml. + Directory and contents are automatically cleaned up after context exit. + + Example: ```python + async with aresolve_profiles_yml() as temp_dir: + # temp_dir contains resolved profiles.yml + # use temp_dir for dbt operations + # temp_dir is automatically cleaned up ``` + """ + temp_dir = Path(tempfile.mkdtemp()) + try: + profiles_yml: dict[str, Any] = load_profiles_yml(profiles_dir) + profiles_yml = await resolve_block_document_references(profiles_yml) + profiles_yml = await resolve_variables(profiles_yml) + + temp_profiles_path = temp_dir / "profiles.yml" + temp_profiles_path.write_text(yaml.dump(profiles_yml, default_flow_style=False)) + + yield str(temp_dir) + finally: + shutil.rmtree(temp_dir) + + +@contextlib.contextmanager +def resolve_profiles_yml(profiles_dir: Optional[str] = None): + """ + Synchronous context manager that creates a temporary directory with a resolved profiles.yml file. + + Args: + profiles_dir: Path to the directory containing profiles.yml. + If None, uses the default profiles directory. + + Yields: + str: Path to temporary directory containing the resolved profiles.yml. + Directory and contents are automatically cleaned up after context exit. + + Example: ```python + with resolve_profiles_yml() as temp_dir: + # temp_dir contains resolved profiles.yml + # use temp_dir for dbt operations + # temp_dir is automatically cleaned up ``` + """ + temp_dir = Path(tempfile.mkdtemp()) + try: + profiles_yml: dict[str, Any] = load_profiles_yml(profiles_dir) + profiles_yml = run_coro_as_sync(resolve_block_document_references(profiles_yml)) + profiles_yml = run_coro_as_sync(resolve_variables(profiles_yml)) + + temp_profiles_path = temp_dir / "profiles.yml" + temp_profiles_path.write_text( + yaml.dump(profiles_yml, default_style=None, default_flow_style=False) + ) + + yield str(temp_dir) + finally: + shutil.rmtree(temp_dir) + + +@inject_client +async def resolve_block_document_references( + template: T, client: Optional["PrefectClient"] = None +) -> Union[T, dict[str, Any]]: + """ + Resolve block document references in a template by replacing each reference with + template text that calls dbt's env_var function, like + {{ env_var('PREFECT_BLOCK_SECRET_MYSECRET') }}. Creates environment variables + for each block document reference, with a name in the format + PREFECT_BLOCKS_BLOCK_TYPE_SLUG_BLOCK_DOCUMENT_NAME and optionally BLOCK_DOCUMENT_KEYPATH. + + Recursively searches for block document references in dictionaries and lists. + + Args: + template: The template to resolve block documents in + + Returns: + The template with block documents resolved + """ + if TYPE_CHECKING: + # The @inject_client decorator takes care of providing the client, but + # the function signature must mark it as optional to callers. + assert client is not None + + if isinstance(template, dict): + block_document_id = template.get("$ref", {}).get("block_document_id") + if block_document_id: + block_document = await client.read_block_document(block_document_id) + return block_document.data + updated_template: dict[str, Any] = {} + for key, value in template.items(): + updated_value = await resolve_block_document_references( + value, client=client + ) + updated_template[key] = updated_value + return updated_template + elif isinstance(template, list): + return [ + await resolve_block_document_references(item, client=client) + for item in template + ] + elif isinstance(template, str): + placeholders = find_placeholders(template) + has_block_document_placeholder = any( + placeholder.type is PlaceholderType.BLOCK_DOCUMENT + for placeholder in placeholders + ) + if not (placeholders and has_block_document_placeholder): + return template + elif ( + len(placeholders) == 1 + and list(placeholders)[0].full_match == template + and list(placeholders)[0].type is PlaceholderType.BLOCK_DOCUMENT + ): + # value_keypath will be a list containing a dot path if additional + # attributes are accessed and an empty list otherwise. + [placeholder] = placeholders + parts = placeholder.name.replace( + BLOCK_DOCUMENT_PLACEHOLDER_PREFIX, "" + ).split(".", 2) + block_type_slug, block_document_name, *value_keypath = parts + block_document = await client.read_block_document_by_name( + name=block_document_name, block_type_slug=block_type_slug + ) + data = block_document.data + value: Union[T, dict[str, Any]] = data + + # resolving system blocks to their data for backwards compatibility + if len(data) == 1 and "value" in data: + # only resolve the value if the keypath is not already pointing to "value" + if not (value_keypath and value_keypath[0].startswith("value")): + data = value = value["value"] + + # resolving keypath/block attributes + if value_keypath: + from_dict: Any = get_from_dict(data, value_keypath[0], default=NotSet) + if from_dict is NotSet: + raise ValueError( + f"Invalid template: {template!r}. Could not resolve the" + " keypath in the block document data." + ) + value = from_dict + + env_var_name = slugify.slugify(placeholder[0], separator="_").upper() + + os.environ[env_var_name] = str(value) + + template_text = f"{{{{ env_var('{env_var_name}') }}}}" + + return template_text + else: + raise ValueError( + f"Invalid template: {template!r}. Only a single block placeholder is" + " allowed in a string and no surrounding text is allowed." + ) + + return template diff --git a/src/integrations/prefect-dbt/tests/core/test_profiles.py b/src/integrations/prefect-dbt/tests/core/test_profiles.py new file mode 100644 index 000000000000..41a076c3b5d5 --- /dev/null +++ b/src/integrations/prefect-dbt/tests/core/test_profiles.py @@ -0,0 +1,564 @@ +import os +import warnings +from datetime import datetime +from pathlib import Path + +import pytest +import yaml +from prefect_dbt.core.profiles import ( + aresolve_profiles_yml, + get_profiles_dir, + load_profiles_yml, + resolve_block_document_references, + resolve_profiles_yml, +) + +from prefect._internal.compatibility.deprecated import PrefectDeprecationWarning +from prefect.blocks.core import Block +from prefect.blocks.system import JSON, DateTime, Secret, String +from prefect.blocks.webhook import Webhook +from prefect.client.orchestration import get_client + + +def should_reraise_warning(warning): + """ + Determine if a deprecation warning should be reraised based on the date. + + Deprecation warnings that have passed the date threshold should be reraised to + ensure the deprecated code paths are removed. + """ + message = str(warning.message) + try: + # Extract the date from the new message format + date_str = message.split("not be available in new releases after ")[1].strip( + "." + ) + # Parse the date + deprecation_date = datetime.strptime(date_str, "%b %Y").date().replace(day=1) + + # Check if the current date is after the start of the month following the deprecation date + current_date = datetime.now().date().replace(day=1) + return current_date > deprecation_date + except Exception: + # Reraise in cases of failure + return True + + +@pytest.fixture +def ignore_prefect_deprecation_warnings(): + """ + Ignore deprecation warnings from the agent module to avoid + test failures. + """ + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("ignore", category=PrefectDeprecationWarning) + yield + for warning in w: + if isinstance(warning.message, PrefectDeprecationWarning): + if should_reraise_warning(warning): + warnings.warn(warning.message, warning.category, stacklevel=2) + + +SAMPLE_PROFILE = { + "jaffle_shop": { + "outputs": { + "dev": { + "type": "duckdb", + "path": "jaffle_shop.duckdb", + "schema": "main", + "threads": 4, + } + } + } +} + +VARIABLES_PROFILE = { + "jaffle_shop_with_variable_reference": { + "outputs": { + "dev": { + "type": "duckdb", + "path": "jaffle_shop.duckdb", + "schema": "main", + "threads": 4, + } + }, + "target": "{{ prefect.variables.target }}", + } +} + +BLOCKS_PROFILE = { + "jaffle_shop_with_variable_reference": { + "outputs": { + "dev": { + "type": "duckdb", + "path": "jaffle_shop.duckdb", + "schema": "main", + "threads": 4, + "password": "{{ prefect.blocks.secret.my-password }}", + } + }, + "target": "dev", + } +} + + +@pytest.fixture +def temp_profiles_dir(tmp_path): + profiles_dir = tmp_path / ".dbt" + profiles_dir.mkdir() + + profiles_path = profiles_dir / "profiles.yml" + with open(profiles_path, "w") as f: + yaml.dump(SAMPLE_PROFILE, f) + + return str(profiles_dir) + + +@pytest.fixture +def temp_variables_profiles_dir(tmp_path): + profiles_dir = tmp_path / ".dbt" + profiles_dir.mkdir() + + profiles_path = profiles_dir / "profiles.yml" + with open(profiles_path, "w") as f: + yaml.dump(VARIABLES_PROFILE, f) + + return str(profiles_dir) + + +@pytest.fixture +def temp_blocks_profiles_dir(tmp_path): + """Create a temporary profiles directory with a profile that references a block.""" + profiles_dir = tmp_path / ".dbt" + profiles_dir.mkdir() + + profiles_path = profiles_dir / "profiles.yml" + with open(profiles_path, "w") as f: + yaml.dump(BLOCKS_PROFILE, f) + + return str(profiles_dir) + + +def test_get_profiles_dir_default(): + if "DBT_PROFILES_DIR" in os.environ: + del os.environ["DBT_PROFILES_DIR"] + + expected = os.path.expanduser("~/.dbt") + assert get_profiles_dir() == expected + + +def test_get_profiles_dir_from_env(monkeypatch): + test_path = "/custom/path" + monkeypatch.setenv("DBT_PROFILES_DIR", test_path) + assert get_profiles_dir() == test_path + + +def test_load_profiles_yml_success(temp_profiles_dir): + profiles = load_profiles_yml(temp_profiles_dir) + assert profiles == SAMPLE_PROFILE + + +def test_load_profiles_yml_default_dir(monkeypatch, temp_profiles_dir): + monkeypatch.setenv("DBT_PROFILES_DIR", temp_profiles_dir) + profiles = load_profiles_yml(None) + assert profiles == SAMPLE_PROFILE + + +def test_load_profiles_yml_file_not_found(): + nonexistent_dir = "/path/that/does/not/exist" + with pytest.raises( + ValueError, + match=f"No profiles.yml found at {os.path.join(nonexistent_dir, 'profiles.yml')}", + ): + load_profiles_yml(nonexistent_dir) + + +def test_load_profiles_yml_invalid_yaml(temp_profiles_dir): + profiles_path = Path(temp_profiles_dir) / "profiles.yml" + with open(profiles_path, "w") as f: + f.write("invalid: yaml: content:\nindentation error") + + with pytest.raises(yaml.YAMLError): + load_profiles_yml(temp_profiles_dir) + + +async def test_aresolve_profiles_yml_success( + temp_profiles_dir, +): + """Test that aresolve_profiles_yml creates and cleans up temporary directory.""" + async with aresolve_profiles_yml(temp_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + profiles_path = temp_path / "profiles.yml" + + # Check temporary directory exists and contains profiles.yml + assert temp_path.exists() + assert profiles_path.exists() + + # Verify contents match expected profiles + loaded_profiles = yaml.safe_load(profiles_path.read_text()) + assert loaded_profiles == SAMPLE_PROFILE + + # Verify cleanup happened + assert not temp_path.exists() + + +def test_resolve_profiles_yml_success(temp_profiles_dir): + """Test that resolve_profiles_yml creates and cleans up temporary directory.""" + with resolve_profiles_yml(temp_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + profiles_path = temp_path / "profiles.yml" + + # Check temporary directory exists and contains profiles.yml + assert temp_path.exists() + assert profiles_path.exists() + + # Verify contents match expected profiles + loaded_profiles = yaml.safe_load(profiles_path.read_text()) + assert loaded_profiles == SAMPLE_PROFILE + + # Verify cleanup happened + assert not temp_path.exists() + + +async def test_aresolve_profiles_yml_error_cleanup(temp_profiles_dir): + """Test that temporary directory is cleaned up even if an error occurs.""" + temp_path = None + + with pytest.raises(ValueError): + async with aresolve_profiles_yml(temp_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + assert temp_path.exists() + raise ValueError("Test error") + + # Verify cleanup happened despite error + assert temp_path is not None + assert not temp_path.exists() + + +def test_resolve_profiles_yml_error_cleanup(temp_profiles_dir): + """Test that temporary directory is cleaned up even if an error occurs.""" + temp_path = None + + with pytest.raises(ValueError): + with resolve_profiles_yml(temp_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + assert temp_path.exists() + raise ValueError("Test error") + + # Verify cleanup happened despite error + assert temp_path is not None + assert not temp_path.exists() + + +async def test_aresolve_profiles_yml_resolves_variables(temp_variables_profiles_dir): + """Test that variables in profiles.yml are properly resolved.""" + # Create a variable + async with get_client() as client: + await client._client.post( + "/variables/", json={"name": "target", "value": "dev"} + ) + + # Use the context manager and verify variable resolution + async with aresolve_profiles_yml(temp_variables_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + profiles_path = temp_path / "profiles.yml" + + # Verify contents have resolved variables + loaded_profiles = yaml.safe_load(profiles_path.read_text()) + assert loaded_profiles["jaffle_shop_with_variable_reference"]["target"] == "dev" + + +def test_resolve_profiles_yml_resolves_variables(temp_variables_profiles_dir): + with resolve_profiles_yml(temp_variables_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + profiles_path = temp_path / "profiles.yml" + + loaded_profiles = yaml.safe_load(profiles_path.read_text()) + assert loaded_profiles["jaffle_shop_with_variable_reference"]["target"] == "dev" + + +async def test_aresolve_profiles_yml_resolves_blocks(temp_blocks_profiles_dir): + from prefect.blocks.system import Secret + + secret_block = Secret(value="super-secret-password") + await secret_block.save("my-password", overwrite=True) + + async with aresolve_profiles_yml(temp_blocks_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + profiles_path = temp_path / "profiles.yml" + + loaded_profiles = yaml.safe_load(profiles_path.read_text()) + assert ( + loaded_profiles["jaffle_shop_with_variable_reference"]["outputs"]["dev"][ + "password" + ] + == "{{ env_var('PREFECT_BLOCKS_SECRET_MY_PASSWORD') }}" + ) + + +def test_resolve_profiles_yml_resolves_blocks(temp_blocks_profiles_dir): + from prefect.blocks.system import Secret + + secret_block = Secret(value="super-secret-password") + secret_block.save("my-password", overwrite=True) + + with resolve_profiles_yml(temp_blocks_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + profiles_path = temp_path / "profiles.yml" + + loaded_profiles = yaml.safe_load(profiles_path.read_text()) + assert ( + loaded_profiles["jaffle_shop_with_variable_reference"]["outputs"]["dev"][ + "password" + ] + == "{{ env_var('PREFECT_BLOCKS_SECRET_MY_PASSWORD') }}" + ) + + +class TestResolveBlockDocumentReferences: + @pytest.fixture(autouse=True) + def ignore_deprecation_warnings(self, ignore_prefect_deprecation_warnings): + """Remove references to deprecated blocks when deprecation period is over.""" + pass + + @pytest.fixture() + async def block_document_id(self): + class ArbitraryBlock(Block): + a: int + b: str + + return await ArbitraryBlock(a=1, b="hello").save( + name="arbitrary-block", overwrite=True + ) + + async def test_resolve_block_document_references_with_no_block_document_references( + self, + ): + assert await resolve_block_document_references({"key": "value"}) == { + "key": "value" + } + + async def test_resolve_block_document_references_with_one_block_document_reference( + self, block_document_id + ): + async with get_client() as prefect_client: + assert { + "key": {"a": 1, "b": "hello"} + } == await resolve_block_document_references( + {"key": {"$ref": {"block_document_id": block_document_id}}}, + client=prefect_client, + ) + + async def test_resolve_block_document_references_with_nested_block_document_references( + self, block_document_id + ): + async with get_client() as prefect_client: + template = { + "key": { + "nested_key": {"$ref": {"block_document_id": block_document_id}}, + "other_nested_key": { + "$ref": {"block_document_id": block_document_id} + }, + } + } + block_document = await prefect_client.read_block_document(block_document_id) + + result = await resolve_block_document_references( + template, client=prefect_client + ) + + assert result == { + "key": { + "nested_key": block_document.data, + "other_nested_key": block_document.data, + } + } + + async def test_resolve_block_document_references_with_list_of_block_document_references( + self, block_document_id + ): + async with get_client() as prefect_client: + template = [{"$ref": {"block_document_id": block_document_id}}] + block_document = await prefect_client.read_block_document(block_document_id) + + result = await resolve_block_document_references( + template, client=prefect_client + ) + + assert result == [block_document.data] + + async def test_resolve_block_document_references_with_dot_delimited_syntax( + self, block_document_id + ): + async with get_client() as prefect_client: + template = {"key": "{{ prefect.blocks.arbitraryblock.arbitrary-block }}"} + + result = await resolve_block_document_references( + template, client=prefect_client + ) + + assert result == { + "key": "{{ env_var('PREFECT_BLOCKS_ARBITRARYBLOCK_ARBITRARY_BLOCK') }}" + } + + async def test_resolve_block_document_references_raises_on_multiple_placeholders( + self, block_document_id + ): + async with get_client() as prefect_client: + template = { + "key": ( + "{{ prefect.blocks.arbitraryblock.arbitrary-block }} {{" + " another_placeholder }}" + ) + } + + with pytest.raises( + ValueError, + match=( + "Only a single block placeholder is allowed in a string and no" + " surrounding text is allowed." + ), + ): + await resolve_block_document_references(template, client=prefect_client) + + async def test_resolve_block_document_references_raises_on_extra_text( + self, block_document_id + ): + async with get_client() as prefect_client: + template = { + "key": "{{ prefect.blocks.arbitraryblock.arbitrary-block }} extra text" + } + + with pytest.raises( + ValueError, + match=( + "Only a single block placeholder is allowed in a string and no" + " surrounding text is allowed." + ), + ): + await resolve_block_document_references(template, client=prefect_client) + + async def test_resolve_block_document_references_does_not_change_standard_placeholders( + self, + ): + template = {"key": "{{ standard_placeholder }}"} + + result = await resolve_block_document_references(template) + + assert result == template + + async def test_resolve_block_document_unpacks_system_blocks(self): + await JSON(value={"key": "value"}).save(name="json-block") + await Secret(value="N1nj4C0d3rP@ssw0rd!").save(name="secret-block") + await DateTime(value="2020-01-01T00:00:00Z").save(name="datetime-block") + await String(value="hello").save(name="string-block") + + template = { + "json": "{{ prefect.blocks.json.json-block }}", + "secret": "{{ prefect.blocks.secret.secret-block }}", + "datetime": "{{ prefect.blocks.date-time.datetime-block }}", + "string": "{{ prefect.blocks.string.string-block }}", + } + + result = await resolve_block_document_references(template) + assert result == { + "datetime": "{{ env_var('PREFECT_BLOCKS_DATE_TIME_DATETIME_BLOCK') }}", + "json": "{{ env_var('PREFECT_BLOCKS_JSON_JSON_BLOCK') }}", + "secret": "{{ env_var('PREFECT_BLOCKS_SECRET_SECRET_BLOCK') }}", + "string": "{{ env_var('PREFECT_BLOCKS_STRING_STRING_BLOCK') }}", + } + + async def test_resolve_block_document_system_block_resolves_dict_keypath(self): + # for backwards compatibility system blocks can be referenced directly + # they should still be able to access nested keys + await JSON(value={"key": {"nested-key": "nested_value"}}).save( + name="nested-json-block" + ) + template = { + "value": "{{ prefect.blocks.json.nested-json-block}}", + "keypath": "{{ prefect.blocks.json.nested-json-block.key }}", + "nested_keypath": "{{ prefect.blocks.json.nested-json-block.key.nested-key }}", + } + + result = await resolve_block_document_references(template) + assert result == { + "keypath": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK_KEY') }}", + "nested_keypath": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK_KEY_NESTED_KEY') }}", + "value": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK') }}", + } + + async def test_resolve_block_document_resolves_dict_keypath(self): + await JSON(value={"key": {"nested-key": "nested_value"}}).save( + name="nested-json-block-2" + ) + template = { + "value": "{{ prefect.blocks.json.nested-json-block-2.value }}", + "keypath": "{{ prefect.blocks.json.nested-json-block-2.value.key }}", + "nested_keypath": ( + "{{ prefect.blocks.json.nested-json-block-2.value.key.nested-key }}" + ), + } + + result = await resolve_block_document_references(template) + assert result == { + "keypath": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK_2_VALUE_KEY') }}", + "nested_keypath": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK_2_VALUE_KEY_NESTED_KEY') }}", + "value": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK_2_VALUE') }}", + } + + async def test_resolve_block_document_resolves_list_keypath(self): + await JSON(value={"key": ["value1", "value2"]}).save(name="json-list-block") + await JSON(value=["value1", "value2"]).save(name="list-block") + await JSON( + value={"key": ["value1", {"nested": ["value2", "value3"]}, "value4"]} + ).save(name="nested-json-list-block") + template = { + "json_list": "{{ prefect.blocks.json.json-list-block.value.key[0] }}", + "list": "{{ prefect.blocks.json.list-block.value[1] }}", + "nested_json_list": ( + "{{ prefect.blocks.json.nested-json-list-block.value.key[1].nested[1] }}" + ), + } + + result = await resolve_block_document_references(template) + assert result == { + "json_list": "{{ env_var('PREFECT_BLOCKS_JSON_JSON_LIST_BLOCK_VALUE_KEY_0') }}", + "list": "{{ env_var('PREFECT_BLOCKS_JSON_LIST_BLOCK_VALUE_1') }}", + "nested_json_list": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_LIST_BLOCK_VALUE_KEY_1_NESTED_1') }}", + } + + async def test_resolve_block_document_raises_on_invalid_keypath(self): + await JSON(value={"key": {"nested_key": "value"}}).save( + name="nested-json-block-3" + ) + json_template = { + "json": "{{ prefect.blocks.json.nested-json-block-3.value.key.does_not_exist }}", + } + with pytest.raises(ValueError, match="Could not resolve the keypath"): + await resolve_block_document_references(json_template) + + await JSON(value=["value1", "value2"]).save(name="index-error-block") + index_error_template = { + "index_error": "{{ prefect.blocks.json.index-error-block.value[3] }}", + } + with pytest.raises(ValueError, match="Could not resolve the keypath"): + await resolve_block_document_references(index_error_template) + + await Webhook(url="https://example.com").save(name="webhook-block") + webhook_template = { + "webhook": "{{ prefect.blocks.webhook.webhook-block.value }}", + } + with pytest.raises(ValueError, match="Could not resolve the keypath"): + await resolve_block_document_references(webhook_template) + + async def test_resolve_block_document_resolves_block_attribute(self): + await Webhook(url="https://example.com").save(name="webhook-block-2") + + template = { + "block_attribute": "{{ prefect.blocks.webhook.webhook-block-2.url }}", + } + result = await resolve_block_document_references(template) + + assert result == { + "block_attribute": "{{ env_var('PREFECT_BLOCKS_WEBHOOK_WEBHOOK_BLOCK_2_URL') }}", + } From 1e13f5d7f122702de381cf8a082dea0f472af003 Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Wed, 29 Jan 2025 10:15:16 -0600 Subject: [PATCH 02/10] add module docstring --- src/integrations/prefect-dbt/prefect_dbt/core/profiles.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py index 90bf932fb718..6029deab8ac8 100644 --- a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py +++ b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py @@ -1,3 +1,8 @@ +""" +Utilities for working with dbt profiles.yml files, including resolving +block document and variable references. +""" + import contextlib import os import shutil From 35e6cce83168350404aaa6e63a77fb52248066bc Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Wed, 29 Jan 2025 10:35:48 -0600 Subject: [PATCH 03/10] remove unused const --- src/integrations/prefect-dbt/prefect_dbt/core/profiles.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py index 6029deab8ac8..4e2e53c19d93 100644 --- a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py +++ b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py @@ -30,7 +30,6 @@ T = TypeVar("T", str, int, float, bool, dict[Any, Any], list[Any], None) BLOCK_DOCUMENT_PLACEHOLDER_PREFIX = "prefect.blocks." -VARIABLE_PLACEHOLDER_PREFIX = "prefect.variables." def get_profiles_dir() -> str: From 125c0b13e97ac3d9643015627487f62a194ab8f5 Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Wed, 29 Jan 2025 18:26:48 -0600 Subject: [PATCH 04/10] context managaer, naming, typing, remove inject_client --- .../prefect-dbt/prefect_dbt/core/profiles.py | 194 +++++++++--------- .../prefect-dbt/tests/core/test_profiles.py | 54 ++--- 2 files changed, 129 insertions(+), 119 deletions(-) diff --git a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py index 4e2e53c19d93..c88dbe4ffafe 100644 --- a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py +++ b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py @@ -5,15 +5,22 @@ import contextlib import os -import shutil import tempfile from pathlib import Path -from typing import TYPE_CHECKING, Any, Optional, TypeVar, Union +from typing import ( + TYPE_CHECKING, + Any, + AsyncGenerator, + Generator, + Optional, + TypeVar, + Union, +) import slugify import yaml -from prefect.client.utilities import inject_client +from prefect import get_client from prefect.utilities.annotations import NotSet from prefect.utilities.asyncutils import run_coro_as_sync from prefect.utilities.collections import get_from_dict @@ -66,7 +73,9 @@ def load_profiles_yml(profiles_dir: Optional[str]) -> dict[str, Any]: @contextlib.asynccontextmanager -async def aresolve_profiles_yml(profiles_dir: Optional[str] = None): +async def aresolve_profiles_yml( + profiles_dir: Optional[str] = None, +) -> AsyncGenerator[str, None]: """ Asynchronous context manager that creates a temporary directory with a resolved profiles.yml file. @@ -84,22 +93,23 @@ async def aresolve_profiles_yml(profiles_dir: Optional[str] = None): # use temp_dir for dbt operations # temp_dir is automatically cleaned up ``` """ - temp_dir = Path(tempfile.mkdtemp()) - try: + with tempfile.TemporaryDirectory() as temp_dir: + temp_dir_path = Path(temp_dir) profiles_yml: dict[str, Any] = load_profiles_yml(profiles_dir) - profiles_yml = await resolve_block_document_references(profiles_yml) + profiles_yml = await convert_block_references_to_env_vars(profiles_yml) profiles_yml = await resolve_variables(profiles_yml) - temp_profiles_path = temp_dir / "profiles.yml" - temp_profiles_path.write_text(yaml.dump(profiles_yml, default_flow_style=False)) - - yield str(temp_dir) - finally: - shutil.rmtree(temp_dir) + temp_profiles_path = temp_dir_path / "profiles.yml" + temp_profiles_path.write_text( + yaml.dump(profiles_yml, default_style=None, default_flow_style=False) + ) + yield str(temp_dir_path) @contextlib.contextmanager -def resolve_profiles_yml(profiles_dir: Optional[str] = None): +def resolve_profiles_yml( + profiles_dir: Optional[str] = None, +) -> Generator[str, None, None]: """ Synchronous context manager that creates a temporary directory with a resolved profiles.yml file. @@ -117,24 +127,22 @@ def resolve_profiles_yml(profiles_dir: Optional[str] = None): # use temp_dir for dbt operations # temp_dir is automatically cleaned up ``` """ - temp_dir = Path(tempfile.mkdtemp()) - try: + with tempfile.TemporaryDirectory() as temp_dir: + temp_dir_path = Path(temp_dir) profiles_yml: dict[str, Any] = load_profiles_yml(profiles_dir) - profiles_yml = run_coro_as_sync(resolve_block_document_references(profiles_yml)) + profiles_yml = run_coro_as_sync( + convert_block_references_to_env_vars(profiles_yml) + ) profiles_yml = run_coro_as_sync(resolve_variables(profiles_yml)) - temp_profiles_path = temp_dir / "profiles.yml" + temp_profiles_path = temp_dir_path / "profiles.yml" temp_profiles_path.write_text( yaml.dump(profiles_yml, default_style=None, default_flow_style=False) ) - - yield str(temp_dir) - finally: - shutil.rmtree(temp_dir) + yield str(temp_dir_path) -@inject_client -async def resolve_block_document_references( +async def convert_block_references_to_env_vars( template: T, client: Optional["PrefectClient"] = None ) -> Union[T, dict[str, Any]]: """ @@ -152,81 +160,79 @@ async def resolve_block_document_references( Returns: The template with block documents resolved """ - if TYPE_CHECKING: - # The @inject_client decorator takes care of providing the client, but - # the function signature must mark it as optional to callers. - assert client is not None - - if isinstance(template, dict): - block_document_id = template.get("$ref", {}).get("block_document_id") - if block_document_id: - block_document = await client.read_block_document(block_document_id) - return block_document.data - updated_template: dict[str, Any] = {} - for key, value in template.items(): - updated_value = await resolve_block_document_references( - value, client=client + async with get_client() as client: + if isinstance(template, dict): + block_document_id = template.get("$ref", {}).get("block_document_id") + if block_document_id: + block_document = await client.read_block_document(block_document_id) + return block_document.data + updated_template: dict[str, Any] = {} + for key, value in template.items(): + updated_value = await convert_block_references_to_env_vars( + value, client=client + ) + updated_template[key] = updated_value + return updated_template + elif isinstance(template, list): + return [ + await convert_block_references_to_env_vars(item, client=client) + for item in template + ] + elif isinstance(template, str): + placeholders = find_placeholders(template) + has_block_document_placeholder = any( + placeholder.type is PlaceholderType.BLOCK_DOCUMENT + for placeholder in placeholders ) - updated_template[key] = updated_value - return updated_template - elif isinstance(template, list): - return [ - await resolve_block_document_references(item, client=client) - for item in template - ] - elif isinstance(template, str): - placeholders = find_placeholders(template) - has_block_document_placeholder = any( - placeholder.type is PlaceholderType.BLOCK_DOCUMENT - for placeholder in placeholders - ) - if not (placeholders and has_block_document_placeholder): - return template - elif ( - len(placeholders) == 1 - and list(placeholders)[0].full_match == template - and list(placeholders)[0].type is PlaceholderType.BLOCK_DOCUMENT - ): - # value_keypath will be a list containing a dot path if additional - # attributes are accessed and an empty list otherwise. - [placeholder] = placeholders - parts = placeholder.name.replace( - BLOCK_DOCUMENT_PLACEHOLDER_PREFIX, "" - ).split(".", 2) - block_type_slug, block_document_name, *value_keypath = parts - block_document = await client.read_block_document_by_name( - name=block_document_name, block_type_slug=block_type_slug - ) - data = block_document.data - value: Union[T, dict[str, Any]] = data - - # resolving system blocks to their data for backwards compatibility - if len(data) == 1 and "value" in data: - # only resolve the value if the keypath is not already pointing to "value" - if not (value_keypath and value_keypath[0].startswith("value")): - data = value = value["value"] - - # resolving keypath/block attributes - if value_keypath: - from_dict: Any = get_from_dict(data, value_keypath[0], default=NotSet) - if from_dict is NotSet: - raise ValueError( - f"Invalid template: {template!r}. Could not resolve the" - " keypath in the block document data." + if not (placeholders and has_block_document_placeholder): + return template + elif ( + len(placeholders) == 1 + and list(placeholders)[0].full_match == template + and list(placeholders)[0].type is PlaceholderType.BLOCK_DOCUMENT + ): + # value_keypath will be a list containing a dot path if additional + # attributes are accessed and an empty list otherwise. + [placeholder] = placeholders + parts = placeholder.name.replace( + BLOCK_DOCUMENT_PLACEHOLDER_PREFIX, "" + ).split(".", 2) + block_type_slug, block_document_name, *value_keypath = parts + block_document = await client.read_block_document_by_name( + name=block_document_name, block_type_slug=block_type_slug + ) + data = block_document.data + value: Union[T, dict[str, Any]] = data + + # resolving system blocks to their data for backwards compatibility + if len(data) == 1 and "value" in data: + # only resolve the value if the keypath is not already pointing to "value" + if not (value_keypath and value_keypath[0].startswith("value")): + data = value = value["value"] + + # resolving keypath/block attributes + if value_keypath: + from_dict: Any = get_from_dict( + data, value_keypath[0], default=NotSet ) - value = from_dict + if from_dict is NotSet: + raise ValueError( + f"Invalid template: {template!r}. Could not resolve the" + " keypath in the block document data." + ) + value = from_dict - env_var_name = slugify.slugify(placeholder[0], separator="_").upper() + env_var_name = slugify.slugify(placeholder[0], separator="_").upper() - os.environ[env_var_name] = str(value) + os.environ[env_var_name] = str(value) - template_text = f"{{{{ env_var('{env_var_name}') }}}}" + template_text = f"{{{{ env_var('{env_var_name}') }}}}" - return template_text - else: - raise ValueError( - f"Invalid template: {template!r}. Only a single block placeholder is" - " allowed in a string and no surrounding text is allowed." - ) + return template_text + else: + raise ValueError( + f"Invalid template: {template!r}. Only a single block placeholder is" + " allowed in a string and no surrounding text is allowed." + ) return template diff --git a/src/integrations/prefect-dbt/tests/core/test_profiles.py b/src/integrations/prefect-dbt/tests/core/test_profiles.py index 41a076c3b5d5..cc69893d09e8 100644 --- a/src/integrations/prefect-dbt/tests/core/test_profiles.py +++ b/src/integrations/prefect-dbt/tests/core/test_profiles.py @@ -7,9 +7,9 @@ import yaml from prefect_dbt.core.profiles import ( aresolve_profiles_yml, + convert_block_references_to_env_vars, get_profiles_dir, load_profiles_yml, - resolve_block_document_references, resolve_profiles_yml, ) @@ -331,25 +331,25 @@ class ArbitraryBlock(Block): name="arbitrary-block", overwrite=True ) - async def test_resolve_block_document_references_with_no_block_document_references( + async def test_convert_block_references_to_env_vars_with_no_block_document_references( self, ): - assert await resolve_block_document_references({"key": "value"}) == { + assert await convert_block_references_to_env_vars({"key": "value"}) == { "key": "value" } - async def test_resolve_block_document_references_with_one_block_document_reference( + async def test_convert_block_references_to_env_vars_with_one_block_document_reference( self, block_document_id ): async with get_client() as prefect_client: assert { "key": {"a": 1, "b": "hello"} - } == await resolve_block_document_references( + } == await convert_block_references_to_env_vars( {"key": {"$ref": {"block_document_id": block_document_id}}}, client=prefect_client, ) - async def test_resolve_block_document_references_with_nested_block_document_references( + async def test_convert_block_references_to_env_vars_with_nested_block_document_references( self, block_document_id ): async with get_client() as prefect_client: @@ -363,7 +363,7 @@ async def test_resolve_block_document_references_with_nested_block_document_refe } block_document = await prefect_client.read_block_document(block_document_id) - result = await resolve_block_document_references( + result = await convert_block_references_to_env_vars( template, client=prefect_client ) @@ -374,26 +374,26 @@ async def test_resolve_block_document_references_with_nested_block_document_refe } } - async def test_resolve_block_document_references_with_list_of_block_document_references( + async def test_convert_block_references_to_env_vars_with_list_of_block_document_references( self, block_document_id ): async with get_client() as prefect_client: template = [{"$ref": {"block_document_id": block_document_id}}] block_document = await prefect_client.read_block_document(block_document_id) - result = await resolve_block_document_references( + result = await convert_block_references_to_env_vars( template, client=prefect_client ) assert result == [block_document.data] - async def test_resolve_block_document_references_with_dot_delimited_syntax( + async def test_convert_block_references_to_env_vars_with_dot_delimited_syntax( self, block_document_id ): async with get_client() as prefect_client: template = {"key": "{{ prefect.blocks.arbitraryblock.arbitrary-block }}"} - result = await resolve_block_document_references( + result = await convert_block_references_to_env_vars( template, client=prefect_client ) @@ -401,7 +401,7 @@ async def test_resolve_block_document_references_with_dot_delimited_syntax( "key": "{{ env_var('PREFECT_BLOCKS_ARBITRARYBLOCK_ARBITRARY_BLOCK') }}" } - async def test_resolve_block_document_references_raises_on_multiple_placeholders( + async def test_convert_block_references_to_env_vars_raises_on_multiple_placeholders( self, block_document_id ): async with get_client() as prefect_client: @@ -419,9 +419,11 @@ async def test_resolve_block_document_references_raises_on_multiple_placeholders " surrounding text is allowed." ), ): - await resolve_block_document_references(template, client=prefect_client) + await convert_block_references_to_env_vars( + template, client=prefect_client + ) - async def test_resolve_block_document_references_raises_on_extra_text( + async def test_convert_block_references_to_env_vars_raises_on_extra_text( self, block_document_id ): async with get_client() as prefect_client: @@ -436,14 +438,16 @@ async def test_resolve_block_document_references_raises_on_extra_text( " surrounding text is allowed." ), ): - await resolve_block_document_references(template, client=prefect_client) + await convert_block_references_to_env_vars( + template, client=prefect_client + ) - async def test_resolve_block_document_references_does_not_change_standard_placeholders( + async def test_convert_block_references_to_env_vars_does_not_change_standard_placeholders( self, ): template = {"key": "{{ standard_placeholder }}"} - result = await resolve_block_document_references(template) + result = await convert_block_references_to_env_vars(template) assert result == template @@ -460,7 +464,7 @@ async def test_resolve_block_document_unpacks_system_blocks(self): "string": "{{ prefect.blocks.string.string-block }}", } - result = await resolve_block_document_references(template) + result = await convert_block_references_to_env_vars(template) assert result == { "datetime": "{{ env_var('PREFECT_BLOCKS_DATE_TIME_DATETIME_BLOCK') }}", "json": "{{ env_var('PREFECT_BLOCKS_JSON_JSON_BLOCK') }}", @@ -480,7 +484,7 @@ async def test_resolve_block_document_system_block_resolves_dict_keypath(self): "nested_keypath": "{{ prefect.blocks.json.nested-json-block.key.nested-key }}", } - result = await resolve_block_document_references(template) + result = await convert_block_references_to_env_vars(template) assert result == { "keypath": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK_KEY') }}", "nested_keypath": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK_KEY_NESTED_KEY') }}", @@ -499,7 +503,7 @@ async def test_resolve_block_document_resolves_dict_keypath(self): ), } - result = await resolve_block_document_references(template) + result = await convert_block_references_to_env_vars(template) assert result == { "keypath": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK_2_VALUE_KEY') }}", "nested_keypath": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK_2_VALUE_KEY_NESTED_KEY') }}", @@ -520,7 +524,7 @@ async def test_resolve_block_document_resolves_list_keypath(self): ), } - result = await resolve_block_document_references(template) + result = await convert_block_references_to_env_vars(template) assert result == { "json_list": "{{ env_var('PREFECT_BLOCKS_JSON_JSON_LIST_BLOCK_VALUE_KEY_0') }}", "list": "{{ env_var('PREFECT_BLOCKS_JSON_LIST_BLOCK_VALUE_1') }}", @@ -535,21 +539,21 @@ async def test_resolve_block_document_raises_on_invalid_keypath(self): "json": "{{ prefect.blocks.json.nested-json-block-3.value.key.does_not_exist }}", } with pytest.raises(ValueError, match="Could not resolve the keypath"): - await resolve_block_document_references(json_template) + await convert_block_references_to_env_vars(json_template) await JSON(value=["value1", "value2"]).save(name="index-error-block") index_error_template = { "index_error": "{{ prefect.blocks.json.index-error-block.value[3] }}", } with pytest.raises(ValueError, match="Could not resolve the keypath"): - await resolve_block_document_references(index_error_template) + await convert_block_references_to_env_vars(index_error_template) await Webhook(url="https://example.com").save(name="webhook-block") webhook_template = { "webhook": "{{ prefect.blocks.webhook.webhook-block.value }}", } with pytest.raises(ValueError, match="Could not resolve the keypath"): - await resolve_block_document_references(webhook_template) + await convert_block_references_to_env_vars(webhook_template) async def test_resolve_block_document_resolves_block_attribute(self): await Webhook(url="https://example.com").save(name="webhook-block-2") @@ -557,7 +561,7 @@ async def test_resolve_block_document_resolves_block_attribute(self): template = { "block_attribute": "{{ prefect.blocks.webhook.webhook-block-2.url }}", } - result = await resolve_block_document_references(template) + result = await convert_block_references_to_env_vars(template) assert result == { "block_attribute": "{{ env_var('PREFECT_BLOCKS_WEBHOOK_WEBHOOK_BLOCK_2_URL') }}", From c2f9b69de1c575b1354ef966d65d8107c33d7f0c Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Fri, 31 Jan 2025 14:46:58 -0600 Subject: [PATCH 05/10] block reference resolution uses callable to replace template text --- .../prefect-dbt/prefect_dbt/core/profiles.py | 160 +++------- .../prefect-dbt/tests/core/test_profiles.py | 282 ++---------------- src/prefect/utilities/templating.py | 21 +- 3 files changed, 78 insertions(+), 385 deletions(-) diff --git a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py index c88dbe4ffafe..a647e4e5f3bd 100644 --- a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py +++ b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py @@ -8,36 +8,21 @@ import tempfile from pathlib import Path from typing import ( - TYPE_CHECKING, Any, AsyncGenerator, Generator, Optional, - TypeVar, - Union, ) import slugify import yaml -from prefect import get_client -from prefect.utilities.annotations import NotSet from prefect.utilities.asyncutils import run_coro_as_sync -from prefect.utilities.collections import get_from_dict from prefect.utilities.templating import ( - PlaceholderType, - find_placeholders, + resolve_block_document_references, resolve_variables, ) -if TYPE_CHECKING: - from prefect.client.orchestration import PrefectClient - - -T = TypeVar("T", str, int, float, bool, dict[Any, Any], list[Any], None) - -BLOCK_DOCUMENT_PLACEHOLDER_PREFIX = "prefect.blocks." - def get_profiles_dir() -> str: """Get the dbt profiles directory from environment or default location.""" @@ -72,6 +57,27 @@ def load_profiles_yml(profiles_dir: Optional[str]) -> dict[str, Any]: return yaml.safe_load(f) +def replace_with_env_var_call(placeholder: str, value: Any) -> str: + """ + A block reference replacement function that returns template text for an env var call. + + Args: + placeholder: The placeholder text to replace + value: The value to replace the placeholder with + + Returns: + The template text for an env var call + """ + print("this is being called") + env_var_name = slugify.slugify(placeholder, separator="_").upper() + + os.environ[env_var_name] = str(value) + + template_text = f"{{{{ env_var('{env_var_name}') }}}}" + + return template_text + + @contextlib.asynccontextmanager async def aresolve_profiles_yml( profiles_dir: Optional[str] = None, @@ -81,22 +87,26 @@ async def aresolve_profiles_yml( Args: profiles_dir: Path to the directory containing profiles.yml. - If None, uses the default profiles directory. + If None, uses the default profiles directory. Yields: str: Path to temporary directory containing the resolved profiles.yml. - Directory and contents are automatically cleaned up after context exit. + Directory and contents are automatically cleaned up after context exit. - Example: ```python + Example: + ```python async with aresolve_profiles_yml() as temp_dir: # temp_dir contains resolved profiles.yml # use temp_dir for dbt operations - # temp_dir is automatically cleaned up ``` + # temp_dir is automatically cleaned up + ``` """ with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) profiles_yml: dict[str, Any] = load_profiles_yml(profiles_dir) - profiles_yml = await convert_block_references_to_env_vars(profiles_yml) + profiles_yml = await resolve_block_document_references( + profiles_yml, replace_with_env_var_call + ) profiles_yml = await resolve_variables(profiles_yml) temp_profiles_path = temp_dir_path / "profiles.yml" @@ -115,23 +125,25 @@ def resolve_profiles_yml( Args: profiles_dir: Path to the directory containing profiles.yml. - If None, uses the default profiles directory. + If None, uses the default profiles directory. Yields: str: Path to temporary directory containing the resolved profiles.yml. - Directory and contents are automatically cleaned up after context exit. + Directory and contents are automatically cleaned up after context exit. - Example: ```python + Example: + ```python with resolve_profiles_yml() as temp_dir: # temp_dir contains resolved profiles.yml # use temp_dir for dbt operations - # temp_dir is automatically cleaned up ``` + # temp_dir is automatically cleaned up + ``` """ with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) profiles_yml: dict[str, Any] = load_profiles_yml(profiles_dir) profiles_yml = run_coro_as_sync( - convert_block_references_to_env_vars(profiles_yml) + resolve_block_document_references(profiles_yml, replace_with_env_var_call) ) profiles_yml = run_coro_as_sync(resolve_variables(profiles_yml)) @@ -140,99 +152,3 @@ def resolve_profiles_yml( yaml.dump(profiles_yml, default_style=None, default_flow_style=False) ) yield str(temp_dir_path) - - -async def convert_block_references_to_env_vars( - template: T, client: Optional["PrefectClient"] = None -) -> Union[T, dict[str, Any]]: - """ - Resolve block document references in a template by replacing each reference with - template text that calls dbt's env_var function, like - {{ env_var('PREFECT_BLOCK_SECRET_MYSECRET') }}. Creates environment variables - for each block document reference, with a name in the format - PREFECT_BLOCKS_BLOCK_TYPE_SLUG_BLOCK_DOCUMENT_NAME and optionally BLOCK_DOCUMENT_KEYPATH. - - Recursively searches for block document references in dictionaries and lists. - - Args: - template: The template to resolve block documents in - - Returns: - The template with block documents resolved - """ - async with get_client() as client: - if isinstance(template, dict): - block_document_id = template.get("$ref", {}).get("block_document_id") - if block_document_id: - block_document = await client.read_block_document(block_document_id) - return block_document.data - updated_template: dict[str, Any] = {} - for key, value in template.items(): - updated_value = await convert_block_references_to_env_vars( - value, client=client - ) - updated_template[key] = updated_value - return updated_template - elif isinstance(template, list): - return [ - await convert_block_references_to_env_vars(item, client=client) - for item in template - ] - elif isinstance(template, str): - placeholders = find_placeholders(template) - has_block_document_placeholder = any( - placeholder.type is PlaceholderType.BLOCK_DOCUMENT - for placeholder in placeholders - ) - if not (placeholders and has_block_document_placeholder): - return template - elif ( - len(placeholders) == 1 - and list(placeholders)[0].full_match == template - and list(placeholders)[0].type is PlaceholderType.BLOCK_DOCUMENT - ): - # value_keypath will be a list containing a dot path if additional - # attributes are accessed and an empty list otherwise. - [placeholder] = placeholders - parts = placeholder.name.replace( - BLOCK_DOCUMENT_PLACEHOLDER_PREFIX, "" - ).split(".", 2) - block_type_slug, block_document_name, *value_keypath = parts - block_document = await client.read_block_document_by_name( - name=block_document_name, block_type_slug=block_type_slug - ) - data = block_document.data - value: Union[T, dict[str, Any]] = data - - # resolving system blocks to their data for backwards compatibility - if len(data) == 1 and "value" in data: - # only resolve the value if the keypath is not already pointing to "value" - if not (value_keypath and value_keypath[0].startswith("value")): - data = value = value["value"] - - # resolving keypath/block attributes - if value_keypath: - from_dict: Any = get_from_dict( - data, value_keypath[0], default=NotSet - ) - if from_dict is NotSet: - raise ValueError( - f"Invalid template: {template!r}. Could not resolve the" - " keypath in the block document data." - ) - value = from_dict - - env_var_name = slugify.slugify(placeholder[0], separator="_").upper() - - os.environ[env_var_name] = str(value) - - template_text = f"{{{{ env_var('{env_var_name}') }}}}" - - return template_text - else: - raise ValueError( - f"Invalid template: {template!r}. Only a single block placeholder is" - " allowed in a string and no surrounding text is allowed." - ) - - return template diff --git a/src/integrations/prefect-dbt/tests/core/test_profiles.py b/src/integrations/prefect-dbt/tests/core/test_profiles.py index cc69893d09e8..f923b4422d75 100644 --- a/src/integrations/prefect-dbt/tests/core/test_profiles.py +++ b/src/integrations/prefect-dbt/tests/core/test_profiles.py @@ -7,16 +7,13 @@ import yaml from prefect_dbt.core.profiles import ( aresolve_profiles_yml, - convert_block_references_to_env_vars, get_profiles_dir, load_profiles_yml, + replace_with_env_var_call, resolve_profiles_yml, ) from prefect._internal.compatibility.deprecated import PrefectDeprecationWarning -from prefect.blocks.core import Block -from prefect.blocks.system import JSON, DateTime, Secret, String -from prefect.blocks.webhook import Webhook from prefect.client.orchestration import get_client @@ -87,7 +84,7 @@ def ignore_prefect_deprecation_warnings(): } BLOCKS_PROFILE = { - "jaffle_shop_with_variable_reference": { + "jaffle_shop_with_blocks_reference": { "outputs": { "dev": { "type": "duckdb", @@ -289,7 +286,7 @@ async def test_aresolve_profiles_yml_resolves_blocks(temp_blocks_profiles_dir): loaded_profiles = yaml.safe_load(profiles_path.read_text()) assert ( - loaded_profiles["jaffle_shop_with_variable_reference"]["outputs"]["dev"][ + loaded_profiles["jaffle_shop_with_blocks_reference"]["outputs"]["dev"][ "password" ] == "{{ env_var('PREFECT_BLOCKS_SECRET_MY_PASSWORD') }}" @@ -308,261 +305,30 @@ def test_resolve_profiles_yml_resolves_blocks(temp_blocks_profiles_dir): loaded_profiles = yaml.safe_load(profiles_path.read_text()) assert ( - loaded_profiles["jaffle_shop_with_variable_reference"]["outputs"]["dev"][ + loaded_profiles["jaffle_shop_with_blocks_reference"]["outputs"]["dev"][ "password" ] == "{{ env_var('PREFECT_BLOCKS_SECRET_MY_PASSWORD') }}" ) -class TestResolveBlockDocumentReferences: - @pytest.fixture(autouse=True) - def ignore_deprecation_warnings(self, ignore_prefect_deprecation_warnings): - """Remove references to deprecated blocks when deprecation period is over.""" - pass - - @pytest.fixture() - async def block_document_id(self): - class ArbitraryBlock(Block): - a: int - b: str - - return await ArbitraryBlock(a=1, b="hello").save( - name="arbitrary-block", overwrite=True - ) - - async def test_convert_block_references_to_env_vars_with_no_block_document_references( - self, - ): - assert await convert_block_references_to_env_vars({"key": "value"}) == { - "key": "value" - } - - async def test_convert_block_references_to_env_vars_with_one_block_document_reference( - self, block_document_id - ): - async with get_client() as prefect_client: - assert { - "key": {"a": 1, "b": "hello"} - } == await convert_block_references_to_env_vars( - {"key": {"$ref": {"block_document_id": block_document_id}}}, - client=prefect_client, - ) - - async def test_convert_block_references_to_env_vars_with_nested_block_document_references( - self, block_document_id - ): - async with get_client() as prefect_client: - template = { - "key": { - "nested_key": {"$ref": {"block_document_id": block_document_id}}, - "other_nested_key": { - "$ref": {"block_document_id": block_document_id} - }, - } - } - block_document = await prefect_client.read_block_document(block_document_id) - - result = await convert_block_references_to_env_vars( - template, client=prefect_client - ) - - assert result == { - "key": { - "nested_key": block_document.data, - "other_nested_key": block_document.data, - } - } - - async def test_convert_block_references_to_env_vars_with_list_of_block_document_references( - self, block_document_id - ): - async with get_client() as prefect_client: - template = [{"$ref": {"block_document_id": block_document_id}}] - block_document = await prefect_client.read_block_document(block_document_id) - - result = await convert_block_references_to_env_vars( - template, client=prefect_client - ) - - assert result == [block_document.data] - - async def test_convert_block_references_to_env_vars_with_dot_delimited_syntax( - self, block_document_id - ): - async with get_client() as prefect_client: - template = {"key": "{{ prefect.blocks.arbitraryblock.arbitrary-block }}"} - - result = await convert_block_references_to_env_vars( - template, client=prefect_client - ) - - assert result == { - "key": "{{ env_var('PREFECT_BLOCKS_ARBITRARYBLOCK_ARBITRARY_BLOCK') }}" - } - - async def test_convert_block_references_to_env_vars_raises_on_multiple_placeholders( - self, block_document_id - ): - async with get_client() as prefect_client: - template = { - "key": ( - "{{ prefect.blocks.arbitraryblock.arbitrary-block }} {{" - " another_placeholder }}" - ) - } - - with pytest.raises( - ValueError, - match=( - "Only a single block placeholder is allowed in a string and no" - " surrounding text is allowed." - ), - ): - await convert_block_references_to_env_vars( - template, client=prefect_client - ) - - async def test_convert_block_references_to_env_vars_raises_on_extra_text( - self, block_document_id - ): - async with get_client() as prefect_client: - template = { - "key": "{{ prefect.blocks.arbitraryblock.arbitrary-block }} extra text" - } - - with pytest.raises( - ValueError, - match=( - "Only a single block placeholder is allowed in a string and no" - " surrounding text is allowed." - ), - ): - await convert_block_references_to_env_vars( - template, client=prefect_client - ) - - async def test_convert_block_references_to_env_vars_does_not_change_standard_placeholders( - self, - ): - template = {"key": "{{ standard_placeholder }}"} - - result = await convert_block_references_to_env_vars(template) - - assert result == template - - async def test_resolve_block_document_unpacks_system_blocks(self): - await JSON(value={"key": "value"}).save(name="json-block") - await Secret(value="N1nj4C0d3rP@ssw0rd!").save(name="secret-block") - await DateTime(value="2020-01-01T00:00:00Z").save(name="datetime-block") - await String(value="hello").save(name="string-block") - - template = { - "json": "{{ prefect.blocks.json.json-block }}", - "secret": "{{ prefect.blocks.secret.secret-block }}", - "datetime": "{{ prefect.blocks.date-time.datetime-block }}", - "string": "{{ prefect.blocks.string.string-block }}", - } - - result = await convert_block_references_to_env_vars(template) - assert result == { - "datetime": "{{ env_var('PREFECT_BLOCKS_DATE_TIME_DATETIME_BLOCK') }}", - "json": "{{ env_var('PREFECT_BLOCKS_JSON_JSON_BLOCK') }}", - "secret": "{{ env_var('PREFECT_BLOCKS_SECRET_SECRET_BLOCK') }}", - "string": "{{ env_var('PREFECT_BLOCKS_STRING_STRING_BLOCK') }}", - } - - async def test_resolve_block_document_system_block_resolves_dict_keypath(self): - # for backwards compatibility system blocks can be referenced directly - # they should still be able to access nested keys - await JSON(value={"key": {"nested-key": "nested_value"}}).save( - name="nested-json-block" - ) - template = { - "value": "{{ prefect.blocks.json.nested-json-block}}", - "keypath": "{{ prefect.blocks.json.nested-json-block.key }}", - "nested_keypath": "{{ prefect.blocks.json.nested-json-block.key.nested-key }}", - } - - result = await convert_block_references_to_env_vars(template) - assert result == { - "keypath": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK_KEY') }}", - "nested_keypath": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK_KEY_NESTED_KEY') }}", - "value": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK') }}", - } - - async def test_resolve_block_document_resolves_dict_keypath(self): - await JSON(value={"key": {"nested-key": "nested_value"}}).save( - name="nested-json-block-2" - ) - template = { - "value": "{{ prefect.blocks.json.nested-json-block-2.value }}", - "keypath": "{{ prefect.blocks.json.nested-json-block-2.value.key }}", - "nested_keypath": ( - "{{ prefect.blocks.json.nested-json-block-2.value.key.nested-key }}" - ), - } - - result = await convert_block_references_to_env_vars(template) - assert result == { - "keypath": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK_2_VALUE_KEY') }}", - "nested_keypath": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK_2_VALUE_KEY_NESTED_KEY') }}", - "value": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_BLOCK_2_VALUE') }}", - } - - async def test_resolve_block_document_resolves_list_keypath(self): - await JSON(value={"key": ["value1", "value2"]}).save(name="json-list-block") - await JSON(value=["value1", "value2"]).save(name="list-block") - await JSON( - value={"key": ["value1", {"nested": ["value2", "value3"]}, "value4"]} - ).save(name="nested-json-list-block") - template = { - "json_list": "{{ prefect.blocks.json.json-list-block.value.key[0] }}", - "list": "{{ prefect.blocks.json.list-block.value[1] }}", - "nested_json_list": ( - "{{ prefect.blocks.json.nested-json-list-block.value.key[1].nested[1] }}" - ), - } - - result = await convert_block_references_to_env_vars(template) - assert result == { - "json_list": "{{ env_var('PREFECT_BLOCKS_JSON_JSON_LIST_BLOCK_VALUE_KEY_0') }}", - "list": "{{ env_var('PREFECT_BLOCKS_JSON_LIST_BLOCK_VALUE_1') }}", - "nested_json_list": "{{ env_var('PREFECT_BLOCKS_JSON_NESTED_JSON_LIST_BLOCK_VALUE_KEY_1_NESTED_1') }}", - } - - async def test_resolve_block_document_raises_on_invalid_keypath(self): - await JSON(value={"key": {"nested_key": "value"}}).save( - name="nested-json-block-3" - ) - json_template = { - "json": "{{ prefect.blocks.json.nested-json-block-3.value.key.does_not_exist }}", - } - with pytest.raises(ValueError, match="Could not resolve the keypath"): - await convert_block_references_to_env_vars(json_template) - - await JSON(value=["value1", "value2"]).save(name="index-error-block") - index_error_template = { - "index_error": "{{ prefect.blocks.json.index-error-block.value[3] }}", - } - with pytest.raises(ValueError, match="Could not resolve the keypath"): - await convert_block_references_to_env_vars(index_error_template) - - await Webhook(url="https://example.com").save(name="webhook-block") - webhook_template = { - "webhook": "{{ prefect.blocks.webhook.webhook-block.value }}", - } - with pytest.raises(ValueError, match="Could not resolve the keypath"): - await convert_block_references_to_env_vars(webhook_template) - - async def test_resolve_block_document_resolves_block_attribute(self): - await Webhook(url="https://example.com").save(name="webhook-block-2") - - template = { - "block_attribute": "{{ prefect.blocks.webhook.webhook-block-2.url }}", - } - result = await convert_block_references_to_env_vars(template) - - assert result == { - "block_attribute": "{{ env_var('PREFECT_BLOCKS_WEBHOOK_WEBHOOK_BLOCK_2_URL') }}", - } +def test_replace_with_env_var_call(): + """Test that replace_with_env_var_call properly creates env vars and returns template text.""" + # Test with a simple block reference + result = replace_with_env_var_call( + "prefect.blocks.secret.my-password.value", "test-value" + ) + assert result == "{{ env_var('PREFECT_BLOCKS_SECRET_MY_PASSWORD_VALUE') }}" + assert os.environ["PREFECT_BLOCKS_SECRET_MY_PASSWORD_VALUE"] == "test-value" + + # Test with a complex block instance name + result = replace_with_env_var_call( + "prefect.blocks.json.complex-name!@123.value", "complex-value" + ) + assert result == "{{ env_var('PREFECT_BLOCKS_JSON_COMPLEX_NAME_123_VALUE') }}" + assert os.environ["PREFECT_BLOCKS_JSON_COMPLEX_NAME_123_VALUE"] == "complex-value" + + # Test with non-string value + result = replace_with_env_var_call("prefect.blocks.json.number-config.value", 42) + assert result == "{{ env_var('PREFECT_BLOCKS_JSON_NUMBER_CONFIG_VALUE') }}" + assert os.environ["PREFECT_BLOCKS_JSON_NUMBER_CONFIG_VALUE"] == "42" diff --git a/src/prefect/utilities/templating.py b/src/prefect/utilities/templating.py index a009e7d03366..467a86e4d688 100644 --- a/src/prefect/utilities/templating.py +++ b/src/prefect/utilities/templating.py @@ -4,6 +4,7 @@ from typing import ( TYPE_CHECKING, Any, + Callable, Literal, NamedTuple, Optional, @@ -195,13 +196,20 @@ def apply_values( raise ValueError(f"Unexpected template type {type(template).__name__!r}") +def replace_with_value(placeholder: str, value: Any) -> Any: + """A block reference replacement function that returns the value unchanged.""" + return value + + @inject_client async def resolve_block_document_references( - template: T, client: Optional["PrefectClient"] = None + template: T, + replacement_function: Callable[[str, Any], Any] = replace_with_value, + client: Optional["PrefectClient"] = None, ) -> Union[T, dict[str, Any]]: """ Resolve block document references in a template by replacing each reference with - the data of the block document. + the return value of the replacement function. Recursively searches for block document references in dictionaries and lists. @@ -258,6 +266,7 @@ async def resolve_block_document_references( Args: template: The template to resolve block documents in + replacement_function: A function that takes the block placeholder and the block value and returns replacement text for the template Returns: The template with block documents resolved @@ -275,13 +284,15 @@ async def resolve_block_document_references( updated_template: dict[str, Any] = {} for key, value in template.items(): updated_value = await resolve_block_document_references( - value, client=client + value, replacement_function=replacement_function, client=client ) updated_template[key] = updated_value return updated_template elif isinstance(template, list): return [ - await resolve_block_document_references(item, client=client) + await resolve_block_document_references( + item, replacement_function=replacement_function, client=client + ) for item in template ] elif isinstance(template, str): @@ -326,7 +337,7 @@ async def resolve_block_document_references( ) value = from_dict - return value + return replacement_function(placeholder.full_match, value) else: raise ValueError( f"Invalid template: {template!r}. Only a single block placeholder is" From 176e51acedaf7363d48dcabf7327c776db0bee85 Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Fri, 31 Jan 2025 14:48:35 -0600 Subject: [PATCH 06/10] remove print --- src/integrations/prefect-dbt/prefect_dbt/core/profiles.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py index a647e4e5f3bd..6e693e18c9f1 100644 --- a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py +++ b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py @@ -68,7 +68,6 @@ def replace_with_env_var_call(placeholder: str, value: Any) -> str: Returns: The template text for an env var call """ - print("this is being called") env_var_name = slugify.slugify(placeholder, separator="_").upper() os.environ[env_var_name] = str(value) From f4e8e2020a0498aa0e2f13fd761e464b238a30bd Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Fri, 31 Jan 2025 14:50:21 -0600 Subject: [PATCH 07/10] remove extra fixure and imports --- .../prefect-dbt/tests/core/test_profiles.py | 43 ------------------- 1 file changed, 43 deletions(-) diff --git a/src/integrations/prefect-dbt/tests/core/test_profiles.py b/src/integrations/prefect-dbt/tests/core/test_profiles.py index f923b4422d75..7c049975115f 100644 --- a/src/integrations/prefect-dbt/tests/core/test_profiles.py +++ b/src/integrations/prefect-dbt/tests/core/test_profiles.py @@ -1,6 +1,4 @@ import os -import warnings -from datetime import datetime from pathlib import Path import pytest @@ -13,49 +11,8 @@ resolve_profiles_yml, ) -from prefect._internal.compatibility.deprecated import PrefectDeprecationWarning from prefect.client.orchestration import get_client - -def should_reraise_warning(warning): - """ - Determine if a deprecation warning should be reraised based on the date. - - Deprecation warnings that have passed the date threshold should be reraised to - ensure the deprecated code paths are removed. - """ - message = str(warning.message) - try: - # Extract the date from the new message format - date_str = message.split("not be available in new releases after ")[1].strip( - "." - ) - # Parse the date - deprecation_date = datetime.strptime(date_str, "%b %Y").date().replace(day=1) - - # Check if the current date is after the start of the month following the deprecation date - current_date = datetime.now().date().replace(day=1) - return current_date > deprecation_date - except Exception: - # Reraise in cases of failure - return True - - -@pytest.fixture -def ignore_prefect_deprecation_warnings(): - """ - Ignore deprecation warnings from the agent module to avoid - test failures. - """ - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("ignore", category=PrefectDeprecationWarning) - yield - for warning in w: - if isinstance(warning.message, PrefectDeprecationWarning): - if should_reraise_warning(warning): - warnings.warn(warning.message, warning.category, stacklevel=2) - - SAMPLE_PROFILE = { "jaffle_shop": { "outputs": { From 1536c71e39d1edb0c5b2fa12f0b49fb62ed13c3c Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Fri, 31 Jan 2025 14:55:55 -0600 Subject: [PATCH 08/10] remove space --- src/integrations/prefect-dbt/prefect_dbt/core/profiles.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py index 6e693e18c9f1..4844ce018abc 100644 --- a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py +++ b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py @@ -1,5 +1,5 @@ """ -Utilities for working with dbt profiles.yml files, including resolving +Utilities for working with dbt profiles.yml files, including resolving block document and variable references. """ From b651af7350b42a3ad5fd8735c87fb8ff43181fc1 Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Fri, 31 Jan 2025 16:06:09 -0600 Subject: [PATCH 09/10] rearrange args --- .../prefect-dbt/prefect_dbt/core/profiles.py | 6 ++++-- src/prefect/utilities/templating.py | 18 ++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py index 4844ce018abc..355bf24f71e2 100644 --- a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py +++ b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py @@ -104,7 +104,7 @@ async def aresolve_profiles_yml( temp_dir_path = Path(temp_dir) profiles_yml: dict[str, Any] = load_profiles_yml(profiles_dir) profiles_yml = await resolve_block_document_references( - profiles_yml, replace_with_env_var_call + profiles_yml, value_transformer=replace_with_env_var_call ) profiles_yml = await resolve_variables(profiles_yml) @@ -142,7 +142,9 @@ def resolve_profiles_yml( temp_dir_path = Path(temp_dir) profiles_yml: dict[str, Any] = load_profiles_yml(profiles_dir) profiles_yml = run_coro_as_sync( - resolve_block_document_references(profiles_yml, replace_with_env_var_call) + resolve_block_document_references( + profiles_yml, value_transformer=replace_with_env_var_call + ) ) profiles_yml = run_coro_as_sync(resolve_variables(profiles_yml)) diff --git a/src/prefect/utilities/templating.py b/src/prefect/utilities/templating.py index 467a86e4d688..db176afcb2ea 100644 --- a/src/prefect/utilities/templating.py +++ b/src/prefect/utilities/templating.py @@ -196,16 +196,11 @@ def apply_values( raise ValueError(f"Unexpected template type {type(template).__name__!r}") -def replace_with_value(placeholder: str, value: Any) -> Any: - """A block reference replacement function that returns the value unchanged.""" - return value - - @inject_client async def resolve_block_document_references( template: T, - replacement_function: Callable[[str, Any], Any] = replace_with_value, client: Optional["PrefectClient"] = None, + value_transformer: Optional[Callable[[str, Any], Any]] = None, ) -> Union[T, dict[str, Any]]: """ Resolve block document references in a template by replacing each reference with @@ -266,7 +261,7 @@ async def resolve_block_document_references( Args: template: The template to resolve block documents in - replacement_function: A function that takes the block placeholder and the block value and returns replacement text for the template + value_transformer: A function that takes the block placeholder and the block value and returns replacement text for the template Returns: The template with block documents resolved @@ -284,14 +279,14 @@ async def resolve_block_document_references( updated_template: dict[str, Any] = {} for key, value in template.items(): updated_value = await resolve_block_document_references( - value, replacement_function=replacement_function, client=client + value, value_transformer=value_transformer, client=client ) updated_template[key] = updated_value return updated_template elif isinstance(template, list): return [ await resolve_block_document_references( - item, replacement_function=replacement_function, client=client + item, value_transformer=value_transformer, client=client ) for item in template ] @@ -337,7 +332,10 @@ async def resolve_block_document_references( ) value = from_dict - return replacement_function(placeholder.full_match, value) + if value_transformer: + value = value_transformer(placeholder.full_match, value) + + return value else: raise ValueError( f"Invalid template: {template!r}. Only a single block placeholder is" From 052c778baca43e34319ad5f3088780579f37d1cb Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Fri, 31 Jan 2025 16:10:58 -0600 Subject: [PATCH 10/10] update docstring --- src/prefect/utilities/templating.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/utilities/templating.py b/src/prefect/utilities/templating.py index db176afcb2ea..c721478d65b5 100644 --- a/src/prefect/utilities/templating.py +++ b/src/prefect/utilities/templating.py @@ -204,7 +204,7 @@ async def resolve_block_document_references( ) -> Union[T, dict[str, Any]]: """ Resolve block document references in a template by replacing each reference with - the return value of the replacement function. + its value or the return value of the transformer function if provided. Recursively searches for block document references in dictionaries and lists.