Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix create_markdown_artifact usage in prefect-dbt #16926

Merged
merged 2 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/integrations/prefect-dbt/prefect_dbt/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pydantic import Field

from prefect import task
from prefect.artifacts import create_markdown_artifact
from prefect.artifacts import acreate_markdown_artifact
from prefect.logging import get_run_logger
from prefect.states import Failed
from prefect.utilities.asyncutils import sync_compatible
Expand Down Expand Up @@ -194,9 +194,8 @@ def _stream_output(event):
if create_summary_artifact and isinstance(result.result, ExecutionResult):
run_results = consolidate_run_results(result)
markdown = create_summary_markdown(run_results, command)
artifact_id = await create_markdown_artifact(
markdown=markdown,
key=summary_artifact_key,
artifact_id = await acreate_markdown_artifact(
markdown=markdown, key=summary_artifact_key
)
if not artifact_id:
logger.error(f"Summary Artifact was not created for dbt {command} task")
Expand Down Expand Up @@ -848,12 +847,13 @@ def dbt_test_flow():
return results


def create_summary_markdown(run_results: dict, command: str) -> str:
def create_summary_markdown(run_results: dict[str, Any], command: str) -> str:
"""
Creates a Prefect task artifact summarizing the results
of the above predefined prefrect-dbt task.
"""
markdown = f"# dbt {command} Task Summary\n"
prefix = "dbt" if not command.startswith("dbt") else ""
markdown = f"# {prefix} {command} Task Summary\n"
markdown += _create_node_summary_table_md(run_results=run_results)

if (
Expand Down
45 changes: 39 additions & 6 deletions src/integrations/prefect-dbt/tests/cli/test_commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock

import pytest
Expand Down Expand Up @@ -216,7 +217,9 @@ async def mock_dbt_runner_ls_success():


@pytest.fixture
def dbt_runner_model_result(monkeypatch, mock_dbt_runner_model_success):
def dbt_runner_model_result(
monkeypatch: pytest.MonkeyPatch, mock_dbt_runner_model_success: dbtRunnerResult
) -> None:
_mock_dbt_runner_invoke_success = MagicMock(
return_value=mock_dbt_runner_model_success
)
Expand All @@ -226,13 +229,17 @@ def dbt_runner_model_result(monkeypatch, mock_dbt_runner_model_success):


@pytest.fixture
def dbt_runner_ls_result(monkeypatch, mock_dbt_runner_ls_success):
def dbt_runner_ls_result(
monkeypatch: pytest.MonkeyPatch, mock_dbt_runner_ls_success: dbtRunnerResult
) -> None:
_mock_dbt_runner_ls_result = MagicMock(return_value=mock_dbt_runner_ls_success)
monkeypatch.setattr("dbt.cli.main.dbtRunner.invoke", _mock_dbt_runner_ls_result)


@pytest.fixture
def dbt_runner_freshness_error(monkeypatch, mock_dbt_runner_freshness_error):
def dbt_runner_freshness_error(
monkeypatch: pytest.MonkeyPatch, mock_dbt_runner_freshness_error: dbtRunnerResult
) -> None:
_mock_dbt_runner_freshness_error = MagicMock(
return_value=mock_dbt_runner_freshness_error
)
Expand All @@ -242,7 +249,9 @@ def dbt_runner_freshness_error(monkeypatch, mock_dbt_runner_freshness_error):


@pytest.fixture
def dbt_runner_freshness_success(monkeypatch, mock_dbt_runner_freshness_success):
def dbt_runner_freshness_success(
monkeypatch: pytest.MonkeyPatch, mock_dbt_runner_freshness_success: dbtRunnerResult
) -> None:
_mock_dbt_runner_freshness_success = MagicMock(
return_value=mock_dbt_runner_freshness_success
)
Expand All @@ -253,7 +262,7 @@ def dbt_runner_freshness_success(monkeypatch, mock_dbt_runner_freshness_success)


@pytest.fixture
def dbt_runner_failed_result(monkeypatch):
def dbt_runner_failed_result(monkeypatch: pytest.MonkeyPatch) -> None:
_mock_dbt_runner_invoke_failed = MagicMock(
return_value=dbtRunnerResult(
success=False,
Expand All @@ -265,7 +274,7 @@ def dbt_runner_failed_result(monkeypatch):


@pytest.fixture
def profiles_dir(tmp_path):
def profiles_dir(tmp_path: Path) -> str:
return str(tmp_path) + "/.dbt"


Expand Down Expand Up @@ -558,6 +567,30 @@ def test_append_dirs_to_commands(
)


@pytest.mark.usefixtures("dbt_runner_freshness_success")
def test_sync_dbt_cli_command_creates_artifact(
profiles_dir: str, dbt_cli_profile: Any
) -> None:
@flow
def test_flow() -> None:
trigger_dbt_cli_command(
command="dbt source freshness",
profiles_dir=profiles_dir,
dbt_cli_profile=dbt_cli_profile,
summary_artifact_key="foo",
create_summary_artifact=True,
)

test_flow()
assert (a := Artifact.get(key="foo"))
assert a.type == "markdown"
assert isinstance(a.data, str) and a.data.startswith(
"# dbt source freshness Task Summary"
)
assert "my_first_dbt_model" in a.data
assert "Successful Nodes" in a.data


@pytest.mark.usefixtures("dbt_runner_model_result")
async def test_run_dbt_build_creates_artifact(profiles_dir, dbt_cli_profile_bare):
@flow
Expand Down
2 changes: 1 addition & 1 deletion src/integrations/prefect-dbt/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def dbt_cli_profile():


@pytest.fixture
def dbt_cli_profile_bare():
def dbt_cli_profile_bare() -> DbtCliProfile:
target_configs = TargetConfigs(
type="custom", schema="my_schema", extras={"account": "fake"}
)
Expand Down