Skip to content

Commit

Permalink
fix create_markdown_artifact usage in prefect-dbt (#16926)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Jan 31, 2025
1 parent 0d553de commit b87312c
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 13 deletions.
12 changes: 6 additions & 6 deletions src/integrations/prefect-dbt/prefect_dbt/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pydantic import Field

from prefect import task
from prefect.artifacts import create_markdown_artifact
from prefect.artifacts import acreate_markdown_artifact
from prefect.logging import get_run_logger
from prefect.states import Failed
from prefect.utilities.asyncutils import sync_compatible
Expand Down Expand Up @@ -194,9 +194,8 @@ def _stream_output(event):
if create_summary_artifact and isinstance(result.result, ExecutionResult):
run_results = consolidate_run_results(result)
markdown = create_summary_markdown(run_results, command)
artifact_id = await create_markdown_artifact(
markdown=markdown,
key=summary_artifact_key,
artifact_id = await acreate_markdown_artifact(
markdown=markdown, key=summary_artifact_key
)
if not artifact_id:
logger.error(f"Summary Artifact was not created for dbt {command} task")
Expand Down Expand Up @@ -848,12 +847,13 @@ def dbt_test_flow():
return results


def create_summary_markdown(run_results: dict, command: str) -> str:
def create_summary_markdown(run_results: dict[str, Any], command: str) -> str:
"""
Creates a Prefect task artifact summarizing the results
of the above predefined prefrect-dbt task.
"""
markdown = f"# dbt {command} Task Summary\n"
prefix = "dbt" if not command.startswith("dbt") else ""
markdown = f"# {prefix} {command} Task Summary\n"
markdown += _create_node_summary_table_md(run_results=run_results)

if (
Expand Down
45 changes: 39 additions & 6 deletions src/integrations/prefect-dbt/tests/cli/test_commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock

import pytest
Expand Down Expand Up @@ -216,7 +217,9 @@ async def mock_dbt_runner_ls_success():


@pytest.fixture
def dbt_runner_model_result(monkeypatch, mock_dbt_runner_model_success):
def dbt_runner_model_result(
monkeypatch: pytest.MonkeyPatch, mock_dbt_runner_model_success: dbtRunnerResult
) -> None:
_mock_dbt_runner_invoke_success = MagicMock(
return_value=mock_dbt_runner_model_success
)
Expand All @@ -226,13 +229,17 @@ def dbt_runner_model_result(monkeypatch, mock_dbt_runner_model_success):


@pytest.fixture
def dbt_runner_ls_result(monkeypatch, mock_dbt_runner_ls_success):
def dbt_runner_ls_result(
monkeypatch: pytest.MonkeyPatch, mock_dbt_runner_ls_success: dbtRunnerResult
) -> None:
_mock_dbt_runner_ls_result = MagicMock(return_value=mock_dbt_runner_ls_success)
monkeypatch.setattr("dbt.cli.main.dbtRunner.invoke", _mock_dbt_runner_ls_result)


@pytest.fixture
def dbt_runner_freshness_error(monkeypatch, mock_dbt_runner_freshness_error):
def dbt_runner_freshness_error(
monkeypatch: pytest.MonkeyPatch, mock_dbt_runner_freshness_error: dbtRunnerResult
) -> None:
_mock_dbt_runner_freshness_error = MagicMock(
return_value=mock_dbt_runner_freshness_error
)
Expand All @@ -242,7 +249,9 @@ def dbt_runner_freshness_error(monkeypatch, mock_dbt_runner_freshness_error):


@pytest.fixture
def dbt_runner_freshness_success(monkeypatch, mock_dbt_runner_freshness_success):
def dbt_runner_freshness_success(
monkeypatch: pytest.MonkeyPatch, mock_dbt_runner_freshness_success: dbtRunnerResult
) -> None:
_mock_dbt_runner_freshness_success = MagicMock(
return_value=mock_dbt_runner_freshness_success
)
Expand All @@ -253,7 +262,7 @@ def dbt_runner_freshness_success(monkeypatch, mock_dbt_runner_freshness_success)


@pytest.fixture
def dbt_runner_failed_result(monkeypatch):
def dbt_runner_failed_result(monkeypatch: pytest.MonkeyPatch) -> None:
_mock_dbt_runner_invoke_failed = MagicMock(
return_value=dbtRunnerResult(
success=False,
Expand All @@ -265,7 +274,7 @@ def dbt_runner_failed_result(monkeypatch):


@pytest.fixture
def profiles_dir(tmp_path):
def profiles_dir(tmp_path: Path) -> str:
return str(tmp_path) + "/.dbt"


Expand Down Expand Up @@ -558,6 +567,30 @@ def test_append_dirs_to_commands(
)


@pytest.mark.usefixtures("dbt_runner_freshness_success")
def test_sync_dbt_cli_command_creates_artifact(
profiles_dir: str, dbt_cli_profile: Any
) -> None:
@flow
def test_flow() -> None:
trigger_dbt_cli_command(
command="dbt source freshness",
profiles_dir=profiles_dir,
dbt_cli_profile=dbt_cli_profile,
summary_artifact_key="foo",
create_summary_artifact=True,
)

test_flow()
assert (a := Artifact.get(key="foo"))
assert a.type == "markdown"
assert isinstance(a.data, str) and a.data.startswith(
"# dbt source freshness Task Summary"
)
assert "my_first_dbt_model" in a.data
assert "Successful Nodes" in a.data


@pytest.mark.usefixtures("dbt_runner_model_result")
async def test_run_dbt_build_creates_artifact(profiles_dir, dbt_cli_profile_bare):
@flow
Expand Down
2 changes: 1 addition & 1 deletion src/integrations/prefect-dbt/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def dbt_cli_profile():


@pytest.fixture
def dbt_cli_profile_bare():
def dbt_cli_profile_bare() -> DbtCliProfile:
target_configs = TargetConfigs(
type="custom", schema="my_schema", extras={"account": "fake"}
)
Expand Down

0 comments on commit b87312c

Please sign in to comment.